From efc3531c3386d3b0d24ba256157539c9a78458ad Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Tue, 15 Nov 2022 12:25:04 +0300 Subject: [PATCH] views from ackee --- base/ackee.py | 45 ------------ main.py | 6 +- migration/tables/content_items.py | 4 +- orm/__init__.py | 4 +- orm/viewed.py | 12 ---- resolvers/reactions.py | 2 +- resolvers/topics.py | 4 +- schema.graphql | 28 ++++---- services/main.py | 4 +- services/stat/reacted.py | 4 +- services/stat/viewed.py | 110 ------------------------------ services/stat/views.py | 99 +++++++++++++++++++++++++++ 12 files changed, 126 insertions(+), 196 deletions(-) delete mode 100644 base/ackee.py delete mode 100644 orm/viewed.py delete mode 100644 services/stat/viewed.py create mode 100644 services/stat/views.py diff --git a/base/ackee.py b/base/ackee.py deleted file mode 100644 index c44f39de..00000000 --- a/base/ackee.py +++ /dev/null @@ -1,45 +0,0 @@ -from gql import gql, Client -from gql.transport.aiohttp import AIOHTTPTransport - -# Provide a GraphQL query -query_ackee_views = gql( - """ - query getDomainsFacts { - domains { - statistics { - views { - id - count - } - pages { - id - count - created - } - } - facts { - activeVisitors - # averageViews - # averageDuration - viewsToday - viewsMonth - viewsYear - } - } - } - """ -) - - -class GraphQLClient: - # Select your transport with a defined url endpoint - transport = AIOHTTPTransport(url="https://ackee.discours.io/") - - # Create a GraphQL client using the defined transport - client = Client(transport=transport, fetch_schema_from_transport=True) - - @staticmethod - def get_views_by_slug(slug): - # Execute the query on the transport - domains = GraphQLClient.client.execute(query_ackee_views) - print(domains) diff --git a/main.py b/main.py index a4d090f0..edccebc2 100644 --- a/main.py +++ b/main.py @@ -17,7 +17,7 @@ from resolvers.auth import confirm_email_handler from services.main import storages_init from services.stat.reacted import ReactedStorage from services.stat.topicstat import TopicStat -from services.stat.viewed import ViewedStorage +from services.stat.views import Stat from services.zine.gittask import GitTask from services.zine.shoutauthor import ShoutAuthorStorage import_module("resolvers") @@ -31,8 +31,8 @@ middleware = [ async def start_up(): await redis.connect() - viewed_storage_task = asyncio.create_task(ViewedStorage.worker()) - print(viewed_storage_task) + views_stat_task = asyncio.create_task(Stat.worker()) + print(views_stat_task) reacted_storage_task = asyncio.create_task(ReactedStorage.worker()) print(reacted_storage_task) shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker()) diff --git a/migration/tables/content_items.py b/migration/tables/content_items.py index 8ce210db..8d9b64fe 100644 --- a/migration/tables/content_items.py +++ b/migration/tables/content_items.py @@ -12,7 +12,7 @@ from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower from orm.user import User from orm.topic import TopicFollower from services.stat.reacted import ReactedStorage -from services.stat.viewed import ViewedStorage +from services.stat.views import Stat OLD_DATE = "2016-03-05 22:22:00.350000" ts = datetime.now() @@ -340,7 +340,7 @@ async def migrate(entry, storage): raise Exception("[migration] content_item.ratings error: \n%r" % content_rating) # shout views - await ViewedStorage.increment(shout_dict["slug"], amount=entry.get("views", 1)) + await Stat.increment(shout_dict["slug"], amount=entry.get("views", 1)) # del shout_dict['ratings'] shout_dict["oid"] = entry.get("_id") storage["shouts"]["by_oid"][entry["_id"]] = shout_dict diff --git a/orm/__init__.py b/orm/__init__.py index 8c9f6412..4ac7cee3 100644 --- a/orm/__init__.py +++ b/orm/__init__.py @@ -6,7 +6,6 @@ from orm.reaction import Reaction from orm.shout import Shout from orm.topic import Topic, TopicFollower from orm.user import User, UserRating -from orm.viewed import ViewedByDay __all__ = [ "User", @@ -19,8 +18,7 @@ __all__ = [ "TopicFollower", "Notification", "Reaction", - "UserRating", - "ViewedByDay" + "UserRating" ] Base.metadata.create_all(engine) diff --git a/orm/viewed.py b/orm/viewed.py deleted file mode 100644 index 7db97418..00000000 --- a/orm/viewed.py +++ /dev/null @@ -1,12 +0,0 @@ -from datetime import datetime -from sqlalchemy import Column, DateTime, ForeignKey, Integer -from base.orm import Base - - -class ViewedByDay(Base): - __tablename__ = "viewed_by_day" - - id = None - shout = Column(ForeignKey("shout.slug"), primary_key=True) - day = Column(DateTime, primary_key=True, default=datetime.now) - value = Column(Integer) diff --git a/resolvers/reactions.py b/resolvers/reactions.py index 1f083cd0..d7cd0fdd 100644 --- a/resolvers/reactions.py +++ b/resolvers/reactions.py @@ -13,7 +13,7 @@ from services.stat.reacted import ReactedStorage async def get_reaction_stat(reaction_id): return { - # "viewed": await ViewedStorage.get_reaction(reaction_id), + # "viewed": await Stat.get_reaction(reaction_id), "reacted": len(await ReactedStorage.get_reaction(reaction_id)), "rating": await ReactedStorage.get_reaction_rating(reaction_id), "commented": len(await ReactedStorage.get_reaction_comments(reaction_id)), diff --git a/resolvers/topics.py b/resolvers/topics.py index b5824fc0..de167e7b 100644 --- a/resolvers/topics.py +++ b/resolvers/topics.py @@ -9,7 +9,7 @@ from orm.topic import Topic, TopicFollower from services.zine.topics import TopicStorage from services.stat.reacted import ReactedStorage from services.stat.topicstat import TopicStat -# from services.stat.viewed import ViewedStorage +from services.stat.views import Stat async def get_topic_stat(slug): @@ -17,7 +17,7 @@ async def get_topic_stat(slug): "shouts": len(TopicStat.shouts_by_topic.get(slug, {}).keys()), "authors": len(TopicStat.authors_by_topic.get(slug, {}).keys()), "followers": len(TopicStat.followers_by_topic.get(slug, {}).keys()), - # "viewed": await ViewedStorage.get_topic(slug), + "viewed": await Stat.get_topic(slug), "reacted": len(await ReactedStorage.get_topic(slug)), "commented": len(await ReactedStorage.get_topic_comments(slug)), "rating": await ReactedStorage.get_topic_rating(slug) diff --git a/schema.graphql b/schema.graphql index 43437974..5a43479b 100644 --- a/schema.graphql +++ b/schema.graphql @@ -191,49 +191,49 @@ type Mutation { unfollow(what: FollowingEntity!, slug: String!): Result! } -input MessagesBy { +interface By { + order: String + days: Int + stat: String +} + +input MessagesBy implements By { author: String body: String chat: String - days: Int } -input AuthorsBy { +input AuthorsBy implements By{ lastSeen: DateTime createdAt: DateTime - stat: String slug: String name: String topic: String } -input ShoutsBy { +input ShoutsBy implements By { slug: String title: String body: String topic: String author: String - days: Int layout: String published: Boolean visibility: String - stat: String } -input ReactionBy { +input ReactionBy implements By { shout: String body: String topic: String author: String - days: Int - stat: String } ################################### Query type Query { # inbox loadChats(offset: Int, amount: Int): Result! # your chats - loadMessagesBy(by: MessagesBy!, amount: Int, offset: Int): Result! + loadMessagesBy(by: By & MessagesBy!, amount: Int, offset: Int): Result! searchUsers(query: String!, amount: Int, offset: Int): Result! # auth @@ -242,9 +242,9 @@ type Query { signOut: AuthResult! # zine - loadAuthorsBy(by: AuthorsBy, amount: Int, offset: Int): [Author]! - loadShoutsBy(by: ShoutsBy, amount: Int, offset: Int): [Shout]! - loadReactionsBy(by: ReactionBy!, amount: Int, limit: Int): [Reaction]! + loadAuthorsBy(by: By & AuthorsBy, amount: Int, offset: Int): [Author]! + loadShoutsBy(by: By & ShoutsBy, amount: Int, offset: Int): [Shout]! + loadReactionsBy(by: By & ReactionBy!, amount: Int, limit: Int): [Reaction]! userFollowers(slug: String!): [Author]! userFollowedAuthors(slug: String!): [Author]! userFollowedTopics(slug: String!): [Topic]! diff --git a/services/main.py b/services/main.py index 3783b55f..bfe14663 100644 --- a/services/main.py +++ b/services/main.py @@ -1,4 +1,4 @@ -from services.stat.viewed import ViewedStorage +from services.stat.views import Stat from services.stat.reacted import ReactedStorage from services.auth.roles import RoleStorage from services.auth.users import UserStorage @@ -10,7 +10,7 @@ from base.orm import local_session async def storages_init(): with local_session() as session: print('[main] initialize storages') - ViewedStorage.init(session) + await Stat.update() ReactedStorage.init(session) RoleStorage.init(session) UserStorage.init(session) diff --git a/services/stat/reacted.py b/services/stat/reacted.py index de854f6a..5d772343 100644 --- a/services/stat/reacted.py +++ b/services/stat/reacted.py @@ -2,6 +2,7 @@ import asyncio from base.orm import local_session from orm.reaction import ReactionKind, Reaction from services.zine.topics import TopicStorage +from services.stat.views import Stat def kind_to_rate(kind) -> int: @@ -35,8 +36,7 @@ class ReactedStorage: @staticmethod async def get_shout_stat(slug): return { - # TODO: use ackee as datasource - "viewed": 0, # await ViewedStorage.get_shout(slug), + "viewed": await Stat.get_shout(slug), "reacted": len(await ReactedStorage.get_shout(slug)), "commented": len(await ReactedStorage.get_comments(slug)), "rating": await ReactedStorage.get_rating(slug), diff --git a/services/stat/viewed.py b/services/stat/viewed.py deleted file mode 100644 index 70f1fc30..00000000 --- a/services/stat/viewed.py +++ /dev/null @@ -1,110 +0,0 @@ -import asyncio -from datetime import datetime - -from base.orm import local_session - -from sqlalchemy.orm.attributes import flag_modified - -from orm.shout import ShoutTopic -from orm.viewed import ViewedByDay - - -class ViewedStorage: - viewed = {"shouts": {}, "topics": {}, "reactions": {}} - this_day_views = {} - to_flush = [] - period = 30 * 60 # sec - lock = asyncio.Lock() - - @staticmethod - def init(session): - self = ViewedStorage - views = session.query(ViewedByDay).all() - - for view in views: - shout = view.shout - topics = ( - session.query(ShoutTopic.topic).filter(ShoutTopic.shout == shout).all() - ) - value = view.value - if shout: - old_value = self.viewed["shouts"].get(shout, 0) - self.viewed["shouts"][shout] = old_value + value - for t in topics: - old_topic_value = self.viewed["topics"].get(t, 0) - self.viewed["topics"][t] = old_topic_value + value - if shout not in self.this_day_views: - self.this_day_views[shout] = view - this_day_view = self.this_day_views[shout] - if this_day_view.day < view.day: - self.this_day_views[shout] = view - - print("[stat.viewed] %d shouts viewed" % len(self.viewed['shouts'])) - - @staticmethod - async def get_shout(shout_slug): - self = ViewedStorage - async with self.lock: - return self.viewed["shouts"].get(shout_slug, 0) - - @staticmethod - async def get_topic(topic_slug): - self = ViewedStorage - async with self.lock: - return self.viewed["topics"].get(topic_slug, 0) - - @staticmethod - async def get_reaction(reaction_id): - self = ViewedStorage - async with self.lock: - return self.viewed["reactions"].get(reaction_id, 0) - - @staticmethod - async def increment(shout_slug, amount=1): - self = ViewedStorage - async with self.lock: - this_day_view = self.this_day_views.get(shout_slug) - day_start = datetime.now().replace(hour=0, minute=0, second=0) - if not this_day_view or this_day_view.day < day_start: - if this_day_view and getattr(this_day_view, "modified", False): - self.to_flush.append(this_day_view) - this_day_view = ViewedByDay.create(shout=shout_slug, value=1) - self.this_day_views[shout_slug] = this_day_view - else: - this_day_view.value = this_day_view.value + amount - this_day_view.modified = True - self.viewed["shouts"][shout_slug] = (self.viewed["shouts"].get(shout_slug, 0) + amount) - with local_session() as session: - topics = ( - session.query(ShoutTopic.topic) - .where(ShoutTopic.shout == shout_slug) - .all() - ) - for t in topics: - self.viewed["topics"][t] = self.viewed["topics"].get(t, 0) + amount - flag_modified(this_day_view, "value") - - @staticmethod - async def flush_changes(session): - self = ViewedStorage - async with self.lock: - for view in self.this_day_views.values(): - if getattr(view, "modified", False): - session.add(view) - flag_modified(view, "value") - view.modified = False - for view in self.to_flush: - session.add(view) - self.to_flush.clear() - session.commit() - - @staticmethod - async def worker(): - while True: - try: - with local_session() as session: - await ViewedStorage.flush_changes(session) - print("[stat.viewed] periodical flush") - except Exception as err: - print("[stat.viewed] : %s" % (err)) - await asyncio.sleep(ViewedStorage.period) diff --git a/services/stat/views.py b/services/stat/views.py new file mode 100644 index 00000000..7f999881 --- /dev/null +++ b/services/stat/views.py @@ -0,0 +1,99 @@ +from gql import gql, Client +from gql.transport.aiohttp import AIOHTTPTransport +import asyncio + +from services.zine.topics import TopicStorage + +query_ackee_views = gql( + """ + query getDomainsFacts { + domains { + statistics { + views { + id + count + } + pages { + id + count + created + } + } + facts { + activeVisitors + # averageViews + # averageDuration + viewsToday + viewsMonth + viewsYear + } + } + } + """ +) + + +class Stat: + lock = asyncio.Lock() + by_slugs = {} + by_topics = {} + period = 30 * 60 # 30 minutes + transport = AIOHTTPTransport(url="https://ackee.discours.io/") + client = Client(transport=transport, fetch_schema_from_transport=True) + + @staticmethod + async def load_views(): + # TODO: when the struture of paylod will be transparent + # TODO: perhaps ackee token getting here + + self = Stat + async with self.lock: + domains = self.client.execute(query_ackee_views) + print("[stat.ackee] loaded domains") + print(domains) + + print('\n\n# TODO: something here...\n\n') + + @staticmethod + async def get_shout(shout_slug): + self = Stat + async with self.lock: + return self.by_slugs.get(shout_slug) or 0 + + @staticmethod + async def get_topic(topic_slug): + self = Stat + async with self.lock: + shouts = self.by_topics.get(topic_slug) + topic_views = 0 + for v in shouts.values(): + topic_views += v + return topic_views + + @staticmethod + async def increment(shout_slug, amount=1): + self = Stat + async with self.lock: + self.by_slugs[shout_slug] = self.by_slugs.get(shout_slug) or 0 + self.by_slugs[shout_slug] += amount + shout_topics = await TopicStorage.get_topics_by_slugs([shout_slug, ]) + for t in shout_topics: + self.by_topics[t] = self.by_topics.get(t) or {} + self.by_topics[t][shout_slug] = self.by_topics[t].get(shout_slug) or 0 + self.by_topics[t][shout_slug] += amount + + @staticmethod + async def update(): + self = Stat + async with self.lock: + self.load_views() + + @staticmethod + async def worker(): + while True: + try: + await Stat.update() + except Exception as err: + print("[stat.ackee] : %s" % (err)) + print("[stat.ackee] renew period: %d minutes" % (Stat.period / 60)) + await asyncio.sleep(Stat.period)