From 9bd458c47cb7eb5b73582833bcc9c853617692ac Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 22 Jan 2024 18:42:45 +0300 Subject: [PATCH] add --- resolvers/editor.py | 12 ++-- services/viewed.py | 168 +++++++++++++++++++++++++++++++++----------- 2 files changed, 134 insertions(+), 46 deletions(-) diff --git a/resolvers/editor.py b/resolvers/editor.py index 8852ac1c..2cbf6deb 100644 --- a/resolvers/editor.py +++ b/resolvers/editor.py @@ -48,16 +48,16 @@ async def create_shout(_, info, inp): current_time = int(time.time()) new_shout = Shout( **{ - "title": inp.get("title"), - "subtitle": inp.get("subtitle"), - "lead": inp.get("lead"), - "description": inp.get("description"), + "title": inp.get("title", ""), + "subtitle": inp.get("subtitle", ""), + "lead": inp.get("lead", ""), + "description": inp.get("description", ""), "body": inp.get("body", ""), - "layout": inp.get("layout"), + "layout": inp.get("layout", "article"), "created_by": author.id, "authors": [], "slug": inp.get("slug") or f"draft-{time.time()}", - "topics": inp.get("topics"), + "topics": inp.get("topics", []), "visibility": ShoutVisibility.AUTHORS.value, "created_at": current_time, # Set created_at as Unix timestamp } diff --git a/services/viewed.py b/services/viewed.py index d6d2172a..67bca042 100644 --- a/services/viewed.py +++ b/services/viewed.py @@ -1,4 +1,5 @@ import asyncio +import threading from logging import Logger import time from datetime import datetime, timedelta, timezone @@ -6,13 +7,19 @@ from os import environ import logging from gql import Client, gql from gql.transport.aiohttp import AIOHTTPTransport +from graphql import DocumentNode from orm.shout import Shout, ShoutTopic from orm.topic import Topic from services.db import local_session - -logging.basicConfig() +logging.basicConfig( + format="[%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s", + level=logging.DEBUG, + handlers=[ + logging.StreamHandler(), + ], +) logger = logging.getLogger("\t[services.viewed]\t") logger.setLevel(logging.DEBUG) @@ -46,8 +53,20 @@ load_pages = gql( } } """ ) -schema_str = open("schemas/ackee.graphql").read() +create_record_mutation_string = """ + createRecord(domainId: $domainId, input: $input) { + payload { + id + } + } +""" + +create_record_mutation = gql(f"mutation {{{create_record_mutation_string}}}") + +schema_str = open("schemas/stat.graphql").read() token = environ.get("ACKEE_TOKEN", "") +domain_id = environ.get("ACKEE_DOMAIN_ID", "") +ackee_site = environ.get("ACKEE_SITE", "https://testing.discours.io/") def create_client(headers=None, schema=None): @@ -62,10 +81,11 @@ class ViewedStorage: lock = asyncio.Lock() by_shouts = {} by_topics = {} - by_reactions = {} + by_authors = {} views = None pages = None domains = None + facts = None period = 60 * 60 # every hour client: Client | None = None auth_result = None @@ -93,59 +113,62 @@ class ViewedStorage: try: start = time.time() self = ViewedStorage - if self.client: - # Use asyncio.run to execute asynchronous code in the main entry point - self.pages = await asyncio.to_thread(self.client.execute, load_pages) - domains = self.pages.get("domains", []) - # logger.debug(f" | domains: {domains}") - for domain in domains: - pages = domain.get("statistics", {}).get("pages", []) - if pages: - # logger.debug(f" | pages: {pages}") - shouts = {} - for page in pages: - p = page["value"].split("?")[0] - slug = p.split("discours.io/")[-1] - shouts[slug] = page["count"] - for slug in shouts.keys(): - await ViewedStorage.increment(slug, shouts[slug]) - logger.info(" ⎪ %d pages collected " % len(shouts.keys())) + async with self.lock: + if self.client: + # Use asyncio.run to execute asynchronous code in the main entry point + self.pages = await asyncio.to_thread(self.client.execute, load_pages) + domains = self.pages.get("domains", []) + # logger.debug(f" | domains: {domains}") + for domain in domains: + pages = domain.get("statistics", {}).get("pages", []) + if pages: + # logger.debug(f" | pages: {pages}") + shouts = {} + for page in pages: + p = page["value"].split("?")[0] + slug = p.split("discours.io/")[-1] + shouts[slug] = page["count"] + for slug in shouts.keys(): + self.by_shouts[slug] = self.by_shouts.get(slug, 0) + 1 + self.update_topics(slug) + logger.info(" ⎪ %d pages collected " % len(shouts.keys())) end = time.time() logger.info(" ⎪ update_pages took %fs " % (end - start)) except Exception: import traceback + traceback.print_exc() @staticmethod async def get_facts(): self = ViewedStorage - facts = [] + self.facts = [] try: if self.client: async with self.lock: - facts = await self.client.execute(load_facts) + self.facts = await asyncio.to_thread(self.client.execute, load_pages) except Exception as er: logger.error(f" - get_facts error: {er}") - return facts or [] + return self.facts or [] @staticmethod - async def get_shout(shout_slug): + async def get_shout(shout_slug) -> int: """getting shout views metric by slug""" self = ViewedStorage async with self.lock: return self.by_shouts.get(shout_slug, 0) @staticmethod - async def get_reaction(shout_slug, reaction_id): - """getting reaction views metric by slug""" + async def get_shout_media(shout_slug) -> Dict[str, int]: + """getting shout plays metric by slug""" self = ViewedStorage async with self.lock: - return self.by_reactions.get(shout_slug, {}).get(reaction_id, 0) + return self.by_shouts.get(shout_slug, 0) @staticmethod - async def get_topic(topic_slug): + async def get_topic(topic_slug) -> int: """getting topic views value summed""" self = ViewedStorage topic_views = 0 @@ -154,11 +177,22 @@ class ViewedStorage: topic_views += self.by_topics[topic_slug].get(shout_slug, 0) return topic_views + @staticmethod + async def get_authors(author_slug) -> int: + """getting author views value summed""" + self = ViewedStorage + author_views = 0 + async with self.lock: + for shout_slug in self.by_authors.get(author_slug, {}).keys(): + author_views += self.by_authors[author_slug].get(shout_slug, 0) + return author_views + @staticmethod def update_topics(shout_slug): """updates topics counters by shout slug""" self = ViewedStorage with local_session() as session: + # grouped by topics for [_shout_topic, topic] in ( session.query(ShoutTopic, Topic).join(Topic).join(Shout).where(Shout.slug == shout_slug).all() ): @@ -166,21 +200,75 @@ class ViewedStorage: self.by_topics[topic.slug] = {} self.by_topics[topic.slug][shout_slug] = self.by_shouts[shout_slug] - @staticmethod - async def increment(shout_slug, amount=1, viewer="ackee"): - """the only way to change views counter""" - self = ViewedStorage - async with self.lock: - self.by_shouts[shout_slug] = self.by_shouts.get(shout_slug, 0) + amount - self.update_topics(shout_slug) + # grouped by authors + for [_shout_author, author] in ( + session.query(ShoutAuthor, Author).join(Author).join(Shout).where(Shout.slug == shout_slug).all() + ): + if not self.by_authors.get(author.slug): + self.by_authors[author.slug] = {} + self.by_authors[author.slug][shout_slug] = self.by_shouts[shout_slug] @staticmethod - async def increment_reaction(shout_slug, reaction_id, amount=1, viewer="ackee"): - """the only way to change views counter""" + async def increment(shout_slug): + """the proper way to change counter""" + resource = ackee_site + shout_slug self = ViewedStorage async with self.lock: - self.by_reactions[shout_slug][reaction_id] = self.by_reactions[shout_slug].get(reaction_id, 0) + amount + self.by_shouts[shout_slug] = self.by_shouts.get(shout_slug, 0) + 1 self.update_topics(shout_slug) + variables = {"domainId": domain_id, "input": {"siteLocation": resource}} + if self.client: + try: + await asyncio.to_thread(self.client.execute, create_record_mutation, variables) + except Exception as e: + logger.error(f"Error during threaded execution: {e}") + + @staticmethod + async def increment_amount(shout_slug, amount): + """the migration way to change counter with batching""" + resource = ackee_site + shout_slug + self = ViewedStorage + + gql_string = "" + batch_size = 100 + if not isinstance(amount, int): + try: + amount = int(amount) + if not isinstance(amount, int): + amount = 1 + except: + pass + + self.by_shouts[shout_slug] = self.by_shouts.get(shout_slug, 0) + amount + self.update_topics(shout_slug) + logger.info(f"{int(amount/100) + 1} requests") + for i in range(amount): + alias = f"mutation{i + 1}" + gql_string += f"{alias}: {create_record_mutation_string + .replace('$domainId', f'"{domain_id}"') + .replace('$input', f'{{siteLocation: "{resource}"}}') + }\n" + # Execute the batch every 100 records + if (i + 1) % batch_size == 0 or (i + 1) == amount: + await self.exec(f"mutation {{\n{gql_string}\n}}") + gql_string = "" # Reset the gql_string for the next batch + # Throttle the requests to 3 per second + await asyncio.sleep(1 / 3) + + + logger.info(f"Incremented {amount} records for shout_slug: {shout_slug}") + + + @staticmethod + async def exec(gql_string: str): + self = ViewedStorage + async with self.lock: + if self.client: + try: + await asyncio.to_thread(self.client.execute, gql(gql_string)) + except Exception as e: + logger.error(f"Error during threaded execution: {e}") + @staticmethod async def worker(): @@ -192,7 +280,7 @@ class ViewedStorage: while True: try: - logger.info(" - updating views...") + logger.info(" - updating records...") await self.update_pages() failed = 0 except Exception: