From 37319c20910512211fb51aa30f453ca37cc26dd6 Mon Sep 17 00:00:00 2001 From: Untone Date: Tue, 12 Mar 2024 14:59:36 +0300 Subject: [PATCH] cache-events-fix --- main.py | 2 + resolvers/author.py | 4 +- resolvers/follower.py | 22 +++---- resolvers/stat.py | 103 ++++++++++++++---------------- services/cache.py | 143 +++++++++++++++++++++++------------------- 5 files changed, 142 insertions(+), 132 deletions(-) diff --git a/main.py b/main.py index 46bcc280..621472fb 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ from ariadne.asgi import GraphQL from starlette.applications import Starlette from starlette.routing import Route +from services.cache import events_register from services.rediscache import redis from services.schema import resolvers from services.viewed import ViewedStorage @@ -37,6 +38,7 @@ app = Starlette( ViewedStorage.init, # search_service.info, # start_sentry, + events_register, start, ], on_shutdown=[redis.disconnect], diff --git a/resolvers/author.py b/resolvers/author.py index 7c40c41e..6992636a 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -2,7 +2,7 @@ import asyncio import json import time -from sqlalchemy import select, or_, and_, text, desc, literal +from sqlalchemy import select, or_, and_, text, desc from sqlalchemy.orm import aliased from sqlalchemy_searchable import search @@ -52,7 +52,7 @@ async def get_author(_, _info, slug='', author_id=None): if author_id: cache = await redis.execute('GET', f'id:{author_id}:author') logger.debug(f'result from cache: {cache}') - q = select(Author).where(Author.id == literal(author_id)) + q = select(Author).where(Author.id == author_id) author_dict = None if cache: author_dict = json.loads(cache) diff --git a/resolvers/follower.py b/resolvers/follower.py index 76a63bd7..c5c729e9 100644 --- a/resolvers/follower.py +++ b/resolvers/follower.py @@ -2,7 +2,7 @@ import json import time from typing import List -from sqlalchemy import select, or_, column +from sqlalchemy import select, or_ from sqlalchemy.sql import and_ from orm.author import Author, AuthorFollower @@ -34,7 +34,7 @@ async def follow(_, info, what, slug): user_id = info.context.get('user_id') if not user_id: return {"error": "unauthorized"} - [follower] = get_with_stat(select(Author).select_from(Author).filter(column(Author.user) == user_id)) + [follower] = get_with_stat(select(Author).select_from(Author).filter(Author.user == user_id)) if not follower: return {"error": "cant find follower"} @@ -42,7 +42,7 @@ async def follow(_, info, what, slug): error = author_follow(follower.id, slug) if not error: logger.debug(f'@{follower.slug} followed @{slug}') - [author] = get_with_stat(select(Author).select_from(Author).where(column(Author.slug) == slug)) + [author] = get_with_stat(select(Author).select_from(Author).where(Author.slug == slug)) if not author: return {"error": "author is not found"} follows = await update_follows_for_author(follower, 'author', author.dict(), True) @@ -52,7 +52,7 @@ async def follow(_, info, what, slug): elif what == 'TOPIC': error = topic_follow(follower.id, slug) if not error: - [topic] = get_with_stat(select(Topic).where(column(Topic.slug) == slug)) + [topic] = get_with_stat(select(Topic).where(Topic.slug == slug)) if not topic: return {"error": "topic is not found"} follows = await update_follows_for_author(follower, 'topic', topic.dict(), True) @@ -63,7 +63,7 @@ async def follow(_, info, what, slug): elif what == 'SHOUT': error = reactions_follow(follower.id, slug) if not error: - [shout] = local_session().execute(select(Shout).where(column(Shout.slug) == slug)) + [shout] = local_session().execute(select(Shout).where(Shout.slug == slug)) if not shout: return {"error": "cant find shout"} follows = await update_follows_for_author(follower, 'shout', shout.dict(), True) @@ -79,7 +79,7 @@ async def unfollow(_, info, what, slug): user_id = info.context.get('user_id') if not user_id: return {"error": "unauthorized"} - follower_query = select(Author).filter(column(Author.user) == user_id) + follower_query = select(Author).filter(Author.user == user_id) [follower] = get_with_stat(follower_query) if not follower: return {"error": "follower profile is not found"} @@ -88,7 +88,7 @@ async def unfollow(_, info, what, slug): error = author_unfollow(follower.id, slug) if not error: logger.info(f'@{follower.slug} unfollowing @{slug}') - [author] = get_with_stat(select(Author).where(column(Author.slug) == slug)) + [author] = get_with_stat(select(Author).where(Author.slug == slug)) if not author: return {"error": "cant find author"} _followers = await update_followers_for_author(follower, author, False) @@ -99,7 +99,7 @@ async def unfollow(_, info, what, slug): error = topic_unfollow(follower.id, slug) if not error: logger.info(f'@{follower.slug} unfollowing §{slug}') - [topic] = get_with_stat(select(Topic).where(column(Topic.slug) == slug)) + [topic] = get_with_stat(select(Topic).where(Topic.slug == slug)) if not topic: return {"error": "cant find topic"} follows = await update_follows_for_author(follower, 'topic', topic.dict(), False) @@ -111,7 +111,7 @@ async def unfollow(_, info, what, slug): error = reactions_unfollow(follower.id, slug) if not error: logger.info(f'@{follower.slug} unfollowing §{slug}') - [shout] = local_session().execute(select(Shout).where(column(Shout.slug) == slug)) + [shout] = local_session().execute(select(Shout).where(Shout.slug == slug)) if not shout: return {"error": "cant find shout"} if not error: @@ -276,8 +276,8 @@ def author_unfollow(follower_id, slug): def get_topic_followers(_, _info, slug: str, topic_id: int) -> List[Author]: q = select(Author) q = ( - q.join(TopicFollower, column(TopicFollower.follower) == Author.id) - .join(Topic, column(Topic.id) == column(TopicFollower.topic)) + q.join(TopicFollower, TopicFollower.follower == Author.id) + .join(Topic, Topic.id == TopicFollower.topic) .filter(or_(Topic.slug == slug, Topic.id == topic_id)) ) return get_with_stat(q) diff --git a/resolvers/stat.py b/resolvers/stat.py index c8c55b6f..d8277376 100644 --- a/resolvers/stat.py +++ b/resolvers/stat.py @@ -6,6 +6,7 @@ from orm.topic import TopicFollower, Topic from services.db import local_session from orm.author import AuthorFollower, Author, AuthorRating from orm.shout import ShoutTopic, ShoutAuthor, Shout +from services.logger import root_logger as logger def add_topic_stat_columns(q): @@ -16,26 +17,26 @@ def add_topic_stat_columns(q): q = ( q.outerjoin(aliased_shout_topic, aliased_shout_topic.topic == Topic.id) .add_columns( - func.count(distinct(aliased_shout_topic.shout)).label('shouts_stat') + func.count(distinct(aliased_shout_topic.shout)).label("shouts_stat") ) .outerjoin( aliased_shout_author, aliased_shout_topic.shout == aliased_shout_author.shout, ) .add_columns( - func.count(distinct(aliased_shout_author.author)).label('authors_stat') + func.count(distinct(aliased_shout_author.author)).label("authors_stat") ) .outerjoin(aliased_topic_follower) .add_columns( func.count(distinct(aliased_topic_follower.follower)).label( - 'followers_stat' + "followers_stat" ) ) ) # Create a subquery for comments count _sub_comments = ( select( - Shout.id, func.coalesce(func.count(Reaction.id), 0).label('comments_count') + Shout.id, func.coalesce(func.count(Reaction.id), 0).label("comments_count") ) .join( Reaction, @@ -66,23 +67,23 @@ def add_author_stat_columns(q): q = q.outerjoin(aliased_shout_author, aliased_shout_author.author == Author.id) q = q.add_columns( - func.count(distinct(aliased_shout_author.shout)).label('shouts_stat') + func.count(distinct(aliased_shout_author.shout)).label("shouts_stat") ) q = q.outerjoin(aliased_authors, aliased_authors.follower == Author.id) q = q.add_columns( - func.count(distinct(aliased_authors.author)).label('authors_stat') + func.count(distinct(aliased_authors.author)).label("authors_stat") ) q = q.outerjoin(aliased_followers, aliased_followers.author == Author.id) q = q.add_columns( - func.count(distinct(aliased_followers.follower)).label('followers_stat') + func.count(distinct(aliased_followers.follower)).label("followers_stat") ) # Create a subquery for comments count sub_comments = ( select( - Author.id, func.coalesce(func.count(Reaction.id), 0).label('comments_stat') + Author.id, func.coalesce(func.count(Reaction.id), 0).label("comments_stat") ) .outerjoin( Reaction, @@ -99,23 +100,10 @@ def add_author_stat_columns(q): q = q.outerjoin(sub_comments, Author.id == sub_comments.c.id) q = q.add_columns(sub_comments.c.comments_stat) - # Create a subquery for topics - _sub_topics = ( - select( - ShoutAuthor.author, - func.count(distinct(ShoutTopic.topic)).label('topics_stat'), - ) - .join(Shout, ShoutTopic.shout == Shout.id) - .join(ShoutAuthor, Shout.id == ShoutAuthor.shout) - .group_by(ShoutAuthor.author) - .subquery() + q = q.group_by( + Author.id, sub_comments.c.comments_stat ) - # q = q.outerjoin(sub_topics, Author.id == sub_topics.c.author) - # q = q.add_columns(sub_topics.c.topics_stat) - - q = q.group_by(Author.id, sub_comments.c.comments_stat) #, sub_topics.c.topics_stat) - return q @@ -124,7 +112,7 @@ def add_author_ratings(q): ratings_subquery = ( select( [ - aliased_author.id.label('author_id'), + aliased_author.id.label("author_id"), func.count() .filter( and_( @@ -133,12 +121,12 @@ def add_author_ratings(q): Reaction.deleted_at.is_(None), ) ) - .label('comments_count'), + .label("comments_count"), func.sum(case((AuthorRating.plus == true(), 1), else_=0)).label( - 'likes_count' + "likes_count" ), func.sum(case((AuthorRating.plus != true(), 1), else_=0)).label( - 'dislikes_count' + "dislikes_count" ), func.sum( case( @@ -151,7 +139,7 @@ def add_author_ratings(q): ), else_=0, ) - ).label('shouts_likes'), + ).label("shouts_likes"), func.sum( case( ( @@ -163,7 +151,7 @@ def add_author_ratings(q): ), else_=0, ) - ).label('shouts_dislikes'), + ).label("shouts_dislikes"), ] ) .select_from(aliased_author) @@ -171,42 +159,47 @@ def add_author_ratings(q): .outerjoin(Shout, Shout.authors.any(id=aliased_author.id)) .filter(Reaction.deleted_at.is_(None)) .group_by(aliased_author.id) - .alias('ratings_subquery') + .alias("ratings_subquery") ) return q.join(ratings_subquery, Author.id == ratings_subquery.c.author_id) def get_with_stat(q): - is_author = f'{q}'.lower().startswith('select author') - is_topic = f'{q}'.lower().startswith('select topic') - if is_author: - q = add_author_stat_columns(q) - # q = add_author_ratings(q) # TODO: move rating to cols down there - elif is_topic: - q = add_topic_stat_columns(q) - records = [] - # logger.debug(f'{q}'.replace('\n', ' ')) - with local_session() as session: - for cols in session.execute(q): - entity = cols[0] - entity.stat = {} - entity.stat['shouts'] = cols[1] - entity.stat['authors'] = cols[2] - entity.stat['followers'] = cols[3] - if is_author: - entity.stat['comments'] = cols[4] - # entity.stat['topics'] = cols[5] - # entity.stat['rating'] = cols[5] - cols[6] - # entity.stat['rating_shouts'] = cols[7] - cols[8] - - records.append(entity) - + try: + is_author = f"{q}".lower().startswith("select author") + is_topic = f"{q}".lower().startswith("select topic") + if is_author: + q = add_author_stat_columns(q) + # q = add_author_ratings(q) # TODO: move rating to cols down there + elif is_topic: + q = add_topic_stat_columns(q) + records = [] + # logger.debug(f'{q}'.replace('\n', ' ')) + with local_session() as session: + for cols in session.execute(q): + entity = cols[0] + if not isinstance(entity, dict): + entity = entity.dict() + stat = dict() + stat["shouts"] = cols[1] + stat["authors"] = cols[2] + stat["followers"] = cols[3] + if is_author: + stat["comments"] = cols[4] + # entity.stat['topics'] = cols[5] + # entity.stat['rating'] = cols[5] - cols[6] + # entity.stat['rating_shouts'] = cols[7] - cols[8] + entity['stat'] = stat + records.append(entity) + except Exception as exc: + logger.debug(cols) + raise Exception(exc) return records def author_follows_authors(author_id: int): - af = aliased(AuthorFollower, name='af') + af = aliased(AuthorFollower, name="af") q = ( select(Author) .select_from(join(Author, af, Author.id == af.author)) diff --git a/services/cache.py b/services/cache.py index da29fdb2..6e600307 100644 --- a/services/cache.py +++ b/services/cache.py @@ -14,27 +14,33 @@ from services.logger import root_logger as logger DEFAULT_FOLLOWS = { - 'topics': [], - 'authors': [], - 'communities': [{'id': 1, 'name': 'Дискурс', 'slug': 'discours', 'pic': ''}], + "topics": [], + "authors": [], + "communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}], } async def set_author_cache(author: dict): payload = json.dumps(author, cls=CustomJSONEncoder) - await redis.execute('SET', f'user:{author.get("user")}:author', payload) - await redis.execute('SET', f'id:{author.get("id")}:author', payload) + await redis.execute("SET", f'user:{author.get("user")}:author', payload) + await redis.execute("SET", f'id:{author.get("id")}:author', payload) async def update_author_followers_cache(author_id: int, followers): - payload = json.dumps([f.dict() if isinstance(f, Author) else f for f in followers], cls=CustomJSONEncoder) - await redis.execute('SET', f'author:{author_id}:followers', payload) + payload = json.dumps( + [f.dict() if isinstance(f, Author) else f for f in followers], + cls=CustomJSONEncoder, + ) + await redis.execute("SET", f"author:{author_id}:followers", payload) async def set_follows_topics_cache(follows, author_id: int): try: - payload = json.dumps([a.dict() if isinstance(a, Author) else a for a in follows], cls=CustomJSONEncoder) - await redis.execute('SET', f'author:{author_id}:follows-topics', payload) + payload = json.dumps( + [a.dict() if isinstance(a, Author) else a for a in follows], + cls=CustomJSONEncoder, + ) + await redis.execute("SET", f"author:{author_id}:follows-topics", payload) except Exception as exc: logger.error(exc) import traceback @@ -45,8 +51,11 @@ async def set_follows_topics_cache(follows, author_id: int): async def set_follows_authors_cache(follows, author_id: int): try: - payload = json.dumps([a.dict() if isinstance(a, Author) else a for a in follows], cls=CustomJSONEncoder) - await redis.execute('SET', f'author:{author_id}:follows-authors', payload) + payload = json.dumps( + [a.dict() if isinstance(a, Author) else a for a in follows], + cls=CustomJSONEncoder, + ) + await redis.execute("SET", f"author:{author_id}:follows-authors", payload) except Exception as exc: import traceback @@ -59,22 +68,22 @@ async def update_follows_for_author( follower: Author, entity_type: str, entity: dict, is_insert: bool ): follows = [] - redis_key = f'author:{follower.id}:follows-{entity_type}s' - follows_str = await redis.execute('GET', redis_key) + redis_key = f"author:{follower.id}:follows-{entity_type}s" + follows_str = await redis.execute("GET", redis_key) if isinstance(follows_str, str): follows = json.loads(follows_str) if is_insert: follows.append(entity) else: - entity_id = entity.get('id') + entity_id = entity.get("id") if not entity_id: - raise Exception('wrong entity') + raise Exception("wrong entity") # Remove the entity from follows - follows = [e for e in follows if e['id'] != entity_id] + follows = [e for e in follows if e["id"] != entity_id] logger.debug(f'{entity['slug']} removed from what @{follower.slug} follows') - if entity_type == 'topic': + if entity_type == "topic": await set_follows_topics_cache(follows, follower.id) - if entity_type == 'author': + if entity_type == "author": await set_follows_authors_cache(follows, follower.id) return follows @@ -82,8 +91,8 @@ async def update_follows_for_author( async def update_followers_for_author( follower: Author, author: Author, is_insert: bool ): - redis_key = f'author:{author.id}:followers' - followers_str = await redis.execute('GET', redis_key) + redis_key = f"author:{author.id}:followers" + followers_str = await redis.execute("GET", redis_key) followers = [] if isinstance(followers_str, str): followers = json.loads(followers_str) @@ -91,14 +100,12 @@ async def update_followers_for_author( followers.append(follower) else: # Remove the entity from followers - followers = [e for e in followers if e['id'] != author.id] + followers = [e for e in followers if e["id"] != author.id] await update_author_followers_cache(author.id, followers) return followers -@event.listens_for(Shout, 'after_insert') -@event.listens_for(Shout, 'after_update') -def after_shouts_update(_mapper, _connection, shout: Shout): +def after_shout_update(_mapper, _connection, shout: Shout): # Main query to get authors associated with the shout through ShoutAuthor authors_query = ( select(Author) @@ -111,8 +118,7 @@ def after_shouts_update(_mapper, _connection, shout: Shout): asyncio.create_task(set_author_cache(author_with_stat.dict())) -@event.listens_for(Reaction, 'after_insert') -def after_reaction_insert(mapper, connection, reaction: Reaction): +def after_reaction_update(mapper, connection, reaction: Reaction): try: author_subquery = select(Author).where(Author.id == reaction.created_by) replied_author_subquery = ( @@ -122,20 +128,11 @@ def after_reaction_insert(mapper, connection, reaction: Reaction): ) author_query = ( - select( - author_subquery.subquery().c.id, - author_subquery.subquery().c.slug, - author_subquery.subquery().c.created_at, - author_subquery.subquery().c.name, - ) + select(author_subquery.subquery()) .select_from(author_subquery.subquery()) .union( - select( - replied_author_subquery.subquery().c.id, - replied_author_subquery.subquery().c.slug, - replied_author_subquery.subquery().c.created_at, - replied_author_subquery.subquery().c.name, - ).select_from(replied_author_subquery.subquery()) + select(replied_author_subquery.subquery()) + .select_from(replied_author_subquery.subquery()) ) ) @@ -146,15 +143,14 @@ def after_reaction_insert(mapper, connection, reaction: Reaction): select(Shout).select_from(Shout).where(Shout.id == reaction.shout) ).first() if shout: - after_shouts_update(mapper, connection, shout) + after_shout_update(mapper, connection, shout) except Exception as exc: logger.error(exc) import traceback + traceback.print_exc() -@event.listens_for(Author, 'after_insert') -@event.listens_for(Author, 'after_update') def after_author_update(_mapper, _connection, author: Author): q = select(Author).where(Author.id == author.id) result = get_with_stat(q) @@ -163,35 +159,33 @@ def after_author_update(_mapper, _connection, author: Author): asyncio.create_task(set_author_cache(author_with_stat.dict())) -@event.listens_for(TopicFollower, 'after_insert') def after_topic_follower_insert(_mapper, _connection, target: TopicFollower): asyncio.create_task( handle_topic_follower_change(target.topic, target.follower, True) ) -@event.listens_for(TopicFollower, 'after_delete') def after_topic_follower_delete(_mapper, _connection, target: TopicFollower): asyncio.create_task( handle_topic_follower_change(target.topic, target.follower, False) ) -@event.listens_for(AuthorFollower, 'after_insert') def after_author_follower_insert(_mapper, _connection, target: AuthorFollower): asyncio.create_task( handle_author_follower_change(target.author, target.follower, True) ) -@event.listens_for(AuthorFollower, 'after_delete') def after_author_follower_delete(_mapper, _connection, target: AuthorFollower): asyncio.create_task( handle_author_follower_change(target.author, target.follower, False) ) -async def handle_author_follower_change(author_id: int, follower_id: int, is_insert: bool): +async def handle_author_follower_change( + author_id: int, follower_id: int, is_insert: bool +): author_query = select(Author).select_from(Author).filter(Author.id == author_id) [author] = get_with_stat(author_query) follower_query = select(Author).select_from(Author).filter(Author.id == follower_id) @@ -199,30 +193,32 @@ async def handle_author_follower_change(author_id: int, follower_id: int, is_ins if follower and author: _ = asyncio.create_task(set_author_cache(author.dict())) follows_authors = await redis.execute( - 'GET', f'author:{follower_id}:follows-authors' + "GET", f"author:{follower_id}:follows-authors" ) if follows_authors: follows_authors = json.loads(follows_authors) - if not any(x.get('id') == author.id for x in follows_authors): + if not any(x.get("id") == author.id for x in follows_authors): follows_authors.append(author.dict()) _ = asyncio.create_task(set_follows_authors_cache(follows_authors, follower_id)) _ = asyncio.create_task(set_author_cache(follower.dict())) await update_follows_for_author( follower, - 'author', + "author", { - 'id': author.id, - 'name': author.name, - 'slug': author.slug, - 'pic': author.pic, - 'bio': author.bio, - 'stat': author.stat, + "id": author.id, + "name": author.name, + "slug": author.slug, + "pic": author.pic, + "bio": author.bio, + "stat": author.stat, }, is_insert, ) -async def handle_topic_follower_change(topic_id: int, follower_id: int, is_insert: bool): +async def handle_topic_follower_change( + topic_id: int, follower_id: int, is_insert: bool +): topic_query = select(Topic).filter(Topic.id == topic_id) [topic] = get_with_stat(topic_query) follower_query = select(Author).filter(Author.id == follower_id) @@ -230,22 +226,41 @@ async def handle_topic_follower_change(topic_id: int, follower_id: int, is_inser if follower and topic: _ = asyncio.create_task(set_author_cache(follower.dict())) follows_topics = await redis.execute( - 'GET', f'author:{follower_id}:follows-topics' + "GET", f"author:{follower_id}:follows-topics" ) if follows_topics: follows_topics = json.loads(follows_topics) - if not any(x.get('id') == topic.id for x in follows_topics): + if not any(x.get("id") == topic.id for x in follows_topics): follows_topics.append(topic) _ = asyncio.create_task(set_follows_topics_cache(follows_topics, follower_id)) await update_follows_for_author( follower, - 'topic', + "topic", { - 'id': topic.id, - 'title': topic.title, - 'slug': topic.slug, - 'body': topic.body, - 'stat': topic.stat, + "id": topic.id, + "title": topic.title, + "slug": topic.slug, + "body": topic.body, + "stat": topic.stat, }, is_insert, ) + + +def events_register(): + event.listen(Shout, "after_insert", after_shout_update) + event.listen(Shout, "after_update", after_shout_update) + + event.listen(Reaction, "after_insert", after_reaction_update) + event.listen(Reaction, "after_update", after_reaction_update) + + event.listen(Author, "after_insert", after_author_update) + event.listen(Author, "after_update", after_author_update) + + event.listen(AuthorFollower, "after_insert", after_author_follower_insert) + event.listen(AuthorFollower, "after_delete", after_author_follower_delete) + + event.listen(TopicFollower, "after_insert", after_topic_follower_insert) + event.listen(TopicFollower, "after_delete", after_topic_follower_delete) + + logger.info('cache events were registered!')