core/resolvers/notifications.py

90 lines
2.5 KiB
Python
Raw Normal View History

2023-10-30 21:00:55 +00:00
from sqlalchemy import and_, desc, select, update
2023-10-26 21:07:35 +00:00
from auth.authenticate import login_required
2023-10-30 21:00:55 +00:00
from auth.credentials import AuthCredentials
from base.orm import local_session
2023-10-30 21:00:55 +00:00
from base.resolvers import mutation, query
from orm import Notification
@query.field("loadNotifications")
@login_required
async def load_notifications(_, info, params=None):
if params is None:
params = {}
auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id
2023-10-30 21:00:55 +00:00
limit = params.get("limit", 50)
offset = params.get("offset", 0)
2023-10-30 21:00:55 +00:00
q = (
select(Notification)
.where(Notification.user == user_id)
.order_by(desc(Notification.createdAt))
.limit(limit)
.offset(offset)
)
2023-10-13 15:15:14 +00:00
notifications = []
with local_session() as session:
2023-10-30 21:00:55 +00:00
total_count = session.query(Notification).where(Notification.user == user_id).count()
2023-10-30 21:00:55 +00:00
total_unread_count = (
session.query(Notification)
.where(and_(Notification.user == user_id, Notification.seen == False)) # noqa: E712
.count()
)
2023-10-13 15:15:14 +00:00
for [notification] in session.execute(q):
notification.type = notification.type.name
notifications.append(notification)
return {
"notifications": notifications,
"totalCount": total_count,
2023-10-30 21:00:55 +00:00
"totalUnreadCount": total_unread_count,
}
@mutation.field("markNotificationAsRead")
@login_required
async def mark_notification_as_read(_, info, notification_id: int):
auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id
with local_session() as session:
2023-10-30 21:00:55 +00:00
notification = (
session.query(Notification)
.where(and_(Notification.id == notification_id, Notification.user == user_id))
.one()
)
notification.seen = True
session.commit()
return {}
@mutation.field("markAllNotificationsAsRead")
@login_required
async def mark_all_notifications_as_read(_, info):
auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id
2023-10-30 21:00:55 +00:00
statement = (
update(Notification)
.where(and_(Notification.user == user_id, Notification.seen == False)) # noqa: E712
.values(seen=True)
)
with local_session() as session:
try:
session.execute(statement)
session.commit()
except Exception as e:
session.rollback()
print(f"[mark_all_notifications_as_read] error: {str(e)}")
return {}