This commit is contained in:
2024-01-26 03:40:49 +03:00
parent 7beddea5b1
commit 59a1f8c902
14 changed files with 249 additions and 188 deletions

View File

@@ -1,11 +1,13 @@
from orm.notification import Notification, NotificationAction, NotificationEntity
from resolvers.model import NotificationReaction, NotificationAuthor, NotificationShout
from services.db import local_session
from services.rediscache import redis
import asyncio
import logging
logger = logging.getLogger(f"[listener.listen_task] ")
from orm.notification import Notification
from resolvers.model import NotificationAuthor, NotificationReaction, NotificationShout
from services.db import local_session
from services.rediscache import redis
logger = logging.getLogger('[listener.listen_task] ')
logger.setLevel(logging.DEBUG)
@@ -19,8 +21,8 @@ async def handle_notification(n: ServiceMessage, channel: str):
"""создаеёт новое хранимое уведомление"""
with local_session() as session:
try:
if channel.startswith("follower:"):
author_id = int(channel.split(":")[1])
if channel.startswith('follower:'):
author_id = int(channel.split(':')[1])
if isinstance(n.payload, NotificationAuthor):
n.payload.following_id = author_id
n = Notification(action=n.action, entity=n.entity, payload=n.payload)
@@ -28,7 +30,7 @@ async def handle_notification(n: ServiceMessage, channel: str):
session.commit()
except Exception as e:
session.rollback()
logger.error(f"[listener.handle_reaction] error: {str(e)}")
logger.error(f'[listener.handle_reaction] error: {str(e)}')
async def listen_task(pattern):
@@ -38,9 +40,9 @@ async def listen_task(pattern):
notification_message = ServiceMessage(**message_data)
await handle_notification(notification_message, str(channel))
except Exception as e:
logger.error(f"Error processing notification: {str(e)}")
logger.error(f'Error processing notification: {str(e)}')
async def notifications_worker():
# Use asyncio.gather to run tasks concurrently
await asyncio.gather(listen_task("follower:*"), listen_task("reaction"), listen_task("shout"))
await asyncio.gather(listen_task('follower:*'), listen_task('reaction'), listen_task('shout'))

View File

