from typing import List import asyncio import requests from datetime import datetime, timezone, timedelta from models.member import ChatMember from settings import API_BASE import time import logging logger = logging.getLogger("[services.core] ") logger.setLevel(logging.DEBUG) def _request_endpoint(query_name, body) -> dict: ts1 = time.time() logger.debug(f"requesting {query_name}...") response = requests.post(API_BASE, headers={"Content-Type": "application/json"}, json=body) ts2 = time.time() logger.debug(f"{query_name} response in {ts1-ts2} secs: <{response.status_code}> {response.text[:32]}..") if response.status_code == 200: try: r = response.json() result = r.get("data", {}).get(query_name, {}) if result: logger.info(f"entries amount in result: {len(result)} ") return result except ValueError as e: logger.error(f"Error decoding JSON response: {e}") return {} def get_all_authors(): query_name = "get_authors_all" gql = { "query": "query { " + query_name + "{ id slug pic name user } }", "variables": None, } return _request_endpoint(query_name, gql) def get_author_by_user(user: str): operation = "GetAuthorId" query_name = "get_author_id" gql = { "query": f"query {operation}($user: String!) {{ {query_name}(user: $user){{ id }} }}", "operationName": operation, "variables": {"user": user.strip()}, } return _request_endpoint(query_name, gql) def get_my_followed() -> List[ChatMember]: query_name = "get_my_followed" gql = { "query": "query { " + query_name + " { authors { id slug pic name } } }", "variables": None, } result = _request_endpoint(query_name, gql) return result.get("authors", []) class CacheStorage: lock = asyncio.Lock() period = 5 * 60 # every 5 mins client = None authors = [] authors_by_user = {} authors_by_id = {} @staticmethod async def init(): """graphql client connection using permanent token""" self = CacheStorage async with self.lock: task = asyncio.create_task(self.worker()) logger.info(task) @staticmethod async def update_authors(): self = CacheStorage async with self.lock: result = get_all_authors() logger.info(f"cache loaded {len(result)}") if result: CacheStorage.authors = result for a in result: user_id = a.get("user") author_id = str(a.get("id")) self.authors_by_user[user_id] = a self.authors_by_id[author_id] = a @staticmethod async def worker(): """async task worker""" failed = 0 self = CacheStorage while True: try: logger.info(" - updating profiles data...") await self.update_authors() failed = 0 except Exception as er: failed += 1 logger.error(f"{er} - update failed #{failed}, wait 10 seconds") if failed > 3: logger.error(" - not trying to update anymore") import traceback traceback.print_exc() break if failed == 0: when = datetime.now(timezone.utc) + timedelta(seconds=self.period) t = format(when.astimezone().isoformat()) logger.info(" ⎩ next update: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])) await asyncio.sleep(self.period) else: await asyncio.sleep(10) logger.info(" - trying to update data again")