follows-cache-fix
All checks were successful
Deploy on push / deploy (push) Successful in 23s

This commit is contained in:
Untone 2024-02-26 03:49:56 +03:00
parent 5478ff45e7
commit a00c68068f
3 changed files with 71 additions and 43 deletions

View File

@ -47,7 +47,7 @@ async def get_author(_, _info, slug='', author_id=None):
[author] = session.execute(q) [author] = session.execute(q)
author_id = cast(Author.id, Integer) author_id = cast(Author.id, Integer)
if author_id: if bool(author_id):
cache = await redis.execute('GET', f'id:{author_id}:author') cache = await redis.execute('GET', f'id:{author_id}:author')
author = json.loads(cache) if cache else get_with_stat(select(Author).where(Author.id == author_id)).first() author = json.loads(cache) if cache else get_with_stat(select(Author).where(Author.id == author_id)).first()
if author: if author:

View File

@ -12,11 +12,11 @@ from orm.reaction import Reaction
from orm.shout import Shout, ShoutReactionsFollower from orm.shout import Shout, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from resolvers.community import community_follow, community_unfollow from resolvers.community import community_follow, community_unfollow
from resolvers.topic import topic_follow, topic_unfollow from resolvers.topic import topic_unfollow
from resolvers.stat import get_with_stat, author_follows_topics, author_follows_authors from resolvers.stat import get_with_stat, author_follows_topics, author_follows_authors
from services.auth import login_required from services.auth import login_required
from services.db import local_session from services.db import local_session
from services.event_listeners import DEFAULT_FOLLOWS from services.event_listeners import DEFAULT_FOLLOWS, update_follows_for_author
from services.notify import notify_follower from services.notify import notify_follower
from services.schema import mutation, query from services.schema import mutation, query
from services.logger import root_logger as logger from services.logger import root_logger as logger
@ -29,24 +29,23 @@ async def follow(_, info, what, slug):
try: try:
user_id = info.context['user_id'] user_id = info.context['user_id']
with local_session() as session: with local_session() as session:
actor = session.query(Author).filter(Author.user == user_id).first() follower = session.query(Author).filter(Author.user == user_id).first()
if actor: if follower:
follower_id = actor.id
if what == 'AUTHOR': if what == 'AUTHOR':
if author_follow(follower_id, slug): if author_unfollow(follower.id, slug):
author = ( author = session.query(Author).where(Author.slug == slug).first()
session.query(Author.id).where(Author.slug == slug).one() if author:
) await update_follows_for_author(session, follower, 'author', author, True)
follower = ( await notify_follower(follower.dict(), author.id, 'unfollow')
session.query(Author).where(Author.id == follower_id).one()
)
await notify_follower(follower.dict(), author.id)
elif what == 'TOPIC': elif what == 'TOPIC':
topic_follow(follower_id, slug) topic = session.query(Topic).where(Topic.slug == slug).first()
if topic:
await update_follows_for_author(session, follower, 'topic', topic, True)
topic_unfollow(follower.id, slug)
elif what == 'COMMUNITY': elif what == 'COMMUNITY':
community_follow(follower_id, slug) community_follow(follower.id, slug)
elif what == 'REACTIONS': elif what == 'REACTIONS':
reactions_follow(follower_id, slug) reactions_follow(follower.id, slug)
except Exception as e: except Exception as e:
logger.debug(info, what, slug) logger.debug(info, what, slug)
logger.error(e) logger.error(e)
@ -61,24 +60,23 @@ async def unfollow(_, info, what, slug):
user_id = info.context['user_id'] user_id = info.context['user_id']
try: try:
with local_session() as session: with local_session() as session:
actor = session.query(Author).filter(Author.user == user_id).first() follower = session.query(Author).filter(Author.user == user_id).first()
if actor: if follower:
follower_id = actor.id
if what == 'AUTHOR': if what == 'AUTHOR':
if author_unfollow(follower_id, slug): if author_unfollow(follower.id, slug):
author = ( author = session.query(Author).where(Author.slug == slug).first()
session.query(Author.id).where(Author.slug == slug).one() if author:
) await update_follows_for_author(session, follower, 'author', author, False)
follower = (
session.query(Author).where(Author.id == follower_id).one()
)
await notify_follower(follower.dict(), author.id, 'unfollow') await notify_follower(follower.dict(), author.id, 'unfollow')
elif what == 'TOPIC': elif what == 'TOPIC':
topic_unfollow(follower_id, slug) topic = session.query(Topic).where(Topic.slug == slug).first()
if topic:
await update_follows_for_author(session, follower, 'topic', topic, False)
topic_unfollow(follower.id, slug)
elif what == 'COMMUNITY': elif what == 'COMMUNITY':
community_unfollow(follower_id, slug) community_unfollow(follower.id, slug)
elif what == 'REACTIONS': elif what == 'REACTIONS':
reactions_unfollow(follower_id, slug) reactions_unfollow(follower.id, slug)
except Exception as e: except Exception as e:
return {'error': str(e)} return {'error': str(e)}

