diff --git a/main.py b/main.py index 44ad110a..675598dd 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ from ariadne.asgi import GraphQL from starlette.applications import Starlette from starlette.routing import Route +from resolvers.author_events import update_cache, scheduled_cache_update from services.rediscache import redis from services.schema import resolvers from services.search import search_service @@ -36,6 +37,8 @@ app = Starlette( on_startup=[ redis.connect, ViewedStorage.init, + update_cache, + scheduled_cache_update, search_service.info, # start_sentry, start, diff --git a/orm/author.py b/orm/author.py index ab2e7558..94670452 100644 --- a/orm/author.py +++ b/orm/author.py @@ -1,11 +1,6 @@ import time -import asyncio from sqlalchemy import JSON, Boolean, Column, ForeignKey, Integer, String -from sqlalchemy import event, select - -from services.rediscache import redis -from orm.topic import Topic, TopicFollower from services.db import Base @@ -43,77 +38,3 @@ class Author(Base): last_seen = Column(Integer, nullable=False, default=lambda: int(time.time())) updated_at = Column(Integer, nullable=False, default=lambda: int(time.time())) deleted_at = Column(Integer, nullable=True, comment="Deleted at") - - -@event.listens_for(Author, "after_insert") -@event.listens_for(Author, "after_update") -def after_author_update(mapper, connection, target): - redis_key = f"user:{target.user}:author" - asyncio.create_task(redis.execute("HSET", redis_key, **vars(target))) - - -async def update_follows_for_user(connection, user_id, entity_type, entity, is_insert): - redis_key = f"user:{user_id}:follows" - follows = await redis.execute("HGET", redis_key) - if not follows: - follows = { - "topics": [], - "authors": [], - "communities": [ - {"slug": "discours", "name": "Дискурс", "id": 1, "desc": ""} - ], - } - if is_insert: - follows[f"{entity_type}s"].append(entity) - else: - # Remove the entity from follows - follows[f"{entity_type}s"] = [ - e for e in follows[f"{entity_type}s"] if e["id"] != entity.id - ] - - await redis.execute("HSET", redis_key, **vars(follows)) - - -async def handle_author_follower_change(connection, author_id, follower_id, is_insert): - async with connection.begin() as conn: - author = await conn.execute(select(Author).filter(Author.id == author_id)).first() - follower = await conn.execute(select(Author).filter(Author.id == follower_id)).first() - if follower and author: - await update_follows_for_user( - connection, follower.user, "author", author, is_insert - ) - - -async def handle_topic_follower_change(connection, topic_id, follower_id, is_insert): - topic = connection.execute(select(Topic).filter(Topic.id == topic_id)).first() - follower = connection.execute( - select(Author).filter(Author.id == follower_id) - ).first() - if follower and topic: - await update_follows_for_user( - connection, follower.user, "topic", topic, is_insert - ) - - -@event.listens_for(TopicFollower, "after_insert") -def after_topic_follower_insert(mapper, connection, target): - asyncio.create_task(handle_topic_follower_change(connection, target.topic, target.follower, True)) - - -@event.listens_for(TopicFollower, "after_delete") -def after_topic_follower_delete(mapper, connection, target): - asyncio.create_task(handle_topic_follower_change(connection, target.topic, target.follower, False)) - - -@event.listens_for(AuthorFollower, "after_insert") -def after_author_follower_insert(mapper, connection, target): - asyncio.create_task(handle_author_follower_change( - connection, target.author, target.follower, True - )) - - -@event.listens_for(AuthorFollower, "after_delete") -def after_author_follower_delete(mapper, connection, target): - asyncio.create_task(handle_author_follower_change( - connection, target.author, target.follower, False - )) diff --git a/pyproject.toml b/pyproject.toml index 7c17ae01..606d31b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ opensearch-py = "^2.4.2" httpx = "^0.26.0" dogpile-cache = "^1.3.1" colorlog = "^6.8.2" +aiocron = "^1.8" [tool.poetry.group.dev.dependencies] ruff = "^0.2.1" diff --git a/resolvers/author.py b/resolvers/author.py index bb6b1ef1..ed3f6482 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -192,7 +192,7 @@ async def get_author(_, _info, slug="", author_id=None): async def get_author_by_user_id(user_id: str): redis_key = f"user:{user_id}:author" - res = await redis.execute("HGET", redis_key) + res = await redis.hget(redis_key) if isinstance(res, dict) and res.get("id"): logger.debug(f"got cached author: {res}") return res @@ -200,7 +200,7 @@ async def get_author_by_user_id(user_id: str): logger.info(f"getting author id for {user_id}") q = select(Author).filter(Author.user == user_id) author = await load_author_with_stats(q) - await redis.execute("HSET", redis_key, **author.dict()) + await redis.hset(redis_key, **author.dict()) return author diff --git a/resolvers/author_events.py b/resolvers/author_events.py new file mode 100644 index 00000000..c1602d3d --- /dev/null +++ b/resolvers/author_events.py @@ -0,0 +1,128 @@ +import asyncio +from aiocron import crontab +from sqlalchemy import select, event + +from orm.author import Author, AuthorFollower +from orm.topic import Topic, TopicFollower +from resolvers.author import add_author_stat_columns, get_author_follows +from resolvers.topic import add_topic_stat_columns +from services.db import local_session +from services.rediscache import redis +from services.viewed import ViewedStorage + + +async def update_cache(): + with local_session() as session: + for author in session.query(Author).all(): + redis_key = f"user:{author.user}:author" + await redis.hset(redis_key, **vars(author)) + follows = await get_author_follows(None, None, user=author.user) + if isinstance(follows, dict): + redis_key = f"user:{author.user}:follows" + await redis.hset(redis_key, **follows) + + +@crontab("*/10 * * * *", func=update_cache) +async def scheduled_cache_update(): + pass + + +@event.listens_for(Author, "after_insert") +@event.listens_for(Author, "after_update") +def after_author_update(mapper, connection, target): + redis_key = f"user:{target.user}:author" + asyncio.create_task(redis.hset(redis_key, **vars(target))) + + +@event.listens_for(TopicFollower, "after_insert") +def after_topic_follower_insert(mapper, connection, target): + asyncio.create_task( + handle_topic_follower_change(connection, target.topic, target.follower, True) + ) + + +@event.listens_for(TopicFollower, "after_delete") +def after_topic_follower_delete(mapper, connection, target): + asyncio.create_task( + handle_topic_follower_change(connection, target.topic, target.follower, False) + ) + + +@event.listens_for(AuthorFollower, "after_insert") +def after_author_follower_insert(mapper, connection, target): + asyncio.create_task( + handle_author_follower_change(connection, target.author, target.follower, True) + ) + + +@event.listens_for(AuthorFollower, "after_delete") +def after_author_follower_delete(mapper, connection, target): + asyncio.create_task( + handle_author_follower_change(connection, target.author, target.follower, False) + ) + + +async def update_follows_for_user(connection, user_id, entity_type, entity, is_insert): + redis_key = f"user:{user_id}:follows" + follows = await redis.hget(redis_key) + if not follows: + follows = { + "topics": [], + "authors": [], + "communities": [ + {"slug": "discours", "name": "Дискурс", "id": 1, "desc": ""} + ], + } + if is_insert: + follows[f"{entity_type}s"].append(entity) + else: + # Remove the entity from follows + follows[f"{entity_type}s"] = [ + e for e in follows[f"{entity_type}s"] if e["id"] != entity.id + ] + + await redis.hset(redis_key, **vars(follows)) + + +async def handle_author_follower_change(connection, author_id, follower_id, is_insert): + q = select(Author).filter(Author.id == author_id) + q = add_author_stat_columns(q) + async with connection.begin() as conn: + [author, shouts_stat, followers_stat, followings_stat] = await conn.execute( + q + ).first() + author.stat = { + "shouts": shouts_stat, + "viewed": await ViewedStorage.get_author(author.slug), + "followers": followers_stat, + "followings": followings_stat, + } + follower = await conn.execute( + select(Author).filter(Author.id == follower_id) + ).first() + if follower and author: + await update_follows_for_user( + connection, follower.user, "author", author, is_insert + ) + + +async def handle_topic_follower_change(connection, topic_id, follower_id, is_insert): + q = select(Topic).filter(Topic.id == topic_id) + q = add_topic_stat_columns(q) + async with connection.begin() as conn: + [topic, shouts_stat, authors_stat, followers_stat] = await conn.execute( + q + ).first() + topic.stat = { + "shouts": shouts_stat, + "authors": authors_stat, + "followers": followers_stat, + "viewed": await ViewedStorage.get_topic(topic.slug), + } + follower = connection.execute( + select(Author).filter(Author.id == follower_id) + ).first() + if follower and topic: + await update_follows_for_user( + connection, follower.user, "topic", topic, is_insert + ) diff --git a/resolvers/follower.py b/resolvers/follower.py index 0ede1358..9eed33ef 100644 --- a/resolvers/follower.py +++ b/resolvers/follower.py @@ -131,13 +131,13 @@ def query_follows(user_id: str): async def get_follows_by_user_id(user_id: str): if user_id: redis_key = f"user:{user_id}:follows" - res = await redis.execute("HGET", redis_key) + res = await redis.hget(redis_key) if res: return res logger.debug(f"getting follows for {user_id}") follows = query_follows(user_id) - await redis.execute("HSET", redis_key, **follows) + await redis.hset(redis_key, **follows) return follows diff --git a/services/rediscache.py b/services/rediscache.py index 452d0368..bc2f819f 100644 --- a/services/rediscache.py +++ b/services/rediscache.py @@ -21,6 +21,10 @@ class RedisCache: if self._client: try: logger.debug(f"{command} {args} {kwargs}") + for arg in args: + if isinstance(arg, dict): + if arg.get('_sa_instance_state'): + del arg['_sa_instance_state'] r = await self._client.execute_command(command, *args, **kwargs) logger.debug(type(r)) logger.debug(r) @@ -48,6 +52,13 @@ class RedisCache: return await self._client.publish(channel, data) + async def hset(self, hash_key: str, fields_values: dict): + return await self._client.hset(hash_key, mapping=fields_values) + + + async def hget(self, hash_key: str): + return await self._client.hget(hash_key) + redis = RedisCache()