reacted-storage-removed

This commit is contained in:
tonyrewin 2022-11-28 11:47:39 +03:00
parent 0ad7d28c00
commit deec1c8181
10 changed files with 73 additions and 244 deletions

View File

@ -16,7 +16,7 @@ from base.redis import redis
from base.resolvers import resolvers from base.resolvers import resolvers
from resolvers.auth import confirm_email_handler from resolvers.auth import confirm_email_handler
from services.main import storages_init from services.main import storages_init
from services.stat.reacted import ReactedStorage # from services.stat.reacted import ReactedStorage
from services.stat.topicstat import TopicStat from services.stat.topicstat import TopicStat
from services.stat.viewed import ViewedStorage from services.stat.viewed import ViewedStorage
from services.zine.topics import TopicStorage from services.zine.topics import TopicStorage
@ -41,8 +41,8 @@ async def start_up():
print(topics_random_work) print(topics_random_work)
views_stat_task = asyncio.create_task(ViewedStorage().worker()) views_stat_task = asyncio.create_task(ViewedStorage().worker())
print(views_stat_task) print(views_stat_task)
reacted_storage_task = asyncio.create_task(ReactedStorage.worker()) # reacted_storage_task = asyncio.create_task(ReactedStorage.worker())
print(reacted_storage_task) # print(reacted_storage_task)
shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker()) shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker())
print(shout_author_task) print(shout_author_task)
topic_stat_task = asyncio.create_task(TopicStat.worker()) topic_stat_task = asyncio.create_task(TopicStat.worker())

View File

@ -8,7 +8,7 @@ from orm.reaction import Reaction, ReactionKind
from orm.shout import ShoutReactionsFollower from orm.shout import ShoutReactionsFollower
from orm.topic import TopicFollower from orm.topic import TopicFollower
from orm.user import User from orm.user import User
from services.stat.reacted import ReactedStorage # from services.stat.reacted import ReactedStorage
ts = datetime.now(tz=timezone.utc) ts = datetime.now(tz=timezone.utc)
@ -77,7 +77,7 @@ async def migrate(entry, storage):
# creating reaction from old comment # creating reaction from old comment
reaction = Reaction.create(**reaction_dict) reaction = Reaction.create(**reaction_dict)
session.add(reaction) session.add(reaction)
await ReactedStorage.react(reaction) # await ReactedStorage.react(reaction)
# creating shout's reactions following for reaction author # creating shout's reactions following for reaction author
following1 = session.query( following1 = session.query(
@ -148,7 +148,7 @@ async def migrate(entry, storage):
) )
session.add(following2) session.add(following2)
session.add(rr) session.add(rr)
await ReactedStorage.react(rr) # await ReactedStorage.react(rr)
except Exception as e: except Exception as e:
print("[migration] comment rating error: %r" % re_reaction_dict) print("[migration] comment rating error: %r" % re_reaction_dict)

View File

@ -9,7 +9,7 @@ from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower
from orm.user import User from orm.user import User
from orm.topic import TopicFollower from orm.topic import TopicFollower
from services.stat.reacted import ReactedStorage # from services.stat.reacted import ReactedStorage
from services.stat.viewed import ViewedStorage from services.stat.viewed import ViewedStorage
OLD_DATE = "2016-03-05 22:22:00.350000" OLD_DATE = "2016-03-05 22:22:00.350000"
@ -373,7 +373,7 @@ async def content_ratings_to_reactions(entry, slug):
else: else:
rea = Reaction.create(**reaction_dict) rea = Reaction.create(**reaction_dict)
session.add(rea) session.add(rea)
await ReactedStorage.react(rea) # await ReactedStorage.react(rea)
# shout_dict['ratings'].append(reaction_dict) # shout_dict['ratings'].append(reaction_dict)
session.commit() session.commit()

View File

