batch-load-fix
Some checks failed
Deploy to core / deploy (push) Failing after 16m25s

This commit is contained in:
Untone 2024-02-21 20:12:47 +03:00
parent 784f790b83
commit 1f0d5ae8e8
4 changed files with 112 additions and 65 deletions

View File

@ -11,14 +11,10 @@ from orm.author import Author, AuthorFollower
from orm.reaction import Reaction from orm.reaction import Reaction
from orm.shout import Shout, ShoutReactionsFollower from orm.shout import Shout, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from resolvers.author import add_author_stat_columns from resolvers.stat import add_author_stat_columns
from resolvers.community import community_follow, community_unfollow from resolvers.community import community_follow, community_unfollow
from resolvers.topic import ( from resolvers.topic import topic_follow, topic_unfollow
topic_follow, from resolvers.stat import add_topic_stat_columns, get_topics_from_query
topic_unfollow,
add_topic_stat_columns,
get_topics_from_query,
)
from services.auth import login_required from services.auth import login_required
from services.db import local_session from services.db import local_session
from services.notify import notify_follower from services.notify import notify_follower

83
resolvers/stat.py Normal file
View File

@ -0,0 +1,83 @@
from sqlalchemy import func, distinct
from sqlalchemy.orm import aliased
from orm.author import Author, AuthorFollower
from orm.shout import ShoutAuthor, ShoutTopic
from orm.topic import Topic, TopicFollower
from services.db import local_session
# from services.viewed import ViewedStorage
def add_author_stat_columns(q):
shout_author_aliased = aliased(ShoutAuthor)
q = q.outerjoin(shout_author_aliased).add_columns(
func.count(distinct(shout_author_aliased.shout)).label('shouts_stat')
)
followers_table = aliased(AuthorFollower)
q = q.outerjoin(followers_table, followers_table.author == Author.id).add_columns(
func.count(distinct(followers_table.follower)).label('followers_stat')
)
followings_table = aliased(AuthorFollower)
q = q.outerjoin(
followings_table, followings_table.follower == Author.id
).add_columns(func.count(distinct(followers_table.author)).label('followings_stat'))
q = q.group_by(Author.id)
return q
async def get_authors_from_query(q):
authors = []
with local_session() as session:
for [author, shouts_stat, followers_stat, followings_stat] in session.execute(
q
):
author.stat = {
'shouts': shouts_stat,
'followers': followers_stat,
'followings': followings_stat,
# viewed
}
authors.append(author)
return authors
def add_topic_stat_columns(q):
aliased_shout_author = aliased(ShoutAuthor)
aliased_topic_follower = aliased(TopicFollower)
q = (
q.outerjoin(ShoutTopic, Topic.id == ShoutTopic.topic)
.add_columns(func.count(distinct(ShoutTopic.shout)).label('shouts_stat'))
.outerjoin(aliased_shout_author, ShoutTopic.shout == aliased_shout_author.shout)
.add_columns(
func.count(distinct(aliased_shout_author.author)).label('authors_stat')
)
.outerjoin(aliased_topic_follower)
.add_columns(
func.count(distinct(aliased_topic_follower.follower)).label(
'followers_stat'
)
)
)
q = q.group_by(Topic.id)
return q
async def get_topics_from_query(q):
topics = []
with local_session() as session:
for [topic, shouts_stat, authors_stat, followers_stat] in session.execute(q):
topic.stat = {
'shouts': shouts_stat,
'authors': authors_stat,
'followers': followers_stat,
# 'viewed': await ViewedStorage.get_topic(topic.slug),
}
topics.append(topic)
return topics

View File

