import asyncio import logging from datetime import datetime, timedelta, timezone from typing import List import requests from models.member import ChatMember from settings import API_BASE logger = logging.getLogger('[services.core] ') logger.setLevel(logging.DEBUG) # TODO: rewrite to orm usage async def _request_endpoint(query_name, body) -> Any: async with aiohttp.ClientSession() as session: async with session.post(API_BASE, headers=headers, json=body) as response: print(f'[services.core] {query_name} HTTP Response {response.status} {await response.text()}') if response.status == 200: r = await response.json() if r: return r.get('data', {}).get(query_name, {}) return [] async def get_followed_shouts(author_id: int): query_name = 'load_shouts_followed' operation = 'GetFollowedShouts' query = f"""query {operation}($author_id: Int!, limit: Int, offset: Int) {{ {query_name}(author_id: $author_id, limit: $limit, offset: $offset) {{ id slug title }} }}""" gql = { 'query': query, 'operationName': operation, 'variables': {'author_id': author_id, 'limit': 1000, 'offset': 0}, # FIXME: too big limit } return await _request_endpoint(query_name, gql) async def get_shout(shout_id): query_name = 'get_shout' operation = 'GetShout' query = f"""query {operation}($slug: String, $shout_id: Int) {{ {query_name}(slug: $slug, shout_id: $shout_id) {{ id slug title authors {{ id slug name pic }} }} }}""" gql = {'query': query, 'operationName': operation, 'variables': {'slug': None, 'shout_id': shout_id}} return await _request_endpoint(query_name, gql) def get_all_authors(): query_name = 'get_authors_all' gql = { 'query': 'query { ' + query_name + '{ id slug pic name user } }', 'variables': None, } return _request_endpoint(query_name, gql) def get_author_by_user(user: str): operation = 'GetAuthorId' query_name = 'get_author_id' gql = { 'query': f'query {operation}($user: String!) {{ {query_name}(user: $user){{ id }} }}', # noqa E201, E202 'operationName': operation, 'variables': {'user': user.strip()}, } return _request_endpoint(query_name, gql) def get_my_followed() -> List[ChatMember]: query_name = 'get_my_followed' gql = { 'query': 'query { ' + query_name + ' { authors { id slug pic name } } }', 'variables': None, } result = _request_endpoint(query_name, gql) return result.get('authors', []) class CacheStorage: lock = asyncio.Lock() period = 5 * 60 # every 5 mins client = None authors = [] authors_by_user = {} authors_by_id = {} @staticmethod async def init(): """graphql client connection using permanent token""" self = CacheStorage async with self.lock: task = asyncio.create_task(self.worker()) logger.info(task) @staticmethod async def update_authors(): self = CacheStorage async with self.lock: result = get_all_authors() logger.info(f'cache loaded {len(result)}') if result: CacheStorage.authors = result for a in result: user_id = a.get('user') author_id = str(a.get('id')) self.authors_by_user[user_id] = a self.authors_by_id[author_id] = a @staticmethod async def worker(): """async task worker""" failed = 0 self = CacheStorage while True: try: logger.info(' - updating profiles data...') await self.update_authors() failed = 0 except Exception as er: failed += 1 logger.error(f'{er} - update failed #{failed}, wait 10 seconds') if failed > 3: logger.error(' - not trying to update anymore') import traceback traceback.print_exc() break if failed == 0: when = datetime.now(timezone.utc) + timedelta(seconds=self.period) t = format(when.astimezone().isoformat()) logger.info(' ⎩ next update: %s' % (t.split('T')[0] + ' ' + t.split('T')[1].split('.')[0])) await asyncio.sleep(self.period) else: await asyncio.sleep(10) logger.info(' - trying to update data again')