bye-following-manageer
All checks were successful
Deploy to core / deploy (push) Successful in 1m43s

This commit is contained in:
Untone 2024-02-03 17:00:48 +03:00
parent 53ceac108f
commit 1b4315fcce
3 changed files with 22 additions and 211 deletions

View File

@ -13,7 +13,6 @@ from sentry_sdk.integrations.starlette import StarletteIntegration
from starlette.applications import Starlette from starlette.applications import Starlette
from starlette.routing import Route from starlette.routing import Route
from services.following import FollowingManager
from services.rediscache import redis from services.rediscache import redis
from services.schema import resolvers from services.schema import resolvers
from services.search import search_service from services.search import search_service
@ -36,8 +35,6 @@ async def start_up():
# start viewed service # start viewed service
await ViewedStorage.init() await ViewedStorage.init()
# preload following data
await FollowingManager.preload()
# start search service # start search service
search_service.info() search_service.info()

View File

@ -1,11 +1,10 @@
import logging import logging
from typing import List from typing import List
from sqlalchemy.orm import aliased from sqlalchemy.orm import selectinload
from sqlalchemy.sql import and_ from sqlalchemy.sql import and_
from orm.author import Author, AuthorFollower from orm.author import Author, AuthorFollower
from orm.community import Community
from orm.reaction import Reaction from orm.reaction import Reaction
from orm.shout import Shout, ShoutReactionsFollower from orm.shout import Shout, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
@ -13,7 +12,6 @@ from resolvers.community import community_follow, community_unfollow
from resolvers.topic import topic_follow, topic_unfollow from resolvers.topic import topic_follow, topic_unfollow
from services.auth import login_required from services.auth import login_required
from services.db import local_session from services.db import local_session
from services.following import FollowingManager, FollowingResult
from services.notify import notify_follower from services.notify import notify_follower
from services.schema import mutation, query from services.schema import mutation, query
@ -34,23 +32,15 @@ async def follow(_, info, what, slug):
follower_id = actor.id follower_id = actor.id
if what == 'AUTHOR': if what == 'AUTHOR':
if author_follow(follower_id, slug): if author_follow(follower_id, slug):
result = FollowingResult('NEW', 'author', slug)
await FollowingManager.push('author', result)
author = session.query(Author.id).where(Author.slug == slug).one() author = session.query(Author.id).where(Author.slug == slug).one()
follower = session.query(Author).where(Author.id == follower_id).one() follower = session.query(Author).where(Author.id == follower_id).one()
await notify_follower(follower.dict(), author.id) await notify_follower(follower.dict(), author.id)
elif what == 'TOPIC': elif what == 'TOPIC':
if topic_follow(follower_id, slug): topic_follow(follower_id, slug)
result = FollowingResult('NEW', 'topic', slug)
await FollowingManager.push('topic', result)
elif what == 'COMMUNITY': elif what == 'COMMUNITY':
if community_follow(follower_id, slug): community_follow(follower_id, slug)
result = FollowingResult('NEW', 'community', slug)
await FollowingManager.push('community', result)
elif what == 'REACTIONS': elif what == 'REACTIONS':
if reactions_follow(follower_id, slug): reactions_follow(follower_id, slug)
result = FollowingResult('NEW', 'shout', slug)
await FollowingManager.push('shout', result)
except Exception as e: except Exception as e:
logger.debug(info, what, slug) logger.debug(info, what, slug)
logger.error(e) logger.error(e)
@ -70,23 +60,15 @@ async def unfollow(_, info, what, slug):
follower_id = actor.id follower_id = actor.id
if what == 'AUTHOR': if what == 'AUTHOR':
if author_unfollow(follower_id, slug): if author_unfollow(follower_id, slug):
result = FollowingResult('DELETED', 'author', slug)
await FollowingManager.push('author', result)
author = session.query(Author.id).where(Author.slug == slug).one() author = session.query(Author.id).where(Author.slug == slug).one()
follower = session.query(Author).where(Author.id == follower_id).one() follower = session.query(Author).where(Author.id == follower_id).one()
await notify_follower(follower.dict(), author.id, 'unfollow') await notify_follower(follower.dict(), author.id, 'unfollow')
elif what == 'TOPIC': elif what == 'TOPIC':
if topic_unfollow(follower_id, slug): topic_unfollow(follower_id, slug)
result = FollowingResult('DELETED', 'topic', slug)
await FollowingManager.push('topic', result)
elif what == 'COMMUNITY': elif what == 'COMMUNITY':
if community_unfollow(follower_id, slug): community_unfollow(follower_id, slug)
result = FollowingResult('DELETED', 'community', slug)
await FollowingManager.push('community', result)
elif what == 'REACTIONS': elif what == 'REACTIONS':
if reactions_unfollow(follower_id, slug): reactions_unfollow(follower_id, slug)
result = FollowingResult('DELETED', 'shout', slug)
await FollowingManager.push('shout', result)
except Exception as e: except Exception as e:
return {'error': str(e)} return {'error': str(e)}
@ -97,31 +79,35 @@ async def unfollow(_, info, what, slug):
@login_required @login_required
async def get_my_followed(_, info): async def get_my_followed(_, info):
user_id = info.context['user_id'] user_id = info.context['user_id']
topics = set()
authors = set()
communities = [] communities = []
with local_session() as session: with local_session() as session:
author = session.query(Author).filter(Author.user == user_id).first() author = session.query(Author).filter(Author.user == user_id).first()
if isinstance(author, Author): if isinstance(author, Author):
author_id = author.id author_id = author.id
aliased_author = aliased(Author)
# Using joinedload to eagerly load AuthorFollower and Author data
authors_query = ( authors_query = (
session.query(aliased_author, AuthorFollower) session.query(Author)
.join(AuthorFollower, AuthorFollower.follower == author_id) .join(AuthorFollower, AuthorFollower.follower == author_id)
.filter(AuthorFollower.author == aliased_author.id) .options(selectinload(Author.followers))
.all()
) )
# Using joinedload to eagerly load TopicFollower and Topic data
topics_query = ( topics_query = (
session.query(Topic, TopicFollower) session.query(Topic)
.join(TopicFollower, TopicFollower.follower == author_id) .join(TopicFollower, TopicFollower.follower == author_id)
.filter(TopicFollower.topic == Topic.id) .options(selectinload(Topic.followers))
.all()
) )
authors = set(session.execute(authors_query).scalars()) # No need for a separate query for communities as it's fetched directly
topics = set(session.execute(topics_query).scalars()) communities = author.communities
communities = session.query(Community).all()
return {'topics': topics_query, 'authors': authors_query, 'communities': communities}
return {'topics': list(topics), 'authors': list(authors), 'communities': communities}
@query.field('get_shout_followers') @query.field('get_shout_followers')

