cache-refactored
All checks were successful
Deploy on push / deploy (push) Successful in 24s

This commit is contained in:
Untone 2024-02-27 15:40:53 +03:00
parent 564a8c10b7
commit 2e68128dfc
5 changed files with 299 additions and 283 deletions

View File

@ -1,7 +1,7 @@
import json
import time
from sqlalchemy import select, or_, and_, text, desc
from sqlalchemy import select, or_, and_, text, desc, cast, Integer
from sqlalchemy.orm import aliased
from sqlalchemy_searchable import search
@ -9,7 +9,7 @@ from orm.author import Author, AuthorFollower
from orm.shout import ShoutAuthor, ShoutTopic
from orm.topic import Topic
from resolvers.stat import get_with_stat, author_follows_authors, author_follows_topics
from services.event_listeners import update_author_cache
from services.cache import update_author_cache
from services.auth import login_required
from services.db import local_session
from services.rediscache import redis
@ -214,26 +214,27 @@ def create_author(user_id: str, slug: str, name: str = ''):
@query.field('get_author_followers')
def get_author_followers(_, _info, slug: str):
async def get_author_followers(_, _info, slug: str):
logger.debug(f'getting followers for @{slug}')
try:
with local_session() as session:
author_alias = aliased(Author)
author_id_result = (
session.query(author_alias.id).filter(author_alias.slug == slug).first()
session.query(author_alias).filter(author_alias.slug == slug).first()
)
author_id = author_id_result[0] if author_id_result else None
author_follower_alias = aliased(AuthorFollower, name='af')
q = select(Author).join(
author_follower_alias,
and_(
author_follower_alias.author == author_id,
author_follower_alias.follower == Author.id,
),
)
return get_with_stat(q)
author = author_id_result[0] if author_id_result else None
author_id = cast(author.id, Integer)
cached = await redis.execute('GET', f'id:{author_id}:followers')
if not cached:
author_follower_alias = aliased(AuthorFollower, name='af')
q = select(Author).join(
author_follower_alias,
and_(
author_follower_alias.author == author_id,
author_follower_alias.follower == Author.id,
),
)
return json.loads(cached) if cached else get_with_stat(q)
except Exception as exc:
logger.error(exc)
return []

View File

@ -16,7 +16,7 @@ from resolvers.topic import topic_unfollow
from resolvers.stat import get_with_stat, author_follows_topics, author_follows_authors
from services.auth import login_required
from services.db import local_session
from services.event_listeners import DEFAULT_FOLLOWS, update_follows_for_author
from services.cache import DEFAULT_FOLLOWS, update_follows_for_author, update_followers_for_author
from services.notify import notify_follower
from services.schema import mutation, query
from services.logger import root_logger as logger
@ -28,24 +28,27 @@ from services.rediscache import redis
async def follow(_, info, what, slug):
try:
user_id = info.context['user_id']
with local_session() as session:
follower = session.query(Author).filter(Author.user == user_id).first()
if follower:
if what == 'AUTHOR':
if author_unfollow(follower.id, slug):
author = session.query(Author).where(Author.slug == slug).first()
if author:
await update_follows_for_author(session, follower, 'author', author, True)
await notify_follower(follower.dict(), author.id, 'unfollow')
elif what == 'TOPIC':
topic = session.query(Topic).where(Topic.slug == slug).first()
if topic:
await update_follows_for_author(session, follower, 'topic', topic, True)
topic_unfollow(follower.id, slug)
elif what == 'COMMUNITY':
community_follow(follower.id, slug)
elif what == 'REACTIONS':
reactions_follow(follower.id, slug)
follower_query = select(Author).select_from(Author).filter(Author.user == user_id)
[follower] = get_with_stat(follower_query)
if follower:
if what == 'AUTHOR':
if author_unfollow(follower.id, slug):
author_query = select(Author).select_from(Author).where(Author.slug == slug)
[author] = get_with_stat(author_query)
if author:
await update_follows_for_author(follower, 'author', author, True)
await update_followers_for_author(follower, author, True)
await notify_follower(follower.dict(), author.id, 'unfollow')
elif what == 'TOPIC':
topic_query = select(Topic).where(Topic.slug == slug)
[topic] = get_with_stat(topic_query)
if topic:
await update_follows_for_author(follower, 'topic', topic, True)
topic_unfollow(follower.id, slug)
elif what == 'COMMUNITY':
community_follow(follower.id, slug)
elif what == 'REACTIONS':
reactions_follow(follower.id, slug)
except Exception as e:
logger.debug(info, what, slug)
logger.error(e)
@ -59,24 +62,27 @@ async def follow(_, info, what, slug):
async def unfollow(_, info, what, slug):
user_id = info.context['user_id']
try:
with local_session() as session:
follower = session.query(Author).filter(Author.user == user_id).first()
if follower:
if what == 'AUTHOR':
if author_unfollow(follower.id, slug):
author = session.query(Author).where(Author.slug == slug).first()
if author:
await update_follows_for_author(session, follower, 'author', author, False)
await notify_follower(follower.dict(), author.id, 'unfollow')
elif what == 'TOPIC':
topic = session.query(Topic).where(Topic.slug == slug).first()
if topic:
await update_follows_for_author(session, follower, 'topic', topic, False)
topic_unfollow(follower.id, slug)
elif what == 'COMMUNITY':
community_unfollow(follower.id, slug)
elif what == 'REACTIONS':
reactions_unfollow(follower.id, slug)
follower_query = select(Author).filter(Author.user == user_id)
[follower] = get_with_stat(follower_query)
if follower:
if what == 'AUTHOR':
if author_unfollow(follower.id, slug):
author_query = select(Author).where(Author.slug == slug)
[author] = get_with_stat(author_query)
if author:
await update_follows_for_author(follower, 'author', author, False)
await update_followers_for_author(follower, author, False)
await notify_follower(follower.dict(), author.id, 'unfollow')
elif what == 'TOPIC':
topic_query = select(Topic).where(Topic.slug == slug)
[topic] = get_with_stat(topic_query)
if topic:
await update_follows_for_author(follower, 'topic', topic, False)
topic_unfollow(follower.id, slug)
elif what == 'COMMUNITY':
community_unfollow(follower.id, slug)
elif what == 'REACTIONS':
reactions_unfollow(follower.id, slug)
except Exception as e:
return {'error': str(e)}

