notifier/services/core.py

43 lines
1.4 KiB
Python
Raw Normal View History

2023-11-28 16:04:45 +00:00
from typing import List, Any
2023-11-30 06:42:41 +00:00
import aiohttp
2023-11-26 10:18:57 +00:00
from settings import API_BASE
2023-11-23 22:58:55 +00:00
2023-11-26 10:18:57 +00:00
headers = {"Content-Type": "application/json"}
2023-11-23 22:58:55 +00:00
2023-11-26 10:18:57 +00:00
async def _request_endpoint(query_name, body):
2023-11-30 06:42:41 +00:00
async with aiohttp.ClientSession() as session:
2023-11-23 22:58:55 +00:00
try:
2023-11-30 06:42:41 +00:00
async with session.post(API_BASE, headers=headers, json=body) as response:
print(f"[services.core] {query_name}: [{response.status}] {len(await response.text())} bytes")
if response.status != 200:
return []
r = await response.json()
if r:
return r.get("data", {}).get(query_name, {})
else:
raise Exception("json response error")
2023-11-23 22:58:55 +00:00
except Exception:
import traceback
traceback.print_exc()
2023-11-28 16:04:45 +00:00
async def get_followed_shouts(author_id: int) -> List[Any]:
2023-11-28 08:33:28 +00:00
query_name = "load_shouts_followed"
2023-11-24 02:18:02 +00:00
query_type = "query"
2023-11-26 10:18:57 +00:00
operation = "GetFollowedShouts"
query_fields = "id slug title"
2023-11-24 02:18:02 +00:00
2023-11-28 08:33:28 +00:00
query = f"""{query_type} {operation}($author_id: Int!, limit: Int, offset: Int) {{
{query_name}(author_id: $author_id, limit: $limit, offset: $offset) {{ {query_fields} }}
}}"""
2023-11-24 02:18:02 +00:00
2023-11-28 08:33:28 +00:00
body = {
"query": query,
"operationName": operation,
2023-11-28 16:04:45 +00:00
"variables": {"author_id": author_id, "limit": 1000, "offset": 0}, # FIXME: too big limit
2023-11-28 08:33:28 +00:00
}
2023-11-24 02:18:02 +00:00
2023-11-26 10:18:57 +00:00
return await _request_endpoint(query_name, body)