notifier/resolvers/listener.py
2024-01-26 03:40:49 +03:00

49 lines
1.7 KiB
Python

import asyncio
import logging
from orm.notification import Notification
from resolvers.model import NotificationAuthor, NotificationReaction, NotificationShout
from services.db import local_session
from services.rediscache import redis
logger = logging.getLogger('[listener.listen_task] ')
logger.setLevel(logging.DEBUG)
class ServiceMessage:
action: str
entity: str
payload: NotificationReaction | NotificationAuthor | NotificationShout
async def handle_notification(n: ServiceMessage, channel: str):
"""создаеёт новое хранимое уведомление"""
with local_session() as session:
try:
if channel.startswith('follower:'):
author_id = int(channel.split(':')[1])
if isinstance(n.payload, NotificationAuthor):
n.payload.following_id = author_id
n = Notification(action=n.action, entity=n.entity, payload=n.payload)
session.add(n)
session.commit()
except Exception as e:
session.rollback()
logger.error(f'[listener.handle_reaction] error: {str(e)}')
async def listen_task(pattern):
async for message_data, channel in redis.listen(pattern):
try:
if message_data:
notification_message = ServiceMessage(**message_data)
await handle_notification(notification_message, str(channel))
except Exception as e:
logger.error(f'Error processing notification: {str(e)}')
async def notifications_worker():
# Use asyncio.gather to run tasks concurrently
await asyncio.gather(listen_task('follower:*'), listen_task('reaction'), listen_task('shout'))