From 8901ded6622a33d1d1ee1dab790f7ece921f54a7 Mon Sep 17 00:00:00 2001 From: Untone Date: Wed, 24 Jan 2024 00:13:14 +0300 Subject: [PATCH] logs-redis-typings-fix --- resolvers/chats.py | 21 ++++++--------- resolvers/load.py | 59 +++++++++++++++++++++++------------------- resolvers/messages.py | 30 +++++++++------------ resolvers/search.py | 13 +++++----- services/auth.py | 1 + services/core.py | 1 - services/rediscache.py | 16 ++++++------ 7 files changed, 67 insertions(+), 74 deletions(-) diff --git a/resolvers/chats.py b/resolvers/chats.py index 343dbc2..4cf710b 100644 --- a/resolvers/chats.py +++ b/resolvers/chats.py @@ -12,6 +12,7 @@ from services.schema import mutation logger = logging.getLogger("[resolvers.chats] ") logger.setLevel(logging.DEBUG) + @mutation.field("update_chat") @login_required async def update_chat(_, info, chat_new: ChatUpdate): @@ -61,19 +62,13 @@ async def create_chat(_, info, title="", members=None): # NOTE: private chats has no title # reuse private chat created before if exists if len(members) == 2 and title == "": - chatdata1 = await redis.execute("SMEMBERS", f"chats_by_author/{members[0]}") - chatdata2 = await redis.execute("SMEMBERS", f"chats_by_author/{members[1]}") - if isinstance(chatdata1, list) and isinstance(chatdata2, list): - chatset1 = set(chatdata1) - chatset2 = set(chatdata2) - - for c in chatset1.intersection(chatset2): - chat_data = await redis.execute("GET", f"chats/{c}") - if isinstance(chat_data, str): - chat = json.loads(chat_data) - if chat["title"] == "": - logger.info("[inbox] createChat found old chat") - return {"chat": chat, "error": "existed"} + chatset1 = await redis.execute("SMEMBERS", f"chats_by_author/{members[0]}") + chatset2 = await redis.execute("SMEMBERS", f"chats_by_author/{members[1]}") + for c in chatset1.intersection(chatset2): + chat = await redis.execute("GET", f"chats/{c}") + if chat["title"] == "": + logger.info("[inbox] createChat found old chat") + return {"chat": chat, "error": "existed"} chat_id = str(uuid.uuid4()) chat: Chat = { diff --git a/resolvers/load.py b/resolvers/load.py index 06bd0fd..4e06bb2 100644 --- a/resolvers/load.py +++ b/resolvers/load.py @@ -21,7 +21,7 @@ async def get_unread_counter(chat_id: str, member_id: int) -> int: # NOTE: not an API handler async def load_messages( chat_id: str, limit: int = 5, offset: int = 0, ids: Optional[List[int]] = None -) -> List[Message | None]: +): """load :limit messages for :chat_id with :offset""" messages = [] try: @@ -32,18 +32,21 @@ async def load_messages( message_ids.extend(mids) if message_ids: message_keys = [f"chats/{chat_id}/messages/{mid}" for mid in message_ids] - messages = (await redis.mget(*message_keys)) or [] - messages = [json.loads(m) if isinstance(m, str) else m for m in messages] - replies = [] - for m in messages: - if m: - reply_to = m.get("reply_to") - if reply_to: - reply_to = int(reply_to) - if reply_to not in message_ids: - replies.append(reply_to) - if replies: - messages += await load_messages(chat_id, offset, limit, replies) + messages = await redis.execute("MGET", *message_keys) + if isinstance(messages, list): + messages = [json.loads(m) if isinstance(m, str) else m for m in messages] + replies = [] + for m in messages: + if m: + reply_to = m.get("reply_to") + if reply_to: + reply_to = int(reply_to) + if reply_to not in message_ids: + replies.append(reply_to) + if replies: + more_messages = await load_messages(chat_id, offset, limit, replies) + if isinstance(more_messages, list): + messages.extend(more_messages) except Exception: import traceback @@ -59,9 +62,10 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Uni chats = [] if author_id: cids = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}") - if isinstance(cids, list): + if isinstance(cids, set): members_online = (await redis.execute("SMEMBERS", "authors-online")) or [] - cids = cids[offset : (offset + limit)] + # TODO: add sort by chat.created_at with in-memory caching chats service + cids = list(cids)[offset : (offset + limit)] lock = asyncio.Lock() if len(cids) == 0: print(f"[resolvers.load] no chats for user with id={author_id}") @@ -93,9 +97,9 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Uni async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): """load :limit messages of :chat_id with :offset""" author_id = info.context["author_id"] - author_chats = (await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id))) - if isinstance(author_chats, list): - author_chats = [c for c in author_chats] + author_chats = await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id)) + if isinstance(author_chats, set): + author_chats = list(author_chats) messages = [] by_chat = by.get("chat") if by_chat in author_chats: @@ -104,12 +108,13 @@ async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): return {"messages": [], "error": "chat not exist"} # everyone's messages in filtered chat messages = await load_messages(by_chat, limit, offset) - return { - "messages": sorted( - [m for m in messages if m and m.get("created_at")], - key=lambda m: m.get("created_at"), - ), - "error": None, - } - else: - return {"error": "Cannot access messages of this chat"} + if isinstance(messages, list): + sorted_messages = [m for m in messages if m and m.get("created_at")] + return { + "messages": sorted( + sorted_messages, + key=lambda m: m.get("created_at"), + ), + "error": None, + } + return {"error": "Cannot get messages of this chat"} diff --git a/resolvers/messages.py b/resolvers/messages.py index d4d2b83..71cf497 100644 --- a/resolvers/messages.py +++ b/resolvers/messages.py @@ -7,6 +7,11 @@ from services.presence import notify_message from services.rediscache import redis from services.schema import mutation +import logging + +logger = logging.getLogger("[resolvers.messages] ") +logger.setLevel(logging.DEBUG) + @mutation.field("create_message") @login_required @@ -16,7 +21,7 @@ async def create_message(_, info, chat_id: str, body: str, reply_to=None): # Получение данных чата из Redis chat_data = await redis.execute("GET", f"chats/{chat_id}") - print(f"[resolvers.messages] debug chat data: {chat_data}") + logger.debug(f"chat data: {chat_data}") # Если данных чата нет, возвращаем ошибку if not chat_data: @@ -38,7 +43,7 @@ async def create_message(_, info, chat_id: str, body: str, reply_to=None): "body": body, "created_at": int(time.time()), "updated_at": None, - "reply_to": None + "reply_to": None, } # Если есть ответ, добавляем его в сообщение @@ -50,7 +55,7 @@ async def create_message(_, info, chat_id: str, body: str, reply_to=None): # Запись обновленных данных чата обратно в Redis await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat_dict)) - print(f"[inbox] creating message {new_message}") + logger.debug(f"creating message {new_message}") # Запись нового сообщения в Redis await redis.execute( @@ -94,9 +99,7 @@ async def update_message(_, info, message): if message_id: message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}") - if not message: - return {"error": "message not exist"} - elif isinstance(message, str): + if isinstance(message, str): message = json.loads(message) if message["created_by"] != author_id: return {"error": "access denied"} @@ -122,15 +125,10 @@ async def delete_message(_, info, chat_id: str, message_id: int): author_id = info.context["author_id"] chat_str = await redis.execute("GET", f"chats/{chat_id}") - if not chat_str: - return {"error": "chat not exist"} - elif isinstance(chat_str, str): + if isinstance(chat_str, str): chat = json.loads(chat_str) - message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}") - if not message_data: - return {"error": "message not exist"} - elif isinstance(message_data, str): + if isinstance(message_data, str): message: Message = json.loads(message_data) if message["created_by"] != author_id: return {"error": "access denied"} @@ -154,8 +152,6 @@ async def mark_as_read(_, info, chat_id: str, message_id: int): author_id = info.context["author_id"] chat_str = await redis.execute("GET", f"chats/{chat_id}") - if not chat_str: - return {"error": "chat not exist"} if isinstance(chat_str, str): chat = json.loads(chat_str) members = set(chat["members"]) @@ -165,9 +161,7 @@ async def mark_as_read(_, info, chat_id: str, message_id: int): await redis.execute("LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id)) message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}") - if not message_data: - return {"error": "message not exist"} - elif isinstance(message_data, str): + if isinstance(message_data, str): message: Message = json.loads(message_data) await notify_message(message, "seen") diff --git a/resolvers/search.py b/resolvers/search.py index b014f50..4e9b426 100644 --- a/resolvers/search.py +++ b/resolvers/search.py @@ -21,7 +21,7 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0 existed_chats = await redis.execute("SMEMBERS", f"/chats_by_author/{author_id}") if isinstance(existed_chats, set): chats_list = list(existed_chats) - for chat_id in chats_list[offset: (offset + limit)]: + for chat_id in chats_list[offset : (offset + limit)]: members_ids = await redis.execute("SMEMBERS", f"/chats/{chat_id}/members") if isinstance(members_ids, set): for member_id in members_ids: @@ -36,7 +36,6 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0 return {"members": list(result), "error": None} - @query.field("search_messages") @login_required async def search_messages( @@ -45,8 +44,7 @@ async def search_messages( messages_set = set([]) author_id = info.context["author_id"] lookup_chats = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}") - if isinstance(lookup_chats, list): - lookup_chats = set(lookup_chats) + if isinstance(lookup_chats, set): # pre-filter lookup chats by_member = by.get("author") @@ -59,7 +57,6 @@ async def search_messages( # load the messages from lookup chats for c in lookup_chats: chat_id = c.decode() - mmm = await load_messages(chat_id, limit, offset) filter_method = None if by_member: filter_method = lambda mx: mx and mx["created_by"] == by_member @@ -70,7 +67,9 @@ async def search_messages( if days_ago: filter_method = lambda mx: mx and (int(time.time()) - mx["created_by"] < days_ago * 24 * 60 * 60) if filter_method: - mmm = list(filter(filter_method, mmm)) - messages_set |= set(mmm) + mmm = await load_messages(chat_id, limit, offset) + if isinstance(mmm, list): + mmm = list(filter(filter_method, mmm)) + messages_set |= set(mmm) return {"messages": sorted(list(messages_set)), "error": None} diff --git a/services/auth.py b/services/auth.py index 6e9cf0d..af652c7 100644 --- a/services/auth.py +++ b/services/auth.py @@ -10,6 +10,7 @@ import logging logger = logging.getLogger("[services.auth] ") logger.setLevel(logging.DEBUG) + async def check_auth(req) -> str | None: token = req.headers.get("Authorization") user_id = "" diff --git a/services/core.py b/services/core.py index a94a623..c0f4c73 100644 --- a/services/core.py +++ b/services/core.py @@ -20,7 +20,6 @@ def _request_endpoint(query_name, body) -> dict: ts2 = time.time() logger.debug(f"{query_name} response in {ts1-ts2} secs: <{response.status_code}> {response.text[:32]}..") - if response.status_code == 200: try: r = response.json() diff --git a/services/rediscache.py b/services/rediscache.py index 1f5f753..50037ce 100644 --- a/services/rediscache.py +++ b/services/rediscache.py @@ -1,6 +1,10 @@ import redis.asyncio as aredis from settings import REDIS_URL +import logging + +logger = logging.getLogger("[services.redis] ") +logger.setLevel(logging.DEBUG) class RedisCache: @@ -19,11 +23,13 @@ class RedisCache: async def execute(self, command, *args, **kwargs): if self._client: try: - print("[redis] " + command + " " + " ".join(args)) + logger.debug(command + " " + " ".join(args)) r = await self._client.execute_command(command, *args, **kwargs) + logger.debug(type(r)) + logger.debug(r) return r except Exception as e: - print(f"[redis] error: {e}") + logger.error(e) async def subscribe(self, *channels): if self._client: @@ -45,12 +51,6 @@ class RedisCache: return await self._client.publish(channel, data) - async def mget(self, key, *keys): - if self._client: - print(f"[redis] MGET {key} {keys}") - return await self._client.mget(key, *keys) - - redis = RedisCache() __all__ = ["redis"]