From a577b5510d677250c7361c18e58ce40f3c7c309f Mon Sep 17 00:00:00 2001 From: Untone Date: Tue, 6 Aug 2024 20:55:19 +0300 Subject: [PATCH] cache-fix3 --- services/cache.py | 6 ++- services/triggers.py | 98 ++++++++++++++++++++++---------------------- 2 files changed, 53 insertions(+), 51 deletions(-) diff --git a/services/cache.py b/services/cache.py index c5cb9c81..39b8a747 100644 --- a/services/cache.py +++ b/services/cache.py @@ -23,7 +23,8 @@ async def cache_topic(topic: dict): payload = json.dumps(topic, cls=CustomJSONEncoder) # Одновременное кэширование по id и slug для быстрого доступа await asyncio.gather( - redis.execute("SET", f"topic:id:{topic['id']}", payload), redis.execute("SET", f"topic:slug:{topic['slug']}", payload) + redis.execute("SET", f"topic:id:{topic['id']}", payload), + redis.execute("SET", f"topic:slug:{topic['slug']}", payload), ) @@ -239,6 +240,7 @@ async def get_cached_author_by_user_id(user_id: str): # Возвращаем None, если автор не найден return None + async def get_cached_topic_authors(topic_id: int): """ Получает список авторов для заданной темы, используя кэш или базу данных. @@ -263,7 +265,7 @@ async def get_cached_topic_authors(topic_id: int): .join(ShoutAuthor, ShoutAuthor.shout == Shout.id) .where(and_(ShoutTopic.topic == topic_id, Shout.published_at.is_not(None), Shout.deleted_at.is_(None))) ) - authors_ids = [author_id for author_id, in session.execute(query).all()] + authors_ids = [author_id for (author_id,) in session.execute(query).all()] # Кэшируем полученные ID авторов await redis.set(rkey, json.dumps(authors_ids)) diff --git a/services/triggers.py b/services/triggers.py index 10f148ff..ca9272c5 100644 --- a/services/triggers.py +++ b/services/triggers.py @@ -5,21 +5,15 @@ from orm.reaction import Reaction from orm.shout import Shout, ShoutAuthor from orm.topic import Topic, TopicFollower from resolvers.stat import get_with_stat -from services.cache import cache_author, cache_follows, cache_topic +from services.cache import cache_author, cache_follows, cache_topic, get_cached_author from services.logger import root_logger as logger -DEFAULT_FOLLOWS = { - "topics": [], - "authors": [], - "communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}], -} - -# Limit the number of concurrent tasks +# Инициализация семафора для ограничения одновременных задач semaphore = asyncio.Semaphore(10) async def run_background_task(coro): - """Runs an asynchronous task in the background with concurrency control.""" + """Запуск фоновой асинхронной задачи с контролем одновременности.""" async with semaphore: try: await coro @@ -28,29 +22,38 @@ async def run_background_task(coro): async def batch_cache_updates(authors, topics, followers): + """Обновление кэша для авторов, тем и подписчиков.""" tasks = ( [cache_author(author) for author in authors] + + [cache_topic(topic) for topic in topics] + [ cache_follows(follower["id"], follower["type"], follower["item_id"], follower["is_insert"]) for follower in followers ] - + [cache_topic(topic) for topic in topics] ) await asyncio.gather(*tasks) async def handle_author_follower_change(author_id: int, follower_id: int, is_insert: bool): - queries = [select(Author).filter(Author.id == author_id), select(Author).filter(Author.id == follower_id)] - author_result, follower_result = await asyncio.gather(*(get_with_stat(query) for query in queries)) - - if author_result and follower_result: - authors = [author_result[0].dict()] - followers = [ - {"id": follower_result[0].id, "type": "author", "item_id": author_result[0].id, "is_insert": is_insert} - ] + """Обработка изменений в подписках авторов.""" + # Получение данных с кэша или через запрос, если необходимо + author = await get_cached_author(author_id) + follower = await get_cached_author(follower_id) + if author and follower: + authors = [author.dict()] + followers = [{"id": follower.id, "type": "author", "item_id": author.id, "is_insert": is_insert}] await batch_cache_updates(authors, [], followers) +async def after_shout_update(_mapper, _connection, shout: Shout): + """После обновления shout, обновить информацию об авторах в кэше.""" + authors_updated = await get_with_stat( + select(Author).join(ShoutAuthor, ShoutAuthor.author == Author.id).filter(ShoutAuthor.shout == shout.id) + ) + if authors_updated: + await batch_cache_updates([author.dict() for author in authors_updated], [], []) + + async def handle_topic_follower_change(topic_id: int, follower_id: int, is_insert: bool): queries = [select(Topic).filter(Topic.id == topic_id), select(Author).filter(Author.id == follower_id)] topic_result, follower_result = await asyncio.gather(*(get_with_stat(query) for query in queries)) @@ -63,37 +66,11 @@ async def handle_topic_follower_change(topic_id: int, follower_id: int, is_inser await batch_cache_updates([], topics, followers) -async def after_shout_update(_mapper, _connection, shout: Shout): - authors_query = ( - select(Author).join(ShoutAuthor, ShoutAuthor.author == Author.id).filter(ShoutAuthor.shout == shout.id) - ) - authors_updated = await get_with_stat(authors_query) - await batch_cache_updates([author.dict() for author in authors_updated], [], []) - - -async def after_reaction_update(mapper, connection, reaction: Reaction): - queries = [ - select(Author).where(Author.id == reaction.created_by), - select(Author).join(Reaction, Author.id == Reaction.created_by).where(Reaction.id == reaction.reply_to), - ] - results = await asyncio.gather(*(get_with_stat(query) for query in queries)) - authors = [result[0].dict() for result in results if result] - - shout_query = select(Shout).where(Shout.id == reaction.shout) - shout_result = await connection.execute(shout_query) - shout = shout_result.scalar_one_or_none() - - tasks = [cache_author(author) for author in authors] - if shout: - tasks.append(after_shout_update(mapper, connection, shout)) - await asyncio.gather(*tasks) - - async def after_author_update(_mapper, _connection, author: Author): - author_query = select(Author).where(Author.id == author.id) - result = await get_with_stat(author_query) - if result: - await cache_author(result[0].dict()) + # Обновление кэша для автора + author_dict = author.dict() + await cache_author(author_dict) + logger.info(f"Author updated and cached: {author.id}") async def after_author_follower_insert(_mapper, _connection, target: AuthorFollower): @@ -116,7 +93,30 @@ async def after_topic_follower_delete(_mapper, _connection, target: TopicFollowe await handle_topic_follower_change(target.topic, target.follower, False) +async def after_reaction_update(mapper, connection, reaction: Reaction): + # Получение данных автора реакции и автора, на чью реакцию было отвечено, одновременно + author_query = select(Author).where(Author.id == reaction.created_by) + replied_author_query = ( + select(Author).join(Reaction, Author.id == Reaction.created_by).where(Reaction.id == reaction.reply_to) + ) + results = await asyncio.gather(get_with_stat(author_query), get_with_stat(replied_author_query)) + authors = [result[0].dict() for result in results if result] + + # Кэширование данных авторов + if authors: + await asyncio.gather(*(cache_author(author) for author in authors)) + + # Обновление информации о shout, если связанный с реакцией + if reaction.shout: + shout_query = select(Shout).where(Shout.id == reaction.shout) + shout_result = await connection.execute(shout_query) + shout = shout_result.scalar_one_or_none() + if shout: + await after_shout_update(mapper, connection, shout) + + def events_register(): + """Регистрация обработчиков событий SQLAlchemy.""" event.listen(Shout, "after_insert", after_shout_update) event.listen(Shout, "after_update", after_shout_update) event.listen(Reaction, "after_insert", after_reaction_update) @@ -127,4 +127,4 @@ def events_register(): event.listen(AuthorFollower, "after_delete", after_author_follower_delete) event.listen(TopicFollower, "after_insert", after_topic_follower_insert) event.listen(TopicFollower, "after_delete", after_topic_follower_delete) - logger.info("Cache events were registered!") + logger.info("Event handlers registered successfully.")