This commit is contained in:
Igor Lobanov 2022-11-25 15:59:10 +01:00
parent 6e073d5dd1
commit 96e9728d6b
11 changed files with 4 additions and 368 deletions

View File

@ -16,8 +16,6 @@ from base.redis import redis
from base.resolvers import resolvers from base.resolvers import resolvers
from resolvers.auth import confirm_email_handler from resolvers.auth import confirm_email_handler
from services.main import storages_init from services.main import storages_init
from services.stat.reacted import ReactedStorage
from services.stat.topicstat import TopicStat
from services.stat.viewed import ViewedStorage from services.stat.viewed import ViewedStorage
from services.zine.gittask import GitTask from services.zine.gittask import GitTask
from settings import DEV_SERVER_STATUS_FILE_NAME from settings import DEV_SERVER_STATUS_FILE_NAME

View File

@ -8,7 +8,6 @@ from orm.reaction import Reaction, ReactionKind
from orm.shout import ShoutReactionsFollower from orm.shout import ShoutReactionsFollower
from orm.topic import TopicFollower from orm.topic import TopicFollower
from orm.user import User from orm.user import User
from services.stat.reacted import ReactedStorage
ts = datetime.now(tz=timezone.utc) ts = datetime.now(tz=timezone.utc)

View File

@ -9,7 +9,6 @@ from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower
from orm.user import User from orm.user import User
from orm.topic import TopicFollower from orm.topic import TopicFollower
from services.stat.reacted import ReactedStorage
from services.stat.viewed import ViewedStorage from services.stat.viewed import ViewedStorage
OLD_DATE = "2016-03-05 22:22:00.350000" OLD_DATE = "2016-03-05 22:22:00.350000"

View File

@ -7,7 +7,6 @@ from base.resolvers import query
from orm import ViewedEntry from orm import ViewedEntry
from orm.shout import Shout, ShoutAuthor from orm.shout import Shout, ShoutAuthor
from orm.reaction import Reaction, ReactionKind from orm.reaction import Reaction, ReactionKind
from services.stat.reacted import ReactedStorage
def add_rating_column(q): def add_rating_column(q):
@ -135,17 +134,15 @@ async def load_shouts_by(_, info, options):
q = q.group_by(Shout.id).order_by(query_order_by).limit(limit).offset(offset) q = q.group_by(Shout.id).order_by(query_order_by).limit(limit).offset(offset)
with local_session() as session: with local_session() as session:
shouts = list(map(map_result_item, session.execute(q).unique())) results = session.execute(q).unique()
for [shout, rating] in results:
for shout in shouts: shout.stat = await ReactedStorage.get_shout_stat(shout.slug, rating)
shout.stat = await ReactedStorage.get_shout_stat(shout.slug, shout.rating)
del shout.rating
author_captions = {} author_captions = {}
if with_author_captions: if with_author_captions:
author_captions_result = session.query(ShoutAuthor).where( author_captions_result = session.query(ShoutAuthor).where(
ShoutAuthor.shout.in_(map(lambda s: s.slug, shouts))).all() ShoutAuthor.shout.in_(map(lambda result_item: result_item[0].slug, results))).all()
for author_captions_result_item in author_captions_result: for author_captions_result_item in author_captions_result:
if author_captions.get(author_captions_result_item.shout) is None: if author_captions.get(author_captions_result_item.shout) is None:

View File

@ -10,8 +10,6 @@ from orm.reaction import Reaction
from orm.shout import ShoutAuthor from orm.shout import ShoutAuthor
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from orm.user import AuthorFollower, Role, User, UserRating, UserRole from orm.user import AuthorFollower, Role, User, UserRating, UserRole
from services.stat.reacted import ReactedStorage
from services.stat.topicstat import TopicStat
# from .community import followed_communities # from .community import followed_communities
from resolvers.inbox.unread import get_total_unread_counter from resolvers.inbox.unread import get_total_unread_counter

View File