@@ -1,27 +1,36 @@
import json
import logging
import time
from typing import Dict, List
import strawberry
from sqlalchemy import and_, select
from sqlalchemy.orm import aliased
from sqlalchemy.sql import not_
from services.db import local_session
from orm.notification import (
Notification,
NotificationAction,
NotificationEntity,
NotificationSeen,
)
from resolvers.model import (
NotificationReaction,
NotificationGroup,
NotificationShout,
NotificationAuthor,
NotificationGroup,
NotificationReaction,
NotificationShout,
NotificationsResult,
)
from orm.notification import NotificationAction, NotificationEntity, NotificationSeen, Notification
from typing import Dict, List
import time, json
import strawberry
from sqlalchemy.orm import aliased
from sqlalchemy.sql.expression import or_
from sqlalchemy import select, and_
import logging
from services.db import local_session
logger = logging.getLogger("[resolvers.schema] ")
logger = logging.getLogger('[resolvers.schema] ')
logger.setLevel(logging.DEBUG)
async def get_notifications_grouped(author_id: int, after: int = 0, limit: int = 10, offset: int = 0):
async def get_notifications_grouped( # noqa: C901
author_id: int, after: int = 0, limit: int = 10, offset: int = 0
):
"""
Retrieves notifications for a given author.
@@ -47,10 +56,13 @@ async def get_notifications_grouped(author_id: int, after: int = 0, limit: int =
authors: List[NotificationAuthor], # List of authors involved in the thread.
}
"""
NotificationSeenAlias = aliased(NotificationSeen)
query = select(Notification, NotificationSeenAlias.viewer.label("seen")).outerjoin(
seen_alias = aliased(NotificationSeen)
query = select(Notification, seen_alias.viewer.label('seen')).outerjoin(
NotificationSeen,
and_(NotificationSeen.viewer == author_id, NotificationSeen.notification == Notification.id),
and_(
NotificationSeen.viewer == author_id,
NotificationSeen.notification == Notification.id,
),
)
if after:
query = query.filter(Notification.created_at > after)
@@ -62,23 +74,36 @@ async def get_notifications_grouped(author_id: int, after: int = 0, limit: int =
notifications_by_thread: Dict[str, List[Notification]] = {}
groups_by_thread: Dict[str, NotificationGroup] = {}
with local_session() as session:
total = session.query(Notification).filter(and_(Notification.action == NotificationAction.CREATE.value, Notification.created_at > after)).count()
unread = session.query(Notification).filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
not_(Notification.seen)
total = (
session.query(Notification)
.filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
)
)
).count()
.count()
)
unread = (
session.query(Notification)
.filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
not_(Notification.seen),
)
)
.count()
)
notifications_result = session.execute(query)
for n, seen in notifications_result:
thread_id = ""
for n, _seen in notifications_result:
thread_id = ''
payload = json.loads(n.payload)
logger.debug(f"[resolvers.schema] {n.action} {n.entity}: {payload}")
if n.entity == "shout" and n.action == "create":
logger.debug(f'[resolvers.schema] {n.action} {n.entity}: {payload}')
if n.entity == 'shout' and n.action == 'create':
shout: NotificationShout = payload
thread_id += f"{shout.id}"
logger.debug(f"create shout: {shout}")
thread_id += f'{shout.id}'
logger.debug(f'create shout: {shout}')
group = groups_by_thread.get(thread_id) or NotificationGroup(
id=thread_id,
entity=n.entity,
@@ -86,8 +111,8 @@ async def get_notifications_grouped(author_id: int, after: int = 0, limit: int =
authors=shout.authors,
updated_at=shout.created_at,
reactions=[],
action="create",
seen=author_id in n.seen
action='create',
seen=author_id in n.seen,
)
# store group in result
groups_by_thread[thread_id] = group
@@ -99,11 +124,11 @@ async def get_notifications_grouped(author_id: int, after: int = 0, limit: int =
elif n.entity == NotificationEntity.REACTION.value and n.action == NotificationAction.CREATE.value:
reaction: NotificationReaction = payload
shout: NotificationShout = reaction.shout
thread_id += f"{reaction.shout}"
if reaction.kind == "LIKE" or reaction.kind == "DISLIKE":
thread_id += f'{reaction.shout}'
if not bool(reaction.reply_to) and (reaction.kind == 'LIKE' or reaction.kind == 'DISLIKE'):
# TODO: making published reaction vote announce
pass
elif reaction.kind == "COMMENT":
elif reaction.kind == 'COMMENT':
if reaction.reply_to:
thread_id += f"{'::' + str(reaction.reply_to)}"
group: NotificationGroup | None = groups_by_thread.get(thread_id)
@@ -128,8 +153,9 @@ async def get_notifications_grouped(author_id: int, after: int = 0, limit: int =
break
else:
# init notification group
reactions = []
reactions.append(reaction.id)
reactions = [
reaction.id,
]
group = NotificationGroup(
id=thread_id,
action=n.action,
@@ -140,7 +166,7 @@ async def get_notifications_grouped(author_id: int, after: int = 0, limit: int =
authors=[
reaction.created_by,
],
seen=author_id in n.seen
seen=author_id in n.seen,
)
# store group in result
groups_by_thread[thread_id] = group
@@ -149,20 +175,22 @@ async def get_notifications_grouped(author_id: int, after: int = 0, limit: int =
notifications.append(n)
notifications_by_thread[thread_id] = notifications
elif n.entity == "follower":
thread_id = "followers"
elif n.entity == 'follower':
thread_id = 'followers'
follower: NotificationAuthor = payload
group = groups_by_thread.get(thread_id) or NotificationGroup(
id=thread_id,
authors=[follower],
updated_at=int(time.time()),
shout=None,
reactions=[],
entity="follower",
action="follow",
seen=author_id in n.seen
)
group.authors = [follower, ]
id=thread_id,
authors=[follower],
updated_at=int(time.time()),
shout=None,
reactions=[],
entity='follower',
action='follow',
seen=author_id in n.seen,
)
group.authors = [
follower,
]
group.updated_at = int(time.time())
# store group in result
groups_by_thread[thread_id] = group
@@ -182,7 +210,7 @@ async def get_notifications_grouped(author_id: int, after: int = 0, limit: int =
class Query:
@strawberry.field
async def load_notifications(self, info, after: int, limit: int = 50, offset: int = 0) -> NotificationsResult:
author_id = info.context.get("author_id")
author_id = info.context.get('author_id')
groups: Dict[str, NotificationGroup] = {}
if author_id:
groups, notifications, total, unread = await get_notifications_grouped(author_id, after, limit, offset)

View File

@@ -1,8 +1,11 @@
import strawberry
from typing import List, Optional
import strawberry
from strawberry_sqlalchemy_mapper import StrawberrySQLAlchemyMapper
from orm.notification import Notification as NotificationMessage
strawberry_sqlalchemy_mapper = StrawberrySQLAlchemyMapper()

View File

@@ -1,12 +1,12 @@
import strawberry
from strawberry.schema.config import StrawberryConfig
from services.auth import LoginRequiredMiddleware
from resolvers.load import Query
from resolvers.seen import Mutation
from services.auth import LoginRequiredMiddleware
from services.db import Base, engine
schema = strawberry.Schema(
query=Query, mutation=Mutation, config=StrawberryConfig(auto_camel_case=False), extensions=[LoginRequiredMiddleware]
)

View File

@@ -1,14 +1,14 @@
from sqlalchemy import and_
from orm.notification import NotificationSeen
from services.db import local_session
from resolvers.model import Notification, NotificationSeenResult, NotificationReaction
import json
import logging
import strawberry
import logging
import json
from sqlalchemy import and_
from sqlalchemy.exc import SQLAlchemyError
from orm.notification import NotificationSeen
from resolvers.model import Notification, NotificationReaction, NotificationSeenResult
from services.db import local_session
logger = logging.getLogger(__name__)
@@ -17,7 +17,7 @@ logger = logging.getLogger(__name__)
class Mutation:
@strawberry.mutation
async def mark_seen(self, info, notification_id: int) -> NotificationSeenResult:
author_id = info.context.get("author_id")
author_id = info.context.get('author_id')
if author_id:
with local_session() as session:
try:
@@ -27,9 +27,9 @@ class Mutation:
except SQLAlchemyError as e:
session.rollback()
logger.error(
f"[mark_notification_as_read] Ошибка при обновлении статуса прочтения уведомления: {str(e)}"
f'[mark_notification_as_read] Ошибка при обновлении статуса прочтения уведомления: {str(e)}'
)
return NotificationSeenResult(error="cant mark as read")
return NotificationSeenResult(error='cant mark as read')
return NotificationSeenResult(error=None)
@strawberry.mutation
@@ -37,7 +37,7 @@ class Mutation:
# TODO: use latest loaded notification_id as input offset parameter
error = None
try:
author_id = info.context.get("author_id")
author_id = info.context.get('author_id')
if author_id:
with local_session() as session:
nnn = session.query(Notification).filter(and_(Notification.created_at > after)).all()
@@ -46,26 +46,26 @@ class Mutation:
ns = NotificationSeen(notification=n.id, viewer=author_id)
session.add(ns)
session.commit()
except SQLAlchemyError as e:
except SQLAlchemyError:
session.rollback()
except Exception as e:
print(e)
error = "cant mark as read"
error = 'cant mark as read'
return NotificationSeenResult(error=error)
@strawberry.mutation
async def mark_seen_thread(self, info, thread: str, after: int) -> NotificationSeenResult:
error = None
author_id = info.context.get("author_id")
author_id = info.context.get('author_id')
if author_id:
[shout_id, reply_to_id] = thread.split("::")
[shout_id, reply_to_id] = thread.split('::')
with local_session() as session:
# TODO: handle new follower and new shout notifications
new_reaction_notifications = (
session.query(Notification)
.filter(
Notification.action == "create",
Notification.entity == "reaction",
Notification.action == 'create',
Notification.entity == 'reaction',
Notification.created_at > after,
)
.all()
@@ -73,13 +73,13 @@ class Mutation:
removed_reaction_notifications = (
session.query(Notification)
.filter(
Notification.action == "delete",
Notification.entity == "reaction",
Notification.action == 'delete',
Notification.entity == 'reaction',
Notification.created_at > after,
)
.all()
)
exclude = set([])
exclude = set()
for nr in removed_reaction_notifications:
reaction: NotificationReaction = json.loads(nr.payload)
exclude.add(reaction.id)
@@ -97,5 +97,5 @@ class Mutation:
except Exception:
session.rollback()
else:
error = "You are not logged in"
error = 'You are not logged in'
return NotificationSeenResult(error=error)