cache-fixed
Some checks failed
Deploy to core / deploy (push) Failing after 15m39s

This commit is contained in:
Untone 2024-02-21 17:37:58 +03:00
parent 63f5a708b7
commit e69046a1f8
6 changed files with 63 additions and 44 deletions

View File

@ -7,7 +7,7 @@ from ariadne.asgi import GraphQL
from starlette.applications import Starlette from starlette.applications import Starlette
from starlette.routing import Route from starlette.routing import Route
from resolvers.author_events import update_cache, scheduled_cache_update from services.follows import FollowsCached
from services.rediscache import redis from services.rediscache import redis
from services.schema import resolvers from services.schema import resolvers
from services.search import search_service from services.search import search_service
@ -37,8 +37,7 @@ app = Starlette(
on_startup=[ on_startup=[
redis.connect, redis.connect,
ViewedStorage.init, ViewedStorage.init,
update_cache, FollowsCached.worker,
scheduled_cache_update,
search_service.info, search_service.info,
# start_sentry, # start_sentry,
start, start,

View File

@ -22,7 +22,6 @@ opensearch-py = "^2.4.2"
httpx = "^0.26.0" httpx = "^0.26.0"
dogpile-cache = "^1.3.1" dogpile-cache = "^1.3.1"
colorlog = "^6.8.2" colorlog = "^6.8.2"
aiocron = "^1.8"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
ruff = "^0.2.1" ruff = "^0.2.1"

View File

@ -1,3 +1,4 @@
import json
import time import time
from typing import List from typing import List
@ -192,16 +193,18 @@ async def get_author(_, _info, slug="", author_id=None):
async def get_author_by_user_id(user_id: str): async def get_author_by_user_id(user_id: str):
redis_key = f"user:{user_id}:author" redis_key = f"user:{user_id}:author"
res = await redis.hget(redis_key) res = await redis.execute('GET', redis_key)
if isinstance(res, dict) and res.get("id"): if isinstance(res, str):
logger.debug(f"got cached author: {res}") author = json.loads(res)
return res if author.get("id"):
logger.debug(f"got cached author: {author}")
return author
logger.info(f"getting author id for {user_id}") logger.info(f"getting author id for {user_id}")
q = select(Author).filter(Author.user == user_id) q = select(Author).filter(Author.user == user_id)
author = await load_author_with_stats(q) author = await load_author_with_stats(q)
await redis.hset(redis_key, **author.dict()) if author:
await redis.execute('set', redis_key, json.dumps(author.dict()))
return author return author

View File

@ -1,3 +1,4 @@
import json
from typing import List from typing import List
from sqlalchemy import select, or_ from sqlalchemy import select, or_
@ -131,13 +132,14 @@ def query_follows(user_id: str):
async def get_follows_by_user_id(user_id: str): async def get_follows_by_user_id(user_id: str):
if user_id: if user_id:
redis_key = f"user:{user_id}:follows" redis_key = f"user:{user_id}:follows"
res = await redis.hget(redis_key) res = await redis.execute('GET', redis_key)
if res: if isinstance(res, str):
return res follows = json.loads(res)
return follows
logger.debug(f"getting follows for {user_id}") logger.debug(f"getting follows for {user_id}")
follows = query_follows(user_id) follows = query_follows(user_id)
await redis.hset(redis_key, **follows) await redis.execute('SET', redis_key, json.dumps(follows))
return follows return follows

View File

@ -1,37 +1,22 @@
import asyncio import asyncio
from aiocron import crontab
from sqlalchemy import select, event from sqlalchemy import select, event
import json
from orm.author import Author, AuthorFollower from orm.author import Author, AuthorFollower
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from resolvers.author import add_author_stat_columns, get_author_follows from resolvers.author import add_author_stat_columns, get_author_follows
from resolvers.topic import add_topic_stat_columns from resolvers.topic import add_topic_stat_columns
from services.logger import root_logger as logger
from services.db import local_session from services.db import local_session
from services.rediscache import redis from services.rediscache import redis
from services.viewed import ViewedStorage from services.viewed import ViewedStorage
async def update_cache():
with local_session() as session:
for author in session.query(Author).all():
redis_key = f"user:{author.user}:author"
await redis.hset(redis_key, **vars(author))
follows = await get_author_follows(None, None, user=author.user)
if isinstance(follows, dict):
redis_key = f"user:{author.user}:follows"
await redis.hset(redis_key, **follows)
@crontab("*/10 * * * *", func=update_cache)
async def scheduled_cache_update():
pass
@event.listens_for(Author, "after_insert") @event.listens_for(Author, "after_insert")
@event.listens_for(Author, "after_update") @event.listens_for(Author, "after_update")
def after_author_update(mapper, connection, target): def after_author_update(mapper, connection, target):
redis_key = f"user:{target.user}:author" redis_key = f"user:{target.user}:author"
asyncio.create_task(redis.hset(redis_key, **vars(target))) asyncio.create_task(redis.execute('set', redis_key, json.dumps(vars(target))))
@event.listens_for(TopicFollower, "after_insert") @event.listens_for(TopicFollower, "after_insert")
@ -64,8 +49,10 @@ def after_author_follower_delete(mapper, connection, target):
async def update_follows_for_user(connection, user_id, entity_type, entity, is_insert): async def update_follows_for_user(connection, user_id, entity_type, entity, is_insert):
redis_key = f"user:{user_id}:follows" redis_key = f"user:{user_id}:follows"
follows = await redis.hget(redis_key) follows_str = await redis.get(redis_key)
if not follows: if follows_str:
follows = json.loads(follows_str)
else:
follows = { follows = {
"topics": [], "topics": [],
"authors": [], "authors": [],
@ -80,8 +67,7 @@ async def update_follows_for_user(connection, user_id, entity_type, entity, is_i
follows[f"{entity_type}s"] = [ follows[f"{entity_type}s"] = [
e for e in follows[f"{entity_type}s"] if e["id"] != entity.id e for e in follows[f"{entity_type}s"] if e["id"] != entity.id
] ]
await redis.execute('set', redis_key, json.dumps(follows))
await redis.hset(redis_key, **vars(follows))
async def handle_author_follower_change(connection, author_id, follower_id, is_insert): async def handle_author_follower_change(connection, author_id, follower_id, is_insert):
@ -126,3 +112,40 @@ async def handle_topic_follower_change(connection, topic_id, follower_id, is_ins
await update_follows_for_user( await update_follows_for_user(
connection, follower.user, "topic", topic, is_insert connection, follower.user, "topic", topic, is_insert
) )
class FollowsCached:
lock = asyncio.Lock()
@staticmethod
async def update_cache():
with local_session() as session:
for author in session.query(Author).all():
if isinstance(author, Author):
redis_key = f"user:{author.user}:author"
author_dict = author.dict()
if isinstance(author_dict, dict):
filtered_author_dict = {k: v for k, v in author_dict.items() if v is not None}
await redis.execute('set', redis_key, json.dumps(filtered_author_dict))
follows = await get_author_follows(None, None, user=author.user)
if isinstance(follows, dict):
filtered_follows = {k: v for k, v in follows.items() if v is not None}
redis_key = f"user:{author.user}:follows"
await redis.execute('set', redis_key, json.dumps(filtered_follows))
@staticmethod
async def worker():
"""Асинхронная задача обновления"""
self = FollowsCached
while True:
try:
await self.update_cache()
await asyncio.sleep(10 * 60 * 60)
except asyncio.CancelledError:
# Handle cancellation due to SIGTERM
logger.info("Cancellation requested. Cleaning up...")
# Perform any necessary cleanup before exiting the loop
break
except Exception as exc:
logger.error(exc)

View File

@ -52,13 +52,6 @@ class RedisCache:
return return
await self._client.publish(channel, data) await self._client.publish(channel, data)
async def hset(self, hash_key: str, fields_values: dict):
return await self._client.hset(hash_key, mapping=fields_values)
async def hget(self, hash_key: str):
return await self._client.hget(hash_key)
redis = RedisCache() redis = RedisCache()