From d3ae078b2018183b47e8fd25ab27e42f9b3c27ba Mon Sep 17 00:00:00 2001 From: Untone Date: Tue, 9 Apr 2024 11:17:32 +0300 Subject: [PATCH] refactored-cache-following --- resolvers/__init__.py | 2 +- resolvers/author.py | 103 ++++++++-------- resolvers/follower.py | 97 +++++---------- resolvers/stat.py | 54 +-------- services/cache.py | 268 ++++++------------------------------------ services/triggers.py | 122 +++++++++++++++++++ 6 files changed, 243 insertions(+), 403 deletions(-) create mode 100644 services/triggers.py diff --git a/resolvers/__init__.py b/resolvers/__init__.py index a071ea72..e0de7c41 100644 --- a/resolvers/__init__.py +++ b/resolvers/__init__.py @@ -19,7 +19,7 @@ from resolvers.reader import (get_shout, load_shouts_by, load_shouts_feed, load_shouts_search, load_shouts_unrated) from resolvers.topic import (get_topic, get_topics_all, get_topics_by_author, get_topics_by_community) -from services.cache import events_register +from services.triggers import events_register events_register() diff --git a/resolvers/author.py b/resolvers/author.py index 6dc08383..f32b8801 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -1,4 +1,3 @@ -import asyncio import json import time @@ -9,10 +8,9 @@ from sqlalchemy_searchable import search from orm.author import Author, AuthorFollower from orm.shout import ShoutAuthor, ShoutTopic from orm.topic import Topic -from resolvers.stat import (author_follows_authors, author_follows_topics, - get_authors_with_stat_cached, get_with_stat) +from resolvers.stat import author_follows_authors, author_follows_topics, get_with_stat from services.auth import login_required -from services.cache import set_author_cache, update_author_followers_cache +from services.cache import cache_author, cache_follower from services.db import local_session from services.encoders import CustomJSONEncoder from services.logger import root_logger as logger @@ -55,22 +53,25 @@ async def get_author(_, _info, slug='', author_id=0): author = None author_dict = None try: - author_query = select(Author).filter( - or_(Author.slug == slug, Author.id == author_id) - ) - result = await get_authors_with_stat_cached(author_query) - if not result: - raise ValueError('Author not found') - [author] = result - author_id = author.id - logger.debug(f'found @{slug} with id {author_id}') - if isinstance(author, Author): - if not author.stat: - [author] = get_with_stat(author_query) # FIXME: with_rating=True) - if author: - await set_author_cache(author.dict()) - logger.debug('updated author stored in cache') - author_dict = author.dict() + # lookup for cached author + author_query = select(Author).filter(or_(Author.slug == slug, Author.id == author_id)) + found_author = local_session().execute(author_query).first() + logger.debug(f'found author id: {found_author.id}') + author_id = found_author.id if not found_author.id else author_id + cached_result = await redis.execute('GET', f'author:{author_id}') + author_dict = json.loads(cached_result) if cached_result else None + + # update stat from db + if not author_dict or not author_dict.get('stat'): + result = get_with_stat(author_query) + if not result: + raise ValueError('Author not found') + [author] = result + # use found author + if isinstance(author, Author): + logger.debug(f'update @{author.slug} with id {author.id}') + author_dict = author.dict() + await cache_author(author_dict) except ValueError: pass except Exception: @@ -95,11 +96,11 @@ async def get_author_by_user_id(user_id: str): logger.debug(f'got author @{author_slug} #{author_id} cached') return author - q = select(Author).filter(Author.user == user_id) - result = await get_authors_with_stat_cached(q) + author_query = select(Author).filter(Author.user == user_id) + result = get_with_stat(author_query) if result: [author] = result - await set_author_cache(author.dict()) + await cache_author(author.dict()) except Exception as exc: import traceback @@ -286,36 +287,32 @@ def create_author(user_id: str, slug: str, name: str = ''): async def get_author_followers(_, _info, slug: str): logger.debug(f'getting followers for @{slug}') try: - with local_session() as session: - author_alias = aliased(Author) - result = ( - session.query(author_alias).filter(author_alias.slug == slug).first() - ) - if result: - [author] = result - author_id = author.id - cached = await redis.execute('GET', f'author:{author_id}:followers') - if not cached: - author_follower_alias = aliased(AuthorFollower, name='af') - q = select(Author).join( - author_follower_alias, - and_( - author_follower_alias.author == author_id, - author_follower_alias.follower == Author.id, - ), - ) - results = await get_authors_with_stat_cached(q) - _ = asyncio.create_task( - update_author_followers_cache( - author_id, [x.dict() for x in results] - ) - ) + author_alias = aliased(Author) + author_query = select(author_alias).filter(author_alias.slug == slug) + result = local_session().execute(author_query).first() + if result: + [author] = result + author_id = author.id + cached = await redis.execute('GET', f'author:{author_id}:followers') + if not cached: + author_follower_alias = aliased(AuthorFollower, name='af') + q = select(Author).join( + author_follower_alias, + and_( + author_follower_alias.author == author_id, + author_follower_alias.follower == Author.id, + ), + ) + results = get_with_stat(q) + if isinstance(results, list): + for follower in results: + await cache_follower(follower, author) logger.debug(f'@{slug} cache updated with {len(results)} followers') - return results - else: - logger.debug(f'@{slug} got followers cached') - if isinstance(cached, str): - return json.loads(cached) + return results + else: + logger.debug(f'@{slug} got followers cached') + if isinstance(cached, str): + return json.loads(cached) except Exception as exc: import traceback @@ -327,4 +324,4 @@ async def get_author_followers(_, _info, slug: str): @query.field('search_authors') async def search_authors(_, _info, what: str): q = search(select(Author), what) - return await get_authors_with_stat_cached(q) + return get_with_stat(q) diff --git a/resolvers/follower.py b/resolvers/follower.py index 6f9fcc76..36e82da9 100644 --- a/resolvers/follower.py +++ b/resolvers/follower.py @@ -8,16 +8,12 @@ from sqlalchemy.sql import and_ from orm.author import Author, AuthorFollower from orm.community import Community -# from orm.community import Community from orm.reaction import Reaction from orm.shout import Shout, ShoutReactionsFollower from orm.topic import Topic, TopicFollower -from resolvers.stat import (author_follows_authors, author_follows_topics, - get_authors_with_stat_cached, - get_topics_with_stat_cached) +from resolvers.stat import author_follows_authors, author_follows_topics, get_with_stat from services.auth import login_required -from services.cache import (DEFAULT_FOLLOWS, update_followers_for_author, - update_follows_for_author) +from services.cache import DEFAULT_FOLLOWS from services.db import local_session from services.logger import root_logger as logger from services.notify import notify_follower @@ -33,53 +29,37 @@ async def follow(_, info, what, slug): user_id = info.context.get('user_id') if not user_id: return {'error': 'unauthorized'} - [follower] = await get_authors_with_stat_cached( - select(Author).select_from(Author).filter(Author.user == user_id) - ) + + follower_query = select(Author).select_from(Author).filter(Author.user == user_id) + [follower] = local_session().execute(follower_query) if not follower: return {'error': 'cant find follower'} if what == 'AUTHOR': error = author_follow(follower.id, slug) if not error: - logger.debug(f'@{follower.slug} followed @{slug}') - [author] = await get_authors_with_stat_cached( - select(Author).select_from(Author).where(Author.slug == slug) - ) - if not author: - return {'error': 'author is not found'} - follows = await update_follows_for_author( - follower, 'author', author.dict(), True - ) - _followers = await update_followers_for_author(follower, author, True) - await notify_follower(follower.dict(), author.id, 'unfollow') + author_query = select(Author).where(Author.slug == slug) + [author] = local_session().execute(author_query) + await notify_follower(follower.dict(), author.id, 'follow') elif what == 'TOPIC': error = topic_follow(follower.id, slug) - if not error: - [topic] = await get_topics_with_stat_cached( - select(Topic).where(Topic.slug == slug) - ) - if not topic: - return {'error': 'topic is not found'} - follows = await update_follows_for_author( - follower, 'topic', topic.dict(), True - ) elif what == 'COMMUNITY': + # FIXME: when more communities follows = local_session().execute(select(Community)) elif what == 'SHOUT': error = reactions_follow(follower.id, slug) - if not error: - [shout] = local_session().execute(select(Shout).where(Shout.slug == slug)) - if not shout: - return {'error': 'cant find shout'} - follows = await update_follows_for_author( - follower, 'shout', shout.dict(), True - ) - return {f'{what.lower()}s': follows, 'error': error} + if error: + return {'error': error} + + entity = what.lower() + follows_str = await redis.execute('GET', f'author:{follower.id}:follows-{entity}s') + if follows_str: + follows = json.loads(follows_str) + return { f'{entity}s': follows } @mutation.field('unfollow') @@ -91,54 +71,33 @@ async def unfollow(_, info, what, slug): if not user_id: return {'error': 'unauthorized'} follower_query = select(Author).filter(Author.user == user_id) - [follower] = await get_authors_with_stat_cached(follower_query) + [follower] = local_session().execute(follower_query) if not follower: return {'error': 'follower profile is not found'} if what == 'AUTHOR': error = author_unfollow(follower.id, slug) + # NOTE: after triggers should update cached stats if not error: - logger.info(f'@{follower.slug} unfollowing @{slug}') - [author] = await get_authors_with_stat_cached( - select(Author).where(Author.slug == slug) - ) - if not author: - return {'error': 'cant find author'} - _followers = await update_followers_for_author(follower, author, False) + logger.info(f'@{follower.slug} unfollowed @{slug}') + author_query = select(Author).where(Author.slug == slug) + [author] = local_session().execute(author_query) await notify_follower(follower.dict(), author.id, 'unfollow') - follows = await update_follows_for_author( - follower, 'author', author.dict(), False - ) elif what == 'TOPIC': error = topic_unfollow(follower.id, slug) - if not error: - logger.info(f'@{follower.slug} unfollowing §{slug}') - [topic] = await get_topics_with_stat_cached( - select(Topic).where(Topic.slug == slug) - ) - if not topic: - return {'error': 'cant find topic'} - follows = await update_follows_for_author( - follower, 'topic', topic.dict(), False - ) elif what == 'COMMUNITY': follows = local_session().execute(select(Community)) elif what == 'SHOUT': error = reactions_unfollow(follower.id, slug) - if not error: - logger.info(f'@{follower.slug} unfollowing §{slug}') - [shout] = local_session().execute(select(Shout).where(Shout.slug == slug)) - if not shout: - return {'error': 'cant find shout'} - if not error: - follows = await update_follows_for_author( - follower, 'shout', shout.dict(), False - ) - return {'error': error, f'{what.lower()}s': follows} + entity = what.lower() + follows_str = await redis.execute('GET', f'author:{follower.id}:follows-{entity}s') + if follows_str: + follows = json.loads(follows_str) + return {'error': error, f'{entity}s': follows} async def get_follows_by_user_id(user_id: str): @@ -321,7 +280,7 @@ async def get_topic_followers(_, _info, slug: str, topic_id: int) -> List[Author .join(Topic, Topic.id == TopicFollower.topic) .filter(or_(Topic.slug == slug, Topic.id == topic_id)) ) - return await get_authors_with_stat_cached(q) + return get_with_stat(q) @query.field('get_shout_followers') diff --git a/resolvers/stat.py b/resolvers/stat.py index 32efda6d..c40121ec 100644 --- a/resolvers/stat.py +++ b/resolvers/stat.py @@ -1,5 +1,3 @@ -import json - from sqlalchemy import and_, distinct, func, join, select from sqlalchemy.orm import aliased @@ -7,10 +5,7 @@ from orm.author import Author, AuthorFollower from orm.reaction import Reaction, ReactionKind from orm.shout import Shout, ShoutAuthor, ShoutTopic from orm.topic import Topic, TopicFollower -from resolvers.rating import add_author_rating_columns from services.db import local_session -from services.logger import root_logger as logger -from services.rediscache import redis def add_topic_stat_columns(q): @@ -65,7 +60,7 @@ def add_topic_stat_columns(q): return q -def add_author_stat_columns(q, with_rating=False): +def add_author_stat_columns(q): aliased_shout_author = aliased(ShoutAuthor) aliased_authors = aliased(AuthorFollower) aliased_followers = aliased(AuthorFollower) @@ -106,20 +101,17 @@ def add_author_stat_columns(q, with_rating=False): q = q.add_columns(sub_comments.c.comments_count) group_list = [Author.id, sub_comments.c.comments_count] - if with_rating: - q, group_list = add_author_rating_columns(q, group_list) - q = q.group_by(*group_list) return q -def get_with_stat(q, with_rating=False): +def get_with_stat(q): try: is_author = f'{q}'.lower().startswith('select author') is_topic = f'{q}'.lower().startswith('select topic') if is_author: - q = add_author_stat_columns(q, with_rating) + q = add_author_stat_columns(q) elif is_topic: q = add_topic_stat_columns(q) records = [] @@ -133,11 +125,6 @@ def get_with_stat(q, with_rating=False): stat['followers'] = cols[3] if is_author: stat['comments'] = cols[4] - if with_rating: - logger.debug(cols) - stat['rating'] = cols[6] - stat['rating_shouts'] = cols[7] - stat['rating_comments'] = cols[8] entity.stat = stat records.append(entity) except Exception as exc: @@ -148,41 +135,6 @@ def get_with_stat(q, with_rating=False): return records -async def get_authors_with_stat_cached(q): - # logger.debug(q) - try: - records = [] - with local_session() as session: - for [x] in session.execute(q): - stat_str = await redis.execute('GET', f'author:{x.id}') - x.stat = ( - json.loads(stat_str).get('stat') - if isinstance(stat_str, str) - else {} - ) - records.append(x) - except Exception as exc: - raise Exception(exc) - return records - - -async def get_topics_with_stat_cached(q): - try: - records = [] - current = None - with local_session() as session: - for [x] in session.execute(q): - current = x - stat_str = await redis.execute('GET', f'topic:{x.id}') - if isinstance(stat_str, str): - x.stat = json.loads(stat_str).get('stat') - records.append(x) - except Exception as exc: - logger.error(current) - raise Exception(exc) - return records - - def author_follows_authors(author_id: int): af = aliased(AuthorFollower, name='af') q = ( diff --git a/services/cache.py b/services/cache.py index 41c7b83c..ec29ae1b 100644 --- a/services/cache.py +++ b/services/cache.py @@ -1,12 +1,9 @@ -import asyncio import json -from sqlalchemy import event, select +from sqlalchemy import select -from orm.author import Author, AuthorFollower -from orm.reaction import Reaction -from orm.shout import Shout, ShoutAuthor -from orm.topic import Topic, TopicFollower +from orm.author import Author +from orm.topic import Topic from resolvers.stat import get_with_stat from services.encoders import CustomJSONEncoder from services.logger import root_logger as logger @@ -19,7 +16,7 @@ DEFAULT_FOLLOWS = { } -async def set_author_cache(author: dict): +async def cache_author(author: dict): payload = json.dumps(author, cls=CustomJSONEncoder) await redis.execute('SET', f'user:{author.get("user")}', payload) await redis.execute('SET', f'author:{author.get("id")}', payload) @@ -66,65 +63,9 @@ async def set_author_cache(author: dict): # author not found in the list, so add the new author with the updated stat field followed_author_followers.append(author) -async def update_author_followers_cache(author_id: int, followers): - updated_followers = [f.dict() if isinstance(f, Author) else f for f in followers] - payload = json.dumps( - updated_followers, - cls=CustomJSONEncoder, - ) - await redis.execute('SET', f'author:{author_id}:followers', payload) - author_str = await redis.execute('GET', f'author:{author_id}') - if author_str: - author = json.loads(author_str) - author['stat']['followers'] = len(updated_followers) - await set_author_cache(author) - -async def set_topic_cache(topic: dict): - payload = json.dumps(topic, cls=CustomJSONEncoder) - await redis.execute('SET', f'topic:{topic.get("id")}', payload) - - -async def set_follows_topics_cache(follows, author_id: int): - try: - payload = json.dumps( - [a.dict() if isinstance(a, Author) else a for a in follows], - cls=CustomJSONEncoder, - ) - await redis.execute('SET', f'author:{author_id}:follows-topics', payload) - except Exception as exc: - logger.error(exc) - import traceback - - exc = traceback.format_exc() - logger.error(exc) - - -async def set_follows_authors_cache(follows, author_id: int): - updated_follows = [a.dict() if isinstance(a, Author) else a for a in follows] - try: - payload = json.dumps( - updated_follows, - cls=CustomJSONEncoder, - ) - await redis.execute('SET', f'author:{author_id}:follows-authors', payload) - # update author everywhere - author_str = await redis.execute('GET', f'author:{author_id}') - if author_str: - author = json.loads(author_str) - author['stat']['authors'] = len(updated_follows) - await set_author_cache(author) - except Exception as exc: - import traceback - - logger.error(exc) - exc = traceback.format_exc() - logger.error(exc) - - -async def update_follows_for_author( - follower: Author, entity_type: str, entity: dict, is_insert: bool -): +async def cache_follows(follower: Author, entity_type: str, entity, is_insert=True): + # prepare follows = [] redis_key = f'author:{follower.id}:follows-{entity_type}s' follows_str = await redis.execute('GET', redis_key) @@ -138,197 +79,66 @@ async def update_follows_for_author( raise Exception('wrong entity') # Remove the entity from follows follows = [e for e in follows if e['id'] != entity_id] - logger.debug(f'{entity['slug']} removed from what @{follower.slug} follows') - if entity_type == 'topic': - await set_follows_topics_cache(follows, follower.id) - if entity_type == 'author': - await set_follows_authors_cache(follows, follower.id) + + # update follows cache + updated_data = [t.dict() if isinstance(t, Topic) else t for t in follows] + payload = json.dumps(updated_data, cls=CustomJSONEncoder) + await redis.execute('SET', redis_key, payload) + + # update follower's stats everywhere + author_str = await redis.execute('GET', f'author:{follower.id}') + if author_str: + author = json.loads(author_str) + author['stat'][f'{entity_type}s'] = len(updated_data) + await cache_author(author) return follows -async def update_followers_for_author( - follower: Author, author: Author, is_insert: bool -): +async def cache_follower(follower: Author, author: Author, is_insert=True): redis_key = f'author:{author.id}:followers' followers_str = await redis.execute('GET', redis_key) followers = [] if isinstance(followers_str, str): followers = json.loads(followers_str) if is_insert: - followers.append(follower) - else: # Remove the entity from followers followers = [e for e in followers if e['id'] != author.id] - await update_author_followers_cache(author.id, followers) + else: + followers.append(follower) + updated_followers = [f.dict() if isinstance(f, Author) else f for f in followers] + payload = json.dumps(updated_followers, cls=CustomJSONEncoder) + await redis.execute('SET', redis_key, payload) + author_str = await redis.execute('GET', f'author:{follower.id}') + if author_str: + author = json.loads(author_str) + author['stat']['followers'] = len(updated_followers) + await cache_author(author) return followers -def after_shout_update(_mapper, _connection, shout: Shout): - logger.info('after shout update') - # Main query to get authors associated with the shout through ShoutAuthor - authors_query = ( - select(Author) - .select_from(ShoutAuthor) # Select from ShoutAuthor - .join(Author, Author.id == ShoutAuthor.author) # Join with Author - .filter(ShoutAuthor.shout == shout.id) # Filter by shout.id - ) - - for author_with_stat in get_with_stat(authors_query): - asyncio.create_task(set_author_cache(author_with_stat.dict())) - - -def after_reaction_update(mapper, connection, reaction: Reaction): - logger.info('after reaction update') - try: - author_subquery = select(Author).where(Author.id == reaction.created_by) - replied_author_subquery = ( - select(Author) - .join(Reaction, Author.id == Reaction.created_by) - .where(Reaction.id == reaction.reply_to) - ) - - author_query = ( - select(author_subquery.subquery()) - .select_from(author_subquery.subquery()) - .union( - select(replied_author_subquery.subquery()).select_from( - replied_author_subquery.subquery() - ) - ) - ) - - for author_with_stat in get_with_stat(author_query): - asyncio.create_task(set_author_cache(author_with_stat.dict())) - - shout = connection.execute( - select(Shout).select_from(Shout).where(Shout.id == reaction.shout) - ).first() - if shout: - after_shout_update(mapper, connection, shout) - except Exception as exc: - logger.error(exc) - import traceback - - traceback.print_exc() - - -def after_author_update(_mapper, _connection, author: Author): - logger.info('after author update') - q = select(Author).where(Author.id == author.id) - result = get_with_stat(q) - if result: - [author_with_stat] = result - asyncio.create_task(set_author_cache(author_with_stat.dict())) - - -def after_topic_follower_insert(_mapper, _connection, target: TopicFollower): - logger.info(target) - asyncio.create_task( - handle_topic_follower_change(target.topic, target.follower, True) - ) - - -def after_topic_follower_delete(_mapper, _connection, target: TopicFollower): - logger.info(target) - asyncio.create_task( - handle_topic_follower_change(target.topic, target.follower, False) - ) - - -def after_author_follower_insert(_mapper, _connection, target: AuthorFollower): - logger.info(target) - asyncio.create_task( - handle_author_follower_change(target.author, target.follower, True) - ) - - -def after_author_follower_delete(_mapper, _connection, target: AuthorFollower): - logger.info(target) - asyncio.create_task( - handle_author_follower_change(target.author, target.follower, False) - ) - - -async def handle_author_follower_change( - author_id: int, follower_id: int, is_insert: bool -): +async def handle_author_follower_change(author_id: int, follower_id: int, is_insert: bool): logger.info(author_id) author_query = select(Author).select_from(Author).filter(Author.id == author_id) [author] = get_with_stat(author_query) follower_query = select(Author).select_from(Author).filter(Author.id == follower_id) [follower] = get_with_stat(follower_query) if follower and author: - _ = asyncio.create_task(set_author_cache(author.dict())) - follows_authors = await redis.execute( - 'GET', f'author:{follower_id}:follows-authors' - ) - if isinstance(follows_authors, str): - follows_authors = json.loads(follows_authors) - if not any(x.get('id') == author.id for x in follows_authors): - follows_authors.append(author.dict()) - _ = asyncio.create_task(set_follows_authors_cache(follows_authors, follower_id)) - _ = asyncio.create_task(set_author_cache(follower.dict())) - await update_follows_for_author( - follower, - 'author', - { - 'id': author.id, - 'name': author.name, - 'slug': author.slug, - 'pic': author.pic, - 'bio': author.bio, - 'stat': author.stat, - }, - is_insert, - ) + await cache_author(author.dict()) + await cache_author(follower.dict()) + await cache_follows(follower, 'author', author.dict(), is_insert) + await cache_follower(follower, author, is_insert) -async def handle_topic_follower_change( - topic_id: int, follower_id: int, is_insert: bool -): +async def handle_topic_follower_change(topic_id: int, follower_id: int, is_insert: bool): logger.info(topic_id) topic_query = select(Topic).filter(Topic.id == topic_id) [topic] = get_with_stat(topic_query) follower_query = select(Author).filter(Author.id == follower_id) [follower] = get_with_stat(follower_query) if follower and topic: - _ = asyncio.create_task(set_author_cache(follower.dict())) - follows_topics = await redis.execute( - 'GET', f'author:{follower_id}:follows-topics' - ) - if isinstance(follows_topics, str): - follows_topics = json.loads(follows_topics) - if not any(x.get('id') == topic.id for x in follows_topics): - follows_topics.append(topic) - _ = asyncio.create_task(set_follows_topics_cache(follows_topics, follower_id)) - await update_follows_for_author( - follower, - 'topic', - { - 'id': topic.id, - 'title': topic.title, - 'slug': topic.slug, - 'body': topic.body, - 'stat': topic.stat, - }, - is_insert, - ) + await cache_author(follower.dict()) + await redis.execute('SET', f'topic:{topic.id}', json.dumps(topic.dict(), cls=CustomJSONEncoder)) + await cache_follows(follower, 'topic', topic.dict(), is_insert) -def events_register(): - event.listen(Shout, 'after_insert', after_shout_update) - event.listen(Shout, 'after_update', after_shout_update) - - event.listen(Reaction, 'after_insert', after_reaction_update) - event.listen(Reaction, 'after_update', after_reaction_update) - - event.listen(Author, 'after_insert', after_author_update) - event.listen(Author, 'after_update', after_author_update) - - event.listen(AuthorFollower, 'after_insert', after_author_follower_insert) - event.listen(AuthorFollower, 'after_delete', after_author_follower_delete) - - event.listen(TopicFollower, 'after_insert', after_topic_follower_insert) - event.listen(TopicFollower, 'after_delete', after_topic_follower_delete) - - logger.info('cache events were registered!') +# handle_author_follow and handle_topic_follow -> cache_author, cache_follows, cache_followers diff --git a/services/triggers.py b/services/triggers.py new file mode 100644 index 00000000..2788c029 --- /dev/null +++ b/services/triggers.py @@ -0,0 +1,122 @@ +import asyncio + +from sqlalchemy import event, select + +from orm.author import Author, AuthorFollower +from orm.reaction import Reaction +from orm.shout import Shout, ShoutAuthor +from orm.topic import TopicFollower +from resolvers.stat import get_with_stat +from services.logger import root_logger as logger +from services.cache import cache_author, handle_topic_follower_change, handle_author_follower_change + +DEFAULT_FOLLOWS = { + 'topics': [], + 'authors': [], + 'communities': [{'id': 1, 'name': 'Дискурс', 'slug': 'discours', 'pic': ''}], +} + + +def after_shout_update(_mapper, _connection, shout: Shout): + logger.info('after shout update') + # Main query to get authors associated with the shout through ShoutAuthor + authors_query = ( + select(Author) + .select_from(ShoutAuthor) # Select from ShoutAuthor + .join(Author, Author.id == ShoutAuthor.author) # Join with Author + .filter(ShoutAuthor.shout == shout.id) # Filter by shout.id + ) + + for author_with_stat in get_with_stat(authors_query): + asyncio.create_task(cache_author(author_with_stat.dict())) + + +def after_reaction_update(mapper, connection, reaction: Reaction): + logger.info('after reaction update') + try: + author_subquery = select(Author).where(Author.id == reaction.created_by) + replied_author_subquery = ( + select(Author) + .join(Reaction, Author.id == Reaction.created_by) + .where(Reaction.id == reaction.reply_to) + ) + + author_query = ( + select(author_subquery.subquery()) + .select_from(author_subquery.subquery()) + .union( + select(replied_author_subquery.subquery()).select_from( + replied_author_subquery.subquery() + ) + ) + ) + + for author_with_stat in get_with_stat(author_query): + asyncio.create_task(cache_author(author_with_stat.dict())) + + shout = connection.execute( + select(Shout).select_from(Shout).where(Shout.id == reaction.shout) + ).first() + if shout: + after_shout_update(mapper, connection, shout) + except Exception as exc: + logger.error(exc) + import traceback + + traceback.print_exc() + + +def after_author_update(_mapper, _connection, author: Author): + logger.info('after author update') + q = select(Author).where(Author.id == author.id) + result = get_with_stat(q) + if result: + [author_with_stat] = result + asyncio.create_task(cache_author(author_with_stat.dict())) + + +def after_topic_follower_insert(_mapper, _connection, target: TopicFollower): + logger.info(target) + asyncio.create_task( + handle_topic_follower_change(target.topic, target.follower, True) + ) + + +def after_topic_follower_delete(_mapper, _connection, target: TopicFollower): + logger.info(target) + asyncio.create_task( + handle_topic_follower_change(target.topic, target.follower, False) + ) + + +def after_author_follower_insert(_mapper, _connection, target: AuthorFollower): + logger.info(target) + asyncio.create_task( + handle_author_follower_change(target.author, target.follower, True) + ) + + +def after_author_follower_delete(_mapper, _connection, target: AuthorFollower): + logger.info(target) + asyncio.create_task( + handle_author_follower_change(target.author, target.follower, False) + ) + + +def events_register(): + event.listen(Shout, 'after_insert', after_shout_update) + event.listen(Shout, 'after_update', after_shout_update) + + event.listen(Reaction, 'after_insert', after_reaction_update) + event.listen(Reaction, 'after_update', after_reaction_update) + + event.listen(Author, 'after_insert', after_author_update) + event.listen(Author, 'after_update', after_author_update) + + event.listen(AuthorFollower, 'after_insert', after_author_follower_insert) + event.listen(AuthorFollower, 'after_delete', after_author_follower_delete) + + event.listen(TopicFollower, 'after_insert', after_topic_follower_insert) + event.listen(TopicFollower, 'after_delete', after_topic_follower_delete) + + logger.info('cache events were registered!')