View File

@ -1,26 +1,219 @@
from functools import wraps
import asyncio
from dogpile.cache import make_region
from sqlalchemy import select, event
import json
# Создание региона кэша с TTL 300 секунд
cache_region = make_region().configure('dogpile.cache.memory', expiration_time=300)
from orm.author import Author, AuthorFollower
from orm.reaction import Reaction
from orm.shout import ShoutAuthor, Shout
from orm.topic import Topic, TopicFollower
from resolvers.stat import get_with_stat
from services.rediscache import redis
from services.logger import root_logger as logger
# Декоратор для кэширования методов
def cache_method(cache_key: str):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Генерация ключа для кэширования
key = cache_key.format(*args, **kwargs)
# Получение значения из кэша
result = cache_region.get(key)
if result is None:
# Если значение отсутствует в кэше, вызываем функцию и кэшируем результат
result = f(*args, **kwargs)
cache_region.set(key, result)
return result
DEFAULT_FOLLOWS = {
'topics': [],
'authors': [],
'communities': [{'id': 1, 'name': 'Дискурс', 'slug': 'discours', 'pic': ''}],
}
return decorated_function
return decorator
async def update_author_cache(author: dict, ttl=25 * 60 * 60):
payload = json.dumps(author)
await redis.execute('SETEX', f'user:{author.get("user")}:author', ttl, payload)
await redis.execute('SETEX', f'id:{author.get("id")}:author', ttl, payload)
async def update_follows_topics_cache(follows, author_id: int, ttl=25 * 60 * 60):
try:
payload = json.dumps(follows)
await redis.execute('SETEX', f'author:{author_id}:follows-topics', ttl, payload)
except Exception as exc:
logger.error(exc)
import traceback
exc = traceback.format_exc()
logger.error(exc)
async def update_follows_authors_cache(follows, author_id: int, ttl=25 * 60 * 60):
try:
payload = json.dumps(follows)
await redis.execute('SETEX', f'author:{author_id}:follows-authors', ttl, payload)
except Exception:
import traceback
exc = traceback.format_exc()
logger.error(exc)
@event.listens_for(Shout, 'after_insert')
@event.listens_for(Shout, 'after_update')
def after_shouts_update(mapper, connection, shout: Shout):
# Main query to get authors associated with the shout through ShoutAuthor
authors_query = (
select(Author)
.select_from(ShoutAuthor) # Select from ShoutAuthor
.join(Author, Author.id == ShoutAuthor.author) # Join with Author
.where(ShoutAuthor.shout == shout.id) # Filter by shout.id
)
for author_with_stat in get_with_stat(authors_query):
asyncio.create_task(update_author_cache(author_with_stat.dict()))
@event.listens_for(Reaction, 'after_insert')
def after_reaction_insert(mapper, connection, reaction: Reaction):
try:
author_subquery = select(Author).where(Author.id == reaction.created_by)
replied_author_subquery = (
select(Author)
.join(Reaction, Author.id == Reaction.created_by)
.where(Reaction.id == reaction.reply_to)
)
author_query = select(
author_subquery.subquery().c.id,
author_subquery.subquery().c.slug,
author_subquery.subquery().c.created_at,
author_subquery.subquery().c.name,
).select_from(author_subquery.subquery()).union(
select(
replied_author_subquery.subquery().c.id,
)
.select_from(replied_author_subquery.subquery())
)
for author_with_stat in get_with_stat(author_query):
asyncio.create_task(update_author_cache(author_with_stat.dict()))
shout = connection.execute(select(Shout).select_from(Shout).where(Shout.id == reaction.shout)).first()
if shout:
after_shouts_update(mapper, connection, shout)
except Exception as exc:
logger.error(exc)
@event.listens_for(Author, 'after_insert')
@event.listens_for(Author, 'after_update')
def after_author_update(mapper, connection, author: Author):
q = select(Author).where(Author.id == author.id)
[author_with_stat] = get_with_stat(q)
asyncio.create_task(update_author_cache(author_with_stat.dict()))
@event.listens_for(TopicFollower, 'after_insert')
def after_topic_follower_insert(mapper, connection, target: TopicFollower):
asyncio.create_task(
handle_topic_follower_change(connection, target.topic, target.follower, True)
)
@event.listens_for(TopicFollower, 'after_delete')
def after_topic_follower_delete(mapper, connection, target: TopicFollower):
asyncio.create_task(
handle_topic_follower_change(connection, target.topic, target.follower, False)
)
@event.listens_for(AuthorFollower, 'after_insert')
def after_author_follower_insert(mapper, connection, target: AuthorFollower):
asyncio.create_task(
handle_author_follower_change(connection, target.author, target.follower, True)
)
@event.listens_for(AuthorFollower, 'after_delete')
def after_author_follower_delete(mapper, connection, target: AuthorFollower):
asyncio.create_task(
handle_author_follower_change(connection, target.author, target.follower, False)
)
async def update_follows_for_author(follower: Author, entity_type: str, entity: dict, is_insert: bool):
ttl = 25 * 60 * 60
redis_key = f'id:{follower.id}:follows-{entity_type}s'
follows_str = await redis.get(redis_key)
follows = json.loads(follows_str) if follows_str else []
if is_insert:
follows.append(entity)
else:
# Remove the entity from follows
follows = [e for e in follows if e['id'] != entity['id']]
await redis.execute('SETEX', redis_key, ttl, json.dumps(follows))
async def update_followers_for_author(follower: Author, author: Author, is_insert: bool):
ttl = 25 * 60 * 60
redis_key = f'id:{author.id}:followers'
followers_str = await redis.get(redis_key)
followers = json.loads(followers_str) if followers_str else []
if is_insert:
followers.append(follower)
else:
# Remove the entity from follows
follows = [e for e in followers if e['id'] != author.id]
await redis.execute('SETEX', redis_key, ttl, json.dumps(follows))
async def handle_author_follower_change(
connection, author_id: int, follower_id: int, is_insert: bool
):
author_query = select(Author).select_from(Author).filter(Author.id == author_id)
[author] = get_with_stat(author_query)
follower_query = select(Author).select_from(Author).filter(Author.id == follower_id)
follower = get_with_stat(follower_query)
if follower and author:
_ = asyncio.create_task(update_author_cache(author.dict()))
follows_authors = await redis.execute('GET', f'author:{follower_id}:follows-authors')
if follows_authors:
follows_authors = json.loads(follows_authors)
if not any(x.get('id') == author.id for x in follows_authors):
follows_authors.append(author.dict())
_ = asyncio.create_task(update_follows_authors_cache(follows_authors, follower_id))
_ = asyncio.create_task(update_author_cache(follower.dict()))
await update_follows_for_author(
connection,
follower,
'author',
{
'id': author.id,
'name': author.name,
'slug': author.slug,
'pic': author.pic,
'bio': author.bio,
'stat': author.stat,
},
is_insert,
)
async def handle_topic_follower_change(
connection, topic_id: int, follower_id: int, is_insert: bool
):
q = select(Topic).filter(Topic.id == topic_id)
topics = get_with_stat(q)
topic = topics[0]
follower_query = select(Author).filter(Author.id == follower_id)
follower = get_with_stat(follower_query)
if follower and topic:
_ = asyncio.create_task(update_author_cache(follower.dict()))
follows_topics = await redis.execute('GET', f'author:{follower_id}:follows-topics')
if follows_topics:
follows_topics = json.loads(follows_topics)
if not any(x.get('id') == topic.id for x in follows_topics):
follows_topics.append(topic)
_ = asyncio.create_task(update_follows_topics_cache(follows_topics, follower_id))
await update_follows_for_author(
follower,
'topic',
{
'id': topic.id,
'title': topic.title,
'slug': topic.slug,
'body': topic.body,
'stat': topic.stat,
},
is_insert,
)