@ -1,55 +1,15 @@
from sqlalchemy import and_, distinct, func, select from sqlalchemy import and_, distinct, func, select
from sqlalchemy.orm import aliased
from orm.author import Author from orm.author import Author
from orm.shout import ShoutAuthor, ShoutTopic from orm.shout import ShoutTopic
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from resolvers.stat import add_topic_stat_columns, get_topics_from_query
from services.auth import login_required from services.auth import login_required
from services.db import local_session from services.db import local_session
from services.schema import mutation, query from services.schema import mutation, query
from services.viewed import ViewedStorage
from services.logger import root_logger as logger from services.logger import root_logger as logger
def add_topic_stat_columns(q):
aliased_shout_author = aliased(ShoutAuthor)
aliased_topic_follower = aliased(TopicFollower)
q = (
q.outerjoin(ShoutTopic, Topic.id == ShoutTopic.topic)
.add_columns(func.count(distinct(ShoutTopic.shout)).label('shouts_stat'))
.outerjoin(aliased_shout_author, ShoutTopic.shout == aliased_shout_author.shout)
.add_columns(
func.count(distinct(aliased_shout_author.author)).label('authors_stat')
)
.outerjoin(aliased_topic_follower)
.add_columns(
func.count(distinct(aliased_topic_follower.follower)).label(
'followers_stat'
)
)
)
q = q.group_by(Topic.id)
return q
async def get_topics_from_query(q):
topics = []
with local_session() as session:
for [topic, shouts_stat, authors_stat, followers_stat] in session.execute(q):
topic.stat = {
'shouts': shouts_stat,
'authors': authors_stat,
'followers': followers_stat,
'viewed': await ViewedStorage.get_topic(topic.slug),
}
topics.append(topic)
return topics
@query.field('get_topics_all') @query.field('get_topics_all')
async def get_topics_all(_, _info): async def get_topics_all(_, _info):
q = select(Topic) q = select(Topic)

View File

@ -4,12 +4,12 @@ import json
from orm.author import Author, AuthorFollower from orm.author import Author, AuthorFollower
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from resolvers.author import add_author_stat_columns, get_author_follows from resolvers.stat import add_author_stat_columns, add_topic_stat_columns
from resolvers.topic import add_topic_stat_columns from resolvers.author import get_author_follows
from services.logger import root_logger as logger from services.logger import root_logger as logger
from services.db import local_session from services.db import local_session
from services.rediscache import redis from services.rediscache import redis
from services.viewed import ViewedStorage # from services.viewed import ViewedStorage
@event.listens_for(Author, 'after_insert') @event.listens_for(Author, 'after_insert')
@ -94,7 +94,7 @@ async def handle_author_follower_change(connection, author_id, follower_id, is_i
).first() ).first()
author.stat = { author.stat = {
'shouts': shouts_stat, 'shouts': shouts_stat,
'viewed': await ViewedStorage.get_author(author.slug), # 'viewed': await ViewedStorage.get_author(author.slug),
'followers': followers_stat, 'followers': followers_stat,
'followings': followings_stat, 'followings': followings_stat,
} }
@ -122,14 +122,12 @@ async def handle_topic_follower_change(connection, topic_id, follower_id, is_ins
q = select(Topic).filter(Topic.id == topic_id) q = select(Topic).filter(Topic.id == topic_id)
q = add_topic_stat_columns(q) q = add_topic_stat_columns(q)
async with connection.begin() as conn: async with connection.begin() as conn:
[topic, shouts_stat, authors_stat, followers_stat] = await conn.execute( [topic, shouts_stat, authors_stat, followers_stat] = await conn.execute(q).first()
q
).first()
topic.stat = { topic.stat = {
'shouts': shouts_stat, 'shouts': shouts_stat,
'authors': authors_stat, 'authors': authors_stat,
'followers': followers_stat, 'followers': followers_stat,
'viewed': await ViewedStorage.get_topic(topic.slug), # 'viewed': await ViewedStorage.get_topic(topic.slug),
} }
follower = connection.execute( follower = connection.execute(
select(Author).filter(Author.id == follower_id) select(Author).filter(Author.id == follower_id)
@ -162,14 +160,24 @@ class FollowsCached:
q = select(Author) q = select(Author)
q = add_author_stat_columns(q) q = add_author_stat_columns(q)
authors = session.execute(q) authors = session.execute(q)
for i in range(0, len(authors), BATCH_SIZE): while True:
batch_authors = authors[i : i + BATCH_SIZE] batch = authors.fetchmany(BATCH_SIZE)
await asyncio.gather( if not batch:
*[ break
FollowsCached.update_author_cache(author) else:
for author in batch_authors for [author, shouts_stat, followers_stat, followings_stat] in batch:
] await redis.execute('SET', f'user:{author.user}:author', json.dumps({
) 'id': author.id,
'name': author.name,
'slug': author.slug,
'pic': author.pic,
'bio': author.bio,
'stat': {
'followings': followings_stat,
'shouts': shouts_stat,
'followers': followers_stat
},
}))
@staticmethod @staticmethod
async def update_author_cache(author: Author): async def update_author_cache(author: Author):