Files
core/services/viewed.py
Untone dcdb6c7b30
All checks were successful
Deploy on push / deploy (push) Successful in 2m54s
lesslogs2
2025-09-28 17:36:04 +03:00

427 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
import time
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import ClassVar
# ga
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import (
DateRange,
Dimension,
Metric,
RunReportRequest,
)
from google.analytics.data_v1beta.types import Filter as GAFilter
from orm.author import Author
from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic
from storage.db import local_session
from storage.redis import redis
from utils.logger import root_logger as logger
GOOGLE_KEYFILE_PATH = os.environ.get("GOOGLE_KEYFILE_PATH", "/dump/google-service.json")
GOOGLE_PROPERTY_ID = os.environ.get("GOOGLE_PROPERTY_ID", "")
class ViewedStorage:
"""
Класс для хранения и доступа к данным о просмотрах.
Использует Redis в качестве основного хранилища и Google Analytics для сбора новых данных.
"""
lock = asyncio.Lock()
views_by_shout: ClassVar[dict] = {}
shouts_by_topic: ClassVar[dict] = {}
shouts_by_author: ClassVar[dict] = {}
views = None
period = 60 * 60 # каждый час
analytics_client: BetaAnalyticsDataClient | None = None
auth_result = None
running = False
redis_views_key = None
last_update_timestamp = 0
start_date = datetime.now(tz=UTC).strftime("%Y-%m-%d")
_background_task: asyncio.Task | None = None
@staticmethod
async def init() -> None:
"""Подключение к клиенту Google Analytics и загрузка данных о просмотрах из Redis"""
self = ViewedStorage
async with self.lock:
# Загрузка предварительно подсчитанных просмотров из Redis
await self.load_views_from_redis()
os.environ.setdefault("GOOGLE_APPLICATION_CREDENTIALS", GOOGLE_KEYFILE_PATH)
if GOOGLE_KEYFILE_PATH and Path(GOOGLE_KEYFILE_PATH).is_file():
# Using a default constructor instructs the client to use the credentials
# specified in GOOGLE_APPLICATION_CREDENTIALS environment variable.
self.analytics_client = BetaAnalyticsDataClient()
logger.info(" * Google Analytics credentials accepted")
# Запуск фоновой задачи
task = asyncio.create_task(self.worker())
# Store reference to prevent garbage collection
self._background_task = task
else:
logger.warning(" * please, add Google Analytics credentials file")
self.running = False
@staticmethod
async def load_views_from_redis() -> None:
"""Загрузка предварительно подсчитанных просмотров из Redis"""
self = ViewedStorage
# Подключаемся к Redis если соединение не установлено
try:
if not await redis.ping():
await redis.connect()
except Exception as e:
logger.warning(f"Redis connection check failed: {e}")
# Try to connect anyway
await redis.connect()
# Логируем настройки Redis соединения
logger.info("* Redis connected")
# Получаем список всех ключей migrated_views_* и находим самый последний
keys = await redis.execute("KEYS", "migrated_views_*")
if keys is None:
keys = []
logger.warning("Redis KEYS command returned None, treating as empty list")
logger.info("Raw Redis result for 'KEYS migrated_views_*': %d", len(keys))
# Декодируем байтовые строки, если есть
if keys and isinstance(keys[0], bytes):
keys = [k.decode("utf-8") for k in keys]
logger.info("Decoded keys: %s", keys)
if not keys:
logger.info(" * No migrated_views keys found in Redis - views will be 0")
return
# Фильтруем только ключи timestamp формата (исключаем migrated_views_slugs)
timestamp_keys = [k for k in keys if k != "migrated_views_slugs"]
logger.info("Timestamp keys after filtering: %s", timestamp_keys)
if not timestamp_keys:
logger.info(" * No migrated_views timestamp keys found in Redis - views will be 0")
return
# Сортируем по времени создания (в названии ключа) и берем последний
timestamp_keys.sort()
latest_key = timestamp_keys[-1]
self.redis_views_key = latest_key
logger.info("Selected latest key: %s", latest_key)
# Получаем метку времени создания для установки start_date
timestamp = await redis.execute("HGET", latest_key, "_timestamp")
if timestamp:
self.last_update_timestamp = int(timestamp)
timestamp_dt = datetime.fromtimestamp(int(timestamp), tz=UTC)
self.start_date = timestamp_dt.strftime("%Y-%m-%d")
# Если данные сегодняшние, считаем их актуальными
now_date = datetime.now(tz=UTC).strftime("%Y-%m-%d")
if now_date == self.start_date:
logger.info(" * Views data is up to date!")
else:
logger.warning("Views data is from %s, may need update", self.start_date)
# 🔎 ЗАГРУЖАЕМ ДАННЫЕ из Redis в views_by_shout
logger.info("🔍 Loading views data from Redis key: %s", latest_key)
# Получаем все данные из hash
views_data = await redis.execute("HGETALL", latest_key)
if views_data and len(views_data) > 0:
# Преобразуем список [key1, value1, key2, value2] в словарь
views_dict = {}
try:
# Проверяем что views_data это словарь или список
if isinstance(views_data, dict):
# Если это уже словарь
for key, value in views_data.items():
key_str = key.decode("utf-8") if isinstance(key, bytes) else str(key)
value_str = value.decode("utf-8") if isinstance(value, bytes) else str(value)
if not key_str.startswith("_"):
try:
views_dict[key_str] = int(value_str)
except (ValueError, TypeError):
logger.warning(f"🔍 Invalid views value for {key_str}: {value_str}")
elif isinstance(views_data, list | tuple):
# Если это список [key1, value1, key2, value2]
for i in range(0, len(views_data), 2):
if i + 1 < len(views_data):
key = (
views_data[i].decode("utf-8")
if isinstance(views_data[i], bytes)
else str(views_data[i])
)
value = (
views_data[i + 1].decode("utf-8")
if isinstance(views_data[i + 1], bytes)
else str(views_data[i + 1])
)
# Пропускаем служебные ключи
if not key.startswith("_"):
try:
views_dict[key] = int(value)
except (ValueError, TypeError):
logger.warning(f"🔍 Invalid views value for {key}: {value}")
else:
logger.warning(f"🔍 Unexpected Redis data format: {type(views_data)}")
# Загружаем данные в класс
self.views_by_shout.update(views_dict)
logger.info("🔍 Loaded %d shouts with views from Redis", len(views_dict))
# Показываем образцы загруженных данных только если есть данные
if views_dict:
sample_items = list(views_dict.items())[:3]
logger.info("🔍 Sample loaded data: %s", sample_items)
else:
logger.debug("🔍 No valid views data found in Redis hash - views will be 0")
except Exception as e:
logger.warning(f"🔍 Error parsing Redis views data: {e} - views will be 0")
else:
logger.debug("🔍 Redis hash is empty for key: %s - views will be 0", latest_key)
# Выводим информацию о количестве загруженных записей
total_entries = await redis.execute("HGET", latest_key, "_total")
if total_entries:
logger.info("%s shouts with views loaded from Redis key: %s", total_entries, latest_key)
logger.info("Found migrated_views keys: %s", keys)
# noinspection PyTypeChecker
@staticmethod
async def update_pages() -> None:
"""Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров"""
self = ViewedStorage
logger.info(" ⎧ views update from Google Analytics ---")
if self.running:
try:
start = time.time()
async with self.lock:
if self.analytics_client:
request = RunReportRequest(
property=f"properties/{GOOGLE_PROPERTY_ID}",
dimensions=[Dimension(name="pagePath")],
metrics=[Metric(name="screenPageViews")],
date_ranges=[DateRange(start_date=self.start_date, end_date="today")],
)
response = self.analytics_client.run_report(request)
if response and isinstance(response.rows, list):
slugs = set()
for row in response.rows:
print(
row.dimension_values[0].value,
row.metric_values[0].value,
)
# Извлечение путей страниц из ответа Google Analytics
if isinstance(row.dimension_values, list):
page_path = row.dimension_values[0].value
slug = page_path.split("discours.io/")[-1]
fresh_views = int(row.metric_values[0].value)
# Обновление данных в хранилище
self.views_by_shout[slug] = self.views_by_shout.get(slug, 0)
self.views_by_shout[slug] += fresh_views
self.update_topics(slug)
# Запись путей страниц для логирования
slugs.add(slug)
logger.info("collected pages: %d", len(slugs))
end = time.time()
logger.info("views update time: %.2fs", end - start)
except (ConnectionError, TimeoutError, ValueError) as error:
logger.error(error)
self.running = False
@staticmethod
def get_shout(shout_slug: str = "", shout_id: int = 0) -> int:
"""
🔎 Синхронное получение метрики просмотров shout по slug или id из кеша.
Использует кешированные данные из views_by_shout (in-memory кеш).
Для обновления данных используется асинхронный фоновый процесс.
Args:
shout_slug: Slug публикации
shout_id: ID публикации
Returns:
int: Количество просмотров из кеша
"""
self = ViewedStorage
# 🔍 DEBUG: Логируем только если кеш пустой и это первый запрос
cache_size = len(self.views_by_shout)
if cache_size == 0 and shout_slug:
logger.debug(f"🔍 ViewedStorage cache is empty for slug '{shout_slug}'")
# 🔎 Используем только in-memory кеш для быстрого доступа
if shout_slug:
views = self.views_by_shout.get(shout_slug, 0)
if views > 0:
# logger.debug(f"🔍 Found {views} views for slug '{shout_slug}'")
pass
return views
# 🔎 Для ID ищем slug в БД и затем получаем views_count
if shout_id:
try:
with local_session() as session:
from orm.shout import Shout
shout = session.query(Shout).where(Shout.id == shout_id).first()
if shout and shout.slug:
views = self.views_by_shout.get(shout.slug, 0)
logger.debug(f"🔍 Found slug '{shout.slug}' for id {shout_id}, views: {views}")
return views
logger.debug(f"🔍 No shout found with id {shout_id} or missing slug")
except Exception as e:
logger.warning(f"Failed to get shout slug for id {shout_id}: {e}")
return 0
logger.debug("🔍 get_shout called without slug or id")
return 0
@staticmethod
async def get_shout_media(shout_slug: str) -> dict[str, int]:
"""Получение метрики воспроизведения shout по slug."""
self = ViewedStorage
# TODO: get media plays from Google Analytics
return self.views_by_shout.get(shout_slug, 0)
@staticmethod
def get_topic(topic_slug: str) -> int:
"""Получение суммарного значения просмотров темы."""
self = ViewedStorage
views_count = 0
for shout_slug in self.shouts_by_topic.get(topic_slug, []):
views_count += self.get_shout(shout_slug=shout_slug)
return views_count
@staticmethod
def get_author(author_slug: str) -> int:
"""Получение суммарного значения просмотров автора."""
self = ViewedStorage
views_count = 0
for shout_slug in self.shouts_by_author.get(author_slug, []):
views_count += self.get_shout(shout_slug=shout_slug)
return views_count
@staticmethod
def update_topics(shout_slug: str) -> None:
"""Обновление счетчиков темы по slug shout"""
self = ViewedStorage
with local_session() as session:
# Определение вспомогательной функции для избежания повторения кода
def update_groups(dictionary: dict, key: str, value: str) -> None:
dictionary[key] = list({*dictionary.get(key, []), value})
# Обновление тем и авторов с использованием вспомогательной функции
for [_st, topic] in (
session.query(ShoutTopic, Topic).join(Topic).join(Shout).where(Shout.slug == shout_slug).all()
):
update_groups(self.shouts_by_topic, topic.slug, shout_slug)
for [_st, author] in (
session.query(ShoutAuthor, Author).join(Author).join(Shout).where(Shout.slug == shout_slug).all()
):
update_groups(self.shouts_by_author, author.slug, shout_slug)
@staticmethod
async def stop() -> None:
"""Остановка фоновой задачи"""
self = ViewedStorage
async with self.lock:
self.running = False
logger.info("ViewedStorage worker was stopped.")
@staticmethod
async def worker() -> None:
"""Асинхронная задача обновления"""
failed = 0
self = ViewedStorage
while self.running:
try:
await self.update_pages()
failed = 0
except (ConnectionError, TimeoutError, ValueError) as exc:
failed += 1
logger.debug(exc)
logger.info("update failed #%d, wait 10 secs", failed)
if failed > 3:
logger.info(" - views update failed, not trying anymore")
self.running = False
break
if failed == 0:
when = datetime.now(UTC) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat())
logger.info(" ⎩ next update: %s", t.split("T")[0] + " " + t.split("T")[1].split(".")[0])
await asyncio.sleep(self.period)
else:
await asyncio.sleep(10)
logger.info(" - try to update views again")
@staticmethod
async def update_slug_views(slug: str) -> int:
"""
Получает fresh статистику просмотров для указанного slug.
Args:
slug: Идентификатор страницы
Returns:
int: Количество просмотров
"""
self = ViewedStorage
if not self.analytics_client:
logger.warning("Google Analytics client not initialized")
return 0
try:
# Создаем фильтр для точного совпадения конца URL
request = RunReportRequest(
property=f"properties/{GOOGLE_PROPERTY_ID}",
date_ranges=[DateRange(start_date=self.start_date, end_date="today")],
dimensions=[Dimension(name="pagePath")],
dimension_filter=GAFilter(
field_name="pagePath",
string_filter=GAFilter.StringFilter(
value=f".*/{slug}$", # Используем регулярное выражение для точного совпадения конца URL
match_type=GAFilter.StringFilter.MatchType.FULL_REGEXP,
case_sensitive=False, # Включаем чувствительность к регистру для точности
),
),
metrics=[Metric(name="screenPageViews")],
)
response = self.analytics_client.run_report(request)
if not response.rows:
return 0
views = int(response.rows[0].metric_values[0].value)
except (ConnectionError, ValueError, AttributeError):
logger.exception("Google Analytics API Error")
return 0
else:
# Кэшируем результат
self.views_by_shout[slug] = views
return views