@ -8,7 +8,6 @@ from base.resolvers import mutation, query
from orm.reaction import Reaction, ReactionKind from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout, ShoutReactionsFollower from orm.shout import Shout, ShoutReactionsFollower
from orm.user import User from orm.user import User
from services.stat.reacted import ReactedStorage
async def get_reaction_stat(reaction_id): async def get_reaction_stat(reaction_id):

View File

@ -5,9 +5,7 @@ from base.orm import local_session
from base.resolvers import mutation, query from base.resolvers import mutation, query
from orm import Shout from orm import Shout
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from services.zine.topics import TopicStorage
# from services.stat.reacted import ReactedStorage # from services.stat.reacted import ReactedStorage
from services.stat.topicstat import TopicStat
# from services.stat.viewed import ViewedStorage # from services.stat.viewed import ViewedStorage

View File

@ -1,7 +1,5 @@
from services.stat.reacted import ReactedStorage
from services.auth.roles import RoleStorage from services.auth.roles import RoleStorage
from services.auth.users import UserStorage from services.auth.users import UserStorage
from services.zine.topics import TopicStorage
from services.search import SearchService from services.search import SearchService
from services.stat.viewed import ViewedStorage from services.stat.viewed import ViewedStorage
from base.orm import local_session from base.orm import local_session
@ -10,10 +8,8 @@ from base.orm import local_session
async def storages_init(): async def storages_init():
with local_session() as session: with local_session() as session:
print('[main] initialize storages') print('[main] initialize storages')
ReactedStorage.init(session)
RoleStorage.init(session) RoleStorage.init(session)
UserStorage.init(session) UserStorage.init(session)
TopicStorage.init(session)
await SearchService.init(session) await SearchService.init(session)
session.commit() session.commit()
await ViewedStorage.init() await ViewedStorage.init()

View File

