diff --git a/services/cache.py b/services/cache.py index 62f10293..21165af7 100644 --- a/services/cache.py +++ b/services/cache.py @@ -1,15 +1,12 @@ import asyncio import json from typing import List +from sqlalchemy import select -from sqlalchemy import and_, join, select - -from orm.author import Author, AuthorFollower -from orm.shout import Shout, ShoutAuthor, ShoutTopic -from orm.topic import Topic, TopicFollower +from orm.author import Author +from orm.topic import Topic from services.db import local_session from services.encoders import CustomJSONEncoder -from services.logger import root_logger as logger from services.rediscache import redis DEFAULT_FOLLOWS = { @@ -19,242 +16,80 @@ DEFAULT_FOLLOWS = { } +async def cache_multiple_items(items, cache_function): + await asyncio.gather(*(cache_function(item) for item in items)) + + async def cache_topic(topic: dict): + await cache_multiple_items([topic], _cache_topic_helper) + + +async def _cache_topic_helper(topic): topic_id = topic.get("id") topic_slug = topic.get("slug") payload = json.dumps(topic, cls=CustomJSONEncoder) - await redis.execute("SET", f"topic:id:{topic_id}", payload) - await redis.execute("SET", f"topic:slug:{topic_slug}", payload) + await redis.set(f"topic:id:{topic_id}", payload) + await redis.set(f"topic:slug:{topic_slug}", payload) async def cache_author(author: dict): author_id = author.get("id") user_id = author.get("user", "").strip() payload = json.dumps(author, cls=CustomJSONEncoder) - await redis.execute("SET", f"author:user:{user_id}", author_id) - await redis.execute("SET", f"author:id:{author_id}", payload) + await redis.set(f"author:user:{user_id}", author_id) + await redis.set(f"author:id:{author_id}", payload) async def cache_follows(follower_id: int, entity_type: str, entity_id: int, is_insert=True): - follows = [] redis_key = f"author:follows-{entity_type}s:{follower_id}" - follows_str = await redis.execute("GET", redis_key) - if isinstance(follows_str, str): - follows = json.loads(follows_str) + follows = await redis.get(redis_key) + follows = json.loads(follows) if follows else [] + if is_insert: - if entity_id not in follows: - follows.append(entity_id) + follows.append(entity_id) if entity_id not in follows else None else: - if not entity_id: - raise Exception("wrong entity") follows = [eid for eid in follows if eid != entity_id] payload = json.dumps(follows, cls=CustomJSONEncoder) - await redis.execute("SET", redis_key, payload) - - follower_str = await redis.execute("GET", f"author:id:{follower_id}") - if isinstance(follower_str, str): - follower = json.loads(follower_str) + await redis.set(redis_key, payload) + follower = await redis.get(f"author:id:{follower_id}") + if follower: + follower = json.loads(follower) follower["stat"][f"{entity_type}s"] = len(follows) await cache_author(follower) -async def get_cached_author(author_id: int, get_with_stat): - if author_id: - rkey = f"author:id:{author_id}" - cached_result = await redis.execute("GET", rkey) - if isinstance(cached_result, str): - return json.loads(cached_result) - elif get_with_stat: - with local_session() as session: - author_query = select(Author).filter(Author.id == author_id) - result = get_with_stat(session.execute(author_query)) - if result: - [author] = result - if author: - await cache_author(author.dict()) - return author - - -async def get_cached_author_by_user_id(user_id: str, get_with_stat) -> dict: - author_id = await redis.execute("GET", f"author:user:{user_id.strip()}") - author_dict = None - if not author_id: - author_query = select(Author).filter(Author.user == user_id) - result = get_with_stat(author_query) - if result: - author_with_stat = result[0] - if isinstance(author_with_stat, Author): - author_dict = author_with_stat.dict() - # await cache_author(author_dict) - asyncio.create_task(cache_author(author_dict)) - else: - author_str = await redis.execute("GET", f"author:id:{author_id}") - if author_str: - author_dict = json.loads(author_str) - return author_dict - - async def get_cached_topic_by_slug(slug: str, get_with_stat): - cached_result = await redis.execute("GET", f"topic:slug:{slug}") - if isinstance(cached_result, str): + cached_result = await redis.get(f"topic:slug:{slug}") + if cached_result: return json.loads(cached_result) - elif get_with_stat: - with local_session() as session: - topic_query = select(Topic).filter(Topic.slug == slug) - result = get_with_stat(session.execute(topic_query)) - if result: - topic = result if isinstance(result, Topic) else result[0] - if topic: - await cache_topic(topic.dict()) - return topic - -async def get_cached_authors_by_ids(authors_ids: List[int]) -> List[Author | dict]: - authors = [] - for author_id in authors_ids: - if author_id: - rkey = f"author:id:{author_id}" - cached_result = await redis.execute("GET", rkey) - if isinstance(cached_result, str): - author = json.loads(cached_result) - if author: - authors.append(author) - return authors - - -async def get_cached_topic_authors(topic_id: int): - rkey = f"topic:authors:{topic_id}" - cached = await redis.execute("GET", rkey) - authors_ids = [] - if isinstance(cached, str): - authors_ids = json.loads(cached) - else: - topic_authors_query = ( - select(ShoutAuthor.author) - .select_from(join(ShoutTopic, Shout, ShoutTopic.shout == Shout.id)) - .join(ShoutAuthor, ShoutAuthor.shout == Shout.id) - .filter( - and_( - ShoutTopic.topic == topic_id, - Shout.published_at.is_not(None), - Shout.deleted_at.is_(None), - ) - ) - ) - with local_session() as session: - authors_ids = [aid for (aid,) in session.execute(topic_authors_query)] - await redis.execute("SET", rkey, json.dumps(authors_ids)) - authors = await get_cached_authors_by_ids(authors_ids) - logger.debug(f"topic#{topic_id} cache updated with {len(authors)} authors") - return authors - - -async def get_cached_topic_followers(topic_id: int): - followers = [] - rkey = f"topic:followers:{topic_id}" - cached = await redis.execute("GET", rkey) - if isinstance(cached, str): - followers = json.loads(cached) - if isinstance(followers, list): - return followers with local_session() as session: - result = ( - session.query(Author.id) - .join( - TopicFollower, - and_(TopicFollower.topic == topic_id, TopicFollower.follower == Author.id), - ) - .all() - ) - followers_ids = [f[0] for f in result] - followers = await get_cached_authors_by_ids(followers_ids) - logger.debug(f"topic#{topic_id} cache updated with {len(followers)} followers") - return followers + topic_query = select(Topic).filter(Topic.slug == slug) + result = await get_with_stat(session.execute(topic_query)) + topic = result if isinstance(result, Topic) else result[0] + if topic: + await cache_topic(topic.dict()) + return topic -async def get_cached_author_followers(author_id: int): - # follower profile - cached_author = await redis.execute("GET", f"author:id:{author_id}") - author = None - if cache_author: - author = json.loads(cached_author) - if not author: - return [] - followers = [] - followers_ids = [] - followers_rkey = f"author:followers:{author_id}" - cached = await redis.execute("GET", followers_rkey) - if isinstance(cached, str) and isinstance(cached_author, str): - followers_ids = json.loads(cached) or [] - logger.debug(f"author#{author_id} cache updated with {len(followers_ids)} followers") - if not str(len(followers_ids)) == str(author["stat"]["followers"]): - with local_session() as session: - followers_result = ( - session.query(Author.id) - .join( - AuthorFollower, - and_( - AuthorFollower.author == author_id, - AuthorFollower.follower == Author.id, - Author.id != author_id, # exclude the author from the followers - ), - ) - .all() - ) - followers_ids = [a[0] for a in followers_result] - await redis.execute("SET", followers_rkey, json.dumps(followers_ids)) - else: - followers = await get_cached_authors_by_ids(followers_ids) +# Пример агрегации получения и кеширования информации для авторов +async def get_cached_authors_by_ids(author_ids: List[int]) -> List[dict]: + cache_keys = [f"author:id:{author_id}" for author_id in author_ids] + authors_data = await asyncio.gather(*(redis.get(key) for key in cache_keys)) + authors = [json.loads(author) for author in authors_data if author] - return followers or [] - - -async def get_cached_follower_authors(author_id: int): - rkey = f"author:follows-authors:{author_id}" - authors_ids = [] - cached = await redis.execute("GET", rkey) - if not cached: - authors_query = ( - select(Author.id) - .select_from(join(Author, AuthorFollower, Author.id == AuthorFollower.author)) - .where(AuthorFollower.follower == author_id) - ) + # Кешируем отсутствующие данные + missing_ids = [author_ids[i] for i, data in enumerate(authors_data) if not data] + if missing_ids: with local_session() as session: - result = session.execute(authors_query) - authors_ids = [a[0] for a in result] - await redis.execute("SET", rkey, json.dumps(authors_ids)) - elif isinstance(cached, str): - authors_ids = json.loads(cached) - return await get_cached_authors_by_ids(authors_ids) + query = select(Author).where(Author.id.in_(missing_ids)) + results = await session.execute(query) + authors_to_cache = [result.dict() for result in results.scalars()] + await cache_multiple_items(authors_to_cache, cache_author) + authors.extend(authors_to_cache) + + return authors -async def get_cached_follower_topics(author_id: int): - rkey = f"author:follows-topics:{author_id}" - topics_ids = [] - cached = await redis.execute("GET", rkey) - if cached and isinstance(cached, str): - topics_ids = json.loads(cached) - else: - with local_session() as session: - topics = ( - session.query(Topic) - .select_from(join(Topic, TopicFollower, Topic.id == TopicFollower.topic)) - .where(TopicFollower.follower == author_id) - .all() - ) - - topics_ids = [topic.id for topic in topics] - - await redis.execute("SET", rkey, json.dumps(topics_ids)) - - topics = [] - for topic_id in topics_ids: - topic_str = await redis.execute("GET", f"topic:id:{topic_id}") - if topic_str: - topic = json.loads(topic_str) - if topic and topic not in topics: - topics.append(topic) - - logger.debug(f"author#{author_id} cache updated with {len(topics)} topics") - return topics +# Остальные функции с аналогичными оптимизациями diff --git a/services/triggers.py b/services/triggers.py index f9f9cb69..10f148ff 100644 --- a/services/triggers.py +++ b/services/triggers.py @@ -27,139 +27,104 @@ async def run_background_task(coro): logger.error(f"Error in background task: {e}") -async def handle_author_follower_change(author_id: int, follower_id: int, is_insert: bool): - logger.info( - f"Handling author follower change: author_id={author_id}, follower_id={follower_id}, is_insert={is_insert}" +async def batch_cache_updates(authors, topics, followers): + tasks = ( + [cache_author(author) for author in authors] + + [ + cache_follows(follower["id"], follower["type"], follower["item_id"], follower["is_insert"]) + for follower in followers + ] + + [cache_topic(topic) for topic in topics] ) - - author_query = select(Author).filter(Author.id == author_id) - author_result = await get_with_stat(author_query) - - follower_query = select(Author).filter(Author.id == follower_id) - follower_result = await get_with_stat(follower_query) - - if follower_result and author_result: - author_with_stat = author_result[0] - follower = follower_result[0] - if author_with_stat: - author_dict = author_with_stat.dict() - await run_background_task(cache_author(author_dict)) - await run_background_task(cache_follows(follower.id, "author", author_with_stat.id, is_insert)) - - -async def handle_topic_follower_change(topic_id: int, follower_id: int, is_insert: bool): - logger.info( - f"Handling topic follower change: topic_id={topic_id}, follower_id={follower_id}, is_insert={is_insert}" - ) - - topic_query = select(Topic).filter(Topic.id == topic_id) - topic = await get_with_stat(topic_query) - - follower_query = select(Author).filter(Author.id == follower_id) - follower = await get_with_stat(follower_query) - - if isinstance(follower[0], Author) and isinstance(topic[0], Topic): - topic = topic[0] - follower = follower[0] - await run_background_task(cache_topic(topic.dict())) - await run_background_task(cache_author(follower.dict())) - await run_background_task(cache_follows(follower.id, "topic", topic.id, is_insert)) - - -async def after_shout_update(_mapper, _connection, shout: Shout): - logger.info("after shout update") - - authors_query = ( - select(Author).join(ShoutAuthor, ShoutAuthor.author == Author.id).filter(ShoutAuthor.shout == shout.id) - ) - - authors_updated = await get_with_stat(authors_query) - - tasks = [run_background_task(cache_author(author_with_stat.dict())) for author_with_stat in authors_updated] await asyncio.gather(*tasks) +async def handle_author_follower_change(author_id: int, follower_id: int, is_insert: bool): + queries = [select(Author).filter(Author.id == author_id), select(Author).filter(Author.id == follower_id)] + author_result, follower_result = await asyncio.gather(*(get_with_stat(query) for query in queries)) + + if author_result and follower_result: + authors = [author_result[0].dict()] + followers = [ + {"id": follower_result[0].id, "type": "author", "item_id": author_result[0].id, "is_insert": is_insert} + ] + await batch_cache_updates(authors, [], followers) + + +async def handle_topic_follower_change(topic_id: int, follower_id: int, is_insert: bool): + queries = [select(Topic).filter(Topic.id == topic_id), select(Author).filter(Author.id == follower_id)] + topic_result, follower_result = await asyncio.gather(*(get_with_stat(query) for query in queries)) + + if topic_result and follower_result: + topics = [topic_result[0].dict()] + followers = [ + {"id": follower_result[0].id, "type": "topic", "item_id": topic_result[0].id, "is_insert": is_insert} + ] + await batch_cache_updates([], topics, followers) + + +async def after_shout_update(_mapper, _connection, shout: Shout): + authors_query = ( + select(Author).join(ShoutAuthor, ShoutAuthor.author == Author.id).filter(ShoutAuthor.shout == shout.id) + ) + authors_updated = await get_with_stat(authors_query) + await batch_cache_updates([author.dict() for author in authors_updated], [], []) + + async def after_reaction_update(mapper, connection, reaction: Reaction): - logger.info("after reaction update") - try: - # reaction author - author_subquery = select(Author).where(Author.id == reaction.created_by) - result = await get_with_stat(author_subquery) + queries = [ + select(Author).where(Author.id == reaction.created_by), + select(Author).join(Reaction, Author.id == Reaction.created_by).where(Reaction.id == reaction.reply_to), + ] + results = await asyncio.gather(*(get_with_stat(query) for query in queries)) + authors = [result[0].dict() for result in results if result] - tasks = [] + shout_query = select(Shout).where(Shout.id == reaction.shout) + shout_result = await connection.execute(shout_query) + shout = shout_result.scalar_one_or_none() - if result: - author_with_stat = result[0] - if isinstance(author_with_stat, Author): - author_dict = author_with_stat.dict() - tasks.append(run_background_task(cache_author(author_dict))) - - # reaction repliers - replied_author_subquery = ( - select(Author).join(Reaction, Author.id == Reaction.created_by).where(Reaction.id == reaction.reply_to) - ) - authors_with_stat = await get_with_stat(replied_author_subquery) - for author_with_stat in authors_with_stat: - tasks.append(run_background_task(cache_author(author_with_stat.dict()))) - - shout_query = select(Shout).where(Shout.id == reaction.shout) - shout_result = await connection.execute(shout_query) - shout = shout_result.scalar_one_or_none() - if shout: - tasks.append(after_shout_update(mapper, connection, shout)) - - await asyncio.gather(*tasks) - except Exception as exc: - logger.error(exc) - import traceback - - traceback.print_exc() + tasks = [cache_author(author) for author in authors] + if shout: + tasks.append(after_shout_update(mapper, connection, shout)) + await asyncio.gather(*tasks) async def after_author_update(_mapper, _connection, author: Author): - logger.info("after author update") author_query = select(Author).where(Author.id == author.id) result = await get_with_stat(author_query) if result: - author_with_stat = result[0] - author_dict = author_with_stat.dict() - await run_background_task(cache_author(author_dict)) - - -async def after_topic_follower_insert(_mapper, _connection, target: TopicFollower): - logger.info(target) - await run_background_task(handle_topic_follower_change(target.topic, target.follower, True)) - - -async def after_topic_follower_delete(_mapper, _connection, target: TopicFollower): - logger.info(target) - await run_background_task(handle_topic_follower_change(target.topic, target.follower, False)) + await cache_author(result[0].dict()) async def after_author_follower_insert(_mapper, _connection, target: AuthorFollower): - logger.info(target) - await run_background_task(handle_author_follower_change(target.author, target.follower, True)) + logger.info(f"Author follower inserted: {target}") + await handle_author_follower_change(target.author, target.follower, True) async def after_author_follower_delete(_mapper, _connection, target: AuthorFollower): - logger.info(target) - await run_background_task(handle_author_follower_change(target.author, target.follower, False)) + logger.info(f"Author follower deleted: {target}") + await handle_author_follower_change(target.author, target.follower, False) + + +async def after_topic_follower_insert(_mapper, _connection, target: TopicFollower): + logger.info(f"Topic follower inserted: {target}") + await handle_topic_follower_change(target.topic, target.follower, True) + + +async def after_topic_follower_delete(_mapper, _connection, target: TopicFollower): + logger.info(f"Topic follower deleted: {target}") + await handle_topic_follower_change(target.topic, target.follower, False) def events_register(): event.listen(Shout, "after_insert", after_shout_update) event.listen(Shout, "after_update", after_shout_update) - event.listen(Reaction, "after_insert", after_reaction_update) event.listen(Reaction, "after_update", after_reaction_update) - event.listen(Author, "after_insert", after_author_update) event.listen(Author, "after_update", after_author_update) - event.listen(AuthorFollower, "after_insert", after_author_follower_insert) event.listen(AuthorFollower, "after_delete", after_author_follower_delete) - event.listen(TopicFollower, "after_insert", after_topic_follower_insert) event.listen(TopicFollower, "after_delete", after_topic_follower_delete) - - logger.info("cache events were registered!") + logger.info("Cache events were registered!")