View File

@ -1,210 +0,0 @@
import asyncio
from sqlalchemy import select, event
import json
from orm.author import Author, AuthorFollower
from orm.reaction import Reaction
from orm.shout import ShoutAuthor, Shout
from orm.topic import Topic, TopicFollower
from resolvers.stat import get_with_stat
from services.rediscache import redis
from services.logger import root_logger as logger
DEFAULT_FOLLOWS = {
'topics': [],
'authors': [],
'communities': [{'id': 1, 'name': 'Дискурс', 'slug': 'discours', 'pic': ''}],
}
async def update_author_cache(author: dict, ttl=25 * 60 * 60):
payload = json.dumps(author)
await redis.execute('SETEX', f'user:{author.get("user")}:author', ttl, payload)
await redis.execute('SETEX', f'id:{author.get("id")}:author', ttl, payload)
async def update_follows_topics_cache(follows, author_id: int, ttl=25 * 60 * 60):
try:
payload = json.dumps(follows)
await redis.execute('SETEX', f'author:{author_id}:follows-topics', ttl, payload)
except Exception:
import traceback
exc = traceback.format_exc()
logger.error(exc)
async def update_follows_authors_cache(follows, author_id: int, ttl=25 * 60 * 60):
try:
payload = json.dumps(follows)
await redis.execute('SETEX', f'author:{author_id}:follows-authors', ttl, payload)
except Exception:
import traceback
exc = traceback.format_exc()
logger.error(exc)
@event.listens_for(Shout, 'after_insert')
@event.listens_for(Shout, 'after_update')
def after_shouts_update(mapper, connection, shout: Shout):
# Main query to get authors associated with the shout through ShoutAuthor
authors_query = (
select(Author)
.select_from(ShoutAuthor) # Select from ShoutAuthor
.join(Author, Author.id == ShoutAuthor.author) # Join with Author
.where(ShoutAuthor.shout == shout.id) # Filter by shout.id
)
for author_with_stat in get_with_stat(authors_query):
asyncio.create_task(update_author_cache(author_with_stat.dict()))
@event.listens_for(Reaction, 'after_insert')
def after_reaction_insert(mapper, connection, reaction: Reaction):
try:
author_subquery = select(Author).where(Author.id == reaction.created_by)
replied_author_subquery = (
select(Author)
.join(Reaction, Author.id == Reaction.created_by)
.where(Reaction.id == reaction.reply_to)
)
author_query = select(
author_subquery.subquery().c.id,
author_subquery.subquery().c.slug,
author_subquery.subquery().c.created_at,
author_subquery.subquery().c.name,
).select_from(author_subquery.subquery()).union(
select(
replied_author_subquery.subquery().c.id,
)
.select_from(replied_author_subquery.subquery())
)
for author_with_stat in get_with_stat(author_query):
asyncio.create_task(update_author_cache(author_with_stat.dict()))
shout = connection.execute(select(Shout).select_from(Shout).where(Shout.id == reaction.shout)).first()
if shout:
after_shouts_update(mapper, connection, shout)
except Exception as exc:
logger.error(exc)
@event.listens_for(Author, 'after_insert')
@event.listens_for(Author, 'after_update')
def after_author_update(mapper, connection, author: Author):
q = select(Author).where(Author.id == author.id)
[author_with_stat] = get_with_stat(q)
asyncio.create_task(update_author_cache(author_with_stat.dict()))
@event.listens_for(TopicFollower, 'after_insert')
def after_topic_follower_insert(mapper, connection, target: TopicFollower):
asyncio.create_task(
handle_topic_follower_change(connection, target.topic, target.follower, True)
)
@event.listens_for(TopicFollower, 'after_delete')
def after_topic_follower_delete(mapper, connection, target: TopicFollower):
asyncio.create_task(
handle_topic_follower_change(connection, target.topic, target.follower, False)
)
@event.listens_for(AuthorFollower, 'after_insert')
def after_author_follower_insert(mapper, connection, target: AuthorFollower):
asyncio.create_task(
handle_author_follower_change(connection, target.author, target.follower, True)
)
@event.listens_for(AuthorFollower, 'after_delete')
def after_author_follower_delete(mapper, connection, target: AuthorFollower):
asyncio.create_task(
handle_author_follower_change(connection, target.author, target.follower, False)
)
async def update_follows_for_author(
connection, follower, entity_type, entity: dict, is_insert
):
ttl = 25 * 60 * 60
redis_key = f'id:{follower.id}:follows-{entity_type}s'
follows_str = await redis.get(redis_key)
follows = json.loads(follows_str) if follows_str else []
if is_insert:
follows[f'{entity_type}s'].append(entity)
else:
# Remove the entity from follows
follows[f'{entity_type}s'] = [
e for e in follows[f'{entity_type}s'] if e['id'] != entity['id']
]
await redis.execute('SETEX', redis_key, ttl, json.dumps(follows))
async def handle_author_follower_change(
connection, author_id: int, follower_id: int, is_insert: bool
):
author_query = select(Author).select_from(Author).filter(Author.id == author_id)
[author] = get_with_stat(author_query)
follower_query = select(Author).select_from(Author).filter(Author.id == follower_id)
follower = get_with_stat(follower_query)
if follower and author:
_ = asyncio.create_task(update_author_cache(author.dict()))
follows_authors = await redis.execute('GET', f'author:{follower_id}:follows-authors')
if follows_authors:
follows_authors = json.loads(follows_authors)
if not any(x.get('id') == author.id for x in follows_authors):
follows_authors.append(author.dict())
_ = asyncio.create_task(update_follows_authors_cache(follows_authors, follower_id))
_ = asyncio.create_task(update_author_cache(follower.dict()))
await update_follows_for_author(
connection,
follower,
'author',
{
'id': author.id,
'name': author.name,
'slug': author.slug,
'pic': author.pic,
'bio': author.bio,
'stat': author.stat,
},
is_insert,
)
async def handle_topic_follower_change(
connection, topic_id: int, follower_id: int, is_insert: bool
):
q = select(Topic).filter(Topic.id == topic_id)
topics = get_with_stat(q)
topic = topics[0]
follower_query = select(Author).filter(Author.id == follower_id)
follower = get_with_stat(follower_query)
if follower and topic:
_ = asyncio.create_task(update_author_cache(follower.dict()))
follows_topics = await redis.execute('GET', f'author:{follower_id}:follows-topics')
if follows_topics:
follows_topics = json.loads(follows_topics)
if not any(x.get('id') == topic.id for x in follows_topics):
follows_topics.append(topic)
_ = asyncio.create_task(update_follows_topics_cache(follows_topics, follower_id))
await update_follows_for_author(
connection,
follower,
'topic',
{
'id': topic.id,
'title': topic.title,
'slug': topic.slug,
'body': topic.body,
'stat': topic.stat,
},
is_insert,
)

26
services/memorycache.py Normal file
View File

@ -0,0 +1,26 @@
from functools import wraps
from dogpile.cache import make_region
# Создание региона кэша с TTL 300 секунд
cache_region = make_region().configure('dogpile.cache.memory', expiration_time=300)
# Декоратор для кэширования методов
def cache_method(cache_key: str):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Генерация ключа для кэширования
key = cache_key.format(*args, **kwargs)
# Получение значения из кэша
result = cache_region.get(key)
if result is None:
# Если значение отсутствует в кэше, вызываем функцию и кэшируем результат
result = f(*args, **kwargs)
cache_region.set(key, result)
return result
return decorated_function
return decorator