diff --git a/services/follows.py b/services/follows.py index c3341e72..e823b85c 100644 --- a/services/follows.py +++ b/services/follows.py @@ -6,31 +6,33 @@ import json from orm.author import Author, AuthorFollower from orm.topic import Topic, TopicFollower from resolvers.stat import add_author_stat_columns, add_topic_stat_columns -from resolvers.author import get_author_follows -from services.logger import root_logger as logger -from services.db import local_session from services.rediscache import redis -# from services.viewed import ViewedStorage + + +async def update_author(author): + redis_key = f'user:{author.user}:author' + + await redis.execute( + 'SET', + redis_key, + json.dumps( + { + 'id': author.id, + 'name': author.name, + 'slug': author.slug, + 'pic': author.pic, + 'bio': author.bio, + 'stat': author.stat, + } + ), + ) + await redis.execute('EXPIRE', redis_key, 25 * 60 * 60) @event.listens_for(Author, 'after_insert') @event.listens_for(Author, 'after_update') def after_author_update(mapper, connection, author: Author): - redis_key = f'user:{author.user}:author' - asyncio.create_task( - redis.execute( - 'set', - redis_key, - json.dumps( - { - 'id': author.id, - 'name': author.name, - 'slug': author.slug, - 'pic': author.pic, - } - ), - ) - ) + asyncio.create_task(update_author(author)) @event.listens_for(TopicFollower, 'after_insert') @@ -149,88 +151,3 @@ async def handle_topic_follower_change(connection, topic_id, follower_id, is_ins }, is_insert, ) - - -BATCH_SIZE = 33 - - -class FollowsCached: - lock = asyncio.Lock() - - @staticmethod - async def update_cache(): - with local_session() as session: - q = select(Author) - q = add_author_stat_columns(q) - authors = session.execute(q) - redis_updates = [] # Store Redis update tasks - - while True: - batch = authors.fetchmany(BATCH_SIZE) - if not batch: - break - else: - for [author, shouts_stat, followers_stat, followings_stat] in batch: - redis_key = f'user:{author.user}:author' - redis_data = { - 'id': author.id, - 'name': author.name, - 'slug': author.slug, - 'pic': author.pic, - 'bio': author.bio, - 'stat': { - 'followings': followings_stat, - 'shouts': shouts_stat, - 'followers': followers_stat, - }, - } - # Add Redis update task to the list - redis_updates.append( - redis.execute('SET', redis_key, json.dumps(redis_data)) - ) - - # Execute Redis update tasks concurrently - await asyncio.gather(*redis_updates) - - @staticmethod - async def update_author_cache(author: Author): - redis_key = f'user:{author.user}:author' - if isinstance(author, Author): - await redis.execute( - 'set', - redis_key, - json.dumps( - { - 'id': author.id, - 'name': author.name, - 'slug': author.slug, - 'pic': author.pic, - 'bio': author.bio, - 'stat': author.stat, - } - ), - ) - follows = await get_author_follows(None, None, user=author.user) - if isinstance(follows, dict): - redis_key = f'user:{author.user}:follows' - await redis.execute('set', redis_key, json.dumps(follows)) - - @staticmethod - async def worker(): - """Асинхронная задача обновления""" - self = FollowsCached - while True: - try: - await self.update_cache() - await asyncio.sleep(10 * 60 * 60) - except asyncio.CancelledError: - # Handle cancellation due to SIGTERM - logger.info('Cancellation requested. Cleaning up...') - # Perform any necessary cleanup before exiting the loop - break - except Exception as exc: - logger.error(exc) - - -async def start_cached_follows(): - await FollowsCached.worker()