@ -1,207 +0,0 @@
import asyncio
import time
from base.orm import local_session
from orm.reaction import ReactionKind, Reaction
from services.zine.topics import TopicStorage
from services.stat.viewed import ViewedStorage
def kind_to_rate(kind) -> int:
if kind in [
ReactionKind.AGREE,
ReactionKind.LIKE,
ReactionKind.PROOF,
ReactionKind.ACCEPT,
]:
return 1
elif kind in [
ReactionKind.DISAGREE,
ReactionKind.DISLIKE,
ReactionKind.DISPROOF,
ReactionKind.REJECT,
]:
return -1
else:
return 0
class ReactedStorage:
reacted = {"shouts": {}, "topics": {}, "reactions": {}, "authors": {}}
rating = {"shouts": {}, "topics": {}, "reactions": {}}
reactions = []
to_flush = []
period = 30 * 60 # sec
lock = asyncio.Lock()
modified_shouts = set([])
@staticmethod
async def get_shout_stat(slug, rating):
viewed = int(await ViewedStorage.get_shout(slug))
# print(viewed)
return {
"viewed": viewed,
"reacted": len(await ReactedStorage.get_shout(slug)),
"commented": len(await ReactedStorage.get_comments(slug)),
# "rating": await ReactedStorage.get_rating(slug),
"rating": rating
}
@staticmethod
async def get_shout(shout_slug):
self = ReactedStorage
async with self.lock:
return self.reacted["shouts"].get(shout_slug, [])
@staticmethod
async def get_author(user_slug):
self = ReactedStorage
async with self.lock:
return self.reacted["authors"].get(user_slug, [])
@staticmethod
async def get_shouts_by_author(user_slug):
self = ReactedStorage
async with self.lock:
author_reactions = self.reacted["authors"].get(user_slug, [])
shouts = []
for r in author_reactions:
if r.shout not in shouts:
shouts.append(r.shout)
return shouts
@staticmethod
async def get_topic(topic_slug):
self = ReactedStorage
async with self.lock:
return self.reacted["topics"].get(topic_slug, [])
@staticmethod
async def get_comments(shout_slug):
self = ReactedStorage
async with self.lock:
return list(
filter(lambda r: bool(r.body), self.reacted["shouts"].get(shout_slug, {}))
)
@staticmethod
async def get_topic_comments(topic_slug):
self = ReactedStorage
async with self.lock:
return list(
filter(lambda r: bool(r.body), self.reacted["topics"].get(topic_slug, []))
)
@staticmethod
async def get_reaction_comments(reaction_id):
self = ReactedStorage
async with self.lock:
return list(
filter(
lambda r: bool(r.body), self.reacted["reactions"].get(reaction_id, {})
)
)
@staticmethod
async def get_reaction(reaction_id):
self = ReactedStorage
async with self.lock:
return self.reacted["reactions"].get(reaction_id, [])
@staticmethod
async def get_rating(shout_slug):
self = ReactedStorage
rating = 0
async with self.lock:
for r in self.reacted["shouts"].get(shout_slug, []):
rating = rating + kind_to_rate(r.kind)
return rating
@staticmethod
async def get_topic_rating(topic_slug):
self = ReactedStorage
rating = 0
async with self.lock:
for r in self.reacted["topics"].get(topic_slug, []):
rating = rating + kind_to_rate(r.kind)
return rating
@staticmethod
async def get_reaction_rating(reaction_id):
self = ReactedStorage
rating = 0
async with self.lock:
for r in self.reacted["reactions"].get(reaction_id, []):
rating = rating + kind_to_rate(r.kind)
return rating
@staticmethod
async def react(reaction):
ReactedStorage.modified_shouts.add(reaction.shout)
@staticmethod
async def recount(reactions):
self = ReactedStorage
for r in reactions:
# renew reactions by shout
self.reacted["shouts"][r.shout] = self.reacted["shouts"].get(r.shout, [])
self.reacted["shouts"][r.shout].append(r)
# renew reactions by author
self.reacted["authors"][r.createdBy] = self.reacted["authors"].get(r.createdBy, [])
self.reacted["authors"][r.createdBy].append(r)
# renew reactions by topic
shout_topics = await TopicStorage.get_topics_by_slugs([r.shout, ])
for t in shout_topics:
self.reacted["topics"][t] = self.reacted["topics"].get(t, [])
self.reacted["topics"][t].append(r)
self.rating["topics"][t] = \
self.rating["topics"].get(t, 0) + kind_to_rate(r.kind)
if r.replyTo:
# renew reactions replies
self.reacted["reactions"][r.replyTo] = \
self.reacted["reactions"].get(r.replyTo, [])
self.reacted["reactions"][r.replyTo].append(r)
self.rating["reactions"][r.replyTo] = \
self.rating["reactions"].get(r.replyTo, 0) + kind_to_rate(r.kind)
else:
# renew shout rating
self.rating["shouts"][r.shout] = \
self.rating["shouts"].get(r.shout, 0) + kind_to_rate(r.kind)
@staticmethod
def init(session):
self = ReactedStorage
all_reactions = session.query(Reaction).all()
self.modified_shouts = list(set([r.shout for r in all_reactions]))
print("[stat.reacted] %d shouts with reactions" % len(self.modified_shouts))
@staticmethod
async def recount_changed(session):
start = time.time()
self = ReactedStorage
async with self.lock:
sss = list(self.modified_shouts)
c = 0
for slug in sss:
siblings = session.query(Reaction).where(Reaction.shout == slug).all()
c += len(siblings)
await self.recount(siblings)
print("[stat.reacted] %d reactions recounted" % c)
print("[stat.reacted] %d shouts modified" % len(self.modified_shouts))
print("[stat.reacted] %d topics" % len(self.reacted["topics"].values()))
print("[stat.reacted] %d authors" % len(self.reacted["authors"].values()))
print("[stat.reacted] %d replies" % len(self.reacted["reactions"]))
self.modified_shouts = set([])
end = time.time()
print("[stat.reacted] recount_changed took %fs " % (end - start))
@staticmethod
async def worker():
while True:
try:
with local_session() as session:
await ReactedStorage.recount_changed(session)
except Exception as err:
print("[stat.reacted] recount error %s" % (err))
await asyncio.sleep(ReactedStorage.period)

