From 3016a75332216c66b440d44bcdca509fbce9aef8 Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 4 Mar 2024 10:35:33 +0300 Subject: [PATCH] notifier-integration --- orm/notification.py | 41 ++++++ resolvers/__init__.py | 1 + resolvers/notifier.py | 287 ++++++++++++++++++++++++++++++++++++++++ schema/input.graphql | 5 + schema/mutation.graphql | 5 + schema/query.graphql | 3 + schema/type.graphql | 31 +++++ services/notify.py | 13 ++ 8 files changed, 386 insertions(+) create mode 100644 orm/notification.py create mode 100644 resolvers/notifier.py diff --git a/orm/notification.py b/orm/notification.py new file mode 100644 index 00000000..2b09ea11 --- /dev/null +++ b/orm/notification.py @@ -0,0 +1,41 @@ +import time +from enum import Enum as Enumeration + +from sqlalchemy import JSON, Column, ForeignKey, Integer, String +from sqlalchemy.orm import relationship + +from orm.author import Author +from services.db import Base + + +class NotificationEntity(Enumeration): + REACTION = 'reaction' + SHOUT = 'shout' + FOLLOWER = 'follower' + + +class NotificationAction(Enumeration): + CREATE = 'create' + UPDATE = 'update' + DELETE = 'delete' + SEEN = 'seen' + FOLLOW = 'follow' + UNFOLLOW = 'unfollow' + + +class NotificationSeen(Base): + __tablename__ = 'notification_seen' + + viewer = Column(ForeignKey('author.id')) + notification = Column(ForeignKey('notification.id')) + + +class Notification(Base): + __tablename__ = 'notification' + + created_at = Column(Integer, server_default=str(int(time.time()))) + entity = Column(String, nullable=False) + action = Column(String, nullable=False) + payload = Column(JSON, nullable=True) + + seen = relationship(lambda: Author, secondary='notification_seen') diff --git a/resolvers/__init__.py b/resolvers/__init__.py index 2db8de57..d464237e 100644 --- a/resolvers/__init__.py +++ b/resolvers/__init__.py @@ -43,6 +43,7 @@ from resolvers.topic import ( ) + __all__ = [ # author 'get_author', diff --git a/resolvers/notifier.py b/resolvers/notifier.py new file mode 100644 index 00000000..41d58bea --- /dev/null +++ b/resolvers/notifier.py @@ -0,0 +1,287 @@ +import json +import time +from typing import List, Tuple + +from sqlalchemy.exc import SQLAlchemyError + +from services.auth import login_required +from services.schema import mutation, query +from sqlalchemy import and_, select +from sqlalchemy.orm import aliased +from sqlalchemy.sql import not_ + +from orm.notification import ( + Notification, + NotificationAction, + NotificationEntity, + NotificationSeen, +) +from services.db import local_session +from services.logger import root_logger as logger + + +def query_notifications(author_id: int, after: int = 0) -> Tuple[int, int, List[Tuple[Notification, bool]]]: + notification_seen_alias = aliased(NotificationSeen) + q = ( + select(Notification, notification_seen_alias.viewer.label("seen")) + .outerjoin( + NotificationSeen, + and_( + NotificationSeen.viewer == author_id, + NotificationSeen.notification == Notification.id, + ), + ) + ) + if after: + q = q.filter(Notification.created_at > after) + q = q.group_by(NotificationSeen.notification, Notification.created_at) + + with local_session() as session: + total = ( + session.query(Notification) + .filter( + and_( + Notification.action == NotificationAction.CREATE.value, + Notification.created_at > after, + ) + ) + .count() + ) + + unread = ( + session.query(Notification) + .filter( + and_( + Notification.action == NotificationAction.CREATE.value, + Notification.created_at > after, + not_(Notification.seen), + ) + ) + .count() + ) + + notifications_result = session.execute(q) + notifications = [] + for n, seen in notifications_result: + notifications.append((n, seen)) + + return total, unread, notifications + + +def group_shout(shout_dict, seen: bool, action: str): + return { + "thread": f'shout-{shout_dict.get("id")}', + "entity": 'shout', + "shout": shout_dict, + "authors": shout_dict.get('authors'), + "updated_at": shout_dict.get('created_at'), + "reactions": [], + "action": action, + "seen": seen + } + + +def group_reaction(reaction_dict, seen: bool, action): + thread_id = reaction_dict['shout'] + if reaction_dict['kind'] == "COMMENT" and reaction_dict.get('reply_to'): + thread_id += f"shout-{reaction_dict.get('shout')}::{reaction_dict.get('reply_to')}" + return { + "thread": thread_id, + "entity": 'reaction', + "updated_at": reaction_dict['created_at'], + "reactions": [reaction_dict['id']], + "shout": reaction_dict.get('shout'), + "authors": [reaction_dict.get('created_by'), ], + "action": action, + "seen": seen + } + + +def group_follower(follower, seen: bool): + return { + "thread": "followers", + "authors": [follower], + "updated_at": int(time.time()), + "shout": None, + "reactions": [], + "entity": "follower", + "action": "follow", + "seen": seen + } + + +def get_notifications_grouped(author_id: int, after: int = 0, limit: int = 10): + """ + Retrieves notifications for a given author. + + Args: + author_id (int): The ID of the author for whom notifications are retrieved. + after (int, optional): If provided, selects only notifications created after this timestamp will be considered. + limit (int, optional): The maximum number of groupa to retrieve. + + Returns: + Dict[str, NotificationGroup], int, int: A dictionary where keys are thread IDs + and values are NotificationGroup objects, unread and total amounts. + + This function queries the database to retrieve notifications for the specified author, considering optional filters. + The result is a dictionary where each key is a thread ID, and the corresponding value is a NotificationGroup + containing information about the notifications within that thread. + + NotificationGroup structure: + { + entity: str, # Type of entity (e.g., 'reaction', 'shout', 'follower'). + updated_at: int, # Timestamp of the latest update in the thread. + shout: Optional[NotificationShout] + reactions: List[int], # List of reaction ids within the thread. + authors: List[NotificationAuthor], # List of authors involved in the thread. + } + """ + total, unread, notifications = query_notifications(author_id, after) + groups_by_thread = {} + groups_amount = 0 + + for notification, seen in notifications: + if groups_amount >= limit: + break + + payload = notification.payload + + if notification.entity == NotificationEntity.SHOUT.value: + group = group_shout(payload, seen, notification.action) + thread_id = group.get('thread') + groups_by_thread[thread_id] = group + groups_amount += 1 + + elif notification.entity == NotificationEntity.REACTION.value: + shout_id = payload.get('shout') + author_id = payload.get('created_by') + reply_id = payload.get('reply_to') + thread_id = f'shout-{shout_id}' + if reply_id and payload.get('kind', '').lower() == 'comment': + thread_id += f'{reply_id}' + existing_group = groups_by_thread.get(thread_id) + if existing_group: + existing_group['seen'] = False + existing_group['authors'].append(author_id) + existing_group['reactions'] = existing_group['reactions'] or [] + existing_group['reactions'].append(payload) + groups_by_thread[thread_id] = existing_group + else: + group = group_reaction(payload, seen, notification.action) + if group: + groups_by_thread[thread_id] = group + groups_amount += 1 + + elif notification.entity == "follower": + thread_id = 'followers' if notification.action == 'follow' else 'unfollowers' + group = groups_by_thread.get(thread_id) + if group: + group['authors'].append(payload) + else: + group = group_follower(payload, seen) + groups_amount += 1 + groups_by_thread[thread_id] = group + return groups_by_thread, unread, total + + +@query.field('load_notifications') +@login_required +async def load_notifications(_, info, after: int, limit: int = 50): + author_id = info.context.get("author_id") + if author_id: + groups, unread, total = get_notifications_grouped(author_id, after, limit) + notifications = sorted(groups.values(), key=lambda group: group.updated_at, reverse=True) + return {"notifications": notifications, "total": total, "unread": unread, "error": None} + return {"notifications": [], "total": 0, "unread": 0, "error": None} + + +@mutation.field('notification_mark_seen') +@login_required +async def notification_mark_seen(_, info, notification_id: int): + author_id = info.context.get('author_id') + if author_id: + with local_session() as session: + try: + ns = NotificationSeen(notification=notification_id, viewer=author_id) + session.add(ns) + session.commit() + except SQLAlchemyError as e: + session.rollback() + logger.error(f'seen mutation failed: {e}') + return {"error": 'cant mark as read'} + return {"error": None} + + +@mutation.field('notifications_seen_after') +@login_required +async def notifications_seen_after(_, info, after: int): + # TODO: use latest loaded notification_id as input offset parameter + error = None + try: + author_id = info.context.get('author_id') + if author_id: + with local_session() as session: + nnn = session.query(Notification).filter(and_(Notification.created_at > after)).all() + for n in nnn: + try: + ns = NotificationSeen(notification=n.id, viewer=author_id) + session.add(ns) + session.commit() + except SQLAlchemyError: + session.rollback() + except Exception as e: + print(e) + error = 'cant mark as read' + return {"error": error} + + +@mutation.field('notifications_seen_thread') +@login_required +async def notifications_seen_thread(_, info, thread: str, after: int): + error = None + author_id = info.context.get('author_id') + if author_id: + [shout_id, reply_to_id] = thread.split('::') + with local_session() as session: + # TODO: handle new follower and new shout notifications + new_reaction_notifications = ( + session.query(Notification) + .filter( + Notification.action == 'create', + Notification.entity == 'reaction', + Notification.created_at > after, + ) + .all() + ) + removed_reaction_notifications = ( + session.query(Notification) + .filter( + Notification.action == 'delete', + Notification.entity == 'reaction', + Notification.created_at > after, + ) + .all() + ) + exclude = set() + for nr in removed_reaction_notifications: + reaction = json.loads(nr.payload) + reaction_id = reaction.get('id') + exclude.add(reaction_id) + for n in new_reaction_notifications: + reaction = json.loads(n.payload) + reaction_id = reaction.get('id') + if ( + reaction_id not in exclude + and str(reaction.get('shout')) == str(shout_id) + and str(reaction.get('reply_to')) == str(reply_to_id) + ): + try: + ns = NotificationSeen(notification=n.id, viewer=author_id) + session.add(ns) + session.commit() + except Exception as e: + logger.warn(e) + session.rollback() + else: + error = 'You are not logged in' + return {"error": error} diff --git a/schema/input.graphql b/schema/input.graphql index a49a049d..213da17b 100644 --- a/schema/input.graphql +++ b/schema/input.graphql @@ -79,3 +79,8 @@ input ReactionBy { after: Int sort: ReactionSort } + +input NotificationSeenInput { + notifications: [Int] + thread: Int +} diff --git a/schema/mutation.graphql b/schema/mutation.graphql index ab90cfad..c5ec107d 100644 --- a/schema/mutation.graphql +++ b/schema/mutation.graphql @@ -28,4 +28,9 @@ type Mutation { remove_invite(invite_id: Int!): CommonResult! accept_invite(invite_id: Int!): CommonResult! reject_invite(invite_id: Int!): CommonResult! + + # notifier + notification_mark_seen(notification_id: Int!, seen: Boolean): CommonResult! + notifications_seen_after(after: Int!, seen: Boolean): CommonResult! + notifications_seen_thread(thread_id: String!, seen: Boolean): CommonResult! } diff --git a/schema/query.graphql b/schema/query.graphql index b4d2f5d2..4e6fd28f 100644 --- a/schema/query.graphql +++ b/schema/query.graphql @@ -41,4 +41,7 @@ type Query { get_topics_random(amount: Int): [Topic] get_topics_by_author(slug: String, user: String, author_id: Int): [Topic] get_topics_by_community(slug: String, community_id: Int): [Topic] + + # notifier + get_notifications: NotificationsResult! } diff --git a/schema/type.graphql b/schema/type.graphql index cdcdf789..c98dfb8c 100644 --- a/schema/type.graphql +++ b/schema/type.graphql @@ -178,3 +178,34 @@ type AuthorFollows { # shouts: [Shout] communities: [Community] } + +type Notification { + id: Int! + action: String! + entity: String! + created_at: Int! + payload: String! + seen: [Author] +} + +type NotificationSeenResult { + error: String +} + +type NotificationGroup { + id: Int! + authors: [Author] + updated_at: Int! + entity: String! + action: String + shout: Shout + reactions: [Reaction] + seen: Boolean +} + +type NotificationsResult { + notifications: [NotificationGroup!]! + unread: Int! + total: Int! + error: String +} diff --git a/services/notify.py b/services/notify.py index 588c1eee..f68b8c7a 100644 --- a/services/notify.py +++ b/services/notify.py @@ -1,12 +1,22 @@ import json +from orm.notification import Notification +from services.db import local_session from services.rediscache import redis +def save_notification(action: str, entity: str, payload): + with local_session() as session: + n = Notification(action=action, entity=entity, payload=payload) + session.add(n) + session.commit() + + async def notify_reaction(reaction, action: str = 'create'): channel_name = 'reaction' data = {'payload': reaction, 'action': action} try: + save_notification(action, channel_name, data.get('payload')) await redis.publish(channel_name, json.dumps(data)) except Exception as e: print(f'[services.notify] Failed to publish to channel {channel_name}: {e}') @@ -16,6 +26,7 @@ async def notify_shout(shout, action: str = 'update'): channel_name = 'shout' data = {'payload': shout, 'action': action} try: + save_notification(action, channel_name, data.get('payload')) await redis.publish(channel_name, json.dumps(data)) except Exception as e: print(f'[services.notify] Failed to publish to channel {channel_name}: {e}') @@ -36,6 +47,8 @@ async def notify_follower(follower: dict, author_id: int, action: str = 'follow' if not json_data: raise ValueError('Empty data to publish.') + save_notification(action, channel_name, data.get('payload')) + # Use the 'await' keyword when publishing await redis.publish(channel_name, json_data)