View File

@ -1,172 +0,0 @@
import asyncio
import logging
import time
from sqlalchemy import and_, joinedload
from orm.author import Author, AuthorFollower
from orm.shout import Shout, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower
from services.db import local_session
logger = logging.getLogger('[services.following] ')
logger.setLevel(logging.DEBUG)
MODEL_CLASSES = {'author': AuthorFollower, 'topic': TopicFollower, 'shout': ShoutReactionsFollower}
class FollowingResult:
def __init__(self, event, kind, payload):
self.event = event
self.kind = kind
self.payload = payload
class Following:
def __init__(self, kind, uid):
self.kind = kind # author, topic, shout
self.uid = uid
self.queue = asyncio.Queue()
class FollowingManager:
lock = asyncio.Lock()
followers_by_kind = None
authors_by_follower = None
topics_by_follower = None
shouts_by_follower = None
authors_by_id = None
shouts_by_id = None
topics_by_id = None
@staticmethod
async def preload():
ts = time.time()
async with FollowingManager.lock:
followers_by_kind = {'author': {}, 'topic': {}, 'shout': {}}
authors_by_follower = {}
topics_by_follower = {}
shouts_by_follower = {}
authors_by_id = {}
topics_by_id = {}
shouts_by_id = {}
with local_session() as session:
all_authors = session.query(Author).all()
for author in all_authors:
authors_by_id[author.id] = author
all_topics = session.query(Topic).all()
for topic in all_topics:
topics_by_id[topic.id] = topic
all_shouts = session.query(Shout).filter(and_(Shout.published_at.is_not(None), Shout.deleted_at.is_(None))).all()
for shout in all_shouts:
shouts_by_id[shout.id] = shout
for kind in followers_by_kind.keys():
model_class = MODEL_CLASSES[kind]
followings = (
session.query(model_class.follower)
.distinct()
.options(joinedload(model_class.follower))
.all()
)
for following in followings:
if kind == 'topic':
followers_by_kind[kind][following.topic] = followers_by_kind[kind].get(following.topic, set())
followers_by_kind[kind][following.topic].add(following.follower)
elif kind == 'author':
followers_by_kind[kind][following.author] = followers_by_kind[kind].get(following.author, set())
followers_by_kind[kind][following.author].add(following.follower)
elif kind == 'shout':
followers_by_kind[kind][following.shout] = followers_by_kind[kind].get(following.shout, set())
followers_by_kind[kind][following.shout].add(following.follower)
# Load authors_by_follower, topics_by_follower, and shouts_by_follower
for entity_kind in followers_by_kind.keys():
followers_dict = followers_by_kind[entity_kind]
if followers_dict:
entity_class = MODEL_CLASSES[entity_kind]
followings = (
session.query(entity_class)
.options(joinedload(entity_class.follower), joinedload(entity_class.entity))
.all()
)
for following in followings:
follower_id = following.follower.id
entity_id = following.entity.id
followers_dict.setdefault(follower_id, set()).add(entity_id)
# Assign the loaded dictionaries to the class attributes
FollowingManager.authors_by_follower = authors_by_follower
FollowingManager.topics_by_follower = topics_by_follower
FollowingManager.shouts_by_follower = shouts_by_follower
FollowingManager.authors_by_id = authors_by_id
FollowingManager.topics_by_id = topics_by_id
FollowingManager.shouts_by_id = shouts_by_id
logger.info(f' preloaded in {time.time() - ts} msec')
@staticmethod
async def register(entity_kind, entity_id, follower_id):
self = FollowingManager
try:
async with self.lock:
if isinstance(self.authors_by_id, dict):
follower = self.authors_by_id.get(follower_id)
if follower and self.followers_by_kind:
self.followers_by_kind[entity_kind][entity_id] = self.followers_by_kind[entity_kind].get(entity_id, set())
self.followers_by_kind[entity_kind][entity_id].add(follower)
if entity_kind == 'author' and self.authors_by_follower and self.authors_by_id:
author = self.authors_by_id.get(entity_id)
self.authors_by_follower.setdefault(follower_id, set()).add(author)
if entity_kind == 'topic' and self.topics_by_follower and self.topics_by_id:
topic = self.topics_by_id.get(entity_id)
self.topics_by_follower.setdefault(follower_id, set()).add(topic)
if entity_kind == 'shout' and self.shouts_by_follower and self.shouts_by_id:
shout = self.shouts_by_id.get(entity_id)
self.shouts_by_follower.setdefault(follower_id, set()).add(shout)
except Exception as exc:
logger.warn(exc)
@staticmethod
async def remove(entity_kind, entity_id, follower_id):
self = FollowingManager
async with self.lock:
if self.followers_by_kind and entity_kind in self.followers_by_kind and entity_id in self.followers_by_kind[entity_kind]:
try:
del self.followers_by_kind[entity_kind][entity_id]
if entity_kind == 'author' and self.authors_by_follower:
del self.authors_by_follower[follower_id][entity_id]
elif entity_kind == 'topic' and self.topics_by_follower:
del self.topics_by_follower[follower_id][entity_id]
elif entity_kind == 'shout' and self.shouts_by_follower:
del self.shouts_by_follower[follower_id][entity_id]
except Exception as exc:
logger.warn(exc)
if isinstance(self.authors_by_id, dict):
follower = self.authors_by_id.get(follower_id)
if follower:
self.followers_by_kind[entity_kind][entity_id].remove(follower)
@staticmethod
async def get_followers_by_kind(kind, target_id=None):
async with FollowingManager.lock:
if FollowingManager.followers_by_kind:
return (
FollowingManager.followers_by_kind[kind] if target_id is None else {target_id}
)
@staticmethod
async def get_authors_for(follower_id):
async with FollowingManager.lock:
if FollowingManager.authors_by_follower:
return FollowingManager.authors_by_follower.get(follower_id, set())
@staticmethod
async def get_topics_for(follower_id):
async with FollowingManager.lock:
if FollowingManager.topics_by_follower:
return FollowingManager.topics_by_follower.get(follower_id, set())
@staticmethod
async def get_shouts_for(follower_id):
async with FollowingManager.lock:
if FollowingManager.shouts_by_follower:
return FollowingManager.shouts_by_follower.get(follower_id, set())