inbox/resolvers/messages.py
Untone e0d2ae37eb
All checks were successful
deploy / deploy (push) Successful in 1m9s
debug-load-messages-by-2
2023-10-16 21:44:13 +03:00

153 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from datetime import datetime, timezone
from typing import List
from services.auth import login_required
from services.presence import notify_message
from services.rediscache import redis
from services.schema import mutation
from validators.chat import Message
@mutation.field("createMessage")
@login_required
async def create_message(_, info, chat: str, body: str, reply_to=None):
"""Создание сообщения с телом :body для чата :chat_id с возможным ответом на :reply_to"""
author_id = info.context["author_id"]
# Получение данных чата из Redis
chat_data = await redis.execute("GET", f"chats/{chat}")
print(f"[resolvers.messages] debug chat data: {chat_data}")
# Если данных чата нет, возвращаем ошибку
if not chat_data:
return {"error": "chat is not exist"}
else:
# Преобразование данных чата из строки JSON в словарь
chat_dict = json.loads(chat_data)
print(chat_dict)
# Получение ID следующего сообщения
message_id = await redis.execute("GET", f"chats/{chat_dict['id']}/next_message_id")
message_id = int(message_id) if message_id else 0
# Создание нового сообщения
new_message: Message = {
"chat": chat_dict["id"],
"id": message_id,
"author": author_id,
"body": body,
"createdAt": int(datetime.now(tz=timezone.utc).timestamp()),
"updatedAt": None,
}
# Если есть ответ, добавляем его в сообщение
if reply_to:
new_message["replyTo"] = reply_to
# Обновление времени последнего обновления чата
chat_dict["updatedAt"] = new_message["createdAt"]
# Запись обновленных данных чата обратно в Redis
await redis.execute("SET", f"chats/{chat_dict['id']}", json.dumps(chat_dict))
print(f"[inbox] creating message {new_message}")
# Запись нового сообщения в Redis
await redis.execute(
"SET",
f"chats/{chat_dict['id']}/messages/{message_id}",
json.dumps(new_message),
)
# Добавление ID нового сообщения в список ID сообщений чата
await redis.execute("LPUSH", f"chats/{chat_dict['id']}/message_ids", str(message_id))
# Обновление ID следующего сообщения
await redis.execute("SET", f"chats/{chat_dict['id']}/next_message_id", str(message_id + 1))
# Добавление нового сообщения в список непрочитанных сообщений для каждого участника чата
members = chat_dict["members"]
for member_id in members:
await redis.execute("LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id))
# Отправка уведомления о новом сообщении
await notify_message(new_message, chat_dict["id"])
return {"message": new_message, "error": None}
@mutation.field("updateMessage")
@login_required
async def update_message(_, info, chat_id: str, message_id: int, body: str):
author_id = info.context["author_id"]
chat_str = await redis.execute("GET", f"chats/{chat_id}")
if not chat_str:
return {"error": "chat not exist"}
message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}")
if not message:
return {"error": "message not exist"}
message = json.loads(message)
if message["author"] != author_id:
return {"error": "access denied"}
message["body"] = body
message["updatedAt"] = int(datetime.now(tz=timezone.utc).timestamp())
await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message))
# TODO: use presence service to notify about updated message
return {"message": message, "error": None}
@mutation.field("deleteMessage")
@login_required
async def delete_message(_, info, chat_id: str, message_id: int):
author_id = info.context["author_id"]
chat_str = await redis.execute("GET", f"chats/{chat_id}")
if not chat_str:
return {"error": "chat not exist"}
chat = json.loads(chat_str)
message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}")
if not message_data:
return {"error": "message not exist"}
message: Message = json.loads(message_data)
if message["author"] != author_id:
return {"error": "access denied"}
await redis.execute("LREM", f"chats/{chat_id}/message_ids", 0, str(message_id))
await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}")
members = chat["members"]
for member_id in members:
await redis.execute("LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id))
# TODO: use presence service to notify about deleted message
return {}
@mutation.field("markAsRead")
@login_required
async def mark_as_read(_, info, chat_id: str, messages: List[int]):
author_id = info.context["author_id"]
chat_str = await redis.execute("GET", f"chats/{chat_id}")
if not chat_str:
return {"error": "chat not exist"}
chat = json.loads(chat_str)
members = set(chat["members"])
if author_id not in members:
return {"error": "access denied"}
for message_id in messages:
await redis.execute("LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id))
return {"error": None}