sync-viewed-stat
Some checks failed
Deploy on push / deploy (push) Failing after 10s

This commit is contained in:
Untone 2024-08-07 13:15:58 +03:00
parent 9c7a62c384
commit 57d25b637d
4 changed files with 34 additions and 27 deletions

View File

@ -275,6 +275,7 @@ async def update_reaction(_, info, reaction):
session.commit() session.commit()
r.stat = { r.stat = {
# FIXME: "viewed": ViewedStorage.get_shout(r.shuot), sure, it is possible to collect reaction vews
"commented": commented_stat, "commented": commented_stat,
"rating": rating_stat, "rating": rating_stat,
} }

View File

@ -21,6 +21,7 @@ from services.db import local_session
from utils.logger import root_logger as logger from utils.logger import root_logger as logger
from services.schema import query from services.schema import query
from services.search import search_text from services.search import search_text
from services.viewed import ViewedStorage
def query_shouts(): def query_shouts():
@ -94,10 +95,14 @@ def get_shouts_with_stats(q, limit, offset=0, author_id=None):
:return: Список публикаций с включенной статистикой. :return: Список публикаций с включенной статистикой.
""" """
# Основной запрос для получения публикаций и объединения их с подзапросами # Основной запрос для получения публикаций и объединения их с подзапросами
q = q.options( q = (
selectinload(Shout.authors), # Eagerly load authors q.options(
selectinload(Shout.topics) # Eagerly load topics selectinload(Shout.authors), # Eagerly load authors
).limit(limit).offset(offset) selectinload(Shout.topics), # Eagerly load topics
)
.limit(limit)
.offset(offset)
)
# Выполнение запроса и обработка результатов # Выполнение запроса и обработка результатов
with local_session() as session: with local_session() as session:
@ -109,7 +114,7 @@ def get_shouts_with_stats(q, limit, offset=0, author_id=None):
shout.authors = authors or [] shout.authors = authors or []
shout.topics = topics or [] shout.topics = topics or []
shout.stat = { shout.stat = {
"viewed": 0, # FIXME: use separate resolver "viewed": ViewedStorage.get_shout(shout.id),
"followers": 0, # FIXME: implement followers_stat "followers": 0, # FIXME: implement followers_stat
"rating": rating_stat or 0, "rating": rating_stat or 0,
"commented": comments_stat or 0, "commented": comments_stat or 0,
@ -215,6 +220,7 @@ async def get_shout(_, info, slug: str):
[shout, commented_stat, rating_stat, last_reaction_at, authors, topics] = results [shout, commented_stat, rating_stat, last_reaction_at, authors, topics] = results
shout.stat = { shout.stat = {
"viewed": ViewedStorage.get_shout(shout.id),
"commented": commented_stat, "commented": commented_stat,
"rating": rating_stat, "rating": rating_stat,
"last_reacted_at": last_reaction_at, "last_reacted_at": last_reaction_at,

View File

@ -94,7 +94,8 @@ type Shout {
type Stat { type Stat {
rating: Int rating: Int
commented: Int commented: Int
followers: Int viewed: Int
followed: Int
last_reacted_at: Int last_reacted_at: Int
} }

View File

@ -1,9 +1,9 @@
import asyncio
import json import json
import os import os
import time import time
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Dict from typing import Dict
from threading import Lock
# ga # ga
from google.analytics.data_v1beta import BetaAnalyticsDataClient from google.analytics.data_v1beta import BetaAnalyticsDataClient
@ -21,7 +21,7 @@ VIEWS_FILEPATH = "/dump/views.json"
class ViewedStorage: class ViewedStorage:
lock = asyncio.Lock() lock = Lock()
views_by_shout = {} views_by_shout = {}
shouts_by_topic = {} shouts_by_topic = {}
shouts_by_author = {} shouts_by_author = {}
@ -33,10 +33,10 @@ class ViewedStorage:
start_date = datetime.now().strftime("%Y-%m-%d") start_date = datetime.now().strftime("%Y-%m-%d")
@staticmethod @staticmethod
async def init(): def init():
"""Подключение к клиенту Google Analytics с использованием аутентификации""" """Подключение к клиенту Google Analytics с использованием аутентификации"""
self = ViewedStorage self = ViewedStorage
async with self.lock: with self.lock:
# Загрузка предварительно подсчитанных просмотров из файла JSON # Загрузка предварительно подсчитанных просмотров из файла JSON
self.load_precounted_views() self.load_precounted_views()
@ -48,7 +48,7 @@ class ViewedStorage:
logger.info(" * Клиент Google Analytics успешно авторизован") logger.info(" * Клиент Google Analytics успешно авторизован")
# Запуск фоновой задачи # Запуск фоновой задачи
_task = asyncio.create_task(self.worker()) self.worker()
else: else:
logger.info(" * Пожалуйста, добавьте ключевой файл Google Analytics") logger.info(" * Пожалуйста, добавьте ключевой файл Google Analytics")
self.disabled = True self.disabled = True
@ -78,16 +78,15 @@ class ViewedStorage:
except Exception as e: except Exception as e:
logger.error(f"Ошибка загрузки предварительно подсчитанных просмотров: {e}") logger.error(f"Ошибка загрузки предварительно подсчитанных просмотров: {e}")
# noinspection PyTypeChecker
@staticmethod @staticmethod
async def update_pages(): def update_pages():
"""Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров""" """Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров"""
self = ViewedStorage self = ViewedStorage
logger.info(" ⎧ Обновление данных просмотров от Google Analytics ---") logger.info(" ⎧ Обновление данных просмотров от Google Analytics ---")
if not self.disabled: if not self.disabled:
try: try:
start = time.time() start = time.time()
async with self.lock: with self.lock:
if self.analytics_client: if self.analytics_client:
request = RunReportRequest( request = RunReportRequest(
property=f"properties/{GOOGLE_PROPERTY_ID}", property=f"properties/{GOOGLE_PROPERTY_ID}",
@ -126,35 +125,35 @@ class ViewedStorage:
self.disabled = True self.disabled = True
@staticmethod @staticmethod
async def get_shout(shout_slug) -> int: def get_shout(shout_slug) -> int:
"""Получение метрики просмотров shout по slug""" """Получение метрики просмотров shout по slug"""
self = ViewedStorage self = ViewedStorage
async with self.lock: with self.lock:
return self.views_by_shout.get(shout_slug, 0) return self.views_by_shout.get(shout_slug, 0)
@staticmethod @staticmethod
async def get_shout_media(shout_slug) -> Dict[str, int]: def get_shout_media(shout_slug) -> Dict[str, int]:
"""Получение метрики воспроизведения shout по slug""" """Получение метрики воспроизведения shout по slug"""
self = ViewedStorage self = ViewedStorage
async with self.lock: with self.lock:
return self.views_by_shout.get(shout_slug, 0) return self.views_by_shout.get(shout_slug, 0)
@staticmethod @staticmethod
async def get_topic(topic_slug) -> int: def get_topic(topic_slug) -> int:
"""Получение суммарного значения просмотров темы""" """Получение суммарного значения просмотров темы"""
self = ViewedStorage self = ViewedStorage
topic_views = 0 topic_views = 0
async with self.lock: with self.lock:
for shout_slug in self.shouts_by_topic.get(topic_slug, []): for shout_slug in self.shouts_by_topic.get(topic_slug, []):
topic_views += self.views_by_shout.get(shout_slug, 0) topic_views += self.views_by_shout.get(shout_slug, 0)
return topic_views return topic_views
@staticmethod @staticmethod
async def get_author(author_slug) -> int: def get_author(author_slug) -> int:
"""Получение суммарного значения просмотров автора""" """Получение суммарного значения просмотров автора"""
self = ViewedStorage self = ViewedStorage
author_views = 0 author_views = 0
async with self.lock: with self.lock:
for shout_slug in self.shouts_by_author.get(author_slug, []): for shout_slug in self.shouts_by_author.get(author_slug, []):
author_views += self.views_by_shout.get(shout_slug, 0) author_views += self.views_by_shout.get(shout_slug, 0)
return author_views return author_views
@ -180,8 +179,8 @@ class ViewedStorage:
update_groups(self.shouts_by_author, author.slug, shout_slug) update_groups(self.shouts_by_author, author.slug, shout_slug)
@staticmethod @staticmethod
async def worker(): def worker():
"""Асинхронная задача обновления""" """Задача обновления"""
failed = 0 failed = 0
self = ViewedStorage self = ViewedStorage
if self.disabled: if self.disabled:
@ -189,7 +188,7 @@ class ViewedStorage:
while True: while True:
try: try:
await self.update_pages() self.update_pages()
failed = 0 failed = 0
except Exception as exc: except Exception as exc:
failed += 1 failed += 1
@ -204,7 +203,7 @@ class ViewedStorage:
logger.info( logger.info(
" ⎩ Следующее обновление: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0]) " ⎩ Следующее обновление: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])
) )
await asyncio.sleep(self.period) time.sleep(self.period)
else: else:
await asyncio.sleep(10) time.sleep(10)
logger.info(" - Попытка снова обновить данные") logger.info(" - Попытка снова обновить данные")