This commit is contained in:
parent
8f690af6ef
commit
033a8b6534
|
@ -1,8 +1,9 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
|
from logging import Logger
|
||||||
import time
|
import time
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from os import environ
|
from os import environ
|
||||||
|
import logging
|
||||||
from gql import Client, gql
|
from gql import Client, gql
|
||||||
from gql.transport.aiohttp import AIOHTTPTransport
|
from gql.transport.aiohttp import AIOHTTPTransport
|
||||||
|
|
||||||
|
@ -10,6 +11,12 @@ from orm.shout import Shout, ShoutTopic
|
||||||
from orm.topic import Topic
|
from orm.topic import Topic
|
||||||
from services.db import local_session
|
from services.db import local_session
|
||||||
|
|
||||||
|
|
||||||
|
logging.basicConfig()
|
||||||
|
logger = logging.getLogger("\t[services.viewed]\t")
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
load_facts = gql(
|
load_facts = gql(
|
||||||
""" query getDomains {
|
""" query getDomains {
|
||||||
domains {
|
domains {
|
||||||
|
@ -71,37 +78,39 @@ class ViewedStorage:
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
if token:
|
if token:
|
||||||
self.client = create_client({"Authorization": f"Bearer {token}"}, schema=schema_str)
|
self.client = create_client({"Authorization": f"Bearer {token}"}, schema=schema_str)
|
||||||
print("[services.viewed] * authorized permanently by ackee.discours.io: %s" % token)
|
logger.info(" * authorized permanently by ackee.discours.io: %s" % token)
|
||||||
|
|
||||||
views_stat_task = asyncio.create_task(self.worker())
|
views_stat_task = asyncio.create_task(self.worker())
|
||||||
print(views_stat_task)
|
logger.info(views_stat_task)
|
||||||
else:
|
else:
|
||||||
print("[services.viewed] * please set ACKEE_TOKEN")
|
logger.info(" * please set ACKEE_TOKEN")
|
||||||
self.disabled = True
|
self.disabled = True
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def update_pages():
|
async def update_pages():
|
||||||
"""query all the pages from ackee sorted by views count"""
|
"""query all the pages from ackee sorted by views count"""
|
||||||
print("[services.viewed] ⎧ updating ackee pages data ---")
|
logger.info(" ⎧ updating ackee pages data ---")
|
||||||
start = time.time()
|
start = time.time()
|
||||||
self = ViewedStorage
|
self = ViewedStorage
|
||||||
try:
|
if self.client:
|
||||||
if self.client:
|
self.pages = self.client.execute(load_pages)
|
||||||
self.pages = self.client.execute(load_pages)
|
domains = self.pages.get("domains", [])
|
||||||
self.pages = self.pages["domains"][0]["statistics"]["pages"]
|
logger.debug(f" | domains: {domains}")
|
||||||
shouts = {}
|
for domain in domains:
|
||||||
for page in self.pages:
|
pages = domain.get("statistics", {}).get("pages", [])
|
||||||
p = page["value"].split("?")[0]
|
if pages:
|
||||||
slug = p.split("discours.io/")[-1]
|
logger.debug(f" | pages: {pages}")
|
||||||
shouts[slug] = page["count"]
|
shouts = {}
|
||||||
for slug in shouts.keys():
|
for page in pages:
|
||||||
await ViewedStorage.increment(slug, shouts[slug])
|
p = page["value"].split("?")[0]
|
||||||
print("[services.viewed] ⎪ %d pages collected " % len(shouts.keys()))
|
slug = p.split("discours.io/")[-1]
|
||||||
except Exception as e:
|
shouts[slug] = page["count"]
|
||||||
raise Exception(e)
|
for slug in shouts.keys():
|
||||||
|
await ViewedStorage.increment(slug, shouts[slug])
|
||||||
|
logger.info(" ⎪ %d pages collected " % len(shouts.keys()))
|
||||||
|
|
||||||
end = time.time()
|
end = time.time()
|
||||||
print("[services.viewed] ⎪ update_pages took %fs " % (end - start))
|
logger.info(" ⎪ update_pages took %fs " % (end - start))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def get_facts():
|
async def get_facts():
|
||||||
|
@ -112,7 +121,7 @@ class ViewedStorage:
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
facts = self.client.execute(load_facts)
|
facts = self.client.execute(load_facts)
|
||||||
except Exception as er:
|
except Exception as er:
|
||||||
print(f"[services.viewed] get_facts error: {er}")
|
logger.error(f" - get_facts error: {er}")
|
||||||
return facts or []
|
return facts or []
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -177,20 +186,20 @@ class ViewedStorage:
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
print("[services.viewed] - updating views...")
|
logger.info(" - updating views...")
|
||||||
await self.update_pages()
|
await self.update_pages()
|
||||||
failed = 0
|
failed = 0
|
||||||
except Exception:
|
except Exception:
|
||||||
failed += 1
|
failed += 1
|
||||||
print("[services.viewed] - update failed #%d, wait 10 seconds" % failed)
|
logger.info(" - update failed #%d, wait 10 seconds" % failed)
|
||||||
if failed > 3:
|
if failed > 3:
|
||||||
print("[services.viewed] - not trying to update anymore")
|
logger.info(" - not trying to update anymore")
|
||||||
break
|
break
|
||||||
if failed == 0:
|
if failed == 0:
|
||||||
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
|
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
|
||||||
t = format(when.astimezone().isoformat())
|
t = format(when.astimezone().isoformat())
|
||||||
print("[services.viewed] ⎩ next update: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0]))
|
logger.info(" ⎩ next update: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0]))
|
||||||
await asyncio.sleep(self.period)
|
await asyncio.sleep(self.period)
|
||||||
else:
|
else:
|
||||||
await asyncio.sleep(10)
|
await asyncio.sleep(10)
|
||||||
print("[services.viewed] - trying to update data again")
|
logger.info(" - trying to update data again")
|
||||||
|
|
Loading…
Reference in New Issue
Block a user