from orm.notification import Notification from resolvers.model import NotificationReaction, NotificationAuthor, NotificationShout from services.db import local_session from services.rediscache import redis import asyncio import logging logger = logging.getLogger("[listener.listen_task] ") logger.setLevel(logging.DEBUG) class ServiceMessage: action: str entity: str payload: NotificationReaction | NotificationAuthor | NotificationShout async def handle_notification(n: ServiceMessage, channel: str): """создаеёт новое хранимое уведомление""" with local_session() as session: try: if channel.startswith("follower:"): author_id = int(channel.split(":")[1]) if isinstance(n.payload, NotificationAuthor): n.payload.following_id = author_id n = Notification(action=n.action, entity=n.entity, payload=n.payload) session.add(n) session.commit() except Exception as e: session.rollback() logger.error(f"[listener.handle_reaction] error: {str(e)}") async def listen_task(pattern): async for message_data, channel in redis.listen(pattern): try: if message_data: notification_message = ServiceMessage(**message_data) await handle_notification(notification_message, str(channel)) except Exception as e: logger.error(f"Error processing notification: {str(e)}") async def notifications_worker(): # Use asyncio.gather to run tasks concurrently await asyncio.gather(listen_task("follower:*"), listen_task("reaction"), listen_task("shout"))