cache-reimplement-2
All checks were successful
Deploy on push / deploy (push) Successful in 1m4s

This commit is contained in:
Untone 2024-05-21 01:40:57 +03:00
parent 3742528e3a
commit 4c1fbf64a2
13 changed files with 292 additions and 326 deletions

View File

@ -1,3 +1,8 @@
[0.3.5]
- cache isolated to services
- topics followers and authors cached
- redis stores lists of ids
[0.3.4] [0.3.4]
- load_authors_by from cache - load_authors_by from cache

View File

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "core" name = "core"
version = "0.3.4" version = "0.3.5"
description = "core module for discours.io" description = "core module for discours.io"
authors = ["discoursio devteam"] authors = ["discoursio devteam"]
license = "MIT" license = "MIT"

View File

@ -11,7 +11,7 @@ from resolvers.author import ( # search_authors,
) )
from resolvers.community import get_communities_all, get_community from resolvers.community import get_communities_all, get_community
from resolvers.editor import create_shout, delete_shout, update_shout from resolvers.editor import create_shout, delete_shout, update_shout
from resolvers.follower import follow, get_shout_followers, get_topic_followers, unfollow from resolvers.follower import follow, get_shout_followers, unfollow
from resolvers.notifier import ( from resolvers.notifier import (
load_notifications, load_notifications,
notification_mark_seen, notification_mark_seen,
@ -35,7 +35,14 @@ from resolvers.reader import (
load_shouts_search, load_shouts_search,
load_shouts_unrated, load_shouts_unrated,
) )
from resolvers.topic import get_topic, get_topics_all, get_topics_by_author, get_topics_by_community from resolvers.topic import (
get_topic,
get_topic_authors,
get_topic_followers,
get_topics_all,
get_topics_by_author,
get_topics_by_community,
)
from services.triggers import events_register from services.triggers import events_register
events_register() events_register()
@ -44,6 +51,7 @@ __all__ = [
# author # author
"get_author", "get_author",
"get_author_id", "get_author_id",
"get_author_followers",
"get_author_follows", "get_author_follows",
"get_author_follows_topics", "get_author_follows_topics",
"get_author_follows_authors", "get_author_follows_authors",
@ -60,6 +68,8 @@ __all__ = [
"get_topics_all", "get_topics_all",
"get_topics_by_community", "get_topics_by_community",
"get_topics_by_author", "get_topics_by_author",
"get_topic_followers",
"get_topic_authors",
# reader # reader
"get_shout", "get_shout",
"load_shouts_by", "load_shouts_by",
@ -72,9 +82,7 @@ __all__ = [
# follower # follower
"follow", "follow",
"unfollow", "unfollow",
"get_topic_followers",
"get_shout_followers", "get_shout_followers",
"get_author_followers",
# editor # editor
"create_shout", "create_shout",
"update_shout", "update_shout",

View File

@ -1,19 +1,22 @@
import json
import time import time
from sqlalchemy import and_, desc, or_, select, text from sqlalchemy import desc, or_, select, text
from sqlalchemy.orm import aliased
from orm.author import Author, AuthorFollower from orm.author import Author
from orm.shout import ShoutAuthor, ShoutTopic from orm.shout import ShoutAuthor, ShoutTopic
from orm.topic import Topic from orm.topic import Topic
from resolvers.stat import author_follows_authors, author_follows_topics, get_with_stat from resolvers.stat import get_with_stat
from services.auth import login_required from services.auth import login_required
from services.cache import cache_author, cache_follow_author_change from services.cache import (
cache_author,
get_cached_author,
get_cached_author_by_user_id,
get_cached_author_followers,
get_cached_author_follows_authors,
get_cached_author_follows_topics,
)
from services.db import local_session from services.db import local_session
from services.encoders import CustomJSONEncoder
from services.logger import root_logger as logger from services.logger import root_logger as logger
from services.rediscache import redis
from services.schema import mutation, query from services.schema import mutation, query
@ -70,10 +73,7 @@ async def get_author(_, _info, slug="", author_id=0):
if found_author: if found_author:
logger.debug(f"found author id: {found_author.id}") logger.debug(f"found author id: {found_author.id}")
author_id = found_author.id if found_author.id else author_id author_id = found_author.id if found_author.id else author_id
if author_id: author_dict = await get_cached_author(author_id)
cached_result = await redis.execute("GET", f"author:{author_id}")
if isinstance(cached_result, str):
author_dict = json.loads(cached_result)
# update stat from db # update stat from db
if not author_dict or not author_dict.get("stat"): if not author_dict or not author_dict.get("stat"):
@ -97,17 +97,11 @@ async def get_author(_, _info, slug="", author_id=0):
async def get_author_by_user_id(user_id: str): async def get_author_by_user_id(user_id: str):
logger.info(f"getting author id for {user_id}") logger.info(f"getting author id for {user_id}")
redis_key = f"user:{user_id}"
author = None author = None
try: try:
res = await redis.execute("GET", redis_key) author = await get_cached_author_by_user_id(user_id)
if isinstance(res, str): if author:
author = json.loads(res) return author
author_id = author.get("id")
author_slug = author.get("slug")
if author_id:
logger.debug(f"got author @{author_slug} #{author_id} cached")
return author
author_query = select(Author).filter(Author.user == user_id) author_query = select(Author).filter(Author.user == user_id)
result = get_with_stat(author_query) result = get_with_stat(author_query)
@ -156,11 +150,7 @@ async def load_authors_by(_, _info, by, limit, offset):
for [a] in authors_nostat: for [a] in authors_nostat:
author_dict = None author_dict = None
if isinstance(a, Author): if isinstance(a, Author):
author_id = a.id author_dict = await get_cached_author(a.id)
if bool(author_id):
cached_result = await redis.execute("GET", f"author:{author_id}")
if isinstance(cached_result, str):
author_dict = json.loads(cached_result)
if not author_dict or not isinstance(author_dict.get("shouts"), int): if not author_dict or not isinstance(author_dict.get("shouts"), int):
break break
@ -196,28 +186,9 @@ async def get_author_follows(_, _info, slug="", user=None, author_id=0):
topics = [] topics = []
authors = [] authors = []
if bool(author_id): if bool(author_id):
rkey = f"author:{author_id}:follows-authors"
logger.debug(f"getting {author_id} follows authors") logger.debug(f"getting {author_id} follows authors")
cached = await redis.execute("GET", rkey) authors = await get_cached_author_follows_authors(author_id)
if not cached: topics = await get_cached_author_follows_topics(author_id)
authors = author_follows_authors(author_id) # type: ignore
prepared = [author.dict() for author in authors]
await redis.execute(
"SET", rkey, json.dumps(prepared, cls=CustomJSONEncoder)
)
elif isinstance(cached, str):
authors = json.loads(cached)
rkey = f"author:{author_id}:follows-topics"
cached = await redis.execute("GET", rkey)
if cached and isinstance(cached, str):
topics = json.loads(cached)
if not cached:
topics = author_follows_topics(author_id) # type: ignore
prepared = [topic.dict() for topic in topics]
await redis.execute(
"SET", rkey, json.dumps(prepared, cls=CustomJSONEncoder)
)
return { return {
"topics": topics, "topics": topics,
"authors": authors, "authors": authors,
@ -244,19 +215,7 @@ async def get_author_follows_topics(_, _info, slug="", user=None, author_id=None
author_id = author_id_result[0] if author_id_result else None author_id = author_id_result[0] if author_id_result else None
if not author_id: if not author_id:
raise ValueError("Author not found") raise ValueError("Author not found")
logger.debug(f"getting {author_id} follows topics") return get_author_follows_topics(author_id)
rkey = f"author:{author_id}:follows-topics"
cached = await redis.execute("GET", rkey)
topics = []
if isinstance(cached, str):
topics = json.loads(cached)
if not cached:
topics = author_follows_topics(author_id)
prepared = [topic.dict() for topic in topics]
await redis.execute(
"SET", rkey, json.dumps(prepared, cls=CustomJSONEncoder)
)
return topics
@query.field("get_author_follows_authors") @query.field("get_author_follows_authors")
@ -269,22 +228,9 @@ async def get_author_follows_authors(_, _info, slug="", user=None, author_id=Non
.first() .first()
) )
author_id = author_id_result[0] if author_id_result else None author_id = author_id_result[0] if author_id_result else None
if author_id: if not author_id:
logger.debug(f"getting {author_id} follows authors")
rkey = f"author:{author_id}:follows-authors"
cached = await redis.execute("GET", rkey)
authors = []
if isinstance(cached, str):
authors = json.loads(cached)
if not authors:
authors = author_follows_authors(author_id)
prepared = [author.dict() for author in authors]
await redis.execute(
"SET", rkey, json.dumps(prepared, cls=CustomJSONEncoder)
)
return authors
else:
raise ValueError("Author not found") raise ValueError("Author not found")
return await get_cached_author_follows_authors(author_id)
def create_author(user_id: str, slug: str, name: str = ""): def create_author(user_id: str, slug: str, name: str = ""):
@ -307,51 +253,8 @@ def create_author(user_id: str, slug: str, name: str = ""):
@query.field("get_author_followers") @query.field("get_author_followers")
async def get_author_followers(_, _info, slug: str): async def get_author_followers(_, _info, slug: str):
logger.debug(f"getting followers for @{slug}") logger.debug(f"getting followers for @{slug}")
try: author_query = select(Author.id).filter(Author.slug == slug).first()
author_alias = aliased(Author) author_id_result = local_session().execute(author_query)
author_query = select(author_alias).filter(author_alias.slug == slug) author_id = author_id_result[0] if author_id_result else None
result = local_session().execute(author_query).first() followers = await get_cached_author_followers(author_id)
followers = [] return followers
if result:
[author] = result
author_id = author.id
cached = await redis.execute("GET", f"author:{author_id}:followers")
if cached:
followers_ids = []
followers = []
if isinstance(cached, str):
followers_cached = json.loads(cached)
if isinstance(followers_cached, list):
logger.debug(
f"@{slug} got {len(followers_cached)} followers cached"
)
for fc in followers_cached:
if fc["id"] not in followers_ids and fc["id"] != author_id:
followers.append(fc)
followers_ids.append(fc["id"])
return followers
author_follower_alias = aliased(AuthorFollower, name="af")
followers_query = select(Author).join(
author_follower_alias,
and_(
author_follower_alias.author == author_id,
author_follower_alias.follower == Author.id,
Author.id != author_id, # exclude the author from the followers
),
)
followers = get_with_stat(followers_query)
if isinstance(followers, list):
followers_ids = [r.id for r in followers]
for follower in followers:
if follower.id not in followers_ids:
await cache_follow_author_change(follower.dict(), author.dict())
followers_ids.append(follower.id)
logger.debug(f"@{slug} cache updated with {len(followers)} followers")
return followers
except Exception as exc:
import traceback
logger.error(exc)
logger.error(traceback.format_exc())
return []

View File

@ -1,5 +1,3 @@
import json
import time
from typing import List from typing import List
from sqlalchemy import select from sqlalchemy import select
@ -10,13 +8,18 @@ from orm.community import Community
from orm.reaction import Reaction from orm.reaction import Reaction
from orm.shout import Shout, ShoutReactionsFollower from orm.shout import Shout, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from resolvers.stat import author_follows_authors, author_follows_topics, get_with_stat from resolvers.stat import get_with_stat
from services.auth import login_required from services.auth import login_required
from services.cache import DEFAULT_FOLLOWS, cache_author, cache_topic from services.cache import (
cache_author,
cache_topic,
get_cached_author_by_user_id,
get_cached_author_follows_authors,
get_cached_author_follows_topics,
)
from services.db import local_session from services.db import local_session
from services.logger import root_logger as logger from services.logger import root_logger as logger
from services.notify import notify_follower from services.notify import notify_follower
from services.rediscache import redis
from services.schema import mutation, query from services.schema import mutation, query
@ -47,30 +50,28 @@ async def follow(_, info, what, slug):
follower_id = follower_dict.get("id") follower_id = follower_dict.get("id")
entity = what.lower() entity = what.lower()
follows = []
follows_str = await redis.execute("GET", f"author:{follower_id}:follows-{entity}s")
if isinstance(follows_str, str):
follows = json.loads(follows_str) or []
if what == "AUTHOR": if what == "AUTHOR":
follows = await get_cached_author_follows_authors(follower_id)
follower_id = int(follower_id) follower_id = int(follower_id)
error = author_follow(follower_id, slug) error = author_follow(follower_id, slug)
if not error: if not error:
author_query = select(Author).filter(Author.slug == slug) [author_id] = (
[author] = get_with_stat(author_query) local_session().query(Author.id).filter(Author.slug == slug).first()
if author: )
author_dict = author.dict() if author_id and author_id not in follows:
author_id = int(author_dict.get("id", 0)) follows.append(author_id)
follows_ids = set(int(a.get("id")) for a in follows) await cache_author(follower_dict)
if author_id not in follows_ids: await notify_follower(follower_dict, author_id, "follow")
[author] = get_with_stat(select(Author).filter(Author.id == author_id))
if author:
author_dict = author.dict()
await cache_author(author_dict) await cache_author(author_dict)
await cache_author(follower_dict)
await notify_follower(follower_dict, author_id, "follow")
follows.append(author_dict)
elif what == "TOPIC": elif what == "TOPIC":
error = topic_follow(follower_id, slug) error = topic_follow(follower_id, slug)
_topic_dict = await cache_by_slug(what, slug) topic_dict = await cache_by_slug(what, slug)
await cache_topic(topic_dict)
elif what == "COMMUNITY": elif what == "COMMUNITY":
# FIXME: when more communities # FIXME: when more communities
@ -95,31 +96,32 @@ async def unfollow(_, info, what, slug):
entity = what.lower() entity = what.lower()
follows = [] follows = []
follows_str = await redis.execute("GET", f"author:{follower_id}:follows-{entity}s")
if isinstance(follows_str, str):
follows = json.loads(follows_str) or []
if what == "AUTHOR": if what == "AUTHOR":
follows = await get_cached_author_follows_authors(follower_id)
follower_id = int(follower_id)
error = author_unfollow(follower_id, slug) error = author_unfollow(follower_id, slug)
# NOTE: after triggers should update cached stats # NOTE: after triggers should update cached stats
if not error: if not error:
logger.info(f"@{follower_dict.get('slug')} unfollowed @{slug}") logger.info(f"@{follower_dict.get('slug')} unfollowed @{slug}")
author_query = select(Author).filter(Author.slug == slug) [author_id] = (
[author] = get_with_stat(author_query) local_session().query(Author.id).filter(Author.slug == slug).first()
if author: )
author_dict = author.dict() if author_id and author_id in follows:
author_id = author.id follows.remove(author_id)
await cache_author(author_dict) await cache_author(follower_dict)
for idx, item in enumerate(follows): await notify_follower(follower_dict, author_id, "follow")
if item["id"] == author_id: [author] = get_with_stat(select(Author).filter(Author.id == author_id))
await cache_author(follower_dict) if author:
await notify_follower(follower_dict, author_id, "unfollow") author_dict = author.dict()
follows.pop(idx) await cache_author(author_dict)
break
elif what == "TOPIC": elif what == "TOPIC":
error = topic_unfollow(follower_id, slug) error = topic_unfollow(follower_id, slug)
_topic_dict = await cache_by_slug(what, slug) if not error:
follows = await get_cached_author_follows_topics(follower_id)
topic_dict = await cache_by_slug(what, slug)
await cache_topic(topic_dict)
elif what == "COMMUNITY": elif what == "COMMUNITY":
follows = local_session().execute(select(Community)) follows = local_session().execute(select(Community))
@ -133,36 +135,23 @@ async def unfollow(_, info, what, slug):
async def get_follows_by_user_id(user_id: str): async def get_follows_by_user_id(user_id: str):
if not user_id: if not user_id:
return {"error": "unauthorized"} return {"error": "unauthorized"}
author = await redis.execute("GET", f"user:{user_id}") author = await get_cached_author_by_user_id(user_id)
if isinstance(author, str):
author = json.loads(author)
if not author: if not author:
with local_session() as session: with local_session() as session:
author = session.query(Author).filter(Author.user == user_id).first() author = session.query(Author).filter(Author.user == user_id).first()
if not author: if not author:
return {"error": "cant find author"} return {"error": "cant find author"}
author = author.dict() author = author.dict()
last_seen = author.get("last_seen", 0) if isinstance(author, dict) else 0
follows = DEFAULT_FOLLOWS author_id = author.get("id")
day_old = int(time.time()) - last_seen > 24 * 60 * 60 if author_id:
if day_old: topics = await get_cached_author_follows_topics(author_id)
author_id = json.loads(str(author)).get("id") authors = await get_cached_author_follows_authors(author_id)
if author_id: return {
topics = author_follows_topics(author_id) "topics": topics or [],
authors = author_follows_authors(author_id) "authors": authors or [],
follows = { "communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}],
"topics": topics, }
"authors": authors,
"communities": [
{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}
],
}
else:
logger.debug(f"getting follows for {user_id} from redis")
res = await redis.execute("GET", f"user:{user_id}:follows")
if isinstance(res, str):
follows = json.loads(res)
return follows
def topic_follow(follower_id, slug): def topic_follow(follower_id, slug):
@ -282,17 +271,6 @@ def author_unfollow(follower_id, slug):
return "cant unfollow" return "cant unfollow"
@query.field("get_topic_followers")
async def get_topic_followers(_, _info, slug: str) -> List[Author]:
topic_followers_query = select(Author)
topic_followers_query = (
topic_followers_query.join(TopicFollower, TopicFollower.follower == Author.id)
.join(Topic, Topic.id == TopicFollower.topic)
.filter(Topic.slug == slug)
)
return get_with_stat(topic_followers_query)
@query.field("get_shout_followers") @query.field("get_shout_followers")
def get_shout_followers( def get_shout_followers(
_, _info, slug: str = "", shout_id: int | None = None _, _info, slug: str = "", shout_id: int | None = None

View File

@ -56,10 +56,9 @@ def get_topic_shouts_stat(topic_id: int):
return result[0] if result else 0 return result[0] if result else 0
def get_topic_authors_stat(topic_id: int): def get_topic_authors_query(topic_id):
# authors return (
q = ( select(ShoutAuthor.author)
select(func.count(distinct(ShoutAuthor.author)))
.select_from(join(ShoutTopic, Shout, ShoutTopic.shout == Shout.id)) .select_from(join(ShoutTopic, Shout, ShoutTopic.shout == Shout.id))
.join(ShoutAuthor, ShoutAuthor.shout == Shout.id) .join(ShoutAuthor, ShoutAuthor.shout == Shout.id)
.filter( .filter(
@ -70,8 +69,18 @@ def get_topic_authors_stat(topic_id: int):
) )
) )
) )
result = local_session().execute(q).first()
return result[0] if result else 0
def get_topic_authors_stat(topic_id: int):
# authors query
topic_authors_query = get_topic_authors_query(topic_id)
# Оборачиваем запрос в другой запрос, чтобы посчитать уникальных авторов
count_query = select(func.count(distinct(topic_authors_query.subquery().c.author)))
# Выполняем запрос и получаем результат
result = local_session().execute(count_query).scalar()
return result if result is not None else 0
def get_topic_followers_stat(topic_id: int): def get_topic_followers_stat(topic_id: int):

View File

@ -5,7 +5,9 @@ from orm.shout import ShoutTopic
from orm.topic import Topic from orm.topic import Topic
from resolvers.stat import get_with_stat from resolvers.stat import get_with_stat
from services.auth import login_required from services.auth import login_required
from services.cache import get_cached_topic_authors, get_cached_topic_followers
from services.db import local_session from services.db import local_session
from services.logger import root_logger as logger
from services.memorycache import cache_region from services.memorycache import cache_region
from services.schema import mutation, query from services.schema import mutation, query
@ -124,3 +126,23 @@ def get_topics_random(_, _info, amount=12):
topics.append(topic) topics.append(topic)
return topics return topics
@query.field("get_topic_followers")
async def get_topic_followers(_, _info, slug: str):
logger.debug(f"getting followers for @{slug}")
topic_query = select(Topic.id).filter(Topic.slug == slug).first()
topic_id_result = local_session().execute(topic_query)
topic_id = topic_id_result[0] if topic_id_result else None
followers = await get_cached_topic_followers(topic_id)
return followers
@query.field("get_topic_authors")
async def get_topic_authors(_, _info, slug: str):
logger.debug(f"getting authors for @{slug}")
topic_query = select(Topic.id).filter(Topic.slug == slug).first()
topic_id_result = local_session().execute(topic_query)
topic_id = topic_id_result[0] if topic_id_result else None
authors = await get_cached_topic_authors(topic_id)
return authors

View File

@ -9,8 +9,8 @@ type Mutation {
delete_shout(shout_id: Int!): CommonResult! delete_shout(shout_id: Int!): CommonResult!
# follower # follower
follow(what: FollowingEntity!, slug: String!): CommonResult! follow(what: FollowingEntity!, slug: String!): AuthorFollowsResult!
unfollow(what: FollowingEntity!, slug: String!): CommonResult! unfollow(what: FollowingEntity!, slug: String!): AuthorFollowsResult!
# topic # topic
create_topic(input: TopicInput!): CommonResult! create_topic(input: TopicInput!): CommonResult!

View File

@ -13,6 +13,7 @@ type Query {
# follower # follower
get_shout_followers(slug: String, shout_id: Int): [Author] get_shout_followers(slug: String, shout_id: Int): [Author]
get_topic_followers(slug: String): [Author] get_topic_followers(slug: String): [Author]
get_topic_authors(slug: String): [Author]
get_author_followers(slug: String, user: String, author_id: Int): [Author] get_author_followers(slug: String, user: String, author_id: Int): [Author]
get_author_follows(slug: String, user: String, author_id: Int): CommonResult! get_author_follows(slug: String, user: String, author_id: Int): CommonResult!
get_author_follows_topics(slug: String, user: String, author_id: Int): [Topic] get_author_follows_topics(slug: String, user: String, author_id: Int): [Topic]

View File

@ -174,10 +174,9 @@ type Invite {
} }
type AuthorFollowsResult { type AuthorFollowsResult {
topics: [Topic] topics: [Int]
authors: [Author] authors: [Int]
# shouts: [Shout] communities: [Int]
communities: [Community]
error: String error: String
} }

View File

@ -1,25 +1,12 @@
import json
from functools import wraps from functools import wraps
import httpx import httpx
from services.cache import get_cached_author_by_user_id
from services.logger import root_logger as logger from services.logger import root_logger as logger
from services.rediscache import redis
from settings import ADMIN_SECRET, AUTH_URL from settings import ADMIN_SECRET, AUTH_URL
async def get_author_by_user(user: str):
author = None
redis_key = f"user:{user}"
result = await redis.execute("GET", redis_key)
if isinstance(result, str):
author = json.loads(result)
if author:
return author
return
async def request_data(gql, headers=None): async def request_data(gql, headers=None):
if headers is None: if headers is None:
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
@ -99,7 +86,7 @@ def login_required(f):
logger.info(f" got {user_id} roles: {user_roles}") logger.info(f" got {user_id} roles: {user_roles}")
info.context["user_id"] = user_id.strip() info.context["user_id"] = user_id.strip()
info.context["roles"] = user_roles info.context["roles"] = user_roles
author = await get_author_by_user(user_id) author = await get_cached_author_by_user_id(user_id)
if not author: if not author:
logger.error(f"author profile not found for user {user_id}") logger.error(f"author profile not found for user {user_id}")
info.context["author"] = author info.context["author"] = author

View File

@ -1,8 +1,13 @@
import json import json
from orm.topic import TopicFollower from sqlalchemy import and_, join, select
from orm.author import Author, AuthorFollower
from orm.topic import Topic, TopicFollower
from resolvers.stat import get_topic_authors_query, get_with_stat
from services.db import local_session from services.db import local_session
from services.encoders import CustomJSONEncoder from services.encoders import CustomJSONEncoder
from services.logger import root_logger as logger
from services.rediscache import redis from services.rediscache import redis
DEFAULT_FOLLOWS = { DEFAULT_FOLLOWS = {
@ -12,11 +17,20 @@ DEFAULT_FOLLOWS = {
} }
async def cache_topic(topic: dict):
await redis.execute(
"SET",
f"topic:id:{topic.get('id')}",
json.dumps(topic.dict(), cls=CustomJSONEncoder),
)
await redis.execute("SET", f"topic:slug:{topic.get('slug')}", topic.id)
async def cache_author(author: dict): async def cache_author(author: dict):
author_id = author.get("id") author_id = author.get("id")
payload = json.dumps(author, cls=CustomJSONEncoder) payload = json.dumps(author, cls=CustomJSONEncoder)
await redis.execute("SET", f'user:{author.get("user")}', payload) await redis.execute("SET", f'user:id:{author.get("user")}', author_id)
await redis.execute("SET", f"author:{author_id}", payload) await redis.execute("SET", f"author:id:{author_id}", payload)
# update stat all field for followers' caches in <authors> list # update stat all field for followers' caches in <authors> list
followers_str = await redis.execute("GET", f"author:{author_id}:followers") followers_str = await redis.execute("GET", f"author:{author_id}:followers")
@ -70,108 +84,158 @@ async def cache_author(author: dict):
) )
async def cache_follows(follower: dict, entity_type: str, entity: dict, is_insert=True): async def cache_follows(
follower_id: int, entity_type: str, entity_id: int, is_insert=True
):
# prepare # prepare
follows = [] follows = []
follower_id = follower.get("id") redis_key = f"author:follows-{entity_type}s:{follower_id}"
if follower_id: follows_str = await redis.execute("GET", redis_key)
redis_key = f"author:{follower_id}:follows-{entity_type}s" if isinstance(follows_str, str):
follows_str = await redis.execute("GET", redis_key) follows = json.loads(follows_str)
if isinstance(follows_str, str): if is_insert:
follows = json.loads(follows_str) if entity_id not in follows:
if is_insert: follows.append(entity_id)
follows.append(entity) else:
else: if not entity_id:
entity_id = entity.get("id") raise Exception("wrong entity")
if not entity_id: # Remove the entity from follows
raise Exception("wrong entity") follows = [eid for eid in follows if eid != entity_id]
# Remove the entity from follows
follows = [e for e in follows if e["id"] != entity_id]
# update follows cache # update follows cache
payload = json.dumps(follows, cls=CustomJSONEncoder) payload = json.dumps(follows, cls=CustomJSONEncoder)
await redis.execute("SET", redis_key, payload) await redis.execute("SET", redis_key, payload)
# update follower's stats everywhere # update follower's stats everywhere
follower_str = await redis.execute("GET", f"author:{follower_id}") follower_str = await redis.execute("GET", f"author:id:{follower_id}")
if isinstance(follower_str, str): if isinstance(follower_str, str):
follower = json.loads(follower_str) follower = json.loads(follower_str)
follower["stat"][f"{entity_type}s"] = len(follows) follower["stat"][f"{entity_type}s"] = len(follows)
await cache_author(follower) await cache_author(follower)
return follows return follows
async def cache_follow_author_change(follower: dict, author: dict, is_insert=True): async def get_cached_author(author_id: int):
author_id = author.get("id") if author_id:
follower_id = follower.get("id") rkey = f"author:id:{author_id}"
followers = [] cached_result = await redis.execute("GET", rkey)
if author_id and follower_id: if isinstance(cached_result, str):
redis_key = f"author:{author_id}:followers" return json.loads(cached_result)
followers_str = await redis.execute("GET", redis_key)
followers = json.loads(followers_str) if isinstance(followers_str, str) else []
# Remove the author from the list of followers, if present
followers = [f for f in followers if f["id"] != author_id]
# If inserting, add the new follower to the list if not already present
if is_insert and not any(f["id"] == follower_id for f in followers):
followers.append(follower)
# Remove the follower from the list if not inserting and present
else: else:
followers = [f for f in followers if f["id"] != follower_id] author_query = select(Author).filter(Author.id == author_id)
[author] = get_with_stat(author_query)
if author:
await cache_author(author.dict())
# Ensure followers are unique based on their 'id' field
followers = list({f["id"]: f for f in followers}.values())
# Update follower's stats everywhere async def get_cached_author_by_user_id(user_id: str):
follower_str = await redis.execute("GET", f"author:{follower_id}") author_id = await redis.execute("GET", f"user:id:{user_id}")
if isinstance(follower_str, str): if author_id:
follower = json.loads(follower_str) return await get_cached_author(int(author_id))
follower["stat"]["followers"] = len(followers)
await cache_author(follower)
payload = json.dumps(followers, cls=CustomJSONEncoder)
await redis.execute("SET", redis_key, payload)
async def get_cached_author_follows_topics(author_id: int):
topics = []
rkey = f"author:follows-topics:{author_id}"
cached = await redis.execute("GET", rkey)
if cached and isinstance(cached, str):
topics = json.loads(cached)
if not cached:
topics = (
local_session()
.query(Topic.id)
.select_from(join(Topic, TopicFollower, Topic.id == TopicFollower.topic))
.where(TopicFollower.follower == author_id)
.all()
)
await redis.execute("SET", rkey, json.dumps(topics))
return topics
async def get_cached_author_follows_authors(author_id: int):
authors = []
rkey = f"author:follows-authors:{author_id}"
cached = await redis.execute("GET", rkey)
if not cached:
with local_session() as session:
authors = (
session.query(Author.id)
.select_from(
join(Author, AuthorFollower, Author.id == AuthorFollower.author)
)
.where(AuthorFollower.follower == author_id)
.all()
)
await redis.execute("SET", rkey, json.dumps(authors))
elif isinstance(cached, str):
authors = json.loads(cached)
return authors
async def get_cached_author_followers(author_id: int):
followers = []
rkey = f"author:followers:{author_id}"
cached = await redis.execute("GET", rkey)
if isinstance(cached, str):
followers = json.loads(cached)
if isinstance(followers, list):
return followers
followers = (
local_session()
.query(Author.id)
.join(
AuthorFollower,
and_(
AuthorFollower.author == author_id,
AuthorFollower.follower == Author.id,
Author.id != author_id, # exclude the author from the followers
),
)
.all()
)
if followers:
await redis.execute("SET", rkey, json.dumps(followers))
logger.debug(f"author#{author_id} cache updated with {len(followers)} followers")
return followers return followers
async def cache_topic(topic_dict: dict): async def get_cached_topic_followers(topic_id: int):
# update stat all field for followers' caches in <topics> list followers = []
rkey = f"topic:followers:{topic_id}"
cached = await redis.execute("GET", rkey)
if isinstance(cached, str):
followers = json.loads(cached)
if isinstance(followers, list):
return followers
followers = ( followers = (
local_session() local_session()
.query(TopicFollower) .query(Author.id)
.filter(TopicFollower.topic == topic_dict.get("id")) .join(
TopicFollower,
and_(TopicFollower.topic == topic_id, TopicFollower.follower == Author.id),
)
.all() .all()
) )
for tf in followers: if followers:
follower_id = tf.follower await redis.execute("SET", rkey, json.dumps(followers))
follower_follows_topics = [] logger.debug(f"topic#{topic_id} cache updated with {len(followers)} followers")
follower_follows_topics_str = await redis.execute( return followers
"GET", f"author:{follower_id}:follows-topics"
)
if isinstance(follower_follows_topics_str, str):
follower_follows_topics = json.loads(follower_follows_topics_str)
c = 0
for old_topic in follower_follows_topics:
if int(old_topic.get("id")) == int(topic_dict.get("id", 0)):
follower_follows_topics[c] = topic_dict
break # exit the loop since we found and updated the topic
c += 1
else:
# topic not found in the list, so add the new topic with the updated stat field
follower_follows_topics.append(topic_dict)
await redis.execute(
"SET",
f"author:{follower_id}:follows-topics",
json.dumps(follower_follows_topics, cls=CustomJSONEncoder),
)
# update topic's stat async def get_cached_topic_authors(topic_id: int):
topic_dict["stat"]["followers"] = len(followers) authors = []
rkey = f"topic:authors:{topic_id}"
cached = await redis.execute("GET", rkey)
if isinstance(cached, str):
authors = json.loads(cached)
if isinstance(authors, list):
return authors
# save in cache authors = local_session().execute(get_topic_authors_query(topic_id))
payload = json.dumps(topic_dict, cls=CustomJSONEncoder) # should be id list
await redis.execute("SET", f'topic:{topic_dict.get("slug")}', payload) if authors:
await redis.execute("SET", rkey, json.dumps(authors))
logger.debug(f"topic#{topic_id} cache updated with {len(authors)} authors")
return authors

View File

@ -1,5 +1,4 @@
import asyncio import asyncio
import json
from sqlalchemy import event, select from sqlalchemy import event, select
@ -8,10 +7,8 @@ from orm.reaction import Reaction
from orm.shout import Shout, ShoutAuthor from orm.shout import Shout, ShoutAuthor
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from resolvers.stat import get_with_stat from resolvers.stat import get_with_stat
from services.cache import cache_author, cache_follow_author_change, cache_follows from services.cache import cache_author, cache_follows, cache_topic
from services.encoders import CustomJSONEncoder
from services.logger import root_logger as logger from services.logger import root_logger as logger
from services.rediscache import redis
DEFAULT_FOLLOWS = { DEFAULT_FOLLOWS = {
"topics": [], "topics": [],
@ -30,12 +27,7 @@ async def handle_author_follower_change(
[follower] = get_with_stat(follower_query) [follower] = get_with_stat(follower_query)
if follower and author: if follower and author:
await cache_author(author.dict()) await cache_author(author.dict())
await cache_follows( await cache_follows(follower.id, "author", author.id, is_insert)
follower.dict(), "author", author.dict(), is_insert
) # cache_author(follower_dict) inside
await cache_follow_author_change(
follower.dict(), author.dict(), is_insert
) # cache_author(follower_dict) inside
async def handle_topic_follower_change( async def handle_topic_follower_change(
@ -47,11 +39,9 @@ async def handle_topic_follower_change(
follower_query = select(Author).filter(Author.id == follower_id) follower_query = select(Author).filter(Author.id == follower_id)
[follower] = get_with_stat(follower_query) [follower] = get_with_stat(follower_query)
if follower and topic: if follower and topic:
await cache_topic(topic.dict())
await cache_author(follower.dict()) await cache_author(follower.dict())
await redis.execute( await cache_follows(follower.id, "topic", topic.id, is_insert)
"SET", f"topic:{topic.id}", json.dumps(topic.dict(), cls=CustomJSONEncoder)
)
await cache_follows(follower.dict(), "topic", topic.dict(), is_insert)
# handle_author_follow and handle_topic_follow -> cache_author, cache_follows, cache_followers # handle_author_follow and handle_topic_follow -> cache_author, cache_follows, cache_followers