cached-storage-authors
All checks were successful
deploy / deploy (push) Successful in 1m3s

This commit is contained in:
Untone 2023-12-23 09:11:04 +03:00
parent e81eabd0d0
commit ce02ce0130
5 changed files with 141 additions and 85 deletions

45
main.py
View File

@ -8,42 +8,43 @@ from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.ariadne import AriadneIntegration from sentry_sdk.integrations.ariadne import AriadneIntegration
from sentry_sdk.integrations.redis import RedisIntegration from sentry_sdk.integrations.redis import RedisIntegration
from starlette.applications import Starlette from starlette.applications import Starlette
import asyncio
from services.rediscache import redis from services.rediscache import redis
from services.schema import resolvers from services.schema import resolvers
from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN
from services.core import CacheStorage
import_module("resolvers") import_module("resolvers")
schema = make_executable_schema(load_schema_from_path("inbox.graphql"), resolvers) # type: ignore schema = make_executable_schema(load_schema_from_path("inbox.graphql"), resolvers) # type: ignore
async def start_up(): async def start_up():
await redis.connect()
await CacheStorage.init()
if MODE == "dev": if MODE == "dev":
if exists(DEV_SERVER_PID_FILE_NAME): if not exists(DEV_SERVER_PID_FILE_NAME):
await redis.connect()
return
else:
with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f: with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f:
f.write(str(os.getpid())) f.write(str(os.getpid()))
else: else:
await redis.connect() # startup sentry monitoring services
try:
import sentry_sdk
# startup sentry monitoring services sentry_sdk.init(
try: SENTRY_DSN,
import sentry_sdk enable_tracing=True,
integrations=[
sentry_sdk.init( AriadneIntegration(),
SENTRY_DSN, RedisIntegration(),
enable_tracing=True, AioHttpIntegration(),
integrations=[ ],
AriadneIntegration(), )
RedisIntegration(), except Exception as e:
AioHttpIntegration(), print("[sentry] init error")
], print(e)
)
except Exception as e:
print("[sentry] init error")
print(e)
async def shutdown(): async def shutdown():

View File

