From 4c1fbf64a235d589c4b6805db8ed892f97daf079 Mon Sep 17 00:00:00 2001 From: Untone Date: Tue, 21 May 2024 01:40:57 +0300 Subject: [PATCH] cache-reimplement-2 --- CHANGELOG.txt | 5 + pyproject.toml | 2 +- resolvers/__init__.py | 16 ++- resolvers/author.py | 149 +++++-------------------- resolvers/follower.py | 120 +++++++++----------- resolvers/stat.py | 21 +++- resolvers/topic.py | 22 ++++ schema/mutation.graphql | 4 +- schema/query.graphql | 1 + schema/type.graphql | 7 +- services/auth.py | 17 +-- services/cache.py | 236 +++++++++++++++++++++++++--------------- services/triggers.py | 18 +-- 13 files changed, 292 insertions(+), 326 deletions(-) diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 0f14c14b..3d47ffe3 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,3 +1,8 @@ +[0.3.5] +- cache isolated to services +- topics followers and authors cached +- redis stores lists of ids + [0.3.4] - load_authors_by from cache diff --git a/pyproject.toml b/pyproject.toml index aab99a75..2d8d0c75 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "core" -version = "0.3.4" +version = "0.3.5" description = "core module for discours.io" authors = ["discoursio devteam"] license = "MIT" diff --git a/resolvers/__init__.py b/resolvers/__init__.py index bff24eef..96d9588f 100644 --- a/resolvers/__init__.py +++ b/resolvers/__init__.py @@ -11,7 +11,7 @@ from resolvers.author import ( # search_authors, ) from resolvers.community import get_communities_all, get_community from resolvers.editor import create_shout, delete_shout, update_shout -from resolvers.follower import follow, get_shout_followers, get_topic_followers, unfollow +from resolvers.follower import follow, get_shout_followers, unfollow from resolvers.notifier import ( load_notifications, notification_mark_seen, @@ -35,7 +35,14 @@ from resolvers.reader import ( load_shouts_search, load_shouts_unrated, ) -from resolvers.topic import get_topic, get_topics_all, get_topics_by_author, get_topics_by_community +from resolvers.topic import ( + get_topic, + get_topic_authors, + get_topic_followers, + get_topics_all, + get_topics_by_author, + get_topics_by_community, +) from services.triggers import events_register events_register() @@ -44,6 +51,7 @@ __all__ = [ # author "get_author", "get_author_id", + "get_author_followers", "get_author_follows", "get_author_follows_topics", "get_author_follows_authors", @@ -60,6 +68,8 @@ __all__ = [ "get_topics_all", "get_topics_by_community", "get_topics_by_author", + "get_topic_followers", + "get_topic_authors", # reader "get_shout", "load_shouts_by", @@ -72,9 +82,7 @@ __all__ = [ # follower "follow", "unfollow", - "get_topic_followers", "get_shout_followers", - "get_author_followers", # editor "create_shout", "update_shout", diff --git a/resolvers/author.py b/resolvers/author.py index 739e8a5f..56d0fb7e 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -1,19 +1,22 @@ -import json import time -from sqlalchemy import and_, desc, or_, select, text -from sqlalchemy.orm import aliased +from sqlalchemy import desc, or_, select, text -from orm.author import Author, AuthorFollower +from orm.author import Author from orm.shout import ShoutAuthor, ShoutTopic from orm.topic import Topic -from resolvers.stat import author_follows_authors, author_follows_topics, get_with_stat +from resolvers.stat import get_with_stat from services.auth import login_required -from services.cache import cache_author, cache_follow_author_change +from services.cache import ( + cache_author, + get_cached_author, + get_cached_author_by_user_id, + get_cached_author_followers, + get_cached_author_follows_authors, + get_cached_author_follows_topics, +) from services.db import local_session -from services.encoders import CustomJSONEncoder from services.logger import root_logger as logger -from services.rediscache import redis from services.schema import mutation, query @@ -70,10 +73,7 @@ async def get_author(_, _info, slug="", author_id=0): if found_author: logger.debug(f"found author id: {found_author.id}") author_id = found_author.id if found_author.id else author_id - if author_id: - cached_result = await redis.execute("GET", f"author:{author_id}") - if isinstance(cached_result, str): - author_dict = json.loads(cached_result) + author_dict = await get_cached_author(author_id) # update stat from db if not author_dict or not author_dict.get("stat"): @@ -97,17 +97,11 @@ async def get_author(_, _info, slug="", author_id=0): async def get_author_by_user_id(user_id: str): logger.info(f"getting author id for {user_id}") - redis_key = f"user:{user_id}" author = None try: - res = await redis.execute("GET", redis_key) - if isinstance(res, str): - author = json.loads(res) - author_id = author.get("id") - author_slug = author.get("slug") - if author_id: - logger.debug(f"got author @{author_slug} #{author_id} cached") - return author + author = await get_cached_author_by_user_id(user_id) + if author: + return author author_query = select(Author).filter(Author.user == user_id) result = get_with_stat(author_query) @@ -156,11 +150,7 @@ async def load_authors_by(_, _info, by, limit, offset): for [a] in authors_nostat: author_dict = None if isinstance(a, Author): - author_id = a.id - if bool(author_id): - cached_result = await redis.execute("GET", f"author:{author_id}") - if isinstance(cached_result, str): - author_dict = json.loads(cached_result) + author_dict = await get_cached_author(a.id) if not author_dict or not isinstance(author_dict.get("shouts"), int): break @@ -196,28 +186,9 @@ async def get_author_follows(_, _info, slug="", user=None, author_id=0): topics = [] authors = [] if bool(author_id): - rkey = f"author:{author_id}:follows-authors" logger.debug(f"getting {author_id} follows authors") - cached = await redis.execute("GET", rkey) - if not cached: - authors = author_follows_authors(author_id) # type: ignore - prepared = [author.dict() for author in authors] - await redis.execute( - "SET", rkey, json.dumps(prepared, cls=CustomJSONEncoder) - ) - elif isinstance(cached, str): - authors = json.loads(cached) - - rkey = f"author:{author_id}:follows-topics" - cached = await redis.execute("GET", rkey) - if cached and isinstance(cached, str): - topics = json.loads(cached) - if not cached: - topics = author_follows_topics(author_id) # type: ignore - prepared = [topic.dict() for topic in topics] - await redis.execute( - "SET", rkey, json.dumps(prepared, cls=CustomJSONEncoder) - ) + authors = await get_cached_author_follows_authors(author_id) + topics = await get_cached_author_follows_topics(author_id) return { "topics": topics, "authors": authors, @@ -244,19 +215,7 @@ async def get_author_follows_topics(_, _info, slug="", user=None, author_id=None author_id = author_id_result[0] if author_id_result else None if not author_id: raise ValueError("Author not found") - logger.debug(f"getting {author_id} follows topics") - rkey = f"author:{author_id}:follows-topics" - cached = await redis.execute("GET", rkey) - topics = [] - if isinstance(cached, str): - topics = json.loads(cached) - if not cached: - topics = author_follows_topics(author_id) - prepared = [topic.dict() for topic in topics] - await redis.execute( - "SET", rkey, json.dumps(prepared, cls=CustomJSONEncoder) - ) - return topics + return get_author_follows_topics(author_id) @query.field("get_author_follows_authors") @@ -269,22 +228,9 @@ async def get_author_follows_authors(_, _info, slug="", user=None, author_id=Non .first() ) author_id = author_id_result[0] if author_id_result else None - if author_id: - logger.debug(f"getting {author_id} follows authors") - rkey = f"author:{author_id}:follows-authors" - cached = await redis.execute("GET", rkey) - authors = [] - if isinstance(cached, str): - authors = json.loads(cached) - if not authors: - authors = author_follows_authors(author_id) - prepared = [author.dict() for author in authors] - await redis.execute( - "SET", rkey, json.dumps(prepared, cls=CustomJSONEncoder) - ) - return authors - else: + if not author_id: raise ValueError("Author not found") + return await get_cached_author_follows_authors(author_id) def create_author(user_id: str, slug: str, name: str = ""): @@ -307,51 +253,8 @@ def create_author(user_id: str, slug: str, name: str = ""): @query.field("get_author_followers") async def get_author_followers(_, _info, slug: str): logger.debug(f"getting followers for @{slug}") - try: - author_alias = aliased(Author) - author_query = select(author_alias).filter(author_alias.slug == slug) - result = local_session().execute(author_query).first() - followers = [] - if result: - [author] = result - author_id = author.id - cached = await redis.execute("GET", f"author:{author_id}:followers") - if cached: - followers_ids = [] - followers = [] - if isinstance(cached, str): - followers_cached = json.loads(cached) - if isinstance(followers_cached, list): - logger.debug( - f"@{slug} got {len(followers_cached)} followers cached" - ) - for fc in followers_cached: - if fc["id"] not in followers_ids and fc["id"] != author_id: - followers.append(fc) - followers_ids.append(fc["id"]) - return followers - - author_follower_alias = aliased(AuthorFollower, name="af") - followers_query = select(Author).join( - author_follower_alias, - and_( - author_follower_alias.author == author_id, - author_follower_alias.follower == Author.id, - Author.id != author_id, # exclude the author from the followers - ), - ) - followers = get_with_stat(followers_query) - if isinstance(followers, list): - followers_ids = [r.id for r in followers] - for follower in followers: - if follower.id not in followers_ids: - await cache_follow_author_change(follower.dict(), author.dict()) - followers_ids.append(follower.id) - logger.debug(f"@{slug} cache updated with {len(followers)} followers") - return followers - except Exception as exc: - import traceback - - logger.error(exc) - logger.error(traceback.format_exc()) - return [] + author_query = select(Author.id).filter(Author.slug == slug).first() + author_id_result = local_session().execute(author_query) + author_id = author_id_result[0] if author_id_result else None + followers = await get_cached_author_followers(author_id) + return followers diff --git a/resolvers/follower.py b/resolvers/follower.py index 8efcb577..939186ce 100644 --- a/resolvers/follower.py +++ b/resolvers/follower.py @@ -1,5 +1,3 @@ -import json -import time from typing import List from sqlalchemy import select @@ -10,13 +8,18 @@ from orm.community import Community from orm.reaction import Reaction from orm.shout import Shout, ShoutReactionsFollower from orm.topic import Topic, TopicFollower -from resolvers.stat import author_follows_authors, author_follows_topics, get_with_stat +from resolvers.stat import get_with_stat from services.auth import login_required -from services.cache import DEFAULT_FOLLOWS, cache_author, cache_topic +from services.cache import ( + cache_author, + cache_topic, + get_cached_author_by_user_id, + get_cached_author_follows_authors, + get_cached_author_follows_topics, +) from services.db import local_session from services.logger import root_logger as logger from services.notify import notify_follower -from services.rediscache import redis from services.schema import mutation, query @@ -47,30 +50,28 @@ async def follow(_, info, what, slug): follower_id = follower_dict.get("id") entity = what.lower() - follows = [] - follows_str = await redis.execute("GET", f"author:{follower_id}:follows-{entity}s") - if isinstance(follows_str, str): - follows = json.loads(follows_str) or [] if what == "AUTHOR": + follows = await get_cached_author_follows_authors(follower_id) follower_id = int(follower_id) error = author_follow(follower_id, slug) if not error: - author_query = select(Author).filter(Author.slug == slug) - [author] = get_with_stat(author_query) - if author: - author_dict = author.dict() - author_id = int(author_dict.get("id", 0)) - follows_ids = set(int(a.get("id")) for a in follows) - if author_id not in follows_ids: + [author_id] = ( + local_session().query(Author.id).filter(Author.slug == slug).first() + ) + if author_id and author_id not in follows: + follows.append(author_id) + await cache_author(follower_dict) + await notify_follower(follower_dict, author_id, "follow") + [author] = get_with_stat(select(Author).filter(Author.id == author_id)) + if author: + author_dict = author.dict() await cache_author(author_dict) - await cache_author(follower_dict) - await notify_follower(follower_dict, author_id, "follow") - follows.append(author_dict) elif what == "TOPIC": error = topic_follow(follower_id, slug) - _topic_dict = await cache_by_slug(what, slug) + topic_dict = await cache_by_slug(what, slug) + await cache_topic(topic_dict) elif what == "COMMUNITY": # FIXME: when more communities @@ -95,31 +96,32 @@ async def unfollow(_, info, what, slug): entity = what.lower() follows = [] - follows_str = await redis.execute("GET", f"author:{follower_id}:follows-{entity}s") - if isinstance(follows_str, str): - follows = json.loads(follows_str) or [] if what == "AUTHOR": + follows = await get_cached_author_follows_authors(follower_id) + follower_id = int(follower_id) error = author_unfollow(follower_id, slug) # NOTE: after triggers should update cached stats if not error: logger.info(f"@{follower_dict.get('slug')} unfollowed @{slug}") - author_query = select(Author).filter(Author.slug == slug) - [author] = get_with_stat(author_query) - if author: - author_dict = author.dict() - author_id = author.id - await cache_author(author_dict) - for idx, item in enumerate(follows): - if item["id"] == author_id: - await cache_author(follower_dict) - await notify_follower(follower_dict, author_id, "unfollow") - follows.pop(idx) - break + [author_id] = ( + local_session().query(Author.id).filter(Author.slug == slug).first() + ) + if author_id and author_id in follows: + follows.remove(author_id) + await cache_author(follower_dict) + await notify_follower(follower_dict, author_id, "follow") + [author] = get_with_stat(select(Author).filter(Author.id == author_id)) + if author: + author_dict = author.dict() + await cache_author(author_dict) elif what == "TOPIC": error = topic_unfollow(follower_id, slug) - _topic_dict = await cache_by_slug(what, slug) + if not error: + follows = await get_cached_author_follows_topics(follower_id) + topic_dict = await cache_by_slug(what, slug) + await cache_topic(topic_dict) elif what == "COMMUNITY": follows = local_session().execute(select(Community)) @@ -133,36 +135,23 @@ async def unfollow(_, info, what, slug): async def get_follows_by_user_id(user_id: str): if not user_id: return {"error": "unauthorized"} - author = await redis.execute("GET", f"user:{user_id}") - if isinstance(author, str): - author = json.loads(author) + author = await get_cached_author_by_user_id(user_id) if not author: with local_session() as session: author = session.query(Author).filter(Author.user == user_id).first() if not author: return {"error": "cant find author"} author = author.dict() - last_seen = author.get("last_seen", 0) if isinstance(author, dict) else 0 - follows = DEFAULT_FOLLOWS - day_old = int(time.time()) - last_seen > 24 * 60 * 60 - if day_old: - author_id = json.loads(str(author)).get("id") - if author_id: - topics = author_follows_topics(author_id) - authors = author_follows_authors(author_id) - follows = { - "topics": topics, - "authors": authors, - "communities": [ - {"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""} - ], - } - else: - logger.debug(f"getting follows for {user_id} from redis") - res = await redis.execute("GET", f"user:{user_id}:follows") - if isinstance(res, str): - follows = json.loads(res) - return follows + + author_id = author.get("id") + if author_id: + topics = await get_cached_author_follows_topics(author_id) + authors = await get_cached_author_follows_authors(author_id) + return { + "topics": topics or [], + "authors": authors or [], + "communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}], + } def topic_follow(follower_id, slug): @@ -282,17 +271,6 @@ def author_unfollow(follower_id, slug): return "cant unfollow" -@query.field("get_topic_followers") -async def get_topic_followers(_, _info, slug: str) -> List[Author]: - topic_followers_query = select(Author) - topic_followers_query = ( - topic_followers_query.join(TopicFollower, TopicFollower.follower == Author.id) - .join(Topic, Topic.id == TopicFollower.topic) - .filter(Topic.slug == slug) - ) - return get_with_stat(topic_followers_query) - - @query.field("get_shout_followers") def get_shout_followers( _, _info, slug: str = "", shout_id: int | None = None diff --git a/resolvers/stat.py b/resolvers/stat.py index 7a5ed004..15829dd6 100644 --- a/resolvers/stat.py +++ b/resolvers/stat.py @@ -56,10 +56,9 @@ def get_topic_shouts_stat(topic_id: int): return result[0] if result else 0 -def get_topic_authors_stat(topic_id: int): - # authors - q = ( - select(func.count(distinct(ShoutAuthor.author))) +def get_topic_authors_query(topic_id): + return ( + select(ShoutAuthor.author) .select_from(join(ShoutTopic, Shout, ShoutTopic.shout == Shout.id)) .join(ShoutAuthor, ShoutAuthor.shout == Shout.id) .filter( @@ -70,8 +69,18 @@ def get_topic_authors_stat(topic_id: int): ) ) ) - result = local_session().execute(q).first() - return result[0] if result else 0 + + +def get_topic_authors_stat(topic_id: int): + # authors query + topic_authors_query = get_topic_authors_query(topic_id) + + # Оборачиваем запрос в другой запрос, чтобы посчитать уникальных авторов + count_query = select(func.count(distinct(topic_authors_query.subquery().c.author))) + + # Выполняем запрос и получаем результат + result = local_session().execute(count_query).scalar() + return result if result is not None else 0 def get_topic_followers_stat(topic_id: int): diff --git a/resolvers/topic.py b/resolvers/topic.py index e7907247..6e937171 100644 --- a/resolvers/topic.py +++ b/resolvers/topic.py @@ -5,7 +5,9 @@ from orm.shout import ShoutTopic from orm.topic import Topic from resolvers.stat import get_with_stat from services.auth import login_required +from services.cache import get_cached_topic_authors, get_cached_topic_followers from services.db import local_session +from services.logger import root_logger as logger from services.memorycache import cache_region from services.schema import mutation, query @@ -124,3 +126,23 @@ def get_topics_random(_, _info, amount=12): topics.append(topic) return topics + + +@query.field("get_topic_followers") +async def get_topic_followers(_, _info, slug: str): + logger.debug(f"getting followers for @{slug}") + topic_query = select(Topic.id).filter(Topic.slug == slug).first() + topic_id_result = local_session().execute(topic_query) + topic_id = topic_id_result[0] if topic_id_result else None + followers = await get_cached_topic_followers(topic_id) + return followers + + +@query.field("get_topic_authors") +async def get_topic_authors(_, _info, slug: str): + logger.debug(f"getting authors for @{slug}") + topic_query = select(Topic.id).filter(Topic.slug == slug).first() + topic_id_result = local_session().execute(topic_query) + topic_id = topic_id_result[0] if topic_id_result else None + authors = await get_cached_topic_authors(topic_id) + return authors diff --git a/schema/mutation.graphql b/schema/mutation.graphql index c5ec107d..e3336e1d 100644 --- a/schema/mutation.graphql +++ b/schema/mutation.graphql @@ -9,8 +9,8 @@ type Mutation { delete_shout(shout_id: Int!): CommonResult! # follower - follow(what: FollowingEntity!, slug: String!): CommonResult! - unfollow(what: FollowingEntity!, slug: String!): CommonResult! + follow(what: FollowingEntity!, slug: String!): AuthorFollowsResult! + unfollow(what: FollowingEntity!, slug: String!): AuthorFollowsResult! # topic create_topic(input: TopicInput!): CommonResult! diff --git a/schema/query.graphql b/schema/query.graphql index f32ad93e..6a0fe934 100644 --- a/schema/query.graphql +++ b/schema/query.graphql @@ -13,6 +13,7 @@ type Query { # follower get_shout_followers(slug: String, shout_id: Int): [Author] get_topic_followers(slug: String): [Author] + get_topic_authors(slug: String): [Author] get_author_followers(slug: String, user: String, author_id: Int): [Author] get_author_follows(slug: String, user: String, author_id: Int): CommonResult! get_author_follows_topics(slug: String, user: String, author_id: Int): [Topic] diff --git a/schema/type.graphql b/schema/type.graphql index a155df32..1e1a3be5 100644 --- a/schema/type.graphql +++ b/schema/type.graphql @@ -174,10 +174,9 @@ type Invite { } type AuthorFollowsResult { - topics: [Topic] - authors: [Author] - # shouts: [Shout] - communities: [Community] + topics: [Int] + authors: [Int] + communities: [Int] error: String } diff --git a/services/auth.py b/services/auth.py index 0f61b9d5..66c4e8fe 100644 --- a/services/auth.py +++ b/services/auth.py @@ -1,25 +1,12 @@ -import json from functools import wraps import httpx +from services.cache import get_cached_author_by_user_id from services.logger import root_logger as logger -from services.rediscache import redis from settings import ADMIN_SECRET, AUTH_URL -async def get_author_by_user(user: str): - author = None - redis_key = f"user:{user}" - - result = await redis.execute("GET", redis_key) - if isinstance(result, str): - author = json.loads(result) - if author: - return author - return - - async def request_data(gql, headers=None): if headers is None: headers = {"Content-Type": "application/json"} @@ -99,7 +86,7 @@ def login_required(f): logger.info(f" got {user_id} roles: {user_roles}") info.context["user_id"] = user_id.strip() info.context["roles"] = user_roles - author = await get_author_by_user(user_id) + author = await get_cached_author_by_user_id(user_id) if not author: logger.error(f"author profile not found for user {user_id}") info.context["author"] = author diff --git a/services/cache.py b/services/cache.py index fd9d5616..34e1184a 100644 --- a/services/cache.py +++ b/services/cache.py @@ -1,8 +1,13 @@ import json -from orm.topic import TopicFollower +from sqlalchemy import and_, join, select + +from orm.author import Author, AuthorFollower +from orm.topic import Topic, TopicFollower +from resolvers.stat import get_topic_authors_query, get_with_stat from services.db import local_session from services.encoders import CustomJSONEncoder +from services.logger import root_logger as logger from services.rediscache import redis DEFAULT_FOLLOWS = { @@ -12,11 +17,20 @@ DEFAULT_FOLLOWS = { } +async def cache_topic(topic: dict): + await redis.execute( + "SET", + f"topic:id:{topic.get('id')}", + json.dumps(topic.dict(), cls=CustomJSONEncoder), + ) + await redis.execute("SET", f"topic:slug:{topic.get('slug')}", topic.id) + + async def cache_author(author: dict): author_id = author.get("id") payload = json.dumps(author, cls=CustomJSONEncoder) - await redis.execute("SET", f'user:{author.get("user")}', payload) - await redis.execute("SET", f"author:{author_id}", payload) + await redis.execute("SET", f'user:id:{author.get("user")}', author_id) + await redis.execute("SET", f"author:id:{author_id}", payload) # update stat all field for followers' caches in list followers_str = await redis.execute("GET", f"author:{author_id}:followers") @@ -70,108 +84,158 @@ async def cache_author(author: dict): ) -async def cache_follows(follower: dict, entity_type: str, entity: dict, is_insert=True): +async def cache_follows( + follower_id: int, entity_type: str, entity_id: int, is_insert=True +): # prepare follows = [] - follower_id = follower.get("id") - if follower_id: - redis_key = f"author:{follower_id}:follows-{entity_type}s" - follows_str = await redis.execute("GET", redis_key) - if isinstance(follows_str, str): - follows = json.loads(follows_str) - if is_insert: - follows.append(entity) - else: - entity_id = entity.get("id") - if not entity_id: - raise Exception("wrong entity") - # Remove the entity from follows - follows = [e for e in follows if e["id"] != entity_id] + redis_key = f"author:follows-{entity_type}s:{follower_id}" + follows_str = await redis.execute("GET", redis_key) + if isinstance(follows_str, str): + follows = json.loads(follows_str) + if is_insert: + if entity_id not in follows: + follows.append(entity_id) + else: + if not entity_id: + raise Exception("wrong entity") + # Remove the entity from follows + follows = [eid for eid in follows if eid != entity_id] - # update follows cache - payload = json.dumps(follows, cls=CustomJSONEncoder) - await redis.execute("SET", redis_key, payload) + # update follows cache + payload = json.dumps(follows, cls=CustomJSONEncoder) + await redis.execute("SET", redis_key, payload) - # update follower's stats everywhere - follower_str = await redis.execute("GET", f"author:{follower_id}") - if isinstance(follower_str, str): - follower = json.loads(follower_str) - follower["stat"][f"{entity_type}s"] = len(follows) - await cache_author(follower) + # update follower's stats everywhere + follower_str = await redis.execute("GET", f"author:id:{follower_id}") + if isinstance(follower_str, str): + follower = json.loads(follower_str) + follower["stat"][f"{entity_type}s"] = len(follows) + await cache_author(follower) return follows -async def cache_follow_author_change(follower: dict, author: dict, is_insert=True): - author_id = author.get("id") - follower_id = follower.get("id") - followers = [] - if author_id and follower_id: - redis_key = f"author:{author_id}:followers" - followers_str = await redis.execute("GET", redis_key) - followers = json.loads(followers_str) if isinstance(followers_str, str) else [] - - # Remove the author from the list of followers, if present - followers = [f for f in followers if f["id"] != author_id] - - # If inserting, add the new follower to the list if not already present - if is_insert and not any(f["id"] == follower_id for f in followers): - followers.append(follower) - - # Remove the follower from the list if not inserting and present +async def get_cached_author(author_id: int): + if author_id: + rkey = f"author:id:{author_id}" + cached_result = await redis.execute("GET", rkey) + if isinstance(cached_result, str): + return json.loads(cached_result) else: - followers = [f for f in followers if f["id"] != follower_id] + author_query = select(Author).filter(Author.id == author_id) + [author] = get_with_stat(author_query) + if author: + await cache_author(author.dict()) - # Ensure followers are unique based on their 'id' field - followers = list({f["id"]: f for f in followers}.values()) - # Update follower's stats everywhere - follower_str = await redis.execute("GET", f"author:{follower_id}") - if isinstance(follower_str, str): - follower = json.loads(follower_str) - follower["stat"]["followers"] = len(followers) - await cache_author(follower) +async def get_cached_author_by_user_id(user_id: str): + author_id = await redis.execute("GET", f"user:id:{user_id}") + if author_id: + return await get_cached_author(int(author_id)) - payload = json.dumps(followers, cls=CustomJSONEncoder) - await redis.execute("SET", redis_key, payload) +async def get_cached_author_follows_topics(author_id: int): + topics = [] + rkey = f"author:follows-topics:{author_id}" + cached = await redis.execute("GET", rkey) + if cached and isinstance(cached, str): + topics = json.loads(cached) + if not cached: + topics = ( + local_session() + .query(Topic.id) + .select_from(join(Topic, TopicFollower, Topic.id == TopicFollower.topic)) + .where(TopicFollower.follower == author_id) + .all() + ) + await redis.execute("SET", rkey, json.dumps(topics)) + return topics + + +async def get_cached_author_follows_authors(author_id: int): + authors = [] + rkey = f"author:follows-authors:{author_id}" + cached = await redis.execute("GET", rkey) + if not cached: + with local_session() as session: + authors = ( + session.query(Author.id) + .select_from( + join(Author, AuthorFollower, Author.id == AuthorFollower.author) + ) + .where(AuthorFollower.follower == author_id) + .all() + ) + await redis.execute("SET", rkey, json.dumps(authors)) + elif isinstance(cached, str): + authors = json.loads(cached) + return authors + + +async def get_cached_author_followers(author_id: int): + followers = [] + rkey = f"author:followers:{author_id}" + cached = await redis.execute("GET", rkey) + if isinstance(cached, str): + followers = json.loads(cached) + if isinstance(followers, list): + return followers + + followers = ( + local_session() + .query(Author.id) + .join( + AuthorFollower, + and_( + AuthorFollower.author == author_id, + AuthorFollower.follower == Author.id, + Author.id != author_id, # exclude the author from the followers + ), + ) + .all() + ) + if followers: + await redis.execute("SET", rkey, json.dumps(followers)) + logger.debug(f"author#{author_id} cache updated with {len(followers)} followers") return followers -async def cache_topic(topic_dict: dict): - # update stat all field for followers' caches in list +async def get_cached_topic_followers(topic_id: int): + followers = [] + rkey = f"topic:followers:{topic_id}" + cached = await redis.execute("GET", rkey) + if isinstance(cached, str): + followers = json.loads(cached) + if isinstance(followers, list): + return followers + followers = ( local_session() - .query(TopicFollower) - .filter(TopicFollower.topic == topic_dict.get("id")) + .query(Author.id) + .join( + TopicFollower, + and_(TopicFollower.topic == topic_id, TopicFollower.follower == Author.id), + ) .all() ) - for tf in followers: - follower_id = tf.follower - follower_follows_topics = [] - follower_follows_topics_str = await redis.execute( - "GET", f"author:{follower_id}:follows-topics" - ) - if isinstance(follower_follows_topics_str, str): - follower_follows_topics = json.loads(follower_follows_topics_str) - c = 0 - for old_topic in follower_follows_topics: - if int(old_topic.get("id")) == int(topic_dict.get("id", 0)): - follower_follows_topics[c] = topic_dict - break # exit the loop since we found and updated the topic - c += 1 - else: - # topic not found in the list, so add the new topic with the updated stat field - follower_follows_topics.append(topic_dict) + if followers: + await redis.execute("SET", rkey, json.dumps(followers)) + logger.debug(f"topic#{topic_id} cache updated with {len(followers)} followers") + return followers - await redis.execute( - "SET", - f"author:{follower_id}:follows-topics", - json.dumps(follower_follows_topics, cls=CustomJSONEncoder), - ) - # update topic's stat - topic_dict["stat"]["followers"] = len(followers) +async def get_cached_topic_authors(topic_id: int): + authors = [] + rkey = f"topic:authors:{topic_id}" + cached = await redis.execute("GET", rkey) + if isinstance(cached, str): + authors = json.loads(cached) + if isinstance(authors, list): + return authors - # save in cache - payload = json.dumps(topic_dict, cls=CustomJSONEncoder) - await redis.execute("SET", f'topic:{topic_dict.get("slug")}', payload) + authors = local_session().execute(get_topic_authors_query(topic_id)) + # should be id list + if authors: + await redis.execute("SET", rkey, json.dumps(authors)) + logger.debug(f"topic#{topic_id} cache updated with {len(authors)} authors") + return authors diff --git a/services/triggers.py b/services/triggers.py index cdeb9087..1bd8c054 100644 --- a/services/triggers.py +++ b/services/triggers.py @@ -1,5 +1,4 @@ import asyncio -import json from sqlalchemy import event, select @@ -8,10 +7,8 @@ from orm.reaction import Reaction from orm.shout import Shout, ShoutAuthor from orm.topic import Topic, TopicFollower from resolvers.stat import get_with_stat -from services.cache import cache_author, cache_follow_author_change, cache_follows -from services.encoders import CustomJSONEncoder +from services.cache import cache_author, cache_follows, cache_topic from services.logger import root_logger as logger -from services.rediscache import redis DEFAULT_FOLLOWS = { "topics": [], @@ -30,12 +27,7 @@ async def handle_author_follower_change( [follower] = get_with_stat(follower_query) if follower and author: await cache_author(author.dict()) - await cache_follows( - follower.dict(), "author", author.dict(), is_insert - ) # cache_author(follower_dict) inside - await cache_follow_author_change( - follower.dict(), author.dict(), is_insert - ) # cache_author(follower_dict) inside + await cache_follows(follower.id, "author", author.id, is_insert) async def handle_topic_follower_change( @@ -47,11 +39,9 @@ async def handle_topic_follower_change( follower_query = select(Author).filter(Author.id == follower_id) [follower] = get_with_stat(follower_query) if follower and topic: + await cache_topic(topic.dict()) await cache_author(follower.dict()) - await redis.execute( - "SET", f"topic:{topic.id}", json.dumps(topic.dict(), cls=CustomJSONEncoder) - ) - await cache_follows(follower.dict(), "topic", topic.dict(), is_insert) + await cache_follows(follower.id, "topic", topic.id, is_insert) # handle_author_follow and handle_topic_follow -> cache_author, cache_follows, cache_followers