from sqlalchemy.sql import not_ from services.db import local_session from resolvers.model import ( NotificationReaction, NotificationGroup, NotificationShout, NotificationAuthor, NotificationsResult, ) from orm.notification import NotificationAction, NotificationEntity, NotificationSeen, Notification from typing import Dict, List import time import json import strawberry from sqlalchemy.orm import aliased from sqlalchemy import select, and_ import logging logger = logging.getLogger("[resolvers.schema] ") logger.setLevel(logging.DEBUG) async def get_notifications_grouped(author_id: int, after: int = 0, limit: int = 10, offset: int = 0): """ Retrieves notifications for a given author. Args: author_id (int): The ID of the author for whom notifications are retrieved. after (int, optional): If provided, selects only notifications created after this timestamp will be considered. limit (int, optional): The maximum number of groupa to retrieve. offset (int, optional): Offset for pagination Returns: Dict[str, NotificationGroup], int, int: A dictionary where keys are thread IDs and values are NotificationGroup objects, unread and total amounts. This function queries the database to retrieve notifications for the specified author, considering optional filters. The result is a dictionary where each key is a thread ID, and the corresponding value is a NotificationGroup containing information about the notifications within that thread. NotificationGroup structure: { entity: str, # Type of entity (e.g., 'reaction', 'shout', 'follower'). updated_at: int, # Timestamp of the latest update in the thread. shout: Optional[NotificationShout] reactions: List[int], # List of reaction ids within the thread. authors: List[NotificationAuthor], # List of authors involved in the thread. } """ NotificationSeenAlias = aliased(NotificationSeen) query = select(Notification, NotificationSeenAlias.viewer.label("seen")).outerjoin( NotificationSeen, and_(NotificationSeen.viewer == author_id, NotificationSeen.notification == Notification.id), ) if after: query = query.filter(Notification.created_at > after) query = query.group_by(NotificationSeen.notification, Notification.created_at) groups_amount = 0 unread = 0 total = 0 notifications_by_thread: Dict[str, List[Notification]] = {} groups_by_thread: Dict[str, NotificationGroup] = {} with local_session() as session: total = ( session.query(Notification) .filter(and_(Notification.action == NotificationAction.CREATE.value, Notification.created_at > after)) .count() ) unread = ( session.query(Notification) .filter( and_( Notification.action == NotificationAction.CREATE.value, Notification.created_at > after, not_(Notification.seen), ) ) .count() ) notifications_result = session.execute(query) for n, seen in notifications_result: thread_id = "" payload = json.loads(n.payload) logger.debug(f"[resolvers.schema] {n.action} {n.entity}: {payload}") if n.entity == "shout" and n.action == "create": shout: NotificationShout = payload thread_id += f"{shout.id}" logger.debug(f"create shout: {shout}") group = groups_by_thread.get(thread_id) or NotificationGroup( id=thread_id, entity=n.entity, shout=shout, authors=shout.authors, updated_at=shout.created_at, reactions=[], action="create", seen=author_id in n.seen, ) # store group in result groups_by_thread[thread_id] = group notifications = notifications_by_thread.get(thread_id, []) if n not in notifications: notifications.append(n) notifications_by_thread[thread_id] = notifications groups_amount += 1 elif n.entity == NotificationEntity.REACTION.value and n.action == NotificationAction.CREATE.value: reaction: NotificationReaction = payload shout: NotificationShout = reaction.shout thread_id += f"{reaction.shout}" if reaction.kind == "LIKE" or reaction.kind == "DISLIKE": # TODO: making published reaction vote announce pass elif reaction.kind == "COMMENT": if reaction.reply_to: thread_id += f"{'::' + str(reaction.reply_to)}" group: NotificationGroup | None = groups_by_thread.get(thread_id) notifications: List[Notification] = notifications_by_thread.get(thread_id, []) if group and notifications: group.seen = False # any not seen notification make it false group.shout = shout group.authors.append(reaction.created_by) if not group.reactions: group.reactions = [] group.reactions.append(reaction.id) # store group in result groups_by_thread[thread_id] = group notifications = notifications_by_thread.get(thread_id, []) if n not in notifications: notifications.append(n) notifications_by_thread[thread_id] = notifications groups_amount += 1 else: groups_amount += 1 if groups_amount > limit: break else: # init notification group reactions = [] reactions.append(reaction.id) group = NotificationGroup( id=thread_id, action=n.action, entity=n.entity, updated_at=reaction.created_at, reactions=reactions, shout=shout, authors=[ reaction.created_by, ], seen=author_id in n.seen, ) # store group in result groups_by_thread[thread_id] = group notifications = notifications_by_thread.get(thread_id, []) if n not in notifications: notifications.append(n) notifications_by_thread[thread_id] = notifications elif n.entity == "follower": thread_id = "followers" follower: NotificationAuthor = payload group = groups_by_thread.get(thread_id) or NotificationGroup( id=thread_id, authors=[follower], updated_at=int(time.time()), shout=None, reactions=[], entity="follower", action="follow", seen=author_id in n.seen, ) group.authors = [ follower, ] group.updated_at = int(time.time()) # store group in result groups_by_thread[thread_id] = group notifications = notifications_by_thread.get(thread_id, []) if n not in notifications: notifications.append(n) notifications_by_thread[thread_id] = notifications groups_amount += 1 if groups_amount > limit: break return groups_by_thread, notifications_by_thread, unread, total @strawberry.type class Query: @strawberry.field async def load_notifications(self, info, after: int, limit: int = 50, offset: int = 0) -> NotificationsResult: author_id = info.context.get("author_id") groups: Dict[str, NotificationGroup] = {} if author_id: groups, notifications, total, unread = await get_notifications_grouped(author_id, after, limit, offset) notifications = sorted(groups.values(), key=lambda group: group.updated_at, reverse=True) return NotificationsResult(notifications=notifications, total=0, unread=0, error=None)