from typing import List, Any from sqlalchemy import and_ from sqlalchemy.orm import aliased from orm.author import Author from orm.notification import Notification as NotificationMessage, NotificationSeen from services.auth import login_required from services.db import local_session from strawberry_sqlalchemy_mapper import StrawberrySQLAlchemyMapper import strawberry strawberry_sqlalchemy_mapper = StrawberrySQLAlchemyMapper() @strawberry_sqlalchemy_mapper.type(NotificationMessage) class Notification: id: int action: str # create update delete join follow etc. entity: str # REACTION SHOUT created_at: int payload: str # JSON data seen: List[int] @strawberry.type class NotificationSeenResult: error: str = strawberry.field(default=None, name="error") @strawberry.type class NotificationsResult: notifications: List[Notification] unread: int total: int @strawberry.type class Query: @strawberry.field @login_required async def load_notifications(self, info, limit: int = 50, offset: int = 0) -> NotificationsResult: user_id = info.context["user_id"] with local_session() as session: author = session.query(Author).filter(Author.user == user_id).first() NotificationSeenAlias = aliased(NotificationSeen) if author: nnn = session.query( NotificationMessage, NotificationSeenAlias.viewer.label("seen") ).outerjoin( NotificationSeen, and_(NotificationSeen.viewer == author.id, NotificationSeen.notification == NotificationMessage.id), ).limit(limit).offset(offset).all() notifications = [] for n in nnn: ntf = Notification() ntf.id = n.id ntf.payload = n.payload ntf.entity = n.entity ntf.action = n.action ntf.created_at = n.created_at ntf.seen = n.seen notifications.append(ntf) nr = NotificationsResult( notifications = notifications, unread = sum(1 for n in notifications if author.id in n.seen), total = session.query(NotificationMessage).count() ) return nr return NotificationsResult( notifications=[], total = 0, unread = 0 ) @strawberry.type class Mutation: @strawberry.mutation @login_required async def mark_notification_as_read(self, info, notification_id: int) -> NotificationSeenResult: user_id = info.context["user_id"] with local_session() as session: try: author = session.query(Author).filter(Author.user == user_id).first() ns = NotificationSeen({"notification": notification_id, "viewer": author.id}) session.add(ns) session.commit() except Exception as e: session.rollback() print(f"[mark_notification_as_read] error: {str(e)}") nsr = NotificationSeenResult(error = "cant mark as read") return nsr return NotificationSeenResult() @strawberry.mutation @login_required async def mark_all_notifications_as_read(self, info) -> NotificationSeenResult: user_id = info.context["user_id"] with local_session() as session: try: author = session.query(Author).filter(Author.user == user_id).first() if author: _nslist = session.query(NotificationSeen).filter(NotificationSeen.viewer == author.id).all() except Exception as e: session.rollback() print(f"[mark_all_notifications_as_read] error: {str(e)}") nsr = NotificationSeenResult() nsr.error = "cant mark as read" return nsr return NotificationSeenResult() schema = strawberry.Schema(query=Query, mutation=Mutation)