import time from typing import Any, Dict, List, Union from resolvers.load import load_messages from services.auth import login_required from services.core import CacheStorage from services.rediscache import redis from services.schema import query @query.field('search_recipients') @login_required async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0): result = set() # TODO: maybe redis scan? author_id = info.context['author_id'] existed_chats = await redis.execute('SMEMBERS', f'/chats_by_author/{author_id}') if isinstance(existed_chats, set): chats_list = list(existed_chats) for chat_id in chats_list[offset : (offset + limit)]: members_ids = await redis.execute('SMEMBERS', f'/chats/{chat_id}/members') if isinstance(members_ids, set): for member_id in members_ids: author = CacheStorage.authors_by_id.get(str(member_id)) if author: if author['name'].startswith(text): result.add(author) more_amount = limit - len(result) if more_amount > 0: result.update(CacheStorage.authors[0:more_amount]) return {'members': list(result), 'error': None} @query.field('search_messages') @login_required async def search_messages( _, info, by: Dict[str, Union[str, int]], limit: int, offset: int ) -> Dict[str, Union[List[Dict[str, Any]], None]]: messages_set = set() author_id = info.context['author_id'] lookup_chats = await redis.execute('SMEMBERS', f'chats_by_author/{author_id}') if isinstance(lookup_chats, set): # pre-filter lookup chats by_member = by.get('author') if by_member: lookup_chats = filter( lambda ca: by_member in ca['members'], list(lookup_chats), ) # load the messages from lookup chats for c in lookup_chats: chat_id = c.decode() fltr = None now = int(time.time()) if by_member: fltr = lambda mx: mx and mx['created_by'] == by_member # noqa E731 body_like = by.get('body') or '' if isinstance(body_like, str): fltr = lambda mx: mx and body_like in mx['body'] # noqa E731 days_ago = int(by.get('days') or '0') if days_ago: ts = days_ago * 24 * 60 * 60 fltr = lambda mx: mx and now - mx['created_by'] < ts # noqa E731 if fltr: mmm = await load_messages(chat_id, limit, offset) if isinstance(mmm, list): mmm = list(filter(fltr, mmm)) messages_set |= set(mmm) return {'messages': sorted(messages_set), 'error': None}