diff --git a/resolvers/author.py b/resolvers/author.py index f99af7b7..cab72588 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -1,7 +1,7 @@ import json import time -from sqlalchemy import select, or_, and_, text, desc +from sqlalchemy import select, or_, and_, text, desc, cast, Integer from sqlalchemy.orm import aliased from sqlalchemy_searchable import search @@ -9,7 +9,7 @@ from orm.author import Author, AuthorFollower from orm.shout import ShoutAuthor, ShoutTopic from orm.topic import Topic from resolvers.stat import get_with_stat, author_follows_authors, author_follows_topics -from services.event_listeners import update_author_cache +from services.cache import update_author_cache from services.auth import login_required from services.db import local_session from services.rediscache import redis @@ -214,26 +214,27 @@ def create_author(user_id: str, slug: str, name: str = ''): @query.field('get_author_followers') -def get_author_followers(_, _info, slug: str): +async def get_author_followers(_, _info, slug: str): logger.debug(f'getting followers for @{slug}') try: with local_session() as session: author_alias = aliased(Author) author_id_result = ( - session.query(author_alias.id).filter(author_alias.slug == slug).first() + session.query(author_alias).filter(author_alias.slug == slug).first() ) - author_id = author_id_result[0] if author_id_result else None - - author_follower_alias = aliased(AuthorFollower, name='af') - q = select(Author).join( - author_follower_alias, - and_( - author_follower_alias.author == author_id, - author_follower_alias.follower == Author.id, - ), - ) - - return get_with_stat(q) + author = author_id_result[0] if author_id_result else None + author_id = cast(author.id, Integer) + cached = await redis.execute('GET', f'id:{author_id}:followers') + if not cached: + author_follower_alias = aliased(AuthorFollower, name='af') + q = select(Author).join( + author_follower_alias, + and_( + author_follower_alias.author == author_id, + author_follower_alias.follower == Author.id, + ), + ) + return json.loads(cached) if cached else get_with_stat(q) except Exception as exc: logger.error(exc) return [] diff --git a/resolvers/follower.py b/resolvers/follower.py index 978ba212..3c6822bb 100644 --- a/resolvers/follower.py +++ b/resolvers/follower.py @@ -16,7 +16,7 @@ from resolvers.topic import topic_unfollow from resolvers.stat import get_with_stat, author_follows_topics, author_follows_authors from services.auth import login_required from services.db import local_session -from services.event_listeners import DEFAULT_FOLLOWS, update_follows_for_author +from services.cache import DEFAULT_FOLLOWS, update_follows_for_author, update_followers_for_author from services.notify import notify_follower from services.schema import mutation, query from services.logger import root_logger as logger @@ -28,24 +28,27 @@ from services.rediscache import redis async def follow(_, info, what, slug): try: user_id = info.context['user_id'] - with local_session() as session: - follower = session.query(Author).filter(Author.user == user_id).first() - if follower: - if what == 'AUTHOR': - if author_unfollow(follower.id, slug): - author = session.query(Author).where(Author.slug == slug).first() - if author: - await update_follows_for_author(session, follower, 'author', author, True) - await notify_follower(follower.dict(), author.id, 'unfollow') - elif what == 'TOPIC': - topic = session.query(Topic).where(Topic.slug == slug).first() - if topic: - await update_follows_for_author(session, follower, 'topic', topic, True) - topic_unfollow(follower.id, slug) - elif what == 'COMMUNITY': - community_follow(follower.id, slug) - elif what == 'REACTIONS': - reactions_follow(follower.id, slug) + follower_query = select(Author).select_from(Author).filter(Author.user == user_id) + [follower] = get_with_stat(follower_query) + if follower: + if what == 'AUTHOR': + if author_unfollow(follower.id, slug): + author_query = select(Author).select_from(Author).where(Author.slug == slug) + [author] = get_with_stat(author_query) + if author: + await update_follows_for_author(follower, 'author', author, True) + await update_followers_for_author(follower, author, True) + await notify_follower(follower.dict(), author.id, 'unfollow') + elif what == 'TOPIC': + topic_query = select(Topic).where(Topic.slug == slug) + [topic] = get_with_stat(topic_query) + if topic: + await update_follows_for_author(follower, 'topic', topic, True) + topic_unfollow(follower.id, slug) + elif what == 'COMMUNITY': + community_follow(follower.id, slug) + elif what == 'REACTIONS': + reactions_follow(follower.id, slug) except Exception as e: logger.debug(info, what, slug) logger.error(e) @@ -59,24 +62,27 @@ async def follow(_, info, what, slug): async def unfollow(_, info, what, slug): user_id = info.context['user_id'] try: - with local_session() as session: - follower = session.query(Author).filter(Author.user == user_id).first() - if follower: - if what == 'AUTHOR': - if author_unfollow(follower.id, slug): - author = session.query(Author).where(Author.slug == slug).first() - if author: - await update_follows_for_author(session, follower, 'author', author, False) - await notify_follower(follower.dict(), author.id, 'unfollow') - elif what == 'TOPIC': - topic = session.query(Topic).where(Topic.slug == slug).first() - if topic: - await update_follows_for_author(session, follower, 'topic', topic, False) - topic_unfollow(follower.id, slug) - elif what == 'COMMUNITY': - community_unfollow(follower.id, slug) - elif what == 'REACTIONS': - reactions_unfollow(follower.id, slug) + follower_query = select(Author).filter(Author.user == user_id) + [follower] = get_with_stat(follower_query) + if follower: + if what == 'AUTHOR': + if author_unfollow(follower.id, slug): + author_query = select(Author).where(Author.slug == slug) + [author] = get_with_stat(author_query) + if author: + await update_follows_for_author(follower, 'author', author, False) + await update_followers_for_author(follower, author, False) + await notify_follower(follower.dict(), author.id, 'unfollow') + elif what == 'TOPIC': + topic_query = select(Topic).where(Topic.slug == slug) + [topic] = get_with_stat(topic_query) + if topic: + await update_follows_for_author(follower, 'topic', topic, False) + topic_unfollow(follower.id, slug) + elif what == 'COMMUNITY': + community_unfollow(follower.id, slug) + elif what == 'REACTIONS': + reactions_unfollow(follower.id, slug) except Exception as e: return {'error': str(e)} diff --git a/services/cache.py b/services/cache.py index 2b2e3c41..86c74c2f 100644 --- a/services/cache.py +++ b/services/cache.py @@ -1,26 +1,219 @@ -from functools import wraps +import asyncio -from dogpile.cache import make_region +from sqlalchemy import select, event +import json -# Создание региона кэша с TTL 300 секунд -cache_region = make_region().configure('dogpile.cache.memory', expiration_time=300) +from orm.author import Author, AuthorFollower +from orm.reaction import Reaction +from orm.shout import ShoutAuthor, Shout +from orm.topic import Topic, TopicFollower +from resolvers.stat import get_with_stat +from services.rediscache import redis +from services.logger import root_logger as logger -# Декоратор для кэширования методов -def cache_method(cache_key: str): - def decorator(f): - @wraps(f) - def decorated_function(*args, **kwargs): - # Генерация ключа для кэширования - key = cache_key.format(*args, **kwargs) - # Получение значения из кэша - result = cache_region.get(key) - if result is None: - # Если значение отсутствует в кэше, вызываем функцию и кэшируем результат - result = f(*args, **kwargs) - cache_region.set(key, result) - return result +DEFAULT_FOLLOWS = { + 'topics': [], + 'authors': [], + 'communities': [{'id': 1, 'name': 'Дискурс', 'slug': 'discours', 'pic': ''}], +} - return decorated_function - return decorator +async def update_author_cache(author: dict, ttl=25 * 60 * 60): + payload = json.dumps(author) + await redis.execute('SETEX', f'user:{author.get("user")}:author', ttl, payload) + await redis.execute('SETEX', f'id:{author.get("id")}:author', ttl, payload) + + +async def update_follows_topics_cache(follows, author_id: int, ttl=25 * 60 * 60): + try: + payload = json.dumps(follows) + await redis.execute('SETEX', f'author:{author_id}:follows-topics', ttl, payload) + except Exception as exc: + logger.error(exc) + import traceback + + exc = traceback.format_exc() + logger.error(exc) + + +async def update_follows_authors_cache(follows, author_id: int, ttl=25 * 60 * 60): + try: + payload = json.dumps(follows) + await redis.execute('SETEX', f'author:{author_id}:follows-authors', ttl, payload) + except Exception: + import traceback + + exc = traceback.format_exc() + logger.error(exc) + + +@event.listens_for(Shout, 'after_insert') +@event.listens_for(Shout, 'after_update') +def after_shouts_update(mapper, connection, shout: Shout): + # Main query to get authors associated with the shout through ShoutAuthor + authors_query = ( + select(Author) + .select_from(ShoutAuthor) # Select from ShoutAuthor + .join(Author, Author.id == ShoutAuthor.author) # Join with Author + .where(ShoutAuthor.shout == shout.id) # Filter by shout.id + ) + + for author_with_stat in get_with_stat(authors_query): + asyncio.create_task(update_author_cache(author_with_stat.dict())) + + +@event.listens_for(Reaction, 'after_insert') +def after_reaction_insert(mapper, connection, reaction: Reaction): + try: + author_subquery = select(Author).where(Author.id == reaction.created_by) + replied_author_subquery = ( + select(Author) + .join(Reaction, Author.id == Reaction.created_by) + .where(Reaction.id == reaction.reply_to) + ) + + author_query = select( + author_subquery.subquery().c.id, + author_subquery.subquery().c.slug, + author_subquery.subquery().c.created_at, + author_subquery.subquery().c.name, + ).select_from(author_subquery.subquery()).union( + select( + replied_author_subquery.subquery().c.id, + ) + .select_from(replied_author_subquery.subquery()) + ) + + for author_with_stat in get_with_stat(author_query): + asyncio.create_task(update_author_cache(author_with_stat.dict())) + + shout = connection.execute(select(Shout).select_from(Shout).where(Shout.id == reaction.shout)).first() + if shout: + after_shouts_update(mapper, connection, shout) + except Exception as exc: + logger.error(exc) + + +@event.listens_for(Author, 'after_insert') +@event.listens_for(Author, 'after_update') +def after_author_update(mapper, connection, author: Author): + q = select(Author).where(Author.id == author.id) + [author_with_stat] = get_with_stat(q) + asyncio.create_task(update_author_cache(author_with_stat.dict())) + + +@event.listens_for(TopicFollower, 'after_insert') +def after_topic_follower_insert(mapper, connection, target: TopicFollower): + asyncio.create_task( + handle_topic_follower_change(connection, target.topic, target.follower, True) + ) + + +@event.listens_for(TopicFollower, 'after_delete') +def after_topic_follower_delete(mapper, connection, target: TopicFollower): + asyncio.create_task( + handle_topic_follower_change(connection, target.topic, target.follower, False) + ) + + +@event.listens_for(AuthorFollower, 'after_insert') +def after_author_follower_insert(mapper, connection, target: AuthorFollower): + asyncio.create_task( + handle_author_follower_change(connection, target.author, target.follower, True) + ) + + +@event.listens_for(AuthorFollower, 'after_delete') +def after_author_follower_delete(mapper, connection, target: AuthorFollower): + asyncio.create_task( + handle_author_follower_change(connection, target.author, target.follower, False) + ) + + +async def update_follows_for_author(follower: Author, entity_type: str, entity: dict, is_insert: bool): + ttl = 25 * 60 * 60 + redis_key = f'id:{follower.id}:follows-{entity_type}s' + follows_str = await redis.get(redis_key) + follows = json.loads(follows_str) if follows_str else [] + if is_insert: + follows.append(entity) + else: + # Remove the entity from follows + follows = [e for e in follows if e['id'] != entity['id']] + await redis.execute('SETEX', redis_key, ttl, json.dumps(follows)) + + +async def update_followers_for_author(follower: Author, author: Author, is_insert: bool): + ttl = 25 * 60 * 60 + redis_key = f'id:{author.id}:followers' + followers_str = await redis.get(redis_key) + followers = json.loads(followers_str) if followers_str else [] + if is_insert: + followers.append(follower) + else: + # Remove the entity from follows + follows = [e for e in followers if e['id'] != author.id] + await redis.execute('SETEX', redis_key, ttl, json.dumps(follows)) + + +async def handle_author_follower_change( + connection, author_id: int, follower_id: int, is_insert: bool +): + author_query = select(Author).select_from(Author).filter(Author.id == author_id) + [author] = get_with_stat(author_query) + follower_query = select(Author).select_from(Author).filter(Author.id == follower_id) + follower = get_with_stat(follower_query) + if follower and author: + _ = asyncio.create_task(update_author_cache(author.dict())) + follows_authors = await redis.execute('GET', f'author:{follower_id}:follows-authors') + if follows_authors: + follows_authors = json.loads(follows_authors) + if not any(x.get('id') == author.id for x in follows_authors): + follows_authors.append(author.dict()) + _ = asyncio.create_task(update_follows_authors_cache(follows_authors, follower_id)) + _ = asyncio.create_task(update_author_cache(follower.dict())) + await update_follows_for_author( + connection, + follower, + 'author', + { + 'id': author.id, + 'name': author.name, + 'slug': author.slug, + 'pic': author.pic, + 'bio': author.bio, + 'stat': author.stat, + }, + is_insert, + ) + + +async def handle_topic_follower_change( + connection, topic_id: int, follower_id: int, is_insert: bool +): + q = select(Topic).filter(Topic.id == topic_id) + topics = get_with_stat(q) + topic = topics[0] + follower_query = select(Author).filter(Author.id == follower_id) + follower = get_with_stat(follower_query) + if follower and topic: + _ = asyncio.create_task(update_author_cache(follower.dict())) + follows_topics = await redis.execute('GET', f'author:{follower_id}:follows-topics') + if follows_topics: + follows_topics = json.loads(follows_topics) + if not any(x.get('id') == topic.id for x in follows_topics): + follows_topics.append(topic) + _ = asyncio.create_task(update_follows_topics_cache(follows_topics, follower_id)) + await update_follows_for_author( + follower, + 'topic', + { + 'id': topic.id, + 'title': topic.title, + 'slug': topic.slug, + 'body': topic.body, + 'stat': topic.stat, + }, + is_insert, + ) diff --git a/services/event_listeners.py b/services/event_listeners.py deleted file mode 100644 index ac03e62f..00000000 --- a/services/event_listeners.py +++ /dev/null @@ -1,210 +0,0 @@ -import asyncio - -from sqlalchemy import select, event -import json - -from orm.author import Author, AuthorFollower -from orm.reaction import Reaction -from orm.shout import ShoutAuthor, Shout -from orm.topic import Topic, TopicFollower -from resolvers.stat import get_with_stat -from services.rediscache import redis -from services.logger import root_logger as logger - - -DEFAULT_FOLLOWS = { - 'topics': [], - 'authors': [], - 'communities': [{'id': 1, 'name': 'Дискурс', 'slug': 'discours', 'pic': ''}], -} - - -async def update_author_cache(author: dict, ttl=25 * 60 * 60): - payload = json.dumps(author) - await redis.execute('SETEX', f'user:{author.get("user")}:author', ttl, payload) - await redis.execute('SETEX', f'id:{author.get("id")}:author', ttl, payload) - - -async def update_follows_topics_cache(follows, author_id: int, ttl=25 * 60 * 60): - try: - payload = json.dumps(follows) - await redis.execute('SETEX', f'author:{author_id}:follows-topics', ttl, payload) - except Exception: - import traceback - - exc = traceback.format_exc() - logger.error(exc) - - -async def update_follows_authors_cache(follows, author_id: int, ttl=25 * 60 * 60): - try: - payload = json.dumps(follows) - await redis.execute('SETEX', f'author:{author_id}:follows-authors', ttl, payload) - except Exception: - import traceback - - exc = traceback.format_exc() - logger.error(exc) - - -@event.listens_for(Shout, 'after_insert') -@event.listens_for(Shout, 'after_update') -def after_shouts_update(mapper, connection, shout: Shout): - # Main query to get authors associated with the shout through ShoutAuthor - authors_query = ( - select(Author) - .select_from(ShoutAuthor) # Select from ShoutAuthor - .join(Author, Author.id == ShoutAuthor.author) # Join with Author - .where(ShoutAuthor.shout == shout.id) # Filter by shout.id - ) - - for author_with_stat in get_with_stat(authors_query): - asyncio.create_task(update_author_cache(author_with_stat.dict())) - - -@event.listens_for(Reaction, 'after_insert') -def after_reaction_insert(mapper, connection, reaction: Reaction): - try: - author_subquery = select(Author).where(Author.id == reaction.created_by) - replied_author_subquery = ( - select(Author) - .join(Reaction, Author.id == Reaction.created_by) - .where(Reaction.id == reaction.reply_to) - ) - - author_query = select( - author_subquery.subquery().c.id, - author_subquery.subquery().c.slug, - author_subquery.subquery().c.created_at, - author_subquery.subquery().c.name, - ).select_from(author_subquery.subquery()).union( - select( - replied_author_subquery.subquery().c.id, - ) - .select_from(replied_author_subquery.subquery()) - ) - - for author_with_stat in get_with_stat(author_query): - asyncio.create_task(update_author_cache(author_with_stat.dict())) - - shout = connection.execute(select(Shout).select_from(Shout).where(Shout.id == reaction.shout)).first() - if shout: - after_shouts_update(mapper, connection, shout) - except Exception as exc: - logger.error(exc) - - -@event.listens_for(Author, 'after_insert') -@event.listens_for(Author, 'after_update') -def after_author_update(mapper, connection, author: Author): - q = select(Author).where(Author.id == author.id) - [author_with_stat] = get_with_stat(q) - asyncio.create_task(update_author_cache(author_with_stat.dict())) - - -@event.listens_for(TopicFollower, 'after_insert') -def after_topic_follower_insert(mapper, connection, target: TopicFollower): - asyncio.create_task( - handle_topic_follower_change(connection, target.topic, target.follower, True) - ) - - -@event.listens_for(TopicFollower, 'after_delete') -def after_topic_follower_delete(mapper, connection, target: TopicFollower): - asyncio.create_task( - handle_topic_follower_change(connection, target.topic, target.follower, False) - ) - - -@event.listens_for(AuthorFollower, 'after_insert') -def after_author_follower_insert(mapper, connection, target: AuthorFollower): - asyncio.create_task( - handle_author_follower_change(connection, target.author, target.follower, True) - ) - - -@event.listens_for(AuthorFollower, 'after_delete') -def after_author_follower_delete(mapper, connection, target: AuthorFollower): - asyncio.create_task( - handle_author_follower_change(connection, target.author, target.follower, False) - ) - - -async def update_follows_for_author( - connection, follower, entity_type, entity: dict, is_insert -): - ttl = 25 * 60 * 60 - redis_key = f'id:{follower.id}:follows-{entity_type}s' - follows_str = await redis.get(redis_key) - follows = json.loads(follows_str) if follows_str else [] - if is_insert: - follows[f'{entity_type}s'].append(entity) - else: - # Remove the entity from follows - follows[f'{entity_type}s'] = [ - e for e in follows[f'{entity_type}s'] if e['id'] != entity['id'] - ] - await redis.execute('SETEX', redis_key, ttl, json.dumps(follows)) - - -async def handle_author_follower_change( - connection, author_id: int, follower_id: int, is_insert: bool -): - author_query = select(Author).select_from(Author).filter(Author.id == author_id) - [author] = get_with_stat(author_query) - follower_query = select(Author).select_from(Author).filter(Author.id == follower_id) - follower = get_with_stat(follower_query) - if follower and author: - _ = asyncio.create_task(update_author_cache(author.dict())) - follows_authors = await redis.execute('GET', f'author:{follower_id}:follows-authors') - if follows_authors: - follows_authors = json.loads(follows_authors) - if not any(x.get('id') == author.id for x in follows_authors): - follows_authors.append(author.dict()) - _ = asyncio.create_task(update_follows_authors_cache(follows_authors, follower_id)) - _ = asyncio.create_task(update_author_cache(follower.dict())) - await update_follows_for_author( - connection, - follower, - 'author', - { - 'id': author.id, - 'name': author.name, - 'slug': author.slug, - 'pic': author.pic, - 'bio': author.bio, - 'stat': author.stat, - }, - is_insert, - ) - - -async def handle_topic_follower_change( - connection, topic_id: int, follower_id: int, is_insert: bool -): - q = select(Topic).filter(Topic.id == topic_id) - topics = get_with_stat(q) - topic = topics[0] - follower_query = select(Author).filter(Author.id == follower_id) - follower = get_with_stat(follower_query) - if follower and topic: - _ = asyncio.create_task(update_author_cache(follower.dict())) - follows_topics = await redis.execute('GET', f'author:{follower_id}:follows-topics') - if follows_topics: - follows_topics = json.loads(follows_topics) - if not any(x.get('id') == topic.id for x in follows_topics): - follows_topics.append(topic) - _ = asyncio.create_task(update_follows_topics_cache(follows_topics, follower_id)) - await update_follows_for_author( - connection, - follower, - 'topic', - { - 'id': topic.id, - 'title': topic.title, - 'slug': topic.slug, - 'body': topic.body, - 'stat': topic.stat, - }, - is_insert, - ) diff --git a/services/memorycache.py b/services/memorycache.py new file mode 100644 index 00000000..2b2e3c41 --- /dev/null +++ b/services/memorycache.py @@ -0,0 +1,26 @@ +from functools import wraps + +from dogpile.cache import make_region + +# Создание региона кэша с TTL 300 секунд +cache_region = make_region().configure('dogpile.cache.memory', expiration_time=300) + + +# Декоратор для кэширования методов +def cache_method(cache_key: str): + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + # Генерация ключа для кэширования + key = cache_key.format(*args, **kwargs) + # Получение значения из кэша + result = cache_region.get(key) + if result is None: + # Если значение отсутствует в кэше, вызываем функцию и кэшируем результат + result = f(*args, **kwargs) + cache_region.set(key, result) + return result + + return decorated_function + + return decorator