fmt
All checks were successful
Deploy on push / deploy (push) Successful in 22s

This commit is contained in:
Untone 2024-03-28 15:56:32 +03:00
parent 7f913050ee
commit 9bda7cef95
12 changed files with 273 additions and 177 deletions

View File

@ -9,7 +9,12 @@ from sqlalchemy_searchable import search
from orm.author import Author, AuthorFollower
from orm.shout import ShoutAuthor, ShoutTopic
from orm.topic import Topic
from resolvers.stat import get_authors_with_stat_cached, author_follows_authors, author_follows_topics, get_with_stat
from resolvers.stat import (
get_authors_with_stat_cached,
author_follows_authors,
author_follows_topics,
get_with_stat,
)
from services.cache import set_author_cache, update_author_followers_cache
from services.auth import login_required
from services.db import local_session
@ -53,7 +58,7 @@ async def get_author(_, _info, slug='', author_id=None):
author_dict = None
try:
if slug:
author_id = local_session().query(Author.id).filter(Author.slug == slug).scalar()
author_id = local_session().query(Author.id).filter(Author.slug == slug)
logger.debug(f'found @{slug} with id {author_id}')
if author_id:
cache_key = f'author:{author_id}'
@ -64,7 +69,6 @@ async def get_author(_, _info, slug='', author_id=None):
if cache and isinstance(cache, str):
author_dict = json.loads(cache)
else:
result = await get_authors_with_stat_cached(q)
if result:
[author] = result
@ -122,7 +126,7 @@ async def get_author_id(_, _info, user: str):
@query.field('load_authors_by')
def load_authors_by(_, _info, by, limit, offset):
cache_key = f"{json.dumps(by)}_{limit}_{offset}"
cache_key = f'{json.dumps(by)}_{limit}_{offset}'
@cache_region.cache_on_arguments(cache_key)
def _load_authors_by():
@ -175,19 +179,21 @@ async def get_author_follows(_, _info, slug='', user=None, author_id=0):
raise ValueError('One of slug, user, or author_id must be provided')
[result] = local_session().execute(author_query)
if len(result) > 0:
#logger.debug(result)
# logger.debug(result)
[author] = result
#logger.debug(author)
# logger.debug(author)
if author and isinstance(author, Author):
logger.debug(author.dict())
author_id = author.id.scalar()
author_id = author.id
rkey = f'author:{author_id}:follows-authors'
logger.debug(f'getting {author_id} follows authors')
cached = await redis.execute('GET', rkey)
if not cached:
authors = author_follows_authors(author_id)
prepared = [author.dict() for author in authors]
await redis.execute('SET', rkey, json.dumps(prepared, cls=CustomJSONEncoder))
await redis.execute(
'SET', rkey, json.dumps(prepared, cls=CustomJSONEncoder)
)
elif isinstance(cached, str):
authors = json.loads(cached)
@ -198,7 +204,9 @@ async def get_author_follows(_, _info, slug='', user=None, author_id=0):
if not cached:
topics = author_follows_topics(author_id)
prepared = [topic.dict() for topic in topics]
await redis.execute('SET', rkey, json.dumps(prepared, cls=CustomJSONEncoder))
await redis.execute(
'SET', rkey, json.dumps(prepared, cls=CustomJSONEncoder)
)
return {
'topics': topics,
'authors': authors,
@ -234,7 +242,9 @@ async def get_author_follows_topics(_, _info, slug='', user=None, author_id=None
if not cached:
topics = author_follows_topics(author_id)
prepared = [topic.dict() for topic in topics]
await redis.execute('SET', rkey, json.dumps(prepared, cls=CustomJSONEncoder))
await redis.execute(
'SET', rkey, json.dumps(prepared, cls=CustomJSONEncoder)
)
return topics
@ -258,7 +268,9 @@ async def get_author_follows_authors(_, _info, slug='', user=None, author_id=Non
if not authors:
authors = author_follows_authors(author_id)
prepared = [author.dict() for author in authors]
await redis.execute('SET', rkey, json.dumps(prepared, cls=CustomJSONEncoder))
await redis.execute(
'SET', rkey, json.dumps(prepared, cls=CustomJSONEncoder)
)
return authors
else:
raise ValueError('Author not found')
@ -287,11 +299,7 @@ async def get_author_followers(_, _info, slug: str):
try:
with local_session() as session:
author_alias = aliased(Author)
author_id = (
session.query(author_alias.id)
.filter(author_alias.slug == slug)
.scalar()
)
author_id = session.query(author_alias.id).filter(author_alias.slug == slug)
if author_id:
cached = await redis.execute('GET', f'author:{author_id}:followers')
if not cached:

View File

@ -22,17 +22,17 @@ from services.logger import root_logger as logger
@query.field('get_my_shout')
@login_required
async def get_my_shout(_, info, shout_id: int):
with (local_session() as session):
with local_session() as session:
user_id = info.context.get('user_id', '')
if not user_id:
return {'error': 'unauthorized', 'shout': None}
shout = session.query(Shout).filter(
Shout.id == shout_id
).options(
joinedload(Shout.authors), joinedload(Shout.topics)
).filter(and_(
Shout.deleted_at.is_(None),
Shout.published_at.is_(None))).first()
shout = (
session.query(Shout)
.filter(Shout.id == shout_id)
.options(joinedload(Shout.authors), joinedload(Shout.topics))
.filter(and_(Shout.deleted_at.is_(None), Shout.published_at.is_(None)))
.first()
)
if not shout:
return {'error': 'no shout found', 'shout': None}
if not shout.published_at:
@ -138,7 +138,12 @@ async def create_shout(_, info, inp):
def patch_main_topic(session, main_topic, shout):
with session.begin():
shout = session.query(Shout).options(joinedload(Shout.topics)).filter(Shout.id == shout.id).first()
shout = (
session.query(Shout)
.options(joinedload(Shout.topics))
.filter(Shout.id == shout.id)
.first()
)
if not shout:
return
old_main_topic = (
@ -153,12 +158,18 @@ def patch_main_topic(session, main_topic, shout):
new_main_topic = (
session.query(ShoutTopic)
.filter(
and_(ShoutTopic.shout == shout.id, ShoutTopic.topic == main_topic.id)
and_(
ShoutTopic.shout == shout.id, ShoutTopic.topic == main_topic.id
)
)
.first()
)
if old_main_topic and new_main_topic and old_main_topic is not new_main_topic:
if (
old_main_topic
and new_main_topic
and old_main_topic is not new_main_topic
):
ShoutTopic.update(old_main_topic, {'main': False})
session.add(old_main_topic)
@ -224,12 +235,16 @@ async def update_shout(_, info, shout_id: int, shout_input=None, publish=False):
if not shout_by_id:
return {'error': 'shout not found'}
if slug != shout_by_id.slug:
same_slug_shout = session.query(Shout).filter(Shout.slug == slug).first()
same_slug_shout = (
session.query(Shout).filter(Shout.slug == slug).first()
)
c = 1
while same_slug_shout is not None:
c += 1
slug += f'-{c}'
same_slug_shout = session.query(Shout).filter(Shout.slug == slug).first()
same_slug_shout = (
session.query(Shout).filter(Shout.slug == slug).first()
)
shout_input['slug'] = slug
if (

View File

@ -13,8 +13,12 @@ from orm.community import Community
from orm.reaction import Reaction
from orm.shout import Shout, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower
from resolvers.stat import get_authors_with_stat_cached, author_follows_topics, author_follows_authors, \
get_topics_with_stat_cached
from resolvers.stat import (
get_authors_with_stat_cached,
author_follows_topics,
author_follows_authors,
get_topics_with_stat_cached,
)
from services.auth import login_required
from services.db import local_session
from services.cache import (
@ -35,29 +39,39 @@ async def follow(_, info, what, slug):
error = None
user_id = info.context.get('user_id')
if not user_id:
return {"error": "unauthorized"}
[follower] = await get_authors_with_stat_cached(select(Author).select_from(Author).filter(Author.user == user_id))
return {'error': 'unauthorized'}
[follower] = await get_authors_with_stat_cached(
select(Author).select_from(Author).filter(Author.user == user_id)
)
if not follower:
return {"error": "cant find follower"}
return {'error': 'cant find follower'}
if what == 'AUTHOR':
error = author_follow(follower.id, slug)
if not error:
logger.debug(f'@{follower.slug} followed @{slug}')
[author] = await get_authors_with_stat_cached(select(Author).select_from(Author).where(Author.slug == slug))
[author] = await get_authors_with_stat_cached(
select(Author).select_from(Author).where(Author.slug == slug)
)
if not author:
return {"error": "author is not found"}
follows = await update_follows_for_author(follower, 'author', author.dict(), True)
return {'error': 'author is not found'}
follows = await update_follows_for_author(
follower, 'author', author.dict(), True
)
_followers = await update_followers_for_author(follower, author, True)
await notify_follower(follower.dict(), author.id, 'unfollow')
elif what == 'TOPIC':
error = topic_follow(follower.id, slug)
if not error:
[topic] = await get_topics_with_stat_cached(select(Topic).where(Topic.slug == slug))
[topic] = await get_topics_with_stat_cached(
select(Topic).where(Topic.slug == slug)
)
if not topic:
return {"error": "topic is not found"}
follows = await update_follows_for_author(follower, 'topic', topic.dict(), True)
return {'error': 'topic is not found'}
follows = await update_follows_for_author(
follower, 'topic', topic.dict(), True
)
elif what == 'COMMUNITY':
follows = local_session().execute(select(Community))
@ -67,10 +81,12 @@ async def follow(_, info, what, slug):
if not error:
[shout] = local_session().execute(select(Shout).where(Shout.slug == slug))
if not shout:
return {"error": "cant find shout"}
follows = await update_follows_for_author(follower, 'shout', shout.dict(), True)
return {'error': 'cant find shout'}
follows = await update_follows_for_author(
follower, 'shout', shout.dict(), True
)
return {f'{what.lower()}s': follows, "error": error}
return {f'{what.lower()}s': follows, 'error': error}
@mutation.field('unfollow')
@ -80,31 +96,39 @@ async def unfollow(_, info, what, slug):
error = None
user_id = info.context.get('user_id')
if not user_id:
return {"error": "unauthorized"}
return {'error': 'unauthorized'}
follower_query = select(Author).filter(Author.user == user_id)
[follower] = await get_authors_with_stat_cached(follower_query)
if not follower:
return {"error": "follower profile is not found"}
return {'error': 'follower profile is not found'}
if what == 'AUTHOR':
error = author_unfollow(follower.id, slug)
if not error:
logger.info(f'@{follower.slug} unfollowing @{slug}')
[author] = await get_authors_with_stat_cached(select(Author).where(Author.slug == slug))
[author] = await get_authors_with_stat_cached(
select(Author).where(Author.slug == slug)
)
if not author:
return {"error": "cant find author"}
return {'error': 'cant find author'}
_followers = await update_followers_for_author(follower, author, False)
await notify_follower(follower.dict(), author.id, 'unfollow')
follows = await update_follows_for_author(follower, 'author', author.dict(), False)
follows = await update_follows_for_author(
follower, 'author', author.dict(), False
)
elif what == 'TOPIC':
error = topic_unfollow(follower.id, slug)
if not error:
logger.info(f'@{follower.slug} unfollowing §{slug}')
[topic] = await get_topics_with_stat_cached(select(Topic).where(Topic.slug == slug))
[topic] = await get_topics_with_stat_cached(
select(Topic).where(Topic.slug == slug)
)
if not topic:
return {"error": "cant find topic"}
follows = await update_follows_for_author(follower, 'topic', topic.dict(), False)
return {'error': 'cant find topic'}
follows = await update_follows_for_author(
follower, 'topic', topic.dict(), False
)
elif what == 'COMMUNITY':
follows = local_session().execute(select(Community))
@ -115,16 +139,18 @@ async def unfollow(_, info, what, slug):
logger.info(f'@{follower.slug} unfollowing §{slug}')
[shout] = local_session().execute(select(Shout).where(Shout.slug == slug))
if not shout:
return {"error": "cant find shout"}
return {'error': 'cant find shout'}
if not error:
follows = await update_follows_for_author(follower, 'shout', shout.dict(), False)
follows = await update_follows_for_author(
follower, 'shout', shout.dict(), False
)
return {'error': error, f'{what.lower()}s': follows}
async def get_follows_by_user_id(user_id: str):
if not user_id:
return {"error": "unauthorized"}
return {'error': 'unauthorized'}
author = await redis.execute('GET', f'user:{user_id}')
if isinstance(author, str):
author = json.loads(author)
@ -132,7 +158,7 @@ async def get_follows_by_user_id(user_id: str):
with local_session() as session:
author = session.query(Author).filter(Author.user == user_id).first()
if not author:
return {"error": "cant find author"}
return {'error': 'cant find author'}
author = author.dict()
last_seen = author.get('last_seen', 0) if isinstance(author, dict) else 0
follows = DEFAULT_FOLLOWS
@ -278,7 +304,9 @@ def author_unfollow(follower_id, slug):
flw = (
session.query(AuthorFollower)
.join(Author, Author.id == AuthorFollower.author)
.filter(and_(AuthorFollower.follower == follower_id, Author.slug == slug))
.filter(
and_(AuthorFollower.follower == follower_id, Author.slug == slug)
)
.first()
)
if flw:

View File

@ -122,9 +122,9 @@ def get_notifications_grouped(
if (groups_amount + offset) >= limit:
break
payload = json.loads(notification.payload.scalar())
payload = json.loads(str(notification.payload))
if notification.entity.scalar() == NotificationEntity.SHOUT.value:
if str(notification.entity) == NotificationEntity.SHOUT.value:
shout = payload
shout_id = shout.get('id')
author_id = shout.get('created_by')
@ -139,13 +139,13 @@ def get_notifications_grouped(
thread_id,
shout=shout,
authors=[author],
action=notification.action.scalar(),
entity=notification.entity.scalar(),
action=str(notification.action),
entity=str(notification.entity),
)
groups_by_thread[thread_id] = group
groups_amount += 1
elif notification.entity.scalar() == NotificationEntity.REACTION.value:
elif str(notification.entity) == NotificationEntity.REACTION.value:
reaction = payload
if not isinstance(shout, dict):
raise ValueError('reaction data is not consistent')
@ -153,7 +153,9 @@ def get_notifications_grouped(
author_id = shout.get('created_by', 0)
if shout_id and author_id:
with local_session() as session:
author = session.query(Author).filter(Author.id == author_id).first()
author = (
session.query(Author).filter(Author.id == author_id).first()
)
shout = session.query(Shout).filter(Shout.id == shout_id).first()
if shout and author:
author = author.dict()
@ -166,7 +168,9 @@ def get_notifications_grouped(
if existing_group:
existing_group['seen'] = False
existing_group['authors'].append(author_id)
existing_group['reactions'] = existing_group['reactions'] or []
existing_group['reactions'] = (
existing_group['reactions'] or []
)
existing_group['reactions'].append(reaction)
groups_by_thread[thread_id] = existing_group
else:
@ -175,21 +179,21 @@ def get_notifications_grouped(
authors=[author],
shout=shout,
reactions=[reaction],
entity=notification.entity.scalar(),
action=notification.action.scalar(),
entity=str(notification.entity),
action=str(notification.action),
)
if group:
groups_by_thread[thread_id] = group
groups_amount += 1
elif notification.entity.scalar() == 'follower':
elif str(notification.entity) == 'follower':
thread_id = 'followers'
follower = json.loads(payload)
group = groups_by_thread.get(thread_id)
if group:
if notification.action.scalar() == 'follow':
if str(notification.action) == 'follow':
group['authors'].append(follower)
elif notification.action.scalar() == 'unfollow':
elif str(notification.action) == 'unfollow':
follower_id = follower.get('id')
for author in group['authors']:
if author.get('id') == follower_id:
@ -199,8 +203,8 @@ def get_notifications_grouped(
group = group_notification(
thread_id,
authors=[follower],
entity=notification.entity.scalar(),
action=notification.action.scalar(),
entity=str(notification.entity),
action=str(notification.action),
)
groups_amount += 1
groups_by_thread[thread_id] = group
@ -305,11 +309,11 @@ async def notifications_seen_thread(_, info, thread: str, after: int):
)
exclude = set()
for nr in removed_reaction_notifications:
reaction = json.loads(nr.payload.scalar())
reaction = json.loads(str(nr.payload))
reaction_id = reaction.get('id')
exclude.add(reaction_id)
for n in new_reaction_notifications:
reaction = json.loads(n.payload.scalar())
reaction = json.loads(str(n.payload))
reaction_id = reaction.get('id')
if (
reaction_id not in exclude

View File

@ -430,8 +430,7 @@ async def reacted_shouts_updates(follower_id: int, limit=50, offset=0) -> List[S
.outerjoin(
Reaction,
and_(
Reaction.shout == Shout.id,
Reaction.created_by == follower_id,
Reaction.shout == Shout.id, Reaction.created_by == follower_id
),
)
.outerjoin(Author, Shout.authors.any(id=follower_id))

View File

@ -17,12 +17,11 @@ from services.logger import root_logger as logger
def query_shouts():
return select(Shout).options(joinedload(Shout.authors), joinedload(Shout.topics)).where(
and_(
Shout.published_at.is_not(None),
Shout.deleted_at.is_(None),
)
)
return (
select(Shout)
.options(joinedload(Shout.authors), joinedload(Shout.topics))
.where(and_(Shout.published_at.is_not(None), Shout.deleted_at.is_(None)))
)
def filter_my(info, session, q):
@ -104,7 +103,7 @@ async def get_shout(_, info, slug: str):
'reacted': reacted_stat,
'commented': commented_stat,
'rating': int(likes_stat or 0) - int(dislikes_stat or 0),
'last_comment': last_comment
'last_comment': last_comment,
}
for author_caption in (
@ -219,7 +218,7 @@ async def load_shouts_by(_, _info, options):
'reacted': reacted_stat,
'commented': commented_stat,
'rating': int(likes_stat) - int(dislikes_stat),
'last_comment': last_comment
'last_comment': last_comment,
}
shouts.append(shout)
@ -293,7 +292,7 @@ async def load_shouts_feed(_, info, options):
'reacted': reacted_stat,
'commented': commented_stat,
'rating': likes_stat - dislikes_stat,
'last_comment': last_comment
'last_comment': last_comment,
}
shouts.append(shout)
@ -313,7 +312,8 @@ async def load_shouts_search(_, _info, text, limit=50, offset=0):
@login_required
async def load_shouts_unrated(_, info, limit: int = 50, offset: int = 0):
q = query_shouts()
q = q.outerjoin(
q = (
q.outerjoin(
Reaction,
and_(
Reaction.shout == Shout.id,
@ -322,14 +322,16 @@ async def load_shouts_unrated(_, info, limit: int = 50, offset: int = 0):
[ReactionKind.LIKE.value, ReactionKind.DISLIKE.value]
),
),
).outerjoin(Author, Author.user == bindparam('user_id')).where(
)
.outerjoin(Author, Author.user == bindparam('user_id'))
.where(
and_(
Shout.deleted_at.is_(None),
Shout.layout.is_not(None),
or_(Author.id.is_(None), Reaction.created_by != Author.id),
)
)
)
# 3 or fewer votes is 0, 1, 2 or 3 votes (null, reaction id1, reaction id2, reaction id3)
q = q.having(func.count(distinct(Reaction.id)) <= 4)
@ -407,9 +409,9 @@ async def load_shouts_random_top(_, _info, options):
(aliased_reaction.kind == ReactionKind.LIKE.value, 1),
(aliased_reaction.kind == ReactionKind.DISLIKE.value, -1),
else_=0,
)
)
)
)
)
random_limit = options.get('random_limit', 100)
@ -432,7 +434,6 @@ async def load_shouts_random_top(_, _info, options):
return shouts
@query.field('load_shouts_random_topic')
async def load_shouts_random_topic(_, info, limit: int = 10):
[topic] = get_topics_random(None, None, 1)

View File

@ -20,26 +20,26 @@ def add_topic_stat_columns(q):
q = (
q.outerjoin(aliased_shout_topic, aliased_shout_topic.topic == Topic.id)
.add_columns(
func.count(distinct(aliased_shout_topic.shout)).label("shouts_stat")
func.count(distinct(aliased_shout_topic.shout)).label('shouts_stat')
)
.outerjoin(
aliased_shout_author,
aliased_shout_topic.shout == aliased_shout_author.shout,
)
.add_columns(
func.count(distinct(aliased_shout_author.author)).label("authors_stat")
func.count(distinct(aliased_shout_author.author)).label('authors_stat')
)
.outerjoin(aliased_topic_follower)
.add_columns(
func.count(distinct(aliased_topic_follower.follower)).label(
"followers_stat"
'followers_stat'
)
)
)
# Create a subquery for comments count
_sub_comments = (
select(
Shout.id, func.coalesce(func.count(Reaction.id), 0).label("comments_count")
Shout.id, func.coalesce(func.count(Reaction.id), 0).label('comments_count')
)
.join(
Reaction,
@ -70,23 +70,23 @@ def add_author_stat_columns(q):
q = q.outerjoin(aliased_shout_author, aliased_shout_author.author == Author.id)
q = q.add_columns(
func.count(distinct(aliased_shout_author.shout)).label("shouts_stat")
func.count(distinct(aliased_shout_author.shout)).label('shouts_stat')
)
q = q.outerjoin(aliased_authors, aliased_authors.follower == Author.id)
q = q.add_columns(
func.count(distinct(aliased_authors.author)).label("authors_stat")
func.count(distinct(aliased_authors.author)).label('authors_stat')
)
q = q.outerjoin(aliased_followers, aliased_followers.author == Author.id)
q = q.add_columns(
func.count(distinct(aliased_followers.follower)).label("followers_stat")
func.count(distinct(aliased_followers.follower)).label('followers_stat')
)
# Create a subquery for comments count
sub_comments = (
select(
Author.id, func.coalesce(func.count(Reaction.id), 0).label("comments_stat")
Author.id, func.coalesce(func.count(Reaction.id), 0).label('comments_stat')
)
.outerjoin(
Reaction,
@ -103,9 +103,7 @@ def add_author_stat_columns(q):
q = q.outerjoin(sub_comments, Author.id == sub_comments.c.id)
q = q.add_columns(sub_comments.c.comments_stat)
q = q.group_by(
Author.id, sub_comments.c.comments_stat
)
q = q.group_by(Author.id, sub_comments.c.comments_stat)
return q
@ -113,12 +111,43 @@ def add_author_stat_columns(q):
def add_author_ratings(q):
aliased_author = aliased(Author)
selection_list = [
aliased_author.id.label("author_id"),
func.count().filter(and_(Reaction.created_by == aliased_author.id,Reaction.kind == ReactionKind.COMMENT.value)).label("comments_count"),
func.sum(case((AuthorRating.plus == true(), 1), else_=0)).label("likes_count"),
func.sum(case((AuthorRating.plus != true(), 1), else_=0)).label("dislikes_count"),
func.sum(case((and_(Reaction.kind == ReactionKind.LIKE.value,Shout.authors.any(id=aliased_author.id)),1),else_=0)).label("shouts_likes"),
func.sum(case((and_(Reaction.kind == ReactionKind.DISLIKE.value,Shout.authors.any(id=aliased_author.id)),1),else_=0)).label("shouts_dislikes"),
aliased_author.id.label('author_id'),
func.count()
.filter(
and_(
Reaction.created_by == aliased_author.id,
Reaction.kind == ReactionKind.COMMENT.value,
)
)
.label('comments_count'),
func.sum(case((AuthorRating.plus == true(), 1), else_=0)).label('likes_count'),
func.sum(case((AuthorRating.plus != true(), 1), else_=0)).label(
'dislikes_count'
),
func.sum(
case(
(
and_(
Reaction.kind == ReactionKind.LIKE.value,
Shout.authors.any(id=aliased_author.id),
),
1,
),
else_=0,
)
).label('shouts_likes'),
func.sum(
case(
(
and_(
Reaction.kind == ReactionKind.DISLIKE.value,
Shout.authors.any(id=aliased_author.id),
),
1,
),
else_=0,
)
).label('shouts_dislikes'),
]
ratings_subquery = (
select(*selection_list)
@ -127,7 +156,7 @@ def add_author_ratings(q):
.outerjoin(Shout, Shout.authors.any(id=aliased_author.id))
.filter(Reaction.deleted_at.is_(None))
.group_by(aliased_author.id)
.alias("ratings_subquery")
.alias('ratings_subquery')
)
return q.join(ratings_subquery, Author.id == ratings_subquery.c.author_id)
@ -135,8 +164,8 @@ def add_author_ratings(q):
def get_with_stat(q):
try:
is_author = f"{q}".lower().startswith("select author")
is_topic = f"{q}".lower().startswith("select topic")
is_author = f'{q}'.lower().startswith('select author')
is_topic = f'{q}'.lower().startswith('select topic')
if is_author:
q = add_author_stat_columns(q)
# q = add_author_ratings(q) # TODO: move rating to cols down there
@ -149,11 +178,11 @@ def get_with_stat(q):
for cols in result:
entity = cols[0]
stat = dict()
stat["shouts"] = cols[1]
stat["authors"] = cols[2]
stat["followers"] = cols[3]
stat['shouts'] = cols[1]
stat['authors'] = cols[2]
stat['followers'] = cols[3]
if is_author:
stat["comments"] = cols[4]
stat['comments'] = cols[4]
# entity.stat['topics'] = cols[5]
# entity.stat['rating'] = cols[5] - cols[6]
# entity.stat['rating_shouts'] = cols[7] - cols[8]
@ -199,7 +228,7 @@ async def get_topics_with_stat_cached(q):
def author_follows_authors(author_id: int):
af = aliased(AuthorFollower, name="af")
af = aliased(AuthorFollower, name='af')
q = (
select(Author)
.select_from(join(Author, af, Author.id == af.author))

View File

@ -12,7 +12,7 @@ from services.memorycache import cache_region
@query.field('get_topics_all')
def get_topics_all(_, _info):
cache_key = "get_topics_all"
cache_key = 'get_topics_all'
@cache_region.cache_on_arguments(cache_key)
def _get_topics_all():
@ -23,7 +23,7 @@ def get_topics_all(_, _info):
@query.field('get_topics_by_community')
def get_topics_by_community(_, _info, community_id: int):
cache_key = f"get_topics_by_community_{community_id}"
cache_key = f'get_topics_by_community_{community_id}'
@cache_region.cache_on_arguments(cache_key)
def _get_topics_by_community():
@ -33,7 +33,6 @@ def get_topics_by_community(_, _info, community_id: int):
return _get_topics_by_community()
@query.field('get_topics_by_author')
async def get_topics_by_author(_, _info, author_id=0, slug='', user=''):
q = select(Topic)

View File

@ -14,21 +14,21 @@ from services.logger import root_logger as logger
DEFAULT_FOLLOWS = {
"topics": [],
"authors": [],
"communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}],
'topics': [],
'authors': [],
'communities': [{'id': 1, 'name': 'Дискурс', 'slug': 'discours', 'pic': ''}],
}
async def set_author_cache(author: dict):
payload = json.dumps(author, cls=CustomJSONEncoder)
await redis.execute("SET", f'user:{author.get("user")}', payload)
await redis.execute("SET", f'author:{author.get("id")}', payload)
await redis.execute('SET', f'user:{author.get("user")}', payload)
await redis.execute('SET', f'author:{author.get("id")}', payload)
async def set_topic_cache(topic: dict):
payload = json.dumps(topic, cls=CustomJSONEncoder)
await redis.execute("SET", f'topic:{topic.get("id")}', payload)
await redis.execute('SET', f'topic:{topic.get("id")}', payload)
async def update_author_followers_cache(author_id: int, followers):
@ -36,7 +36,7 @@ async def update_author_followers_cache(author_id: int, followers):
[f.dict() if isinstance(f, Author) else f for f in followers],
cls=CustomJSONEncoder,
)
await redis.execute("SET", f"author:{author_id}:followers", payload)
await redis.execute('SET', f'author:{author_id}:followers', payload)
async def set_follows_topics_cache(follows, author_id: int):
@ -45,7 +45,7 @@ async def set_follows_topics_cache(follows, author_id: int):
[a.dict() if isinstance(a, Author) else a for a in follows],
cls=CustomJSONEncoder,
)
await redis.execute("SET", f"author:{author_id}:follows-topics", payload)
await redis.execute('SET', f'author:{author_id}:follows-topics', payload)
except Exception as exc:
logger.error(exc)
import traceback
@ -60,7 +60,7 @@ async def set_follows_authors_cache(follows, author_id: int):
[a.dict() if isinstance(a, Author) else a for a in follows],
cls=CustomJSONEncoder,
)
await redis.execute("SET", f"author:{author_id}:follows-authors", payload)
await redis.execute('SET', f'author:{author_id}:follows-authors', payload)
except Exception as exc:
import traceback
@ -73,31 +73,31 @@ async def update_follows_for_author(
follower: Author, entity_type: str, entity: dict, is_insert: bool
):
follows = []
redis_key = f"author:{follower.id}:follows-{entity_type}s"
follows_str = await redis.execute("GET", redis_key)
redis_key = f'author:{follower.id}:follows-{entity_type}s'
follows_str = await redis.execute('GET', redis_key)
if isinstance(follows_str, str):
follows = json.loads(follows_str)
if is_insert:
follows.append(entity)
else:
entity_id = entity.get("id")
entity_id = entity.get('id')
if not entity_id:
raise Exception("wrong entity")
raise Exception('wrong entity')
# Remove the entity from follows
follows = [e for e in follows if e["id"] != entity_id]
follows = [e for e in follows if e['id'] != entity_id]
logger.debug(f'{entity['slug']} removed from what @{follower.slug} follows')
if entity_type == "topic":
await set_follows_topics_cache(follows, follower.id.scalar())
if entity_type == "author":
await set_follows_authors_cache(follows, follower.id.scalar())
if entity_type == 'topic':
await set_follows_topics_cache(follows, follower.id)
if entity_type == 'author':
await set_follows_authors_cache(follows, follower.id)
return follows
async def update_followers_for_author(
follower: Author, author: Author, is_insert: bool
):
redis_key = f"author:{author.id}:followers"
followers_str = await redis.execute("GET", redis_key)
redis_key = f'author:{author.id}:followers'
followers_str = await redis.execute('GET', redis_key)
followers = []
if isinstance(followers_str, str):
followers = json.loads(followers_str)
@ -105,8 +105,8 @@ async def update_followers_for_author(
followers.append(follower)
else:
# Remove the entity from followers
followers = [e for e in followers if e["id"] != author.id]
await update_author_followers_cache(author.id.scalar(), followers)
followers = [e for e in followers if e['id'] != author.id]
await update_author_followers_cache(author.id, followers)
return followers
@ -136,8 +136,9 @@ def after_reaction_update(mapper, connection, reaction: Reaction):
select(author_subquery.subquery())
.select_from(author_subquery.subquery())
.union(
select(replied_author_subquery.subquery())
.select_from(replied_author_subquery.subquery())
select(replied_author_subquery.subquery()).select_from(
replied_author_subquery.subquery()
)
)
)
@ -166,25 +167,25 @@ def after_author_update(_mapper, _connection, author: Author):
def after_topic_follower_insert(_mapper, _connection, target: TopicFollower):
asyncio.create_task(
handle_topic_follower_change(target.topic.scalar(), target.follower.scalar(), True)
handle_topic_follower_change(target.topic, target.follower, True)
)
def after_topic_follower_delete(_mapper, _connection, target: TopicFollower):
asyncio.create_task(
handle_topic_follower_change(target.topic.scalar(), target.follower.scalar(), False)
handle_topic_follower_change(target.topic, target.follower, False)
)
def after_author_follower_insert(_mapper, _connection, target: AuthorFollower):
asyncio.create_task(
handle_author_follower_change(target.author.scalar(), target.follower.scalar(), True)
handle_author_follower_change(target.author, target.follower, True)
)
def after_author_follower_delete(_mapper, _connection, target: AuthorFollower):
asyncio.create_task(
handle_author_follower_change(target.author.scalar(), target.follower.scalar(), False)
handle_author_follower_change(target.author, target.follower, False)
)
@ -198,24 +199,24 @@ async def handle_author_follower_change(
if follower and author:
_ = asyncio.create_task(set_author_cache(author.dict()))
follows_authors = await redis.execute(
"GET", f"author:{follower_id}:follows-authors"
'GET', f'author:{follower_id}:follows-authors'
)
if isinstance(follows_authors, str):
follows_authors = json.loads(follows_authors)
if not any(x.get("id") == author.id for x in follows_authors):
if not any(x.get('id') == author.id for x in follows_authors):
follows_authors.append(author.dict())
_ = asyncio.create_task(set_follows_authors_cache(follows_authors, follower_id))
_ = asyncio.create_task(set_author_cache(follower.dict()))
await update_follows_for_author(
follower,
"author",
'author',
{
"id": author.id,
"name": author.name,
"slug": author.slug,
"pic": author.pic,
"bio": author.bio,
"stat": author.stat,
'id': author.id,
'name': author.name,
'slug': author.slug,
'pic': author.pic,
'bio': author.bio,
'stat': author.stat,
},
is_insert,
)
@ -231,41 +232,41 @@ async def handle_topic_follower_change(
if follower and topic:
_ = asyncio.create_task(set_author_cache(follower.dict()))
follows_topics = await redis.execute(
"GET", f"author:{follower_id}:follows-topics"
'GET', f'author:{follower_id}:follows-topics'
)
if isinstance(follows_topics, str):
follows_topics = json.loads(follows_topics)
if not any(x.get("id") == topic.id for x in follows_topics):
if not any(x.get('id') == topic.id for x in follows_topics):
follows_topics.append(topic)
_ = asyncio.create_task(set_follows_topics_cache(follows_topics, follower_id))
await update_follows_for_author(
follower,
"topic",
'topic',
{
"id": topic.id,
"title": topic.title,
"slug": topic.slug,
"body": topic.body,
"stat": topic.stat,
'id': topic.id,
'title': topic.title,
'slug': topic.slug,
'body': topic.body,
'stat': topic.stat,
},
is_insert,
)
def events_register():
event.listen(Shout, "after_insert", after_shout_update)
event.listen(Shout, "after_update", after_shout_update)
event.listen(Shout, 'after_insert', after_shout_update)
event.listen(Shout, 'after_update', after_shout_update)
event.listen(Reaction, "after_insert", after_reaction_update)
event.listen(Reaction, "after_update", after_reaction_update)
event.listen(Reaction, 'after_insert', after_reaction_update)
event.listen(Reaction, 'after_update', after_reaction_update)
event.listen(Author, "after_insert", after_author_update)
event.listen(Author, "after_update", after_author_update)
event.listen(Author, 'after_insert', after_author_update)
event.listen(Author, 'after_update', after_author_update)
event.listen(AuthorFollower, "after_insert", after_author_follower_insert)
event.listen(AuthorFollower, "after_delete", after_author_follower_delete)
event.listen(AuthorFollower, 'after_insert', after_author_follower_insert)
event.listen(AuthorFollower, 'after_delete', after_author_follower_delete)
event.listen(TopicFollower, "after_insert", after_topic_follower_insert)
event.listen(TopicFollower, "after_delete", after_topic_follower_delete)
event.listen(TopicFollower, 'after_insert', after_topic_follower_insert)
event.listen(TopicFollower, 'after_delete', after_topic_follower_delete)
logger.info('cache events were registered!')

View File

@ -101,7 +101,11 @@ def after_cursor_execute(conn, cursor, statement, parameters, context, executema
if hasattr(conn, 'query_start_time'):
elapsed = time.time() - conn.query_start_time
conn.query_start_time = None
query = f'{statement} % {parameters}' if parameters else f'{statement}'.replace('\n', ' ')
query = (
f'{statement} % {parameters}'
if parameters
else f'{statement}'.replace('\n', ' ')
)
if elapsed > 1 and conn.executed_statement != conn.statement:
conn.executed_statement = conn.statement
logger.debug(f"\n{query}\n{'*' * math.floor(elapsed)} {elapsed:.3f} s\n")

View File

@ -122,7 +122,9 @@ class SearchService:
async def recreate_index(self):
if self.client:
async with self.lock:
self.client.indices.delete(index=self.index_name, ignore_unavailable=True)
self.client.indices.delete(
index=self.index_name, ignore_unavailable=True
)
await self.check_index()
def index(self, shout):
@ -146,7 +148,12 @@ class SearchService:
# Use Redis as cache with TTL
redis_key = f'search:{text}'
await redis.execute('SETEX', redis_key, REDIS_TTL, json.dumps(results, cls=CustomJSONEncoder))
await redis.execute(
'SETEX',
redis_key,
REDIS_TTL,
json.dumps(results, cls=CustomJSONEncoder),
)
return []

View File

@ -42,7 +42,6 @@ class ViewedStorage:
"""Подключение к клиенту Google Analytics с использованием аутентификации"""
self = ViewedStorage
async with self.lock:
# Загрузка предварительно подсчитанных просмотров из файла JSON
self.load_precounted_views()
@ -66,7 +65,9 @@ class ViewedStorage:
try:
if os.path.exists(VIEWS_FILEPATH):
self.file_modification_timestamp = os.path.getmtime(VIEWS_FILEPATH)
self.start_date = datetime.fromtimestamp(self.file_modification_timestamp).strftime('%Y-%m-%d')
self.start_date = datetime.fromtimestamp(
self.file_modification_timestamp
).strftime('%Y-%m-%d')
now_date = datetime.now().strftime('%Y-%m-%d')
if now_date == self.start_date:
@ -83,7 +84,7 @@ class ViewedStorage:
f' * {len(precounted_views)} публикаций с просмотрами успешно загружены.'
)
else:
logger.info(" * Файл просмотров не найден.")
logger.info(' * Файл просмотров не найден.')
except Exception as e:
logger.error(f'Ошибка загрузки предварительно подсчитанных просмотров: {e}')