@ -5,14 +5,17 @@ from typing import Any, Dict, List, Optional, Union
from models.chat import ChatPayload, Message from models.chat import ChatPayload, Message
from resolvers.chats import create_chat from resolvers.chats import create_chat
from services.auth import login_required from services.auth import login_required
from services.core import get_all_authors from services.core import CacheStorage
from services.rediscache import redis from services.rediscache import redis
from services.schema import query from services.schema import query
async def get_unread_counter(chat_id: str, member_id: int) -> int: async def get_unread_counter(chat_id: str, member_id: int) -> int:
unread = await redis.execute("LLEN", f"chats/{chat_id}/unread/{member_id}") unread = await redis.execute("LLEN", f"chats/{chat_id}/unread/{member_id}")
return unread or 0 if isinstance(unread, int):
return unread
else:
return 0
# NOTE: not an API handler # NOTE: not an API handler
@ -55,9 +58,9 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Uni
chats = [] chats = []
if author_id: if author_id:
cids = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}") cids = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
if cids: if isinstance(cids, list):
members_online = (await redis.execute("SMEMBERS", "authors-online")) or [] members_online = (await redis.execute("SMEMBERS", "authors-online")) or []
cids = list(cids)[offset : (offset + limit)] cids = cids[offset : (offset + limit)]
lock = asyncio.Lock() lock = asyncio.Lock()
if len(cids) == 0: if len(cids) == 0:
print(f"[resolvers.load] no chats for user with id={author_id}") print(f"[resolvers.load] no chats for user with id={author_id}")
@ -65,21 +68,18 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Uni
print(f"[resolvers.load] created chat: {r['chat_id']}") print(f"[resolvers.load] created chat: {r['chat_id']}")
cids.append(r["chat"]["id"]) cids.append(r["chat"]["id"])
authors = get_all_authors()
authors_by_id = {a["id"]: a for a in authors}
for cid in cids: for cid in cids:
async with lock: async with lock:
chat_str: str = await redis.execute("GET", f"chats/{cid}") chat_str = await redis.execute("GET", f"chats/{cid}")
print(f"[resolvers.load] redis GET by {cid}: {chat_str}") if isinstance(chat_str, str):
if chat_str: print(f"[resolvers.load] redis GET by {cid}: {chat_str}")
c: ChatPayload = json.loads(chat_str) c: ChatPayload = json.loads(chat_str)
c["messages"] = await load_messages(cid, 5, 0) c["messages"] = (await load_messages(cid, 5, 0)) or []
c["unread"] = await get_unread_counter(cid, author_id) c["unread"] = await get_unread_counter(cid, author_id)
member_ids = c["members"].copy() member_ids = c["members"].copy()
c["members"] = [] c["members"] = []
for member_id in member_ids: for member_id in member_ids:
a = authors_by_id.get(member_id) a = CacheStorage.authors_by_id.get(member_id)
if a: if a:
a["online"] = a.get("id") in members_online a["online"] = a.get("id") in members_online
c["members"].append(a) c["members"].append(a)
@ -92,9 +92,9 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Uni
async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0):
"""load :limit messages of :chat_id with :offset""" """load :limit messages of :chat_id with :offset"""
author_id = info.context["author_id"] author_id = info.context["author_id"]
author_chats = (await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id))) or [] author_chats = (await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id)))
author_chats = [c for c in author_chats] if isinstance(author_chats, list):
if author_chats: author_chats = [c for c in author_chats]
messages = [] messages = []
by_chat = by.get("chat") by_chat = by.get("chat")
if by_chat in author_chats: if by_chat in author_chats:
@ -105,7 +105,7 @@ async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0):
messages = await load_messages(by_chat, limit, offset) messages = await load_messages(by_chat, limit, offset)
return { return {
"messages": sorted( "messages": sorted(
[m for m in messages if m.get("created_at")], [m for m in messages if m and m.get("created_at")],
key=lambda m: m.get("created_at"), key=lambda m: m.get("created_at"),
), ),
"error": None, "error": None,

View File

@ -1,10 +1,10 @@
import json import json
from datetime import datetime, timedelta, timezone import time
from typing import Any, Dict, List, Union from typing import Any, Dict, List, Union
from resolvers.load import load_messages from resolvers.load import load_messages
from services.auth import login_required from services.auth import login_required
from services.core import get_all_authors from services.core import CacheStorage
from services.rediscache import redis from services.rediscache import redis
from services.schema import query from services.schema import query
@ -18,22 +18,21 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0
author_id = info.context["author_id"] author_id = info.context["author_id"]
authors = get_all_authors()
authors_by_id = {a["id"]: a for a in authors}
existed_chats = await redis.execute("SMEMBERS", f"/chats_by_author/{author_id}") existed_chats = await redis.execute("SMEMBERS", f"/chats_by_author/{author_id}")
if existed_chats: if isinstance(existed_chats, str):
for chat_id in list(json.loads(existed_chats))[offset : (offset + limit)]: chats_list = list(json.loads(existed_chats))
for chat_id in chats_list[offset : (offset + limit)]:
members_ids = await redis.execute("GET", f"/chats/{chat_id}/members") members_ids = await redis.execute("GET", f"/chats/{chat_id}/members")
for member_id in members_ids: if isinstance(members_ids, list):
author = authors_by_id.get(member_id) for member_id in members_ids:
if author: author = CacheStorage.authors_by_id.get(member_id)
if author["name"].startswith(text): if author:
result.add(author) if author["name"].startswith(text):
result.add(author)
more_amount = limit - len(result) more_amount = limit - len(result)
if more_amount > 0: if more_amount > 0:
result.update(authors_by_id.values()[0:more_amount]) result.update(CacheStorage.authors[0:more_amount])
return {"members": list(result), "error": None} return {"members": list(result), "error": None}
@ -42,39 +41,35 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0
async def search_messages( async def search_messages(
_, info, by: Dict[str, Union[str, int]], limit: int, offset: int _, info, by: Dict[str, Union[str, int]], limit: int, offset: int
) -> Dict[str, Union[List[Dict[str, Any]], None]]: ) -> Dict[str, Union[List[Dict[str, Any]], None]]:
author_id = info.context["author_id"]
lookup_chats = set((await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or [])
messages_set = set([]) messages_set = set([])
author_id = info.context["author_id"]
lookup_chats = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
if isinstance(lookup_chats, list):
lookup_chats = set(lookup_chats)
by_member = by.get("author") # pre-filter lookup chats
body_like = by.get("body") by_member = by.get("author")
days_ago = by.get("days")
# pre-filter lookup chats
if by_member:
lookup_chats = filter(
lambda ca: by_member in ca["members"],
list(lookup_chats),
)
# load the messages from lookup chats
for c in lookup_chats:
chat_id = c.decode()
mmm = await load_messages(chat_id, limit, offset)
if by_member: if by_member:
mmm = list(filter(lambda mx: mx["author"] == by_member, mmm)) lookup_chats = filter(
if body_like: lambda ca: by_member in ca["members"],
mmm = list(filter(lambda mx: body_like in mx["body"], mmm)) list(lookup_chats),
if days_ago:
mmm = list(
filter(
lambda msg: int(datetime.now(tz=timezone.utc)) - int(msg["created_at"])
< int(timedelta(days=days_ago)),
mmm,
)
) )
messages_set.union(set(mmm)) # load the messages from lookup chats
for c in lookup_chats:
chat_id = c.decode()
mmm = await load_messages(chat_id, limit, offset)
filter_method = None
if by_member:
filter_method = lambda mx: mx and mx["created_by"] == by_member
body_like = by.get("body") or ""
if isinstance(body_like, str):
filter_method = lambda mx: mx and body_like in mx["body"]
days_ago = int(by.get("days") or "0")
if days_ago:
filter_method = lambda mx: mx and (int(time.time()) - mx["created_by"] < days_ago * 24 * 60 * 60)
if filter_method:
mmm = list(filter(filter_method, mmm))
messages_set |= set(mmm)
messages_sorted = sorted(list(messages_set)) return {"messages": sorted(list(messages_set)), "error": None}
return {"messages": messages_sorted, "error": None}

View File

@ -1,6 +1,8 @@
from typing import List from typing import List
import asyncio
import requests import requests
from datetime import datetime, timezone, timedelta
from models.member import ChatMember from models.member import ChatMember
from settings import API_BASE from settings import API_BASE
@ -8,7 +10,7 @@ from settings import API_BASE
def _request_endpoint(query_name, body) -> dict: def _request_endpoint(query_name, body) -> dict:
print(f"[services.core] requesting {query_name}...") print(f"[services.core] requesting {query_name}...")
response = requests.post(API_BASE, headers={"Content-Type": "application/json"}, json=body) response = requests.post(API_BASE, headers={"Content-Type": "application/json"}, json=body)
print(f"[services.core] {query_name} response: <{response.status_code}> {response.text[:65]}..") print(f"[services.core] {query_name} response: <{response.status_code}> {response.text}..")
if response.status_code == 200: if response.status_code == 200:
try: try:
@ -55,3 +57,61 @@ def get_my_followed() -> List[ChatMember]:
result = _request_endpoint(query_name, gql) result = _request_endpoint(query_name, gql)
return result.get("authors", []) return result.get("authors", [])
class CacheStorage:
lock = asyncio.Lock()
period = 5 * 60 # every 5 mins
client = None
authors = []
authors_by_user = {}
authors_by_id = {}
@staticmethod
async def init():
"""graphql client connection using permanent token"""
self = CacheStorage
async with self.lock:
task = asyncio.create_task(self.worker())
print(task)
@staticmethod
async def update_authors():
self = CacheStorage
async with self.lock:
result = get_all_authors()
print(f"[services.core] loaded {len(result)}")
if result:
CacheStorage.authors = result
for a in result:
self.authors_by_user[a.user] = a
self.authors_by_id[a.id] = a
@staticmethod
async def worker():
"""async task worker"""
failed = 0
self = CacheStorage
while True:
try:
print("[services.core] - updating views...")
await self.update_authors()
failed = 0
except Exception:
failed += 1
print("[services.core] - update failed #%d, wait 10 seconds" % failed)
if failed > 3:
print("[services.core] - not trying to update anymore")
break
if failed == 0:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat())
print(
"[services.core] ⎩ next update: %s"
% (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])
)
await asyncio.sleep(self.period)
else:
await asyncio.sleep(10)
print("[services.core] - trying to update data again")

View File

@ -2,8 +2,8 @@ from os import environ
PORT = 80 PORT = 80
REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1" REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1"
API_BASE = environ.get("API_BASE") or "https://v2.discours.io/" API_BASE = environ.get("API_BASE") or "https://core.discours.io/"
AUTH_URL = environ.get("AUTH_URL") or "https://v2.discours.io/" AUTH_URL = environ.get("AUTH_URL") or "https://auth.discours.io/"
MODE = environ.get("MODE") or "production" MODE = environ.get("MODE") or "production"
SENTRY_DSN = environ.get("SENTRY_DSN") SENTRY_DSN = environ.get("SENTRY_DSN")
DEV_SERVER_PID_FILE_NAME = "dev-server.pid" DEV_SERVER_PID_FILE_NAME = "dev-server.pid"