View File

@ -24,6 +24,26 @@ async def update_author_cache(author: dict, ttl=25 * 60 * 60):
await redis.execute('SETEX', f'id:{author.get("id")}:author', ttl, payload) await redis.execute('SETEX', f'id:{author.get("id")}:author', ttl, payload)
async def update_follows_topics_cache(follows, author_id: int, ttl=25 * 60 * 60):
try:
payload = json.dumps(follows)
await redis.execute('SETEX', f'author:{author_id}:follows-topics', ttl, payload)
except Exception:
import traceback
traceback.print_exc()
async def update_follows_authors_cache(follows, author_id: int, ttl=25 * 60 * 60):
try:
payload = json.dumps(follows)
await redis.execute('SETEX', f'author:{author_id}:follows-authors', ttl, payload)
except Exception:
import traceback
traceback.print_exc()
@event.listens_for(Shout, 'after_insert') @event.listens_for(Shout, 'after_insert')
@event.listens_for(Shout, 'after_update') @event.listens_for(Shout, 'after_update')
def after_shouts_update(mapper, connection, shout: Shout): def after_shouts_update(mapper, connection, shout: Shout):
@ -107,15 +127,13 @@ def after_author_follower_delete(mapper, connection, target: AuthorFollower):
) )
async def update_follows_for_user( async def update_follows_for_author(
connection, user_id, entity_type, entity: dict, is_insert connection, follower, entity_type, entity: dict, is_insert
): ):
redis_key = f'user:{user_id}:follows' ttl = 25 * 60 * 60
redis_key = f'id:{follower.id}:follows-{entity_type}s'
follows_str = await redis.get(redis_key) follows_str = await redis.get(redis_key)
if follows_str: follows = json.loads(follows_str) if follows_str else []
follows = json.loads(follows_str)
else:
follows = DEFAULT_FOLLOWS
if is_insert: if is_insert:
follows[f'{entity_type}s'].append(entity) follows[f'{entity_type}s'].append(entity)
else: else:
@ -123,7 +141,7 @@ async def update_follows_for_user(
follows[f'{entity_type}s'] = [ follows[f'{entity_type}s'] = [
e for e in follows[f'{entity_type}s'] if e['id'] != entity['id'] e for e in follows[f'{entity_type}s'] if e['id'] != entity['id']
] ]
await redis.execute('SET', redis_key, json.dumps(follows)) await redis.execute('SETEX', redis_key, ttl, json.dumps(follows))
async def handle_author_follower_change( async def handle_author_follower_change(
@ -135,10 +153,16 @@ async def handle_author_follower_change(
follower = get_with_stat(follower_query) follower = get_with_stat(follower_query)
if follower and author: if follower and author:
_ = asyncio.create_task(update_author_cache(author.dict())) _ = asyncio.create_task(update_author_cache(author.dict()))
follows_authors = await redis.execute('GET', f'author:{follower_id}:follows-authors')
if follows_authors:
follows_authors = json.loads(follows_authors)
if not any(x.get('id') == author.id for x in follows_authors):
follows_authors.append(author.dict())
_ = asyncio.create_task(update_follows_authors_cache(follows_authors, follower_id))
_ = asyncio.create_task(update_author_cache(follower.dict())) _ = asyncio.create_task(update_author_cache(follower.dict()))
await update_follows_for_user( await update_follows_for_author(
connection, connection,
follower.user, follower,
'author', 'author',
{ {
'id': author.id, 'id': author.id,
@ -162,9 +186,15 @@ async def handle_topic_follower_change(
follower = get_with_stat(follower_query) follower = get_with_stat(follower_query)
if follower and topic: if follower and topic:
_ = asyncio.create_task(update_author_cache(follower.dict())) _ = asyncio.create_task(update_author_cache(follower.dict()))
await update_follows_for_user( follows_topics = await redis.execute('GET', f'author:{follower_id}:follows-topics')
if follows_topics:
follows_topics = json.loads(follows_topics)
if not any(x.get('id') == topic.id for x in follows_topics):
follows_topics.append(topic)
_ = asyncio.create_task(update_follows_topics_cache(follows_topics, follower_id))
await update_follows_for_author(
connection, connection,
follower.user, follower,
'topic', 'topic',
{ {
'id': topic.id, 'id': topic.id,