from typing import Any, List import aiohttp from models.member import ChatMember from settings import API_BASE headers = {"Content-Type": "application/json"} async def _request_endpoint(query_name, body) -> Any: async with aiohttp.ClientSession() as session: try: async with session.post(API_BASE, headers=headers, json=body) as response: print(f"[services.core] {query_name}: [{response.status}] {len(await response.text())} bytes") if response.status != 200: return [] r = await response.json() if r: return r.get("data", {}).get(query_name, {}) else: raise Exception("json response error") except Exception: import traceback traceback.print_exc() async def get_all_authors() -> List[ChatMember]: query_name = "authorsAll" query_type = "query" operation = "AuthorsAll" query_fields = "id slug pic name" gql = { "query": query_type + " " + operation + " { " + query_name + " { " + query_fields + " } " + " }", "operationName": operation, "variables": None, } return _request_endpoint(query_name, gql) async def get_my_followed() -> List[ChatMember]: query_name = "get_my_followed" query_type = "query" operation = "GetMyFollowed" query_fields = "id slug pic name" gql = { "query": query_type + " " + operation + " { " + query_name + " { authors {" + query_fields + "} } " + " }", "operationName": operation, "variables": None, } async with AsyncClient() as client: try: response = await client.post(API_BASE, headers=headers, json=gql) print(f"[services.core] {query_name}: [{response.status_code}] {len(response.text)} bytes") if response.status_code != 200: return [] r = response.json() if r: return r.get("data", {}).get(query_name, {}).get("authors", []) except Exception: import traceback traceback.print_exc() return [] async def get_author(author_id: int = None, slug: str = "", user: str = ""): query_name = "get_author(author_id: $author_id, slug: $slug, user: $user)" query_type = "query" operation = "GetAuthor($author_id: Int, $slug: String, $user: String)" query_fields = "id slug pic name" vars = {} if author_id: vars["author_id"] = author_id elif slug: vars["slug"] = slug elif user: vars["user"] = user gql = { "query": query_type + " " + operation + " { " + query_name + " { " + query_fields + "} " + " }", "operationName": operation, "variables": None if vars == {} else vars, } return await _request_endpoint(query_name, gql)