import json import logging import time from models.chat import Message from services.auth import login_required from services.presence import notify_message from services.rediscache import redis from services.schema import mutation logger = logging.getLogger("[resolvers.messages] ") logger.setLevel(logging.DEBUG) @mutation.field("create_message") @login_required async def create_message(_, info, chat_id: str, body: str, reply_to=None): """Создание сообщения с телом :body для чата :chat_id с возможным ответом на :reply_to""" author_id = info.context["author_id"] # Получение данных чата из Redis chat_data = await redis.execute("GET", f"chats/{chat_id}") logger.debug(f"chat data: {chat_data}") # Если данных чата нет, возвращаем ошибку if not chat_data: return {"error": "chat is not exist"} elif isinstance(chat_data, str): # Преобразование данных чата из строки JSON в словарь chat_dict = json.loads(chat_data) chat_id = chat_dict["id"] # Получение ID следующего сообщения message_id = await redis.execute( "GET", f"chats/{chat_dict['id']}/next_message_id" ) if isinstance(message_id, str) or isinstance(message_id, int): message_id = int(message_id) if message_id else 0 # Создание нового сообщения new_message: Message = { "chat_id": chat_id, "id": message_id, "created_by": author_id, "body": body, "created_at": int(time.time()), "updated_at": None, "reply_to": None, } # Если есть ответ, добавляем его в сообщение if reply_to: new_message["reply_to"] = reply_to # Обновление времени последнего обновления чата chat_dict["updated_at"] = new_message["created_at"] # Запись обновленных данных чата обратно в Redis await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat_dict)) logger.debug(f"creating message {new_message}") # Запись нового сообщения в Redis await redis.execute( "SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(new_message), ) # Добавление ID нового сообщения в список ID сообщений чата await redis.execute( "LPUSH", f"chats/{chat_id}/message_ids", str(message_id) ) # Обновление ID следующего сообщения await redis.execute( "SET", f"chats/{chat_id}/next_message_id", str(message_id + 1) ) # Добавление нового сообщения в список непрочитанных сообщений для каждого участника чата members = chat_dict["members"] for member_id in members: await redis.execute( "LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id), ) # Отправка уведомления о новом сообщении new_message["chat_id"] = chat_id await notify_message(new_message, "create") return {"message": new_message, "error": None} return {"error": "cannot create message"} @mutation.field("update_message") @login_required async def update_message(_, info, message): author_id = info.context["author_id"] chat_id = message.get("chat_id") chat_str = "" if chat_id: chat_str = await redis.execute("GET", f"chats/{chat_id}") if not chat_str: return {"error": "chat not exist"} message_id = message.get("id") body = message.get("body") if message_id: message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}") if isinstance(message, str): message = json.loads(message) if message["created_by"] != author_id: return {"error": "access denied"} if body: message["body"] = body message["updated_at"] = int(time.time()) await redis.execute( "SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message) ) # Отправка уведомления message["chat_id"] = chat_id await notify_message(message, "update") return {"message": message, "error": None} return {"message": message, "error": "cannot update"} @mutation.field("delete_message") @login_required async def delete_message(_, info, chat_id: str, message_id: int): author_id = info.context["author_id"] chat_str = await redis.execute("GET", f"chats/{chat_id}") if isinstance(chat_str, str): chat = json.loads(chat_str) message_data = await redis.execute( "GET", f"chats/{chat_id}/messages/{str(message_id)}" ) if isinstance(message_data, str): message: Message = json.loads(message_data) if message["created_by"] != author_id: return {"error": "access denied"} await redis.execute( "LREM", f"chats/{chat_id}/message_ids", 0, str(message_id) ) await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}") members = chat["members"] for member_id in members: await redis.execute( "LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id) ) message["chat_id"] = chat_id await notify_message(message, "delete") return {} @mutation.field("mark_as_read") @login_required async def mark_as_read(_, info, chat_id: str, message_id: int): author_id = info.context["author_id"] chat_str = await redis.execute("GET", f"chats/{chat_id}") if isinstance(chat_str, str): chat = json.loads(chat_str) members = set(chat["members"]) if author_id not in members: return {"error": "access denied"} await redis.execute( "LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id) ) message_data = await redis.execute( "GET", f"chats/{chat_id}/messages/{str(message_id)}" ) if isinstance(message_data, str): message: Message = json.loads(message_data) await notify_message(message, "seen") return {"error": None}