import json from datetime import datetime, timezone from typing import List from services.auth import login_required from services.presence import notify_message from services.rediscache import redis from services.schema import mutation from validators.chat import Message @mutation.field("createMessage") @login_required async def create_message(_, info, chat: str, body: str, reply_to=None): """ create message with :body for :chat_id replying to :reply_to optionally """ author_id = info.context["author_id"] chat_data = await redis.execute("GET", f"chats/{chat}") print(f"[resolvers.messages] debug chat data: {chat_data}") if not chat_data: return {"error": "chat is not exist"} else: chat_dict = json.loads(chat_data) print(chat_dict) message_id = await redis.execute("GET", f"chats/{chat_dict['id']}/next_message_id") message_id = int(message_id) if message_id else 0 new_message: Message = { "chat": chat_dict["id"], "id": message_id, "author": author_id, "body": body, "createdAt": int(datetime.now(tz=timezone.utc).timestamp()), "updatedAt": None, } if reply_to: new_message["replyTo"] = reply_to chat_dict["updatedAt"] = new_message["createdAt"] await redis.execute("SET", f"chats/{chat_dict['id']}", json.dumps(chat)) print(f"[inbox] creating message {new_message}") await redis.execute( "SET", f"chats/{chat_dict['id']}/messages/{message_id}", json.dumps(new_message), ) await redis.execute("LPUSH", f"chats/{chat_dict['id']}/message_ids", str(message_id)) await redis.execute("SET", f"chats/{chat_dict['id']}/next_message_id", str(message_id + 1)) members = chat_dict["members"] for member_id in members: await redis.execute("LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id)) # result = FollowingResult("NEW", "chat", new_message) # await FollowingManager.push("chat", result) # subscribe on updates await notify_message(new_message, chat_dict["id"]) return {"message": new_message, "error": None} @mutation.field("updateMessage") @login_required async def update_message(_, info, chat_id: str, message_id: int, body: str): author_id = info.context["author_id"] chat = await redis.execute("GET", f"chats/{chat_id}") if not chat: return {"error": "chat not exist"} message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}") if not message: return {"error": "message not exist"} message = json.loads(message) if message["author"] != author_id: return {"error": "access denied"} message["body"] = body message["updatedAt"] = int(datetime.now(tz=timezone.utc).timestamp()) await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message)) # result = FollowingResult("UPDATE", "chat", new_message) # await FollowingManager.push("chat", result) # TODO: subscribe on updates return {"message": message, "error": None} @mutation.field("deleteMessage") @login_required async def delete_message(_, info, chat_id: str, message_id: int): author_id = info.context["author_id"] chat = await redis.execute("GET", f"chats/{chat_id}") if not chat: return {"error": "chat not exist"} chat = json.loads(chat) message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}") if not message_data: return {"error": "message not exist"} message: Message = json.loads(message_data) if message["author"] != author_id: return {"error": "access denied"} await redis.execute("LREM", f"chats/{chat_id}/message_ids", 0, str(message_id)) await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}") members = chat["members"] for member_id in members: await redis.execute("LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id)) # result = FollowingResult("DELETED", "chat", message) # await FollowingManager.push(result) # TODO ? return {} @mutation.field("markAsRead") @login_required async def mark_as_read(_, info, chat_id: str, messages: List[int]): author_id = info.context["author_id"] chat = await redis.execute("GET", f"chats/{chat_id}") if not chat: return {"error": "chat not exist"} chat = json.loads(chat) members = set(chat["members"]) if author_id not in members: return {"error": "access denied"} for message_id in messages: await redis.execute("LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id)) return {"error": None}