@ -29,7 +29,10 @@ def calc_reactions(q):
(Reaction.body.is_not(None), 1), (Reaction.body.is_not(None), 1),
else_=0 else_=0
) )
).label('commented') ).label('commented'),
sa.func.sum(
Reaction.id
).label('reacted')
) )
@ -72,7 +75,7 @@ async def load_shout(_, info, slug):
Shout.deletedAt.is_(None) Shout.deletedAt.is_(None)
).group_by(Shout.id) ).group_by(Shout.id)
[shout, rating, commented] = session.execute(q).unique().one() [shout, rating, commented, reacted] = session.execute(q).unique().one()
for a in shout.authors: for a in shout.authors:
a.caption = await ShoutAuthorStorage.get_author_caption(shout.slug, a.slug) a.caption = await ShoutAuthorStorage.get_author_caption(shout.slug, a.slug)
viewed = await ViewedStorage.get_shout(shout.slug) viewed = await ViewedStorage.get_shout(shout.slug)
@ -80,7 +83,7 @@ async def load_shout(_, info, slug):
"rating": rating, "rating": rating,
"viewed": viewed, "viewed": viewed,
"commented": commented, "commented": commented,
# "reacted": reacted "reacted": reacted
} }
return shout return shout
@ -146,12 +149,12 @@ async def load_shouts_by(_, info, options):
shouts = [] shouts = []
with local_session() as session: with local_session() as session:
for [shout, rating, commented] in session.execute(q).unique(): for [shout, rating, commented, reacted] in session.execute(q).unique():
shout.stat = { shout.stat = {
"rating": rating, "rating": rating,
"viewed": await ViewedStorage.get_shout(shout.slug), "viewed": await ViewedStorage.get_shout(shout.slug),
"commented": commented, "commented": commented,
# "reacted": reacted "reacted": reacted
} }
# NOTE: no need authors captions in arrays # NOTE: no need authors captions in arrays
# for author in shout.authors: # for author in shout.authors:

View File

@ -8,16 +8,8 @@ from base.resolvers import mutation, query
from orm.reaction import Reaction, ReactionKind from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout, ShoutReactionsFollower from orm.shout import Shout, ShoutReactionsFollower
from orm.user import User from orm.user import User
from services.stat.reacted import ReactedStorage # from services.stat.reacted import ReactedStorage
from resolvers.zine.load import calc_reactions
async def get_reaction_stat(reaction_id):
return {
# "viewed": await ViewedStorage.get_reaction(reaction_id),
"reacted": len(await ReactedStorage.get_reaction(reaction_id)),
"rating": await ReactedStorage.get_reaction_rating(reaction_id),
"commented": len(await ReactedStorage.get_reaction_comments(reaction_id)),
}
def reactions_follow(user: User, slug: str, auto=False): def reactions_follow(user: User, slug: str, auto=False):
@ -142,13 +134,17 @@ async def create_reaction(_, info, inp):
elif check_to_publish(session, user, reaction): elif check_to_publish(session, user, reaction):
set_published(session, reaction.shout, reaction.createdBy) set_published(session, reaction.shout, reaction.createdBy)
ReactedStorage.react(reaction) # ReactedStorage.react(reaction)
try: try:
reactions_follow(user, inp["shout"], True) reactions_follow(user, inp["shout"], True)
except Exception as e: except Exception as e:
print(f"[resolvers.reactions] error on reactions autofollowing: {e}") print(f"[resolvers.reactions] error on reactions autofollowing: {e}")
reaction.stat = await get_reaction_stat(reaction.id) reaction.stat = {
"commented": 0,
"reacted": 0,
"rating": 0
}
return {"reaction": reaction} return {"reaction": reaction}
@ -160,11 +156,16 @@ async def update_reaction(_, info, inp):
with local_session() as session: with local_session() as session:
user = session.query(User).where(User.id == user_id).first() user = session.query(User).where(User.id == user_id).first()
reaction = session.query(Reaction).filter(Reaction.id == inp.id).first() q = select(Reaction).filter(Reaction.id == inp.id)
q = calc_reactions(q)
[reaction, rating, commented, reacted] = session.execute(q).unique().one()
if not reaction: if not reaction:
return {"error": "invalid reaction id"} return {"error": "invalid reaction id"}
if reaction.createdBy != user.slug: if reaction.createdBy != user.slug:
return {"error": "access denied"} return {"error": "access denied"}
reaction.body = inp["body"] reaction.body = inp["body"]
reaction.updatedAt = datetime.now(tz=timezone.utc) reaction.updatedAt = datetime.now(tz=timezone.utc)
if reaction.kind != inp["kind"]: if reaction.kind != inp["kind"]:
@ -173,8 +174,11 @@ async def update_reaction(_, info, inp):
if inp.get("range"): if inp.get("range"):
reaction.range = inp.get("range") reaction.range = inp.get("range")
session.commit() session.commit()
reaction.stat = {
reaction.stat = await get_reaction_stat(reaction.id) "commented": commented,
"reacted": reacted,
"rating": rating
}
return {"reaction": reaction} return {"reaction": reaction}
@ -198,6 +202,7 @@ async def delete_reaction(_, info, rid):
def map_result_item(result_item): def map_result_item(result_item):
[user, shout, reaction] = result_item [user, shout, reaction] = result_item
print(reaction)
reaction.createdBy = user reaction.createdBy = user
reaction.shout = shout reaction.shout = shout
reaction.replyTo = reaction reaction.replyTo = reaction
@ -254,14 +259,22 @@ async def load_reactions_by(_, _info, by, limit=50, offset=0):
).order_by( ).order_by(
order_way(order_field) order_way(order_field)
) )
q = calc_reactions(q)
q = q.where(Reaction.deletedAt.is_(None)) q = q.where(Reaction.deletedAt.is_(None))
q = q.limit(limit).offset(offset) q = q.limit(limit).offset(offset)
reactions = []
with local_session() as session: with local_session() as session:
reactions = list(map(map_result_item, session.execute(q))) for [
for reaction in reactions: [reaction, rating, commented, reacted], shout, reply
reaction.stat = await get_reaction_stat(reaction.id) ] in list(map(map_result_item, session.execute(q))):
reaction.shout = shout
reaction.replyTo = reply
reaction.stat = {
"rating": rating,
"commented": commented,
"reacted": reacted
}
reactions.append(reaction)
if by.get("stat"): if by.get("stat"):
reactions.sort(lambda r: r.stat.get(by["stat"]) or r.createdAt) reactions.sort(lambda r: r.stat.get(by["stat"]) or r.createdAt)

