notifier/services/core.py
Untone 2d89a4ec86
All checks were successful
deploy / deploy (push) Successful in 1m16s
update-api
2023-11-28 11:33:28 +03:00

62 lines
1.8 KiB
Python

from typing import List
from httpx import AsyncClient
from orm.author import Author
from orm.shout import Shout
from settings import API_BASE
headers = {"Content-Type": "application/json"}
async def _request_endpoint(query_name, body):
async with AsyncClient() as client:
try:
response = await client.post(API_BASE, headers=headers, json=body)
print(f"[services.core] {query_name}: [{response.status_code}] {len(response.text)} bytes")
if response.status_code != 200:
return []
r = response.json()
if r:
return r.get("data", {}).get(query_name, {})
else:
raise Exception("json response error")
except Exception:
import traceback
traceback.print_exc()
async def get_author(author_id) -> Author:
query_name = "get_author"
query_type = "query"
operation = "GetAuthor"
query_fields = "id slug pic name"
gql = {
"query": query_type + " " + operation + " { " + query_name + " { " + query_fields + "} " + " }",
"operationName": operation,
"variables": None,
}
return await _request_endpoint(query_name, gql)
async def get_followed_shouts(author_id: int) -> List[Shout]:
query_name = "load_shouts_followed"
query_type = "query"
operation = "GetFollowedShouts"
query_fields = "id slug title"
query = f"""{query_type} {operation}($author_id: Int!, limit: Int, offset: Int) {{
{query_name}(author_id: $author_id, limit: $limit, offset: $offset) {{ {query_fields} }}
}}"""
body = {
"query": query,
"operationName": operation,
"variables": {"author_id": author_id, "limit": 1000, "offset": 0}, # FIXME: too big
}
return await _request_endpoint(query_name, body)