diff --git a/README.md b/README.md index 4e77da5..ff5cad6 100644 --- a/README.md +++ b/README.md @@ -2,15 +2,12 @@ ### Что делает - - сохраняет тех, кому уведомления уже были отправлены (redis: authors-online) - - формирует дайджесты для остальных - слушает Redis PubSub канал с обновлениями реакций ### Что НЕ делает - не отправляет сообщения по SSE - - не определяет кому их отправлять ## Как разрабатывать локально diff --git a/main.py b/main.py index 7274f47..ee3d16d 100644 --- a/main.py +++ b/main.py @@ -9,23 +9,23 @@ from sentry_sdk.integrations.strawberry import StrawberryIntegration from starlette.applications import Starlette from strawberry.asgi import GraphQL -from resolvers.listener import reactions_worker +from resolvers.listener import notifications_worker from resolvers.schema import schema from services.rediscache import redis from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN async def start_up(): + await redis.connect() + + task = asyncio.create_task(notifications_worker()) + print(task) + if MODE == "dev": if exists(DEV_SERVER_PID_FILE_NAME): - await redis.connect() - else: with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f: f.write(str(os.getpid())) else: - await redis.connect() - notification_service_task = asyncio.create_task(reactions_worker()) - print(f"[main.start_up] {notification_service_task}") try: import sentry_sdk diff --git a/orm/notification.py b/orm/notification.py index 2a5c799..98351ad 100644 --- a/orm/notification.py +++ b/orm/notification.py @@ -1,7 +1,6 @@ -import time from enum import Enum as Enumeration -from sqlalchemy import JSON as JSONType +from sqlalchemy import JSON as JSONType, func, cast from sqlalchemy import Column, Enum, ForeignKey, Integer from sqlalchemy.orm import relationship @@ -34,7 +33,7 @@ class NotificationSeen(Base): class Notification(Base): __tablename__ = "notification" - created_at = Column(Integer, default=lambda: int(time.time())) + created_at = Column(Integer, server_default=cast(func.current_timestamp(), Integer)) entity = Column(Enum(NotificationEntity), nullable=False) action = Column(Enum(NotificationAction), nullable=False) payload = Column(JSONType, nullable=True) diff --git a/pyproject.toml b/pyproject.toml index dceab60..99edf96 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "discoursio-notifier" -version = "0.0.3" +version = "0.1.0" description = "notifier server for discours.io" authors = ["discours.io devteam"] diff --git a/resolvers/listener.py b/resolvers/listener.py index dfca4fa..5487c85 100644 --- a/resolvers/listener.py +++ b/resolvers/listener.py @@ -1,13 +1,25 @@ from orm.notification import Notification +from resolvers.model import NotificationReaction, NotificationAuthor, NotificationShout from services.db import local_session from services.rediscache import redis +import asyncio -async def handle_reaction(notification: dict[str, str | int]): +class ServiceMessage: + action: str + entity: str + payload: NotificationReaction | NotificationAuthor | NotificationShout + + +async def handle_notification(n: ServiceMessage, channel: str): """создаеёт новое хранимое уведомление""" with local_session() as session: try: - n = Notification(**notification) + if channel.startswith("follower:"): + author_id = int(channel.split(":")[1]) + if isinstance(n.payload, NotificationAuthor): + n.payload.following_id = author_id + n = Notification(action=n.action, entity=n.entity, payload=n.payload) session.add(n) session.commit() except Exception as e: @@ -15,7 +27,15 @@ async def handle_reaction(notification: dict[str, str | int]): print(f"[listener.handle_reaction] error: {str(e)}") -async def reactions_worker(): - async for message in redis.listen("reaction"): - if message: - await handle_reaction(message) +async def listen_task(pattern): + async for message_data, channel in redis.listen(pattern): + try: + notification_message = ServiceMessage(**message_data) + await handle_notification(notification_message, str(channel)) + except Exception as e: + print(f"[listener.listen_task] Error processing notification: {str(e)}") + + +async def notifications_worker(): + # Use asyncio.gather to run tasks concurrently + await asyncio.gather(listen_task("follower:*"), listen_task("reaction"), listen_task("shout")) diff --git a/resolvers/load.py b/resolvers/load.py new file mode 100644 index 0000000..7afba79 --- /dev/null +++ b/resolvers/load.py @@ -0,0 +1,153 @@ +from services.db import local_session +from resolvers.model import ( + NotificationReaction, + Notification as NotificationMessage, + NotificationGroup, + NotificationShout, + NotificationAuthor, + NotificationsResult, +) +from orm.notification import NotificationSeen +from typing import Dict +import time, json +import strawberry +from sqlalchemy.orm import aliased +from sqlalchemy import select, and_ + + +async def get_notifications_grouped( + author_id: int, after: int = 0, limit: int = 10, offset: int = 0, mark_as_read=False +) -> Dict[str, NotificationGroup]: + """ + Retrieves notifications for a given author. + + Args: + author_id (int): The ID of the author for whom notifications are retrieved. + session: Database connection session + after (int, optional): If provided, only notifications created after this timestamp will be considered. + limit (int, optional): The maximum number of notifications to retrieve. + + Returns: + Dict[str, NotificationGroup]: A dictionary where keys are thread IDs and values are NotificationGroup objects. + + This function queries the database to retrieve notifications for the specified author, considering optional filters. + The result is a dictionary where each key is a thread ID, and the corresponding value is a NotificationGroup + containing information about the notifications within that thread. + + NotificationGroup structure: + { + entity: str, # Type of entity (e.g., 'reaction', 'shout', 'follower'). + updated_at: int, # Timestamp of the latest update in the thread. + reactions: List[Reaction], # List of reactions within the thread. + authors: List[Author], # List of authors involved in the thread. + } + """ + NotificationSeenAlias = aliased(NotificationSeen) + query = select(NotificationMessage, NotificationSeenAlias.viewer.label("seen")).outerjoin( + NotificationSeen, + and_(NotificationSeen.viewer == author_id, NotificationSeen.notification == NotificationMessage.id), + ) + if after: + query = query.filter(NotificationMessage.created_at > after) + query = query.group_by(NotificationSeen.notification) + + notifications: Dict[str, NotificationGroup] = {} + counter = 0 + with local_session() as session: + for n, seen in session.execute(query): + thread_id = "" + payload = json.loads(n.payload) + print(f"[resolvers.schema] {n.action} {n.entity}: {payload}") + if n.entity == "shout": + shout: NotificationShout = payload + thread_id += f"{shout.id}" + if n.action == "delete": + del notifications[thread_id] + elif n.action == "create": + print(f"[resolvers.schema] create shout: {shout}") + notification_group = NotificationGroup( + entity=n.entity, + shout=shout, + authors=shout.authors, + updated_at=shout.created_at, + reactions=[], + action="create", + ) + # store group in result + notifications[thread_id] = notification_group + counter += 1 + elif n.entity == "reaction": + reaction: NotificationReaction = payload + shout: NotificationShout = reaction.shout + thread_id += f"{reaction.shout}" + if reaction.kind == "LIKE" or reaction.kind == "DISLIKE": + # TODO: making published reaction vote announce + pass + elif reaction.kind == "COMMENT": + if reaction.reply_to: + thread_id += f"{'::' + str(reaction.reply_to)}" + notification_group: NotificationGroup | None = notifications.get(thread_id) + if notification_group: + notification_group.shout = shout + notification_group.authors.append(reaction.created_by) + if not notification_group.reactions: + notification_group.reactions = [] + notification_group.reactions.append(reaction.id) + # store group in result + notifications[thread_id] = notification_group + counter += 1 + else: + counter += 1 + if counter > limit: + break + else: + # init notification group + reactions = [] + reactions.append(reaction.id) + notification_group = NotificationGroup( + action=n.action, + entity=n.entity, + updated_at=reaction.created_at, + reactions=reactions, + shout=shout, + authors=[ + reaction.created_by, + ], + ) + # store group in result + notifications[thread_id] = notification_group + elif n.entity == "follower": + thread_id = "followers" + follower: NotificationAuthor = payload + notification_group = notifications.get(thread_id) + if not notification_group: + notification_group = NotificationGroup( + authors=[follower], + updated_at=int(time.time()), + shout=None, + reactions=[], + entity="follower", + action="follow", + ) + # store group in result + notifications[thread_id] = notification_group + counter += 1 + + if counter > limit: + break + + return notifications + + +@strawberry.type +class Query: + @strawberry.field + async def load_notifications(self, info, after: int, limit: int = 50, offset: int = 0) -> NotificationsResult: + author_id = info.context.get("author_id") + notification_groups: Dict[str, NotificationGroup] = {} + if author_id: + # TODO: add total counter calculation + # TODO: add unread counter calculation + notification_groups = await get_notifications_grouped(author_id, after, limit, offset) + notifications = sorted(notification_groups.values(), key=lambda group: group.updated_at, reverse=True) + return NotificationsResult(notifications=notifications, total=0, unread=0, error=None) diff --git a/resolvers/model.py b/resolvers/model.py new file mode 100644 index 0000000..2792322 --- /dev/null +++ b/resolvers/model.py @@ -0,0 +1,72 @@ +import strawberry +from typing import List, Optional +from strawberry_sqlalchemy_mapper import StrawberrySQLAlchemyMapper +from orm.notification import Notification as NotificationMessage + +strawberry_sqlalchemy_mapper = StrawberrySQLAlchemyMapper() + + +@strawberry_sqlalchemy_mapper.type(NotificationMessage) +class Notification: + id: int + action: str # create update delete join follow etc. + entity: str # REACTION SHOUT FOLLOWER + created_at: int + payload: str # JSON data + seen: List[int] # NOTE: adds author_id when seen + # TODO: add recipient defining field + + +@strawberry.type +class NotificationSeenResult: + error: str | None + + +@strawberry.type +class NotificationAuthor: + id: int + slug: str + name: str + pic: str + following_id: Optional[int] + + +@strawberry.type +class NotificationShout: + id: int + slug: str + title: str + created_at: int + authors: List[NotificationAuthor] + + +@strawberry.type +class NotificationReaction: + id: int + kind: str + shout: NotificationShout + reply_to: int + created_by: NotificationAuthor + created_at: int + + +@strawberry.type +class NotificationGroup: + authors: List[NotificationAuthor] + updated_at: int + entity: str + action: Optional[str] + shout: Optional[NotificationShout] + reactions: Optional[List[int]] + # latest reaction.created_at for reactions-updates + # no timestamp for followers-updates + # latest shout.created_at for shouts-updates + # you are invited in authors list + + +@strawberry.type +class NotificationsResult: + notifications: List[NotificationGroup] + unread: int + total: int + error: Optional[str] diff --git a/resolvers/schema.py b/resolvers/schema.py index f7155b8..7d05927 100644 --- a/resolvers/schema.py +++ b/resolvers/schema.py @@ -1,145 +1,9 @@ -import logging -from typing import List -from sqlalchemy.schema import Column - import strawberry -from sqlalchemy import and_, select -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.orm import aliased from strawberry.schema.config import StrawberryConfig -from strawberry_sqlalchemy_mapper import StrawberrySQLAlchemyMapper -from orm.author import Author -from orm.notification import Notification as NotificationMessage -from orm.notification import NotificationSeen from services.auth import LoginRequiredMiddleware -from services.db import local_session - -strawberry_sqlalchemy_mapper = StrawberrySQLAlchemyMapper() - -# Инициализация логгера -logger = logging.getLogger(__name__) - - -@strawberry_sqlalchemy_mapper.type(NotificationMessage) -class Notification: - id: int - action: str # create update delete join follow etc. - entity: str # REACTION SHOUT - created_at: int - payload: str # JSON data - seen: List[int] - - -@strawberry.type -class NotificationSeenResult: - error: str | None - - -@strawberry.type -class NotificationsResult: - notifications: List[Notification] - unread: int - total: int - - -def get_notifications(author_id: int, session, after: int | Column[int], limit: int = 9999, offset: int = 0) -> List[Notification]: - NotificationSeenAlias = aliased(NotificationSeen) - query = ( - select(NotificationMessage, NotificationSeenAlias.viewer.label("seen")) - .outerjoin( - NotificationSeen, - and_(NotificationSeen.viewer == author_id, NotificationSeen.notification == NotificationMessage.id), - ) - .filter(NotificationMessage.created_at > after) - .group_by(NotificationSeen.notification) - ) - if limit: - query = query.limit(limit) - if offset: - query = query.offset(offset) - - notifications = [] - for n, seen in session.execute(query): - ntf = Notification( - id=n.id, - payload=n.payload, - entity=n.entity, - action=n.action, - created_at=n.created_at, - seen=seen, - ) - if ntf: - notifications.append(ntf) - return notifications - - -@strawberry.type -class Query: - @strawberry.field - async def load_notifications(self, info, after: int, limit: int = 50, offset: int = 0) -> NotificationsResult: - author_id = info.context.get("author_id") - with local_session() as session: - try: - if author_id: - notifications = get_notifications(author_id, session, after, limit, offset) - if notifications and len(notifications) > 0: - nr = NotificationsResult( - notifications=notifications, - unread=sum(1 for n in notifications if author_id in n.seen), - total=session.query(NotificationMessage).count(), - ) - return nr - except Exception as ex: - import traceback - - traceback.print_exc() - logger.error(f"[load_notifications] Ошибка при выполнении запроса к базе данных: {ex}") - return NotificationsResult(notifications=[], total=0, unread=0) - - -@strawberry.type -class Mutation: - @strawberry.mutation - async def mark_notification_as_read(self, info, notification_id: int) -> NotificationSeenResult: - author_id = info.context.get("author_id") - if author_id: - with local_session() as session: - try: - ns = NotificationSeen(notification=notification_id, viewer=author_id) - session.add(ns) - session.commit() - except SQLAlchemyError as e: - session.rollback() - logger.error( - f"[mark_notification_as_read] Ошибка при обновлении статуса прочтения уведомления: {str(e)}" - ) - return NotificationSeenResult(error="cant mark as read") - return NotificationSeenResult(error=None) - - @strawberry.mutation - async def mark_all_notifications_as_read(self, info) -> NotificationSeenResult: - author_id = info.context.get("author_id") - if author_id: - with local_session() as session: - try: - author = session.query(Author).filter(Author.id == author_id).first() - if author: - after = author.last_seen - nslist = get_notifications(author_id, session, after) - for n in nslist: - if author_id not in n.seen: - ns = NotificationSeen(viewer=author_id, notification=n.id) - session.add(ns) - session.commit() - except SQLAlchemyError as e: - session.rollback() - logger.error( - f"[mark_all_notifications_as_read] Ошибка обновления статуса прочтения всех уведомлений: {e}" - ) - return NotificationSeenResult(error="cant mark as read") - return NotificationSeenResult(error=None) - +from resolvers.load import Query +from resolvers.seen import Mutation schema = strawberry.Schema( query=Query, mutation=Mutation, config=StrawberryConfig(auto_camel_case=False), extensions=[LoginRequiredMiddleware] diff --git a/resolvers/seen.py b/resolvers/seen.py new file mode 100644 index 0000000..ff4aaca --- /dev/null +++ b/resolvers/seen.py @@ -0,0 +1,45 @@ +from orm.notification import NotificationSeen +from services.db import local_session +from resolvers.model import NotificationSeenResult +from resolvers.load import get_notifications_grouped +import strawberry +import logging + +from sqlalchemy.exc import SQLAlchemyError + + +logger = logging.getLogger(__name__) + + +@strawberry.type +class Mutation: + @strawberry.mutation + async def mark_notification_as_read(self, info, notification_id: int) -> NotificationSeenResult: + author_id = info.context.get("author_id") + if author_id: + with local_session() as session: + try: + ns = NotificationSeen(notification=notification_id, viewer=author_id) + session.add(ns) + session.commit() + except SQLAlchemyError as e: + session.rollback() + logger.error( + f"[mark_notification_as_read] Ошибка при обновлении статуса прочтения уведомления: {str(e)}" + ) + return NotificationSeenResult(error="cant mark as read") + return NotificationSeenResult(error=None) + + @strawberry.mutation + async def mark_all_notifications_as_read(self, info, limit: int = 10, offset: int = 0) -> NotificationSeenResult: + # TODO: use latest loaded notification_id as input offset parameter + ngroups = {} + error = None + try: + author_id = info.context.get("author_id") + if author_id: + ngroups = get_notifications_grouped(author_id, limit, offset, mark_as_read=True) + except Exception as e: + print(e) + error = "cant mark as read" + return NotificationSeenResult(error=error) diff --git a/services/auth.py b/services/auth.py index 2e5c100..d463e32 100644 --- a/services/auth.py +++ b/services/auth.py @@ -1,4 +1,3 @@ -from functools import wraps from aiohttp import ClientSession from starlette.exceptions import HTTPException from strawberry.extensions import Extension @@ -7,12 +6,13 @@ from settings import AUTH_URL from services.db import local_session from orm.author import Author + async def check_auth(req) -> str | None: token = req.headers.get("Authorization") user_id = "" if token: # Logging the authentication token - print(f"[services.auth] checking auth token: {token}") + # print(f"[services.auth] checking auth token: {token}") query_name = "validate_jwt_token" operation = "ValidateToken" headers = { @@ -42,11 +42,16 @@ async def check_auth(req) -> str | None: print(f"[services.auth] errors: {errors}") else: user_id = data.get("data", {}).get(query_name, {}).get("claims", {}).get("sub") - return user_id + if user_id: + print(f"[services.auth] got user_id: {user_id}") + return user_id except Exception as e: # Handling and logging exceptions during authentication check print(f"[services.auth] {e}") + if not user_id: + raise HTTPException(status_code=401, detail="Unathorized") + class LoginRequiredMiddleware(Extension): async def on_request_start(self): @@ -60,3 +65,5 @@ class LoginRequiredMiddleware(Extension): if author: context["author_id"] = author.id context["user_id"] = user_id or None + + self.execution_context.context = context diff --git a/services/core.py b/services/core.py index 66cdcc5..0d1f3e6 100644 --- a/services/core.py +++ b/services/core.py @@ -1,4 +1,4 @@ -from typing import Any, List +from typing import Any import aiohttp @@ -11,7 +11,7 @@ api_base = API_BASE or "https://core.discours.io" async def _request_endpoint(query_name, body) -> Any: async with aiohttp.ClientSession() as session: async with session.post(API_BASE, headers=headers, json=body) as response: - print(f"[services.core] {query_name} response: <{response.status}> {await response.text()}") + print(f"[services.core] {query_name} HTTP Response {response.status} {await response.text()}") if response.status == 200: r = await response.json() if r: @@ -27,10 +27,23 @@ async def get_followed_shouts(author_id: int): {query_name}(author_id: $author_id, limit: $limit, offset: $offset) {{ id slug title }} }}""" - body = { + gql = { "query": query, "operationName": operation, "variables": {"author_id": author_id, "limit": 1000, "offset": 0}, # FIXME: too big limit } - return await _request_endpoint(query_name, body) + return await _request_endpoint(query_name, gql) + + +async def get_shout(shout_id): + query_name = "get_shout" + operation = "GetShout" + + query = f"""query {operation}($slug: String, $shout_id: Int) {{ + {query_name}(slug: $slug, shout_id: $shout_id) {{ id slug title authors {{ id slug name pic }} }} + }}""" + + gql = {"query": query, "operationName": operation, "variables": {"slug": None, "shout_id": shout_id}} + + return await _request_endpoint(query_name, gql) diff --git a/services/rediscache.py b/services/rediscache.py index 96f0b93..64ce35e 100644 --- a/services/rediscache.py +++ b/services/rediscache.py @@ -1,4 +1,3 @@ -import asyncio import json import redis.asyncio as aredis @@ -49,19 +48,19 @@ class RedisCache: return await self._client.publish(channel, data) - async def listen(self, channel): + async def listen(self, pattern): if self._client: pubsub = self._client.pubsub() - await pubsub.subscribe(channel) + await pubsub.psubscribe(pattern) while True: message = await pubsub.get_message() if message and isinstance(message["data"], (str, bytes, bytearray)): + print(f"[services.rediscache] msg: {message}") try: - yield json.loads(message["data"]) - except json.JSONDecodeError as e: - print(f"Error decoding JSON: {e}") - await asyncio.sleep(0.1) + yield json.loads(message["data"]), message.get("channel") + except Exception as e: + print(f"[servoces.rediscache] Error: {e}") redis = RedisCache()