View File

@ -1,4 +1,4 @@
from services.stat.reacted import ReactedStorage # from services.stat.reacted import ReactedStorage
from services.auth.roles import RoleStorage from services.auth.roles import RoleStorage
from services.auth.users import UserStorage from services.auth.users import UserStorage
from services.zine.topics import TopicStorage from services.zine.topics import TopicStorage
@ -10,7 +10,7 @@ from base.orm import local_session
async def storages_init(): async def storages_init():
with local_session() as session: with local_session() as session:
print('[main] initialize storages') print('[main] initialize storages')
ReactedStorage.init(session) # ReactedStorage.init(session)
RoleStorage.init(session) RoleStorage.init(session)
UserStorage.init(session) UserStorage.init(session)
TopicStorage.init(session) TopicStorage.init(session)

View File

@ -1,191 +0,0 @@
import asyncio
import time
from base.orm import local_session
from orm.reaction import ReactionKind, Reaction
from services.zine.topics import TopicStorage
def kind_to_rate(kind) -> int:
if kind in [
ReactionKind.AGREE,
ReactionKind.LIKE,
ReactionKind.PROOF,
ReactionKind.ACCEPT,
]:
return 1
elif kind in [
ReactionKind.DISAGREE,
ReactionKind.DISLIKE,
ReactionKind.DISPROOF,
ReactionKind.REJECT,
]:
return -1
else:
return 0
class ReactedStorage:
reacted = {"shouts": {}, "topics": {}, "reactions": {}, "authors": {}}
rating = {"shouts": {}, "topics": {}, "reactions": {}}
reactions = []
to_flush = []
period = 30 * 60 # sec
lock = asyncio.Lock()
modified_shouts = set([])
@staticmethod
async def get_shout(shout_slug):
self = ReactedStorage
async with self.lock:
return self.reacted["shouts"].get(shout_slug, [])
@staticmethod
async def get_author(user_slug):
self = ReactedStorage
async with self.lock:
return self.reacted["authors"].get(user_slug, [])
@staticmethod
async def get_followed_reactions(user_slug):
self = ReactedStorage
async with self.lock:
author_reactions = self.reacted["authors"].get(user_slug, [])
shouts = []
for r in author_reactions:
if r.shout not in shouts:
shouts.append(r.shout)
return shouts
@staticmethod
async def get_topic(topic_slug):
self = ReactedStorage
async with self.lock:
return self.reacted["topics"].get(topic_slug, [])
@staticmethod
async def get_comments(shout_slug):
self = ReactedStorage
async with self.lock:
return list(
filter(lambda r: bool(r.body), self.reacted["shouts"].get(shout_slug, {}))
)
@staticmethod
async def get_topic_comments(topic_slug):
self = ReactedStorage
async with self.lock:
return list(
filter(lambda r: bool(r.body), self.reacted["topics"].get(topic_slug, []))
)
@staticmethod
async def get_reaction_comments(reaction_id):
self = ReactedStorage
async with self.lock:
return list(
filter(
lambda r: bool(r.body), self.reacted["reactions"].get(reaction_id, {})
)
)
@staticmethod
async def get_reaction(reaction_id):
self = ReactedStorage
async with self.lock:
return self.reacted["reactions"].get(reaction_id, [])
@staticmethod
async def get_rating(shout_slug):
self = ReactedStorage
rating = 0
async with self.lock:
for r in self.reacted["shouts"].get(shout_slug, []):
rating = rating + kind_to_rate(r.kind)
return rating
@staticmethod
async def get_topic_rating(topic_slug):
self = ReactedStorage
rating = 0
async with self.lock:
for r in self.reacted["topics"].get(topic_slug, []):
rating = rating + kind_to_rate(r.kind)
return rating
@staticmethod
async def get_reaction_rating(reaction_id):
self = ReactedStorage
rating = 0
async with self.lock:
for r in self.reacted["reactions"].get(reaction_id, []):
rating = rating + kind_to_rate(r.kind)
return rating
@staticmethod
async def react(reaction):
ReactedStorage.modified_shouts.add(reaction.shout)
@staticmethod
async def recount(reactions):
self = ReactedStorage
for r in reactions:
# renew reactions by shout
self.reacted["shouts"][r.shout] = self.reacted["shouts"].get(r.shout, [])
self.reacted["shouts"][r.shout].append(r)
# renew reactions by author
self.reacted["authors"][r.createdBy] = self.reacted["authors"].get(r.createdBy, [])
self.reacted["authors"][r.createdBy].append(r)
# renew reactions by topic
shout_topics = await TopicStorage.get_topics_by_slugs([r.shout, ])
for t in shout_topics:
self.reacted["topics"][t] = self.reacted["topics"].get(t, [])
self.reacted["topics"][t].append(r)
self.rating["topics"][t] = \
self.rating["topics"].get(t, 0) + kind_to_rate(r.kind)
if r.replyTo:
# renew reactions replies
self.reacted["reactions"][r.replyTo] = \
self.reacted["reactions"].get(r.replyTo, [])
self.reacted["reactions"][r.replyTo].append(r)
self.rating["reactions"][r.replyTo] = \
self.rating["reactions"].get(r.replyTo, 0) + kind_to_rate(r.kind)
else:
# renew shout rating
self.rating["shouts"][r.shout] = \
self.rating["shouts"].get(r.shout, 0) + kind_to_rate(r.kind)
@staticmethod
def init(session):
self = ReactedStorage
all_reactions = session.query(Reaction).all()
self.modified_shouts = list(set([r.shout for r in all_reactions]))
print("[stat.reacted] %d shouts with reactions" % len(self.modified_shouts))
@staticmethod
async def recount_changed(session):
self = ReactedStorage
sss = list(self.modified_shouts)
c = 0
for slug in sss:
siblings = session.query(Reaction).where(Reaction.shout == slug).all()
c += len(siblings)
await self.recount(siblings)
print("[stat.reacted] %d reactions recounted" % c)
print("[stat.reacted] %d shouts modified" % len(self.modified_shouts))
print("[stat.reacted] %d topics" % len(self.reacted["topics"].values()))
print("[stat.reacted] %d authors" % len(self.reacted["authors"].values()))
print("[stat.reacted] %d replies" % len(self.reacted["reactions"]))
self.modified_shouts = set([])
@staticmethod
async def worker():
while True:
try:
with local_session() as session:
ts = time.time()
await ReactedStorage.recount_changed(session)
print("[stat.reacted] recount_changed took %fs " % (time.time() - ts))
except Exception as err:
print("[stat.reacted] recount error %s" % (err))
await asyncio.sleep(ReactedStorage.period)

