following-manager-upgrade
All checks were successful
Deploy to core / deploy (push) Successful in 1m42s

This commit is contained in:
Untone 2024-02-03 12:48:36 +03:00
parent 7f04eba208
commit 83390912e9
2 changed files with 91 additions and 16 deletions

View File

@ -13,6 +13,7 @@ from sentry_sdk.integrations.starlette import StarletteIntegration
from starlette.applications import Starlette from starlette.applications import Starlette
from starlette.routing import Route from starlette.routing import Route
from services.following import FollowingManager
from services.rediscache import redis from services.rediscache import redis
from services.schema import resolvers from services.schema import resolvers
from services.search import search_service from services.search import search_service
@ -35,6 +36,9 @@ async def start_up():
# start viewed service # start viewed service
await ViewedStorage.init() await ViewedStorage.init()
# preload following data
await FollowingManager.preload()
# start search service # start search service
search_service.info() search_service.info()

View File

@ -1,4 +1,18 @@
import asyncio import asyncio
import logging
import time
from orm.author import AuthorFollower
from orm.shout import ShoutReactionsFollower
from orm.topic import TopicFollower
from services.db import local_session
logger = logging.getLogger('[services.following] ')
logger.setLevel(logging.DEBUG)
MODEL_CLASSES = {'author': AuthorFollower, 'topic': TopicFollower, 'shout': ShoutReactionsFollower}
class FollowingResult: class FollowingResult:
@ -9,39 +23,96 @@ class FollowingResult:
class Following: class Following:
queue = asyncio.Queue()
def __init__(self, kind, uid): def __init__(self, kind, uid):
self.kind = kind # author topic shout community self.kind = kind # author, topic, shout
self.uid = uid self.uid = uid
self.queue = asyncio.Queue()
class FollowingManager: class FollowingManager:
lock = asyncio.Lock() lock = asyncio.Lock()
followers_by_kind = {} followers_by_kind = {'author': [], 'topic': [], 'shout': []}
data = {'author': [], 'topic': [], 'shout': [], 'community': []} authors_by_follower = {}
topics_by_follower = {}
shouts_by_follower = {}
@staticmethod
async def preload():
logger.info(' preloading started...')
ts = int(time.time())
async with FollowingManager.lock:
with local_session() as session:
# Load followers_by_kind
for kind in FollowingManager.followers_by_kind.keys():
model_class = MODEL_CLASSES[kind]
followers = session.query(model_class.follower).distinct().all()
FollowingManager.followers_by_kind[kind] = [follower[0] for follower in followers]
# Load authors_by_follower
for following in session.query(AuthorFollower).all():
FollowingManager.authors_by_follower[following.follower] = FollowingManager.authors_by_follower.get(
following.follower, []
)
FollowingManager.authors_by_follower[following.follower].append(following.author)
# Load topics_by_follower
for following in session.query(TopicFollower).all():
FollowingManager.topics_by_follower[following.follower] = FollowingManager.topics_by_follower.get(
following.follower, []
)
FollowingManager.topics_by_follower[following.follower].append(following.topic)
# Load shouts_by_follower
for following in session.query(ShoutReactionsFollower).all():
FollowingManager.shouts_by_follower[following.follower] = FollowingManager.shouts_by_follower.get(
following.follower, []
)
FollowingManager.shouts_by_follower[following.follower].append(following.shout)
logger.info(f' preloading finished at {(int(time.time()) - ts)/1000} secs')
@staticmethod @staticmethod
async def register(kind, uid): async def register(kind, uid):
async with FollowingManager.lock: async with FollowingManager.lock:
FollowingManager.followers_by_kind[kind] = FollowingManager.followers_by_kind.get(kind, []) if uid not in FollowingManager.followers_by_kind[kind]:
FollowingManager.followers_by_kind[kind].append(uid) FollowingManager.followers_by_kind[kind].append(uid)
@staticmethod @staticmethod
async def remove(kind, uid): async def remove(kind, uid):
async with FollowingManager.lock: async with FollowingManager.lock:
followings = FollowingManager.followers_by_kind.get(kind) FollowingManager.followers_by_kind[kind] = [
if followings: follower for follower in FollowingManager.followers_by_kind[kind] if follower != uid
followings.remove(uid) ]
FollowingManager.followers_by_kind[kind] = followings
@staticmethod @staticmethod
async def push(kind, payload): async def push(kind, payload):
try: try:
async with FollowingManager.lock: async with FollowingManager.lock:
entities = FollowingManager.followers_by_kind.get(kind, []) for entity in FollowingManager.followers_by_kind[kind]:
for entity in entities[:]: # Use a copy to iterate if payload.shout['created_by'] == entity:
if payload.shout['created_by'] == entity.uid: await entity.queue.put(payload)
entity.queue.put_nowait(payload)
except Exception as e: except Exception as e:
print(Exception(e)) print(f'Error in push method: {e}')
@staticmethod
async def get_followers_by_kind(kind, target_id=None):
async with FollowingManager.lock:
return (
FollowingManager.followers_by_kind[kind][target_id]
if target_id
else FollowingManager.followers_by_kind[kind]
)
@staticmethod
async def get_authors_for(follower_id):
async with FollowingManager.lock:
return FollowingManager.authors_by_follower.get(follower_id, [])
@staticmethod
async def get_topics_for(follower_id):
async with FollowingManager.lock:
return FollowingManager.topics_by_follower.get(follower_id, [])
@staticmethod
async def get_shouts_for(follower_id):
async with FollowingManager.lock:
return FollowingManager.shouts_by_follower.get(follower_id, [])