precounted-views-import

This commit is contained in:
Untone 2024-01-22 19:17:39 +03:00
parent f08a00e3c2
commit ff6637a51e

View File

@ -3,6 +3,7 @@ import threading
from typing import Dict from typing import Dict
from logging import Logger from logging import Logger
import time import time
import json
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from os import environ from os import environ
import logging import logging
@ -102,12 +103,26 @@ class ViewedStorage:
self.client = create_client({"Authorization": f"Bearer {token}"}, schema=schema_str) self.client = create_client({"Authorization": f"Bearer {token}"}, schema=schema_str)
logger.info(" * authorized permanently by ackee.discours.io: %s" % token) logger.info(" * authorized permanently by ackee.discours.io: %s" % token)
# Load pre-counted views from the JSON file
self.load_precounted_views()
views_stat_task = asyncio.create_task(self.worker()) views_stat_task = asyncio.create_task(self.worker())
logger.info(views_stat_task) logger.info(views_stat_task)
else: else:
logger.info(" * please set ACKEE_TOKEN") logger.info(" * please set ACKEE_TOKEN")
self.disabled = True self.disabled = True
@staticmethod
def load_precounted_views():
self = ViewedStorage
try:
with open("/dump/views.json", "r") as file:
precounted_views = json.load(file)
self.by_shouts.update(precounted_views)
logger.info(f" * {len(precounted_views)} pre-counted views loaded successfully.")
except Exception as e:
logger.error(f"Error loading pre-counted views: {e}")
@staticmethod @staticmethod
async def update_pages(): async def update_pages():
"""query all the pages from ackee sorted by views count""" """query all the pages from ackee sorted by views count"""
@ -225,53 +240,6 @@ class ViewedStorage:
except Exception as e: except Exception as e:
logger.error(f"Error during threaded execution: {e}") logger.error(f"Error during threaded execution: {e}")
@staticmethod
async def increment_amount(shout_slug, amount):
"""the migration way to change counter with batching"""
resource = ackee_site + shout_slug
self = ViewedStorage
gql_string = ""
batch_size = 100
if not isinstance(amount, int):
try:
amount = int(amount)
if not isinstance(amount, int):
amount = 1
except:
pass
self.by_shouts[shout_slug] = self.by_shouts.get(shout_slug, 0) + amount
self.update_topics(shout_slug)
logger.info(f"{int(amount/100) + 1} requests")
for i in range(amount):
alias = f"mutation{i + 1}"
gql_string += f"{alias}: {create_record_mutation_string
.replace('$domainId', f'"{domain_id}"')
.replace('$input', f'{{siteLocation: "{resource}"}}')
}\n"
# Execute the batch every 100 records
if (i + 1) % batch_size == 0 or (i + 1) == amount:
await self.exec(f"mutation {{\n{gql_string}\n}}")
gql_string = "" # Reset the gql_string for the next batch
# Throttle the requests to 3 per second
await asyncio.sleep(1 / 3)
logger.info(f"Incremented {amount} records for shout_slug: {shout_slug}")
@staticmethod
async def exec(gql_string: str):
self = ViewedStorage
async with self.lock:
if self.client:
try:
await asyncio.to_thread(self.client.execute, gql(gql_string))
except Exception as e:
logger.error(f"Error during threaded execution: {e}")
@staticmethod @staticmethod
async def worker(): async def worker():
"""async task worker""" """async task worker"""