View File

@ -17,9 +17,12 @@ class TopicStat:
@staticmethod @staticmethod
async def load_stat(session): async def load_stat(session):
print("[stat.topics] ⎧ loading stat -------")
ts = time.time()
self = TopicStat self = TopicStat
shout_topics = session.query(ShoutTopic, Shout).join(Shout).all() shout_topics = session.query(ShoutTopic, Shout).join(Shout).all() # ~ 10 secs
print("[stat.topics] %d links for shouts" % len(shout_topics)) print("[stat.topics] ⎪ shout topics joined query took %fs " % (time.time() - ts))
print("[stat.topics] ⎪ indexing %d links..." % len(shout_topics))
for [shout_topic, shout] in shout_topics: for [shout_topic, shout] in shout_topics:
tpc = shout_topic.topic tpc = shout_topic.topic
self.shouts_by_topic[tpc] = self.shouts_by_topic.get(tpc, dict()) self.shouts_by_topic[tpc] = self.shouts_by_topic.get(tpc, dict())
@ -35,7 +38,7 @@ class TopicStat:
self.followers_by_topic = {} self.followers_by_topic = {}
followings = session.query(TopicFollower).all() followings = session.query(TopicFollower).all()
print("[stat.topics] %d followings by users" % len(followings)) print("[stat.topics] ⎪ indexing %d followings..." % len(followings))
for flw in followings: for flw in followings:
topic = flw.topic topic = flw.topic
userslug = flw.follower userslug = flw.follower
@ -58,7 +61,7 @@ class TopicStat:
ts = time.time() ts = time.time()
async with self.lock: async with self.lock:
await self.load_stat(session) await self.load_stat(session)
print("[stat.topicstat] load_stat took %fs " % (time.time() - ts)) print("[stat.topics] load_stat took %fs " % (time.time() - ts))
except Exception as err: except Exception as err:
raise Exception(err) raise Exception(err)
if first_run: if first_run:

