import asyncio import json import logging from typing import Any, Dict, List, Optional, Union from models.chat import ChatPayload from resolvers.chats import create_chat from services.auth import login_required from services.core import CacheStorage from services.rediscache import redis from services.schema import query logger = logging.getLogger('[resolvers.load] ') logger.setLevel(logging.DEBUG) async def get_unread_counter(chat_id: str, member_id: int) -> int: unread = await redis.execute('LLEN', f'chats/{chat_id}/unread/{member_id}') if isinstance(unread, int): return unread else: return 0 # NOTE: not an API handler async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids: Optional[List[int]] = None): """load :limit messages for :chat_id with :offset""" logger.info('load_messages') messages = [] try: message_ids = [] + (ids or []) if limit: mids = await redis.execute('LRANGE', f'chats/{chat_id}/message_ids', offset, offset + limit) if isinstance(mids, list): message_ids.extend(mids) if message_ids: message_keys = [f'chats/{chat_id}/messages/{mid}' for mid in message_ids] messages = await redis.execute('MGET', *message_keys) if isinstance(messages, list): messages = [json.loads(m) if isinstance(m, str) else m for m in messages] replies = [] for m in messages: if m: reply_to = m.get('reply_to') if reply_to: reply_to = int(reply_to) if reply_to not in message_ids: replies.append(reply_to) if replies: more_messages = await load_messages(chat_id, offset, limit, replies) if isinstance(more_messages, list): messages.extend(more_messages) except Exception as ex: logger.error(ex) import traceback traceback.print_exc() return messages @query.field('load_chats') @login_required async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Union[List[Dict[str, Any]], None]]: """load :limit chats of current user with :offset""" logger.info('load_chats') author_id = info.context['author_id'] chats = [] try: if author_id: logger.debug('got author', author_id) cids = await redis.execute('SMEMBERS', f'chats_by_author/{author_id}') logger.debug('got cids', cids) members_online = (await redis.execute('SMEMBERS', 'authors-online')) or [] # to show online status logger.debug('members online', members_online) if isinstance(cids, set): # TODO: add sort by chat.created_at with in-memory caching chats service cids = list(cids)[offset : (offset + limit)] lock = asyncio.Lock() if len(cids) == 0: logger.debug(f'no chats for user with id={author_id}') r = await create_chat(None, info, members=[2]) # member with id = 2 is discours logger.debug(f"created chat: {r['chat_id']}") cids.append(r['chat']['id']) logger.debug(f"getting data for {len(cids)} user's chats") for cid in cids: async with lock: chat_str = await redis.execute('GET', f'chats/{cid}') if isinstance(chat_str, str): logger.debug(f'redis GET by {cid}: {chat_str}') c: ChatPayload = json.loads(chat_str) c['messages'] = (await load_messages(cid, 5, 0)) or [] c['unread'] = await get_unread_counter(cid, author_id) member_ids = c['members'].copy() c['members'] = [] for member_id in member_ids: a = CacheStorage.authors_by_id.get(str(member_id)) if a: a['online'] = a.get('id') in members_online c['members'].append(a) else: logger.error(f'cant find author by id {member_id}') chats.append(c) else: logger.error(f'cant find chat by id {cid}') except Exception: import traceback traceback.print_exc() return {'chats': chats, 'error': None} @query.field('load_messages_by') @login_required async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): """load :limit messages of :chat_id with :offset""" logger.info('load_messages_by') author_id = info.context['author_id'] author_chats = await redis.execute('SMEMBERS', f'chats_by_author/{author_id}') try: if isinstance(author_chats, set): author_chats = list(author_chats) messages = [] by_chat = by.get('chat') if by_chat in author_chats: chat = await redis.execute('GET', f'chats/{by_chat}') if not chat: return {'messages': [], 'error': 'chat not exist'} # everyone's messages in filtered chat messages = await load_messages(by_chat, limit, offset) if isinstance(messages, list): sorted_messages = [m for m in messages if m and m.get('created_at')] return { 'messages': sorted( sorted_messages, key=lambda m: m.get('created_at'), ), 'error': None, } except Exception as exc: logger.error(exc) import traceback traceback.print_exc() return {'error': 'Cannot get messages of this chat'}