From f0b625af53a17811d021b2cf9bdd5d2e01768374 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Sat, 13 Aug 2022 12:48:07 +0300 Subject: [PATCH] -reactions.storage, +collectionShouts.query, fixes --- main.py | 4 +- migration/tables/comments.py | 1 - orm/__init__.py | 2 + orm/reaction.py | 42 +++++++-- orm/shout.py | 112 ++++++++++++------------ resolvers/collection.py | 4 +- resolvers/reactions.py | 46 ++++++---- resolvers/zine.py | 13 +++ schema.graphql | 8 +- services/auth/roles.py | 2 +- services/auth/users.py | 2 +- services/stat/reacted.py | 142 +++++++++++++++---------------- services/stat/topicstat.py | 18 ++-- services/stat/viewed.py | 55 +++++------- services/zine/gittask.py | 4 +- services/zine/reactions.py | 159 ----------------------------------- services/zine/shoutauthor.py | 7 +- services/zine/shoutscache.py | 33 ++++---- services/zine/topics.py | 4 +- 19 files changed, 277 insertions(+), 381 deletions(-) delete mode 100644 services/zine/reactions.py diff --git a/main.py b/main.py index 1c1cb9ed..347b1227 100644 --- a/main.py +++ b/main.py @@ -12,11 +12,11 @@ from auth.email import email_authorize from base.redis import redis from base.resolvers import resolvers from resolvers.zine import ShoutsCache +from services.stat.reacted import ReactedStorage from services.stat.viewed import ViewedStorage from services.zine.gittask import GitTask from services.stat.topicstat import TopicStat from services.zine.shoutauthor import ShoutAuthorStorage -from services.zine.reactions import ReactionsStorage import asyncio import_module('resolvers') @@ -30,8 +30,8 @@ middleware = [ async def start_up(): await redis.connect() viewed_storage_task = asyncio.create_task(ViewedStorage.worker()) + reacted_storage_task = asyncio.create_task(ReactedStorage.worker()) shouts_cache_task = asyncio.create_task(ShoutsCache.worker()) - reaction_stat_task = asyncio.create_task(ReactionsStorage.worker()) shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker()) topic_stat_task = asyncio.create_task(TopicStat.worker()) git_task = asyncio.create_task(GitTask.git_task_worker()) diff --git a/migration/tables/comments.py b/migration/tables/comments.py index 648170b2..df07da3c 100644 --- a/migration/tables/comments.py +++ b/migration/tables/comments.py @@ -1,7 +1,6 @@ from datetime import datetime from dateutil.parser import parse as date_parse from orm import Reaction, User -from orm import reaction from base.orm import local_session from migration.html2text import html2text from orm.reaction import ReactionKind diff --git a/orm/__init__.py b/orm/__init__.py index 7d31e530..e8c3c297 100644 --- a/orm/__init__.py +++ b/orm/__init__.py @@ -6,6 +6,7 @@ from orm.topic import Topic, TopicFollower from orm.notification import Notification from orm.shout import Shout from orm.reaction import Reaction +from services.stat.reacted import ReactedStorage from services.zine.topics import TopicStorage from services.auth.users import UserStorage from services.stat.viewed import ViewedStorage @@ -24,6 +25,7 @@ Role.init_table() with local_session() as session: ViewedStorage.init(session) + ReactedStorage.init(session) RoleStorage.init(session) UserStorage.init(session) TopicStorage.init(session) diff --git a/orm/reaction.py b/orm/reaction.py index fadb9f1c..d862b23f 100644 --- a/orm/reaction.py +++ b/orm/reaction.py @@ -10,16 +10,42 @@ class ReactionKind(enum.Enum): DISAGREE = 2 # -1 PROOF = 3 # +1 DISPROOF = 4 # -1 - ASK = 5 # +0 + ASK = 5 # +0 bookmark PROPOSE = 6 # +0 - QUOTE = 7 # +0 + QUOTE = 7 # +0 bookmark COMMENT = 8 # +0 ACCEPT = 9 # +1 REJECT = 0 # -1 LIKE = 11 # +1 DISLIKE = 12 # -1 - # TYPE = # rating change guess + # TYPE = # rating diff +def kind_to_rate(kind) -> int: + if kind in [ + ReactionKind.AGREE, + ReactionKind.LIKE, + ReactionKind.PROOF, + ReactionKind.ACCEPT + ]: return 1 + elif kind in [ + ReactionKind.DISAGREE, + ReactionKind.DISLIKE, + ReactionKind.DISPROOF, + ReactionKind.REJECT + ]: return -1 + else: return 0 + +def get_bookmarked(reactions): + c = 0 + for r in reactions: + c += 1 if r.kind in [ ReactionKind.QUOTE, ReactionKind.ASK] else 0 + return c + +def get_rating(reactions): + rating = 0 + for r in reactions: + rating += kind_to_rate(r.kind) + return rating class Reaction(Base): __tablename__ = 'reaction' @@ -38,13 +64,15 @@ class Reaction(Base): @property async def stat(self): - reacted = 0 + reacted = [] try: with local_session() as session: - reacted = session.query(Reaction).filter(Reaction.replyTo == self.id).count() + reacted = session.query(Reaction).filter(Reaction.replyTo == self.id).all() except Exception as e: print(e) return { - "viewed": await ViewedStorage.get_reaction(self.slug), - "reacted": reacted + "viewed": await ViewedStorage.get_reaction(self.id), + "reacted": reacted.count(), + "rating": get_rating(reacted), + "bookmarked": get_bookmarked(reacted) } \ No newline at end of file diff --git a/orm/shout.py b/orm/shout.py index 529848a9..cc4d0cf0 100644 --- a/orm/shout.py +++ b/orm/shout.py @@ -3,67 +3,75 @@ from sqlalchemy import Column, Integer, String, ForeignKey, DateTime, Boolean from sqlalchemy.orm import relationship from orm.user import User from orm.topic import Topic, ShoutTopic -from orm.reaction import Reaction -from services.zine.reactions import ReactionsStorage +from orm.reaction import Reaction, get_bookmarked +from services.stat.reacted import ReactedStorage from services.stat.viewed import ViewedStorage -from base.orm import Base +from base.orm import Base, local_session class ShoutReactionsFollower(Base): - __tablename__ = "shout_reactions_followers" - - id = None - follower = Column(ForeignKey('user.slug'), primary_key = True) - shout = Column(ForeignKey('shout.slug'), primary_key = True) - auto = Column(Boolean, nullable=False, default = False) - createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") - deletedAt: str = Column(DateTime, nullable=True) + __tablename__ = "shout_reactions_followers" + + id = None + follower = Column(ForeignKey('user.slug'), primary_key = True) + shout = Column(ForeignKey('shout.slug'), primary_key = True) + auto = Column(Boolean, nullable=False, default = False) + createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") + deletedAt: str = Column(DateTime, nullable=True) class ShoutAuthor(Base): - __tablename__ = "shout_author" - - id = None - shout = Column(ForeignKey('shout.slug'), primary_key = True) - user = Column(ForeignKey('user.slug'), primary_key = True) - caption: str = Column(String, nullable = True, default = "") - + __tablename__ = "shout_author" + + id = None + shout = Column(ForeignKey('shout.slug'), primary_key = True) + user = Column(ForeignKey('user.slug'), primary_key = True) + caption: str = Column(String, nullable = True, default = "") + class ShoutAllowed(Base): - __tablename__ = "shout_allowed" - - id = None - shout = Column(ForeignKey('shout.slug'), primary_key = True) - user = Column(ForeignKey('user.id'), primary_key = True) + __tablename__ = "shout_allowed" + + id = None + shout = Column(ForeignKey('shout.slug'), primary_key = True) + user = Column(ForeignKey('user.id'), primary_key = True) class Shout(Base): - __tablename__ = 'shout' + __tablename__ = 'shout' - id = None + id = None - slug: str = Column(String, primary_key=True) - community: str = Column(Integer, ForeignKey("community.id"), nullable=False, comment="Community") - body: str = Column(String, nullable=False, comment="Body") - createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") - updatedAt: str = Column(DateTime, nullable=True, comment="Updated at") - replyTo: int = Column(ForeignKey("shout.slug"), nullable=True) - versionOf: int = Column(ForeignKey("shout.slug"), nullable=True) - tags: str = Column(String, nullable=True) - publishedBy: int = Column(ForeignKey("user.id"), nullable=True) - publishedAt: str = Column(DateTime, nullable=True) - cover: str = Column(String, nullable = True) - title: str = Column(String, nullable = True) - subtitle: str = Column(String, nullable = True) - layout: str = Column(String, nullable = True) - reactions = relationship(lambda: Reaction) - authors = relationship(lambda: User, secondary=ShoutAuthor.__tablename__) - topics = relationship(lambda: Topic, secondary=ShoutTopic.__tablename__) - mainTopic = Column(ForeignKey("topic.slug"), nullable=True) - visibleFor = relationship(lambda: User, secondary=ShoutAllowed.__tablename__) - draft: bool = Column(Boolean, default=True) - oid: str = Column(String, nullable=True) + slug: str = Column(String, primary_key=True) + community: str = Column(Integer, ForeignKey("community.id"), nullable=False, comment="Community") + body: str = Column(String, nullable=False, comment="Body") + createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") + updatedAt: str = Column(DateTime, nullable=True, comment="Updated at") + replyTo: int = Column(ForeignKey("shout.slug"), nullable=True) + versionOf: int = Column(ForeignKey("shout.slug"), nullable=True) + tags: str = Column(String, nullable=True) + publishedBy: int = Column(ForeignKey("user.id"), nullable=True) + publishedAt: str = Column(DateTime, nullable=True) + cover: str = Column(String, nullable = True) + title: str = Column(String, nullable = True) + subtitle: str = Column(String, nullable = True) + layout: str = Column(String, nullable = True) + reactions = relationship(lambda: Reaction) + authors = relationship(lambda: User, secondary=ShoutAuthor.__tablename__) + topics = relationship(lambda: Topic, secondary=ShoutTopic.__tablename__) + mainTopic = Column(ForeignKey("topic.slug"), nullable=True) + visibleFor = relationship(lambda: User, secondary=ShoutAllowed.__tablename__) + draft: bool = Column(Boolean, default=True) + oid: str = Column(String, nullable=True) - @property - async def stat(self): - return { - "viewed": await ViewedStorage.get_shout(self.slug), - "reacted": await ReactionsStorage.by_shout(self.slug) - } + @property + async def stat(self): + reacted = [] + try: + with local_session() as session: + reacted = session.query(Reaction).where(Reaction.shout == self.slug).all() + except Exception as e: + print(e) + return { + "viewed": await ViewedStorage.get_shout(self.slug), + "reacted": await ReactedStorage.get_shout(self.slug), + "rating": await ReactedStorage.get_rating(self.slug), + "bookmarked": get_bookmarked(reacted) + } diff --git a/resolvers/collection.py b/resolvers/collection.py index c00c3ac5..b8bb06f5 100644 --- a/resolvers/collection.py +++ b/resolvers/collection.py @@ -1,10 +1,10 @@ -from orm.collection import Collection, CollectionFollower +from orm.collection import Collection from base.orm import local_session from orm.user import User from base.resolvers import mutation, query from auth.authenticate import login_required from datetime import datetime -from typing import Collection, List +from typing import Collection from sqlalchemy import and_ @mutation.field("createCollection") diff --git a/resolvers/reactions.py b/resolvers/reactions.py index 2e68c7c1..8c9f742a 100644 --- a/resolvers/reactions.py +++ b/resolvers/reactions.py @@ -1,3 +1,4 @@ +from sqlalchemy import desc from orm.reaction import Reaction from base.orm import local_session from orm.shout import ShoutReactionsFollower @@ -5,8 +6,7 @@ from orm.user import User from base.resolvers import mutation, query from auth.authenticate import login_required from datetime import datetime -from services.zine.reactions import ReactionsStorage -from services.stat.viewed import ViewedStorage +from services.stat.reacted import ReactedStorage def reactions_follow(user, slug, auto=False): with local_session() as session: @@ -48,9 +48,11 @@ def reactions_unfollow(user, slug): @login_required async def create_reaction(_, info, inp): user = info.context["request"].user - + + # TODO: filter allowed reaction kinds + reaction = Reaction.create(**inp) - + ReactedStorage.increment(reaction.shout, reaction.replyTo) try: reactions_follow(user, inp['shout'], True) except Exception as e: @@ -101,26 +103,34 @@ async def delete_reaction(_, info, id): return {} @query.field("reactionsByShout") -def get_shout_reactions(_, info, slug): - #offset = page * size - #end = offset + size - return ReactionsStorage.reactions_by_shout.get(slug, []) #[offset:end] +def get_shout_reactions(_, info, slug, page, size): + offset = page * size + reactions = [] + with local_session() as session: + reactions = session.query(Reaction).filter(Reaction.shout == slug).limit(size).offset(offset).all() + return reactions @query.field("reactionsAll") def get_all_reactions(_, info, page=1, size=10): offset = page * size - end = offset + size - return ReactionsStorage.reactions[offset:end] + reactions = [] + with local_session() as session: + stmt = session.query(Reaction).\ + filter(Reaction.deletedAt == None).\ + order_by(desc("createdAt")).\ + offset(offset).limit(size) + reactions = [] + for row in session.execute(stmt): + reaction = row.Reaction + reactions.append(reaction) + reactions.sort(key=lambda x: x.createdAt, reverse=True) + return reactions @query.field("reactionsByAuthor") def get_reactions_by_author(_, info, slug, page=1, size=50): offset = page * size - end = offset + size - return ReactionsStorage.reactions_by_author.get(slug, [])[offset:end] - - -@mutation.field("viewReaction") -async def view_reaction(_, info, reaction): - await ViewedStorage.inc_reaction(reaction) - return {"error" : ""} \ No newline at end of file + reactions = [] + with local_session() as session: + reactions = session.query(Reaction).filter(Reaction.createdBy == slug).limit(size).offset(offset).all() + return reactions \ No newline at end of file diff --git a/resolvers/zine.py b/resolvers/zine.py index 1bce3936..530bc365 100644 --- a/resolvers/zine.py +++ b/resolvers/zine.py @@ -1,3 +1,4 @@ +from orm.collection import ShoutCollection from orm.shout import Shout, ShoutAuthor, ShoutTopic from orm.topic import Topic from base.orm import local_session @@ -81,6 +82,18 @@ async def shouts_by_topics(_, info, slugs, page, size): offset(page * size) return shouts +@query.field("shoutsByCollection") +async def shouts_by_topics(_, info, collection, page, size): + page = page - 1 + with local_session() as session: + shouts = session.query(Shout).\ + join(ShoutCollection, ShoutCollection.collection == collection).\ + where(and_(ShoutCollection.shout == Shout.slug, Shout.publishedAt != None)).\ + order_by(desc(Shout.publishedAt)).\ + limit(size).\ + offset(page * size) + return shouts + @query.field("shoutsByAuthors") async def shouts_by_authors(_, info, slugs, page, size): page = page - 1 diff --git a/schema.graphql b/schema.graphql index 90cb39c0..97338152 100644 --- a/schema.graphql +++ b/schema.graphql @@ -254,6 +254,7 @@ type Query { # collection getCollection(author: String!, slug: String!): Collection! + shoutsByCollection(collection: String, page: Int, size: Int): [Shout]! # communities getCommunity(slug: String): Community! @@ -408,8 +409,10 @@ type Shout { } type Stat { - viewed: Int! - reacted: Int! + viewed: Int + reacted: Int + rating: Int + bookmarked: Int } type Community { @@ -435,6 +438,7 @@ type TopicStat { followers: Int! authors: Int! viewed: Int! + reacted: Int! } type Topic { diff --git a/services/auth/roles.py b/services/auth/roles.py index b3e1e43c..531c8050 100644 --- a/services/auth/roles.py +++ b/services/auth/roles.py @@ -13,7 +13,7 @@ class RoleStorage: roles = session.query(Role).\ options(selectinload(Role.permissions)).all() self.roles = dict([(role.id, role) for role in roles]) - print('[service.auth] %d roles' % len(roles)) + print('[auth.roles] %d precached' % len(roles)) @staticmethod diff --git a/services/auth/users.py b/services/auth/users.py index 5ff6d4c4..b879d27b 100644 --- a/services/auth/users.py +++ b/services/auth/users.py @@ -13,7 +13,7 @@ class UserStorage: users = session.query(User).\ options(selectinload(User.roles), selectinload(User.ratings)).all() self.users = dict([(user.id, user) for user in users]) - print('[service.auth] %d users' % len(self.users)) + print('[auth.users] %d precached' % len(self.users)) @staticmethod async def get_user(id): diff --git a/services/stat/reacted.py b/services/stat/reacted.py index 87ceecda..e7576066 100644 --- a/services/stat/reacted.py +++ b/services/stat/reacted.py @@ -1,9 +1,12 @@ import asyncio from datetime import datetime +from typing_extensions import Self +from sqlalchemy.types import Enum from sqlalchemy import Column, DateTime, ForeignKey, Integer from sqlalchemy.orm.attributes import flag_modified from base.orm import Base, local_session -from orm.reaction import ReactionKind +from orm.reaction import ReactionKind, kind_to_rate +from orm.topic import ShoutTopic class ReactedByDay(Base): __tablename__ = "reacted_by_day" @@ -12,42 +15,50 @@ class ReactedByDay(Base): reaction = Column(ForeignKey("reaction.id"), primary_key = True) shout = Column(ForeignKey('shout.slug'), primary_key=True) reply = Column(ForeignKey('reaction.id'), primary_key=True, nullable=True) - kind = Column(ReactionKind, primary_key=True) + kind: int = Column(Enum(ReactionKind), nullable=False, comment="Reaction kind") day = Column(DateTime, primary_key=True, default=datetime.now) class ReactedStorage: reacted = { - 'shouts': { - 'total': 0, - 'today': 0, - 'month': 0, - # TODO: need an opionated metrics list - }, - 'topics': {} # TODO: get sum reactions for all shouts in topic + 'shouts': {}, + 'topics': {}, + 'reactions': {} } - this_day_reactions = {} + rating = { + 'shouts': {}, + 'topics': {}, + 'reactions': {} + } + reactions = [] to_flush = [] period = 30*60 # sec lock = asyncio.Lock() @staticmethod - def init(session): + def prepare(session): self = ReactedStorage - reactions = session.query(ReactedByDay).all() - - for reaction in reactions: + all_reactions = session.query(ReactedByDay).all() + day_start = datetime.now().replace(hour=0, minute=0, second=0) + for reaction in all_reactions: + day = reaction.day shout = reaction.shout - value = reaction.value - if shout: - old_value = self.reacted['shouts'].get(shout, 0) - self.reacted['shouts'][shout] = old_value + value - if not shout in self.this_day_reactions: - self.this_day_reactions[shout] = reaction - this_day_reaction = self.this_day_reactions[shout] - if this_day_reaction.day < reaction.day: - self.this_day_reactions[shout] = reaction + topics = session.query(ShoutTopic.topic).where(ShoutTopic.shout == shout).all() + kind = reaction.kind + + self.reacted['shouts'][shout] = self.reacted['shouts'].get(shout, 0) + 1 + self.rating['shouts'][shout] = self.rating['shouts'].get(shout, 0) + kind_to_rate(kind) + + for t in topics: + self.reacted['topics'][t] = self.reacted['topics'].get(t, 0) + 1 # reactions amount + self.rating['topics'][t] = self.rating['topics'].get(t, 0) + kind_to_rate(kind) # rating + + if reaction.reply: + self.reacted['reactions'][reaction.reply] = self.reacted['reactions'].get(reaction.reply, 0) + 1 + self.rating['reactions'][reaction.reply] = self.rating['reactions'].get(reaction.reply, 0) + kind_to_rate(reaction.kind) + + print('[stat.reacted] %d shouts reacted' % len(self.reacted['shouts'])) + print('[stat.reacted] %d reactions reacted' % len(self.reacted['reactions'])) - print('[service.reacted] watching %d shouts' % len(reactions)) @staticmethod async def get_shout(shout_slug): @@ -55,7 +66,24 @@ class ReactedStorage: async with self.lock: return self.reacted['shouts'].get(shout_slug, 0) - # NOTE: this method is never called + @staticmethod + async def get_topic(topic_slug): + self = ReactedStorage + async with self.lock: + return self.reacted['topics'].get(topic_slug, 0) + + @staticmethod + async def get_rating(shout_slug): + self = ReactedStorage + async with self.lock: + return self.reacted['shouts'].get(shout_slug, 0) + + @staticmethod + async def get_topic_rating(topic_slug): + self = ReactedStorage + async with self.lock: + return self.rating['topics'].get(topic_slug, 0) + @staticmethod async def get_reaction(reaction_id): self = ReactedStorage @@ -63,63 +91,35 @@ class ReactedStorage: return self.reacted['reactions'].get(reaction_id, 0) @staticmethod - async def inc_shout(shout_slug): + async def get_reaction_rating(reaction_id): self = ReactedStorage async with self.lock: - this_day_reaction = self.this_day_reactions.get(shout_slug) - day_start = datetime.now().replace(hour=0, minute=0, second=0) - if not this_day_reaction or this_day_reaction.day < day_start: - if this_day_reaction and getattr(this_day_reaction, "modified", False): - self.to_flush.append(this_day_reaction) - this_day_reaction = ReactedByDay.create(shout=shout_slug, value=1) - self.this_day_reactions[shout_slug] = this_day_reaction - else: - this_day_reaction.value = this_day_reaction.value + 1 - this_day_reaction.modified = True - old_value = self.reacted['shouts'].get(shout_slug, 0) - self.reacted['shotus'][shout_slug] = old_value + 1 + return self.rating['reactions'].get(reaction_id, 0) @staticmethod - async def inc_reaction(shout_slug, reaction_id): + async def increment(shout_slug, kind, reply_id = None): self = ReactedStorage + reaction: ReactedByDay = None async with self.lock: - this_day_reaction = self.this_day_reactions.get(reaction_id) - day_start = datetime.now().replace(hour=0, minute=0, second=0) - if not this_day_reaction or this_day_reaction.day < day_start: - if this_day_reaction and getattr(this_day_reaction, "modified", False): - self.to_flush.append(this_day_reaction) - this_day_reaction = ReactedByDay.create( - shout=shout_slug, reaction=reaction_id, value=1) - self.this_day_reactions[shout_slug] = this_day_reaction - else: - this_day_reaction.value = this_day_reaction.value + 1 - this_day_reaction.modified = True - old_value = self.reacted['shouts'].get(shout_slug, 0) - self.reacted['shouts'][shout_slug] = old_value + 1 - old_value = self.reacted['reactions'].get(shout_slug, 0) - self.reacted['reaction'][reaction_id] = old_value + 1 - - @staticmethod - async def flush_changes(session): - self = ReactedStorage - async with self.lock: - for reaction in self.this_day_reactions.values(): - if getattr(reaction, "modified", False): - session.add(reaction) - flag_modified(reaction, "value") - reaction.modified = False - for reaction in self.to_flush: - session.add(reaction) - self.to_flush.clear() - session.commit() + reaction = ReactedByDay.create(shout=shout_slug, kind=kind, reply=reply_id) + self.reacted['shouts'][shout_slug] = self.reacted['shouts'].get(shout_slug, []) + self.reacted['shouts'][shout_slug].append(reaction) + if reply_id: + self.reacted['reaction'][reply_id] = self.reacted['reactions'].get(shout_slug, []) + self.reacted['reaction'][reply_id].append(reaction) @staticmethod async def worker(): while True: try: with local_session() as session: - await ReactedStorage.flush_changes(session) - print("[service.reacted] service flushed changes") + ReactedStorage.prepare(session) + print("[stat.reacted] updated") except Exception as err: - print("[service.reacted] errror: %s" % (err)) + print("[stat.reacted] error: %s" % (err)) + raise err await asyncio.sleep(ReactedStorage.period) + + @staticmethod + def init(session): + ReactedStorage.prepare(session) \ No newline at end of file diff --git a/services/stat/topicstat.py b/services/stat/topicstat.py index 46c9d69b..2bb35c16 100644 --- a/services/stat/topicstat.py +++ b/services/stat/topicstat.py @@ -1,5 +1,7 @@ import asyncio from base.orm import local_session +from services.stat.reacted import ReactedStorage +from services.stat.viewed import ViewedStorage from services.zine.shoutauthor import ShoutAuthorStorage from orm.topic import ShoutTopic, TopicFollower from typing import Dict @@ -8,7 +10,6 @@ class TopicStat: shouts_by_topic = {} authors_by_topic = {} followers_by_topic = {} - reactions_by_topic = {} lock = asyncio.Lock() period = 30*60 #sec @@ -32,8 +33,8 @@ class TopicStat: else: self.authors_by_topic[topic] = set(authors) - print('[service.topicstat] authors sorted') - print('[service.topicstat] shouts sorted') + print('[stat.topics] authors sorted') + print('[stat.topics] shouts sorted') self.followers_by_topic = {} followings = session.query(TopicFollower) @@ -44,7 +45,7 @@ class TopicStat: self.followers_by_topic[topic].append(user) else: self.followers_by_topic[topic] = [user] - print('[service.topicstat] followers sorted') + print('[stat.topics] followers sorted') @staticmethod async def get_shouts(topic): @@ -59,13 +60,14 @@ class TopicStat: shouts = self.shouts_by_topic.get(topic, []) followers = self.followers_by_topic.get(topic, []) authors = self.authors_by_topic.get(topic, []) - reactions = self.reactions_by_topic.get(topic, []) return { "shouts" : len(shouts), "authors" : len(authors), "followers" : len(followers), - "reactions" : len(reactions) + "viewed": ViewedStorage.get_topic(topic), + "reacted" : ReactedStorage.get_topic(topic), + "rating" : ReactedStorage.get_topic_rating(topic), } @staticmethod @@ -76,8 +78,8 @@ class TopicStat: with local_session() as session: async with self.lock: await self.load_stat(session) - print("[service.topicstat] updated") + print("[stat.topics] updated") except Exception as err: - print("[service.topicstat] errror: %s" % (err)) + print("[stat.topics] errror: %s" % (err)) await asyncio.sleep(self.period) diff --git a/services/stat/viewed.py b/services/stat/viewed.py index a0ac5956..4ef3ee9e 100644 --- a/services/stat/viewed.py +++ b/services/stat/viewed.py @@ -3,6 +3,7 @@ from datetime import datetime from sqlalchemy import Column, DateTime, ForeignKey, Integer from sqlalchemy.orm.attributes import flag_modified from base.orm import Base, local_session +from orm.topic import ShoutTopic class ViewedByDay(Base): @@ -17,7 +18,7 @@ class ViewedByDay(Base): class ViewedStorage: viewed = { 'shouts': {}, - 'topics': {} # TODO: get sum views for all shouts in topic + 'topics': {} } this_day_views = {} to_flush = [] @@ -31,33 +32,36 @@ class ViewedStorage: for view in views: shout = view.shout + topics = session.query(ShoutTopic.topic).filter(ShoutTopic.shout == shout).all() value = view.value if shout: old_value = self.viewed['shouts'].get(shout, 0) self.viewed['shouts'][shout] = old_value + value + for t in topics: + old_topic_value = self.viewed['topics'].get(t, 0) + self.viewed['topics'][t] = old_topic_value + value if not shout in self.this_day_views: self.this_day_views[shout] = view this_day_view = self.this_day_views[shout] if this_day_view.day < view.day: self.this_day_views[shout] = view - print('[service.viewed] watching %d shouts' % len(views)) + print('[stat.viewed] watching %d shouts' % len(views)) @staticmethod async def get_shout(shout_slug): self = ViewedStorage async with self.lock: return self.viewed['shouts'].get(shout_slug, 0) - - # NOTE: this method is never called - @staticmethod - async def get_reaction(reaction_id): - self = ViewedStorage - async with self.lock: - return self.viewed['reactions'].get(reaction_id, 0) @staticmethod - async def inc_shout(shout_slug): + async def get_topic(topic_slug): + self = ViewedStorage + async with self.lock: + return self.viewed['topics'].get(topic_slug, 0) + + @staticmethod + async def increment(shout_slug): self = ViewedStorage async with self.lock: this_day_view = self.this_day_views.get(shout_slug) @@ -71,27 +75,14 @@ class ViewedStorage: this_day_view.value = this_day_view.value + 1 this_day_view.modified = True old_value = self.viewed['shouts'].get(shout_slug, 0) - self.viewed['shotus'][shout_slug] = old_value + 1 - - @staticmethod - async def inc_reaction(shout_slug, reaction_id): - self = ViewedStorage - async with self.lock: - this_day_view = self.this_day_views.get(reaction_id) - day_start = datetime.now().replace(hour=0, minute=0, second=0) - if not this_day_view or this_day_view.day < day_start: - if this_day_view and getattr(this_day_view, "modified", False): - self.to_flush.append(this_day_view) - this_day_view = ViewedByDay.create( - shout=shout_slug, reaction=reaction_id, value=1) - self.this_day_views[shout_slug] = this_day_view - else: - this_day_view.value = this_day_view.value + 1 - this_day_view.modified = True - old_value = self.viewed['shouts'].get(shout_slug, 0) self.viewed['shouts'][shout_slug] = old_value + 1 - old_value = self.viewed['reactions'].get(shout_slug, 0) - self.viewed['reaction'][reaction_id] = old_value + 1 + with local_session() as session: + topics = session.query(ShoutTopic.topic).where(ShoutTopic.shout == shout_slug).all() + for t in topics: + old_topic_value = self.viewed['topics'].get(t, 0) + self.viewed['topics'][t] = old_topic_value + 1 + flag_modified(this_day_view, "value") + @staticmethod async def flush_changes(session): @@ -113,7 +104,7 @@ class ViewedStorage: try: with local_session() as session: await ViewedStorage.flush_changes(session) - print("[service.viewed] service flushed changes") + print("[stat.viewed] service flushed changes") except Exception as err: - print("[service.viewed] errror: %s" % (err)) + print("[stat.viewed] errror: %s" % (err)) await asyncio.sleep(ViewedStorage.period) diff --git a/services/zine/gittask.py b/services/zine/gittask.py index ecf0c692..5dd8e541 100644 --- a/services/zine/gittask.py +++ b/services/zine/gittask.py @@ -53,10 +53,10 @@ class GitTask: @staticmethod async def git_task_worker(): - print("[resolvers.git] worker start") + print("[service.git] worker start") while True: task = await GitTask.queue.get() try: task.execute() except Exception as err: - print("[resolvers.git] worker error: %s" % (err)) + print("[service.git] worker error: %s" % (err)) diff --git a/services/zine/reactions.py b/services/zine/reactions.py deleted file mode 100644 index 134e5dde..00000000 --- a/services/zine/reactions.py +++ /dev/null @@ -1,159 +0,0 @@ -import asyncio -from sqlalchemy import and_, desc, func -from sqlalchemy.orm import joinedload -from base.orm import local_session -from orm.reaction import Reaction, ReactionKind -from orm.topic import ShoutTopic - - -def kind_to_rate(kind) -> int: - if kind in [ - ReactionKind.AGREE, - ReactionKind.LIKE, - ReactionKind.PROOF, - ReactionKind.ACCEPT - ]: return 1 - elif kind in [ - ReactionKind.DISAGREE, - ReactionKind.DISLIKE, - ReactionKind.DISPROOF, - ReactionKind.REJECT - ]: return -1 - else: return 0 - -class ReactionsStorage: - limit = 200 - reactions = [] - rating_by_shout = {} - reactions_by_shout = {} - reactions_by_topic = {} # TODO: get sum reactions for all shouts in topic - reactions_by_author = {} - lock = asyncio.Lock() - period = 3*60 # 3 mins - - @staticmethod - async def prepare_all(session): - stmt = session.query(Reaction).\ - filter(Reaction.deletedAt == None).\ - order_by(desc("createdAt")).\ - limit(ReactionsStorage.limit) - reactions = [] - for row in session.execute(stmt): - reaction = row.Reaction - reactions.append(reaction) - reactions.sort(key=lambda x: x.createdAt, reverse=True) - async with ReactionsStorage.lock: - print("[service.reactions] %d recently published reactions " % len(reactions)) - ReactionsStorage.reactions = reactions - - @staticmethod - async def prepare_by_author(session): - try: - by_authors = session.query(Reaction.createdBy, func.count('*').label("count")).\ - where(and_(Reaction.deletedAt == None)).\ - group_by(Reaction.createdBy).all() - except Exception as e: - print(e) - by_authors = {} - async with ReactionsStorage.lock: - ReactionsStorage.reactions_by_author = dict([stat for stat in by_authors]) - print("[service.reactions] %d reacted users" % len(by_authors)) - - @staticmethod - async def prepare_by_shout(session): - try: - by_shouts = session.query(Reaction.shout, func.count('*').label("count")).\ - where(and_(Reaction.deletedAt == None)).\ - group_by(Reaction.shout).all() - except Exception as e: - print(e) - by_shouts = {} - async with ReactionsStorage.lock: - ReactionsStorage.reactions_by_shout = dict([stat for stat in by_shouts]) - print("[service.reactions] %d reacted shouts" % len(by_shouts)) - - @staticmethod - async def calc_ratings(session): - rating_by_shout = {} - for shout in ReactionsStorage.reactions_by_shout.keys(): - rating_by_shout[shout] = 0 - shout_reactions_by_kinds = session.query(Reaction).\ - where(and_(Reaction.deletedAt == None, Reaction.shout == shout)).\ - group_by(Reaction.kind, Reaction.id).all() - for reaction in shout_reactions_by_kinds: - rating_by_shout[shout] += kind_to_rate(reaction.kind) - async with ReactionsStorage.lock: - ReactionsStorage.rating_by_shout = rating_by_shout - - @staticmethod - async def prepare_by_topic(session): - # TODO: optimize - by_topics = session.query(Reaction, func.count('*').label("count")).\ - options( - joinedload(ShoutTopic), - joinedload(Reaction.shout) - ).\ - join(ShoutTopic, ShoutTopic.shout == Reaction.shout).\ - filter(Reaction.deletedAt == None).\ - group_by(ShoutTopic.topic).all() - reactions_by_topic = {} - for t, reactions in by_topics: - if not reactions_by_topic.get(t): - reactions_by_topic[t] = 0 - for r in reactions: - reactions_by_topic[t] += r.count - async with ReactionsStorage.lock: - ReactionsStorage.reactions_by_topic = reactions_by_topic - - @staticmethod - async def recent(): - async with ReactionsStorage.lock: - return ReactionsStorage.reactions - - @staticmethod - async def total(): - async with ReactionsStorage.lock: - return len(ReactionsStorage.reactions) - - @staticmethod - async def by_shout(shout): - async with ReactionsStorage.lock: - stat = ReactionsStorage.reactions_by_shout.get(shout) - stat = stat if stat else 0 - return stat - - @staticmethod - async def shout_rating(shout): - async with ReactionsStorage.lock: - return ReactionsStorage.rating_by_shout.get(shout) - - @staticmethod - async def by_author(slug): - async with ReactionsStorage.lock: - stat = ReactionsStorage.reactions_by_author.get(slug) - stat = stat if stat else 0 - return stat - - @staticmethod - async def by_topic(topic): - async with ReactionsStorage.lock: - stat = ReactionsStorage.reactions_by_topic.get(topic) - stat = stat if stat else 0 - return stat - - @staticmethod - async def worker(): - while True: - try: - with local_session() as session: - await ReactionsStorage.prepare_all(session) - print("[service.reactions] all reactions prepared") - await ReactionsStorage.prepare_by_shout(session) - print("[service.reactions] reactions by shouts prepared") - await ReactionsStorage.calc_ratings(session) - print("[service.reactions] reactions ratings prepared") - await ReactionsStorage.prepare_by_topic(session) - print("[service.reactions] reactions topics prepared") - except Exception as err: - print("[service.reactions] errror: %s" % (err)) - await asyncio.sleep(ReactionsStorage.period) diff --git a/services/zine/shoutauthor.py b/services/zine/shoutauthor.py index 99787637..4ae20405 100644 --- a/services/zine/shoutauthor.py +++ b/services/zine/shoutauthor.py @@ -20,8 +20,7 @@ class ShoutAuthorStorage: self.authors_by_shout[shout].append(user) else: self.authors_by_shout[shout] = [user] - print('[service.shoutauthor] %d authors ' % len(self.authors_by_shout)) - # FIXME: [service.shoutauthor] 4251 authors + print('[zine.authors] %d shouts preprocessed' % len(self.authors_by_shout)) @staticmethod async def get_authors(shout): @@ -37,7 +36,7 @@ class ShoutAuthorStorage: with local_session() as session: async with self.lock: await self.load(session) - print("[service.shoutauthor] updated") + print("[zine.authors] state updated") except Exception as err: - print("[service.shoutauthor] errror: %s" % (err)) + print("[zine.authors] errror: %s" % (err)) await asyncio.sleep(self.period) \ No newline at end of file diff --git a/services/zine/shoutscache.py b/services/zine/shoutscache.py index 56fb1adc..7bae2586 100644 --- a/services/zine/shoutscache.py +++ b/services/zine/shoutscache.py @@ -2,12 +2,11 @@ import asyncio from datetime import datetime, timedelta from sqlalchemy import and_, desc, func, select -from sqlalchemy.orm import selectinload, joinedload +from sqlalchemy.orm import selectinload from base.orm import local_session from orm.reaction import Reaction from orm.shout import Shout -from orm.topic import Topic -from services.zine.reactions import ReactionsStorage +from services.stat.reacted import ReactedStorage from services.stat.viewed import ViewedByDay @@ -27,11 +26,11 @@ class ShoutsCache: shouts = [] for row in session.execute(stmt): shout = row.Shout - shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0 + shout.rating = await ReactedStorage.get_rating(shout.slug) or 0 shouts.append(shout) async with ShoutsCache.lock: ShoutsCache.recent_published = shouts - print("[service.shoutscache] %d recently published shouts " % len(shouts)) + print("[zine.cache] %d recently published shouts " % len(shouts)) @staticmethod async def prepare_recent_all(): @@ -47,11 +46,11 @@ class ShoutsCache: for row in session.execute(stmt): shout = row.Shout # shout.topics = [t.slug for t in shout.topics] - shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0 + shout.rating = await ReactedStorage.get_rating(shout.slug) or 0 shouts.append(shout) async with ShoutsCache.lock: ShoutsCache.recent_all = shouts - print("[service.shoutscache] %d recently created shouts " % len(shouts)) + print("[zine.cache] %d recently created shouts " % len(shouts)) @staticmethod async def prepare_recent_reacted(): @@ -70,11 +69,11 @@ class ShoutsCache: for row in session.execute(stmt): shout = row.Shout # shout.topics = [t.slug for t in shout.topics] - shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0 + shout.rating = await ReactedStorage.get_rating(shout.slug) or 0 shouts.append(shout) async with ShoutsCache.lock: ShoutsCache.recent_reacted = shouts - print("[service.shoutscache] %d recently reacted shouts " % len(shouts)) + print("[zine.cache] %d recently reacted shouts " % len(shouts)) @staticmethod @@ -93,11 +92,11 @@ class ShoutsCache: # with rating synthetic counter for row in session.execute(stmt): shout = row.Shout - shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0 + shout.rating = await ReactedStorage.get_rating(shout.slug) or 0 shouts.append(shout) shouts.sort(key = lambda shout: shout.rating, reverse = True) async with ShoutsCache.lock: - print("[service.shoutscache] %d top shouts " % len(shouts)) + print("[zine.cache] %d top shouts " % len(shouts)) ShoutsCache.top_overall = shouts @staticmethod @@ -114,11 +113,11 @@ class ShoutsCache: shouts = [] for row in session.execute(stmt): shout = row.Shout - shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0 + shout.rating = await ReactedStorage.get_rating(shout.slug) or 0 shouts.append(shout) shouts.sort(key = lambda shout: shout.rating, reverse = True) async with ShoutsCache.lock: - print("[service.shoutscache] %d top month shouts " % len(shouts)) + print("[zine.cache] %d top month shouts " % len(shouts)) ShoutsCache.top_month = shouts @staticmethod @@ -135,11 +134,11 @@ class ShoutsCache: shouts = [] for row in session.execute(stmt): shout = row.Shout - shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0 + shout.rating = await ReactedStorage.get_rating(shout.slug) or 0 shouts.append(shout) # shouts.sort(key = lambda shout: shout.viewed, reverse = True) async with ShoutsCache.lock: - print("[service.shoutscache] %d top viewed shouts " % len(shouts)) + print("[zine.cache] %d top viewed shouts " % len(shouts)) ShoutsCache.top_viewed = shouts @staticmethod @@ -152,8 +151,8 @@ class ShoutsCache: await ShoutsCache.prepare_recent_published() await ShoutsCache.prepare_recent_all() await ShoutsCache.prepare_recent_reacted() - print("[service.shoutscache] updated") + print("[zine.cache] updated") except Exception as err: - print("[service.shoutscache] error: %s" % (err)) + print("[zine.cache] error: %s" % (err)) raise err await asyncio.sleep(ShoutsCache.period) diff --git a/services/zine/topics.py b/services/zine/topics.py index 0f85ed7d..a23d557f 100644 --- a/services/zine/topics.py +++ b/services/zine/topics.py @@ -12,9 +12,9 @@ class TopicStorage: topics = session.query(Topic) self.topics = dict([(topic.slug, topic) for topic in topics]) for topic in self.topics.values(): - self.load_parents(topic) # TODO: test + self.load_parents(topic) - print('[service.topics] %d ' % len(self.topics.keys())) + print('[zine.topics] %d ' % len(self.topics.keys())) @staticmethod def load_parents(topic):