View File

@ -76,9 +76,9 @@ class ViewedStorage:
self.client = create_client({ self.client = create_client({
"Authorization": "Bearer %s" % str(token) "Authorization": "Bearer %s" % str(token)
}, schema=schema_str) }, schema=schema_str)
print("[stat.viewed] authorized permanentely by ackee.discours.io: %s" % token) print("[stat.viewed] * authorized permanentely by ackee.discours.io: %s" % token)
else: else:
print("[stat.viewed] please set ACKEE_TOKEN") print("[stat.viewed] * please set ACKEE_TOKEN")
self.disabled = True self.disabled = True
@staticmethod @staticmethod
@ -89,7 +89,7 @@ class ViewedStorage:
try: try:
self.pages = await self.client.execute_async(load_pages) self.pages = await self.client.execute_async(load_pages)
self.pages = self.pages["domains"][0]["statistics"]["pages"] self.pages = self.pages["domains"][0]["statistics"]["pages"]
print("[stat.viewed] ackee pages updated") print("[stat.viewed] ackee pages updated")
shouts = {} shouts = {}
try: try:
for page in self.pages: for page in self.pages:
@ -100,12 +100,12 @@ class ViewedStorage:
await ViewedStorage.increment(slug, v) await ViewedStorage.increment(slug, v)
except Exception: except Exception:
pass pass
print("[stat.viewed] %d pages collected " % len(shouts.keys())) print("[stat.viewed] %d pages collected " % len(shouts.keys()))
except Exception as e: except Exception as e:
raise e raise e
end = time.time() end = time.time()
print("[stat.viewed] update_pages took %fs " % (end - start)) print("[stat.viewed] update_pages took %fs " % (end - start))
@staticmethod @staticmethod
async def get_facts(): async def get_facts():
@ -179,21 +179,22 @@ class ViewedStorage:
async with self.lock: async with self.lock:
while True: while True:
try: try:
print("[stat.viewed] ⎧ updating views...")
await self.update_pages() await self.update_pages()
failed = 0 failed = 0
except Exception: except Exception:
failed += 1 failed += 1
print("[stat.viewed] update failed #%d, wait 10 seconds" % failed) print("[stat.viewed] update failed #%d, wait 10 seconds" % failed)
if failed > 3: if failed > 3:
print("[stat.viewed] not trying to update anymore") print("[stat.viewed] not trying to update anymore")
break break
if failed == 0: if failed == 0:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period) when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat()) t = format(when.astimezone().isoformat())
print("[stat.viewed] next update: %s" % ( print("[stat.viewed] next update: %s" % (
t.split("T")[0] + " " + t.split("T")[1].split(".")[0] t.split("T")[0] + " " + t.split("T")[1].split(".")[0]
)) ))
await asyncio.sleep(self.period) await asyncio.sleep(self.period)
else: else:
await asyncio.sleep(10) await asyncio.sleep(10)
print("[stat.viewed] trying to update data again...") print("[stat.viewed] trying to update data again...")

View File

@ -16,7 +16,7 @@ class ShoutAuthorStorage:
for sa in sas: for sa in sas:
self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, {}) self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, {})
self.authors_by_shout[sa.shout][sa.user] = sa.caption self.authors_by_shout[sa.shout][sa.user] = sa.caption
print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout)) print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout))
@staticmethod @staticmethod
async def get_author_caption(shout, author): async def get_author_caption(shout, author):
@ -43,7 +43,7 @@ class ShoutAuthorStorage:
with local_session() as session: with local_session() as session:
ts = time.time() ts = time.time()
await self.load_captions(session) await self.load_captions(session)
print("[zine.authors] load_captions took %fs " % (time.time() - ts)) print("[zine.authors] load_captions took %fs " % (time.time() - ts))
except Exception as err: except Exception as err:
print("[zine.authors] error indexing by author: %s" % (err)) print("[zine.authors] error indexing by author: %s" % (err))
# await asyncio.sleep(self.period) # await asyncio.sleep(self.period)