View File

@ -1,73 +0,0 @@
import asyncio
import time
from base.orm import local_session
from orm.shout import Shout, ShoutTopic, ShoutAuthor
from orm.topic import TopicFollower
from sqlalchemy.sql.expression import select
class TopicStat:
# by slugs
shouts_by_topic = {} # Shout object stored
authors_by_topic = {} # User
followers_by_topic = {} # User
#
lock = asyncio.Lock()
period = 30 * 60 # sec
@staticmethod
async def load_stat(session):
start = time.time()
self = TopicStat
shout_topics = session.query(ShoutTopic, Shout).join(Shout).all()
all_shout_authors = session.query(ShoutAuthor).all()
print("[stat.topics] %d links for shouts" % len(shout_topics))
for [shout_topic, shout] in shout_topics:
tpc = shout_topic.topic
# shouts by topics
# shout = session.query(Shout).where(Shout.slug == shout_topic.shout).first()
self.shouts_by_topic[tpc] = self.shouts_by_topic.get(tpc, dict())
self.shouts_by_topic[tpc][shout.slug] = shout
# authors by topics
shout_authors = filter(lambda asa: asa.shout == shout.slug, all_shout_authors)
self.authors_by_topic[tpc] = self.authors_by_topic.get(tpc, dict())
for sa in shout_authors:
self.authors_by_topic[tpc][sa.shout] = sa.caption
self.followers_by_topic = {}
followings = session.query(TopicFollower).all()
print("[stat.topics] %d followings by users" % len(followings))
for flw in followings:
topic = flw.topic
userslug = flw.follower
self.followers_by_topic[topic] = self.followers_by_topic.get(topic, dict())
self.followers_by_topic[topic][userslug] = userslug
end = time.time()
print("[stat.topics] load_stat took %fs " % (end - start))
@staticmethod
async def get_shouts(topic):
self = TopicStat
async with self.lock:
return self.shouts_by_topic.get(topic, dict())
@staticmethod
async def worker():
self = TopicStat
first_run = True
while True:
try:
with local_session() as session:
async with self.lock:
await self.load_stat(session)
except Exception as err:
raise Exception(err)
if first_run:
# sleep for period + 1 min after first run
# to distribute load on server by workers with the same period
await asyncio.sleep(60)
first_run = False
await asyncio.sleep(self.period)

View File

@ -1,68 +0,0 @@
import asyncio
from orm.topic import Topic
class TopicStorage:
topics = {}
lock = asyncio.Lock()
@staticmethod
def init(session):
self = TopicStorage
topics = session.query(Topic)
self.topics = dict([(topic.slug, topic) for topic in topics])
for tpc in self.topics.values():
# self.load_parents(tpc)
pass
print("[zine.topics] %d precached" % len(self.topics.keys()))
# @staticmethod
# def load_parents(topic):
# self = TopicStorage
# parents = []
# for parent in self.topics.values():
# if topic.slug in parent.children:
# parents.append(parent.slug)
# topic.parents = parents
# return topic
@staticmethod
async def get_topics_all():
self = TopicStorage
async with self.lock:
return list(self.topics.values())
@staticmethod
async def get_topics_by_slugs(slugs):
self = TopicStorage
async with self.lock:
if not slugs:
return self.topics.values()
topics = filter(lambda topic: topic.slug in slugs, self.topics.values())
return list(topics)
@staticmethod
async def get_topics_by_community(community):
self = TopicStorage
async with self.lock:
topics = filter(
lambda topic: topic.community == community, self.topics.values()
)
return list(topics)
@staticmethod
async def get_topics_by_author(author):
self = TopicStorage
async with self.lock:
topics = filter(
lambda topic: topic.community == author, self.topics.values()
)
return list(topics)
@staticmethod
async def update_topic(topic):
self = TopicStorage
async with self.lock:
self.topics[topic.slug] = topic
# self.load_parents(topic)