From b823862cec5486467944953b7678a0ef3a689ec4 Mon Sep 17 00:00:00 2001 From: Untone Date: Tue, 6 Aug 2024 18:53:25 +0300 Subject: [PATCH] caching-fix --- resolvers/reaction.py | 17 +++---- resolvers/reader.py | 3 ++ services/triggers.py | 114 ++++++++++++++++++++++++------------------ 3 files changed, 75 insertions(+), 59 deletions(-) diff --git a/resolvers/reaction.py b/resolvers/reaction.py index 84fd99f4..53cbfed7 100644 --- a/resolvers/reaction.py +++ b/resolvers/reaction.py @@ -21,14 +21,13 @@ from services.viewed import ViewedStorage def add_reaction_stat_columns(q, aliased_reaction): - q = q.outerjoin( - aliased_reaction, aliased_reaction.deleted_at.is_(None)).add_columns( - func.sum(aliased_reaction.id).label("reacted_stat"), - func.sum(case((aliased_reaction.kind == str(ReactionKind.COMMENT.value), 1), else_=0)).label("comments_stat"), - func.sum(case((aliased_reaction.kind == str(ReactionKind.LIKE.value), 1), else_=0)).label("likes_stat"), - func.sum(case((aliased_reaction.kind == str(ReactionKind.DISLIKE.value), 1), else_=0)).label("dislikes_stat"), - func.max(aliased_reaction.created_at).label("last_comment_stat") - ) + q = q.outerjoin(aliased_reaction, aliased_reaction.deleted_at.is_(None)).add_columns( + func.sum(aliased_reaction.id).label("reacted_stat"), + func.sum(case((aliased_reaction.kind == str(ReactionKind.COMMENT.value), 1), else_=0)).label("comments_stat"), + func.sum(case((aliased_reaction.kind == str(ReactionKind.LIKE.value), 1), else_=0)).label("likes_stat"), + func.sum(case((aliased_reaction.kind == str(ReactionKind.DISLIKE.value), 1), else_=0)).label("dislikes_stat"), + func.max(aliased_reaction.created_at).label("last_comment_stat"), + ) return q @@ -427,7 +426,7 @@ async def load_reactions_by(_, info, by, limit=50, offset=0): "rating": int(likes_stat or 0) - int(dislikes_stat or 0), "reacted": reacted_stat, "commented": commented_stat, - "last_reacted_at": last_reacted_at + "last_reacted_at": last_reacted_at, } reactions.add(reaction) diff --git a/resolvers/reader.py b/resolvers/reader.py index 9a7c1565..da4ea632 100644 --- a/resolvers/reader.py +++ b/resolvers/reader.py @@ -30,6 +30,7 @@ def query_shouts(): select(Shout) .options(joinedload(Shout.authors), joinedload(Shout.topics)) .where(and_(Shout.published_at.is_not(None), Shout.deleted_at.is_(None))) + .execution_options(populate_existing=True) ) @@ -37,9 +38,11 @@ def filter_my(info, session, q): user_id = info.context.get("user_id") reader_id = info.context.get("author", {}).get("id") if user_id and reader_id: + # Предварительный расчет ID для автора и темы reader_followed_authors = select(AuthorFollower.author).where(AuthorFollower.follower == reader_id) reader_followed_topics = select(TopicFollower.topic).where(TopicFollower.follower == reader_id) + # Используйте подзапросы для фильтрации subquery = ( select(Shout.id) .where(Shout.id == ShoutAuthor.shout) diff --git a/services/triggers.py b/services/triggers.py index b3ddb351..a4edd94d 100644 --- a/services/triggers.py +++ b/services/triggers.py @@ -17,81 +17,100 @@ DEFAULT_FOLLOWS = { } -def handle_author_follower_change(author_id: int, follower_id: int, is_insert: bool): - logger.info(author_id) - author_query = select(Author).select_from(Author).filter(Author.id == author_id) - author_result = get_with_stat(author_query) - follower_query = select(Author).select_from(Author).filter(Author.id == follower_id) - follower_result = get_with_stat(follower_query) +def run_background_task(coro): + """Запускает асинхронную задачу в фоне и обрабатывает исключения.""" + task = asyncio.create_task(coro) + task.add_done_callback(handle_task_result) + + +def handle_task_result(task): + """Обработка результата завершенной задачи.""" + try: + task.result() + except Exception as e: + logger.error(f"Error in background task: {e}") + + +async def handle_author_follower_change(author_id: int, follower_id: int, is_insert: bool): + logger.info( + f"Handling author follower change: author_id={author_id}, follower_id={follower_id}, is_insert={is_insert}" + ) + + author_query = select(Author).filter(Author.id == author_id) + author_result = await get_with_stat(author_query) + + follower_query = select(Author).filter(Author.id == follower_id) + follower_result = await get_with_stat(follower_query) + if follower_result and author_result: author_with_stat = author_result[0] follower = follower_result[0] if author_with_stat: author_dict = author_with_stat.dict() - # await cache_author(author_with_stat) - asyncio.create_task(cache_author(author_dict)) - asyncio.create_task(cache_follows(follower.id, "author", author_with_stat.id, is_insert)) + run_background_task(cache_author(author_dict)) + run_background_task(cache_follows(follower.id, "author", author_with_stat.id, is_insert)) async def handle_topic_follower_change(topic_id: int, follower_id: int, is_insert: bool): - logger.info(topic_id) + logger.info( + f"Handling topic follower change: topic_id={topic_id}, follower_id={follower_id}, is_insert={is_insert}" + ) + topic_query = select(Topic).filter(Topic.id == topic_id) - topic = get_with_stat(topic_query) + topic = await get_with_stat(topic_query) + follower_query = select(Author).filter(Author.id == follower_id) - follower = get_with_stat(follower_query) + follower = await get_with_stat(follower_query) + if isinstance(follower[0], Author) and isinstance(topic[0], Topic): topic = topic[0] follower = follower[0] - await cache_topic(topic.dict()) - await cache_author(follower.dict()) - await cache_follows(follower.id, "topic", topic.id, is_insert) + run_background_task(cache_topic(topic.dict())) + run_background_task(cache_author(follower.dict())) + run_background_task(cache_follows(follower.id, "topic", topic.id, is_insert)) -# handle_author_follow and handle_topic_follow -> cache_author, cache_follows, cache_followers - - -def after_shout_update(_mapper, _connection, shout: Shout): +async def after_shout_update(_mapper, _connection, shout: Shout): logger.info("after shout update") - # Main query to get authors associated with the shout through ShoutAuthor + authors_query = ( select(Author) - .select_from(ShoutAuthor) # Select from ShoutAuthor - .join(Author, Author.id == ShoutAuthor.author) # Join with Author - .filter(ShoutAuthor.shout == shout.id) # Filter by shout.id + .join(ShoutAuthor, ShoutAuthor.author == Author.id) # Use join directly with Author + .filter(ShoutAuthor.shout == shout.id) ) - authors_updated = get_with_stat(authors_query) + authors_updated = await get_with_stat(authors_query) for author_with_stat in authors_updated: - asyncio.create_task(cache_author(author_with_stat.dict())) + run_background_task(cache_author(author_with_stat.dict())) -def after_reaction_update(mapper, connection, reaction: Reaction): +async def after_reaction_update(mapper, connection, reaction: Reaction): logger.info("after reaction update") try: # reaction author author_subquery = select(Author).where(Author.id == reaction.created_by) - result = get_with_stat(author_subquery) + result = await get_with_stat(author_subquery) if result: author_with_stat = result[0] if isinstance(author_with_stat, Author): author_dict = author_with_stat.dict() - # await cache_author(author_dict) - asyncio.create_task(cache_author(author_dict)) + run_background_task(cache_author(author_dict)) # reaction repliers replied_author_subquery = ( select(Author).join(Reaction, Author.id == Reaction.created_by).where(Reaction.id == reaction.reply_to) ) - authors_with_stat = get_with_stat(replied_author_subquery) + authors_with_stat = await get_with_stat(replied_author_subquery) for author_with_stat in authors_with_stat: - asyncio.create_task(cache_author(author_with_stat.dict())) + run_background_task(cache_author(author_with_stat.dict())) - shout_query = select(Shout).select_from(Shout).where(Shout.id == reaction.shout) - [shout] = connection.execute(shout_query) + shout_query = select(Shout).where(Shout.id == reaction.shout) + shout_result = await connection.execute(shout_query) + shout = shout_result.scalar_one_or_none() if shout: - after_shout_update(mapper, connection, shout) + await after_shout_update(mapper, connection, shout) except Exception as exc: logger.error(exc) import traceback @@ -99,39 +118,34 @@ def after_reaction_update(mapper, connection, reaction: Reaction): traceback.print_exc() -def after_author_update(_mapper, _connection, author: Author): +async def after_author_update(_mapper, _connection, author: Author): logger.info("after author update") author_query = select(Author).where(Author.id == author.id) - result = get_with_stat(author_query) + result = await get_with_stat(author_query) if result: author_with_stat = result[0] author_dict = author_with_stat.dict() - # await cache_author(author_with_stat) - asyncio.create_task(cache_author(author_dict)) + run_background_task(cache_author(author_dict)) -def after_topic_follower_insert(_mapper, _connection, target: TopicFollower): +async def after_topic_follower_insert(_mapper, _connection, target: TopicFollower): logger.info(target) - asyncio.create_task( - handle_topic_follower_change(target.topic, target.follower, True) # type: ignore - ) + run_background_task(handle_topic_follower_change(target.topic, target.follower, True)) -def after_topic_follower_delete(_mapper, _connection, target: TopicFollower): +async def after_topic_follower_delete(_mapper, _connection, target: TopicFollower): logger.info(target) - asyncio.create_task( - handle_topic_follower_change(target.topic, target.follower, False) # type: ignore - ) + run_background_task(handle_topic_follower_change(target.topic, target.follower, False)) -def after_author_follower_insert(_mapper, _connection, target: AuthorFollower): +async def after_author_follower_insert(_mapper, _connection, target: AuthorFollower): logger.info(target) - handle_author_follower_change(target.author, target.follower, True) + run_background_task(handle_author_follower_change(target.author, target.follower, True)) -def after_author_follower_delete(_mapper, _connection, target: AuthorFollower): +async def after_author_follower_delete(_mapper, _connection, target: AuthorFollower): logger.info(target) - handle_author_follower_change(target.author, target.follower, False) + run_background_task(handle_author_follower_change(target.author, target.follower, False)) def events_register():