core/services/viewed.py

321 lines
14 KiB
Python
Raw Normal View History

2024-08-07 10:25:48 +00:00
import asyncio
2024-01-25 19:41:27 +00:00
import os
import time
2023-12-17 20:30:20 +00:00
from datetime import datetime, timedelta, timezone
2025-04-14 16:53:14 +00:00
from typing import Dict, Optional
2025-03-20 08:55:21 +00:00
2024-08-07 10:37:08 +00:00
# ga
2024-01-28 13:26:40 +00:00
from google.analytics.data_v1beta import BetaAnalyticsDataClient
2025-02-11 09:00:35 +00:00
from google.analytics.data_v1beta.types import (
DateRange,
Dimension,
Metric,
RunReportRequest,
)
2024-12-11 23:03:19 +00:00
from google.analytics.data_v1beta.types import Filter as GAFilter
2022-11-21 22:23:16 +00:00
2024-01-23 13:04:38 +00:00
from orm.author import Author
from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic
from services.db import local_session
2025-04-14 16:53:14 +00:00
from services.redis import redis
2024-08-07 10:37:50 +00:00
from utils.logger import root_logger as logger
2024-01-23 13:04:38 +00:00
2024-08-07 10:37:08 +00:00
GOOGLE_KEYFILE_PATH = os.environ.get("GOOGLE_KEYFILE_PATH", "/dump/google-service.json")
GOOGLE_PROPERTY_ID = os.environ.get("GOOGLE_PROPERTY_ID", "")
2024-01-28 12:54:38 +00:00
2024-01-13 12:44:56 +00:00
2022-11-18 17:54:37 +00:00
class ViewedStorage:
2025-04-14 16:53:14 +00:00
"""
Класс для хранения и доступа к данным о просмотрах.
Использует Redis в качестве основного хранилища и Google Analytics для сбора новых данных.
"""
2025-04-15 17:16:01 +00:00
2024-08-07 10:37:08 +00:00
lock = asyncio.Lock()
2024-12-11 23:03:19 +00:00
views_by_shout = {}
2024-01-22 18:20:17 +00:00
shouts_by_topic = {}
shouts_by_author = {}
2024-08-07 10:37:08 +00:00
views = None
2024-01-23 13:04:38 +00:00
period = 60 * 60 # каждый час
2025-04-14 16:53:14 +00:00
analytics_client: Optional[BetaAnalyticsDataClient] = None
2024-08-07 10:37:08 +00:00
auth_result = None
2024-11-02 01:44:07 +00:00
running = False
2025-04-14 16:53:14 +00:00
redis_views_key = None
last_update_timestamp = 0
2024-04-26 22:51:45 +00:00
start_date = datetime.now().strftime("%Y-%m-%d")
2022-11-18 17:54:37 +00:00
2022-11-20 07:48:40 +00:00
@staticmethod
2024-08-07 10:25:48 +00:00
async def init():
2025-04-14 16:53:14 +00:00
"""Подключение к клиенту Google Analytics и загрузка данных о просмотрах из Redis"""
2022-11-22 07:29:54 +00:00
self = ViewedStorage
2024-08-07 10:37:08 +00:00
async with self.lock:
2025-04-14 16:53:14 +00:00
# Загрузка предварительно подсчитанных просмотров из Redis
await self.load_views_from_redis()
2024-08-07 10:37:08 +00:00
os.environ.setdefault("GOOGLE_APPLICATION_CREDENTIALS", GOOGLE_KEYFILE_PATH)
if GOOGLE_KEYFILE_PATH and os.path.isfile(GOOGLE_KEYFILE_PATH):
# Using a default constructor instructs the client to use the credentials
# specified in GOOGLE_APPLICATION_CREDENTIALS environment variable.
self.analytics_client = BetaAnalyticsDataClient()
2024-10-23 08:29:44 +00:00
logger.info(" * Google Analytics credentials accepted")
2024-08-07 10:37:08 +00:00
# Запуск фоновой задачи
_task = asyncio.create_task(self.worker())
else:
2024-10-15 08:12:09 +00:00
logger.warning(" * please, add Google Analytics credentials file")
2024-11-02 01:44:07 +00:00
self.running = False
2022-11-20 07:48:40 +00:00
2024-01-22 16:17:39 +00:00
@staticmethod
2025-04-14 16:53:14 +00:00
async def load_views_from_redis():
"""Загрузка предварительно подсчитанных просмотров из Redis"""
2024-01-22 16:17:39 +00:00
self = ViewedStorage
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
# Подключаемся к Redis если соединение не установлено
if not redis._client:
await redis.connect()
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
# Получаем список всех ключей migrated_views_* и находим самый последний
keys = await redis.execute("KEYS", "migrated_views_*")
if not keys:
logger.warning(" * No migrated_views keys found in Redis")
return
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
# Фильтруем только ключи timestamp формата (исключаем migrated_views_slugs)
timestamp_keys = [k for k in keys if k != "migrated_views_slugs"]
if not timestamp_keys:
logger.warning(" * No migrated_views timestamp keys found in Redis")
return
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
# Сортируем по времени создания (в названии ключа) и берем последний
timestamp_keys.sort()
latest_key = timestamp_keys[-1]
self.redis_views_key = latest_key
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
# Получаем метку времени создания для установки start_date
timestamp = await redis.execute("HGET", latest_key, "_timestamp")
if timestamp:
self.last_update_timestamp = int(timestamp)
timestamp_dt = datetime.fromtimestamp(int(timestamp))
self.start_date = timestamp_dt.strftime("%Y-%m-%d")
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
# Если данные сегодняшние, считаем их актуальными
2024-10-15 08:12:09 +00:00
now_date = datetime.now().strftime("%Y-%m-%d")
if now_date == self.start_date:
2025-04-14 16:53:14 +00:00
logger.info(" * Views data is up to date!")
2024-03-12 12:57:46 +00:00
else:
2025-04-14 16:53:14 +00:00
logger.warning(f" * Views data is from {self.start_date}, may need update")
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
# Выводим информацию о количестве загруженных записей
total_entries = await redis.execute("HGET", latest_key, "_total")
if total_entries:
logger.info(f" * {total_entries} shouts with views loaded from Redis key: {latest_key}")
2024-01-22 16:17:39 +00:00
2024-08-07 10:37:08 +00:00
# noinspection PyTypeChecker
2022-11-21 22:23:16 +00:00
@staticmethod
2024-08-07 10:25:48 +00:00
async def update_pages():
2025-04-14 16:53:14 +00:00
"""Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров"""
2022-11-21 22:23:16 +00:00
self = ViewedStorage
2024-10-15 08:12:09 +00:00
logger.info(" ⎧ views update from Google Analytics ---")
2024-11-02 01:44:07 +00:00
if self.running:
2024-01-23 13:04:38 +00:00
try:
start = time.time()
2024-08-07 10:37:08 +00:00
async with self.lock:
if self.analytics_client:
request = RunReportRequest(
property=f"properties/{GOOGLE_PROPERTY_ID}",
dimensions=[Dimension(name="pagePath")],
metrics=[Metric(name="screenPageViews")],
date_ranges=[DateRange(start_date=self.start_date, end_date="today")],
)
response = self.analytics_client.run_report(request)
if response and isinstance(response.rows, list):
slugs = set()
for row in response.rows:
print(
row.dimension_values[0].value,
row.metric_values[0].value,
)
# Извлечение путей страниц из ответа Google Analytics
if isinstance(row.dimension_values, list):
page_path = row.dimension_values[0].value
slug = page_path.split("discours.io/")[-1]
2024-12-11 23:03:19 +00:00
fresh_views = int(row.metric_values[0].value)
2024-08-07 10:37:08 +00:00
# Обновление данных в хранилище
self.views_by_shout[slug] = self.views_by_shout.get(slug, 0)
2024-12-11 23:03:19 +00:00
self.views_by_shout[slug] += fresh_views
2024-08-07 10:37:08 +00:00
self.update_topics(slug)
# Запись путей страниц для логирования
slugs.add(slug)
2024-10-15 08:12:09 +00:00
logger.info(f" ⎪ collected pages: {len(slugs)} ")
2024-08-07 10:37:08 +00:00
end = time.time()
2024-10-15 08:12:09 +00:00
logger.info(" ⎪ views update time: %fs " % (end - start))
2024-01-28 09:03:41 +00:00
except Exception as error:
logger.error(error)
2024-11-02 01:44:07 +00:00
self.running = False
2022-11-18 17:54:37 +00:00
2022-11-19 11:35:34 +00:00
@staticmethod
2025-04-14 16:53:14 +00:00
async def get_shout(shout_slug="", shout_id=0) -> int:
"""
Получение метрики просмотров shout по slug или id.
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
Args:
shout_slug: Slug публикации
shout_id: ID публикации
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
Returns:
int: Количество просмотров
"""
2022-11-19 11:35:34 +00:00
self = ViewedStorage
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
# Получаем данные из Redis для новой схемы хранения
if not redis._client:
await redis.connect()
2025-04-15 17:16:01 +00:00
2024-12-11 23:03:19 +00:00
fresh_views = self.views_by_shout.get(shout_slug, 0)
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
# Если есть id, пытаемся получить данные из Redis по ключу migrated_views_<timestamp>
if shout_id and self.redis_views_key:
precounted_views = await redis.execute("HGET", self.redis_views_key, str(shout_id))
if precounted_views:
return fresh_views + int(precounted_views)
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
# Если нет id или данных, пытаемся получить по slug из отдельного хеша
precounted_views = await redis.execute("HGET", "migrated_views_slugs", shout_slug)
if precounted_views:
return fresh_views + int(precounted_views)
2025-04-15 17:16:01 +00:00
2025-04-14 16:53:14 +00:00
return fresh_views
2023-11-03 10:10:22 +00:00
@staticmethod
2025-04-14 16:53:14 +00:00
async def get_shout_media(shout_slug) -> Dict[str, int]:
2024-08-07 10:30:41 +00:00
"""Получение метрики воспроизведения shout по slug."""
2023-11-03 10:10:22 +00:00
self = ViewedStorage
2024-12-11 23:03:19 +00:00
# TODO: get media plays from Google Analytics
2024-08-07 10:30:41 +00:00
return self.views_by_shout.get(shout_slug, 0)
2022-11-19 11:35:34 +00:00
2022-11-21 05:18:50 +00:00
@staticmethod
2025-04-14 16:53:14 +00:00
async def get_topic(topic_slug) -> int:
2024-08-07 10:30:41 +00:00
"""Получение суммарного значения просмотров темы."""
2022-11-21 05:18:50 +00:00
self = ViewedStorage
2025-04-14 16:53:14 +00:00
views_count = 0
for shout_slug in self.shouts_by_topic.get(topic_slug, []):
views_count += await self.get_shout(shout_slug=shout_slug)
return views_count
2022-11-21 05:18:50 +00:00
2024-01-22 15:42:45 +00:00
@staticmethod
2025-04-14 16:53:14 +00:00
async def get_author(author_slug) -> int:
2024-08-07 10:30:41 +00:00
"""Получение суммарного значения просмотров автора."""
2024-01-22 15:42:45 +00:00
self = ViewedStorage
2025-04-14 16:53:14 +00:00
views_count = 0
for shout_slug in self.shouts_by_author.get(author_slug, []):
views_count += await self.get_shout(shout_slug=shout_slug)
return views_count
2024-01-22 15:42:45 +00:00
2022-11-22 13:58:55 +00:00
@staticmethod
2023-11-22 18:23:15 +00:00
def update_topics(shout_slug):
2024-08-07 10:37:08 +00:00
"""Обновление счетчиков темы по slug shout"""
2022-11-22 13:58:55 +00:00
self = ViewedStorage
2023-11-22 18:23:15 +00:00
with local_session() as session:
2025-04-14 16:53:14 +00:00
# Определение вспомогательной функции для избежания повторения кода
2024-01-22 18:20:17 +00:00
def update_groups(dictionary, key, value):
dictionary[key] = list(set(dictionary.get(key, []) + [value]))
2024-01-23 13:04:38 +00:00
# Обновление тем и авторов с использованием вспомогательной функции
2024-11-02 01:44:07 +00:00
for [_st, topic] in (
2024-05-30 04:12:00 +00:00
session.query(ShoutTopic, Topic).join(Topic).join(Shout).where(Shout.slug == shout_slug).all()
2024-01-25 19:41:27 +00:00
):
2024-01-22 18:20:17 +00:00
update_groups(self.shouts_by_topic, topic.slug, shout_slug)
2024-11-02 01:44:07 +00:00
for [_st, author] in (
2024-05-30 04:12:00 +00:00
session.query(ShoutAuthor, Author).join(Author).join(Shout).where(Shout.slug == shout_slug).all()
2024-01-25 19:41:27 +00:00
):
2024-01-22 18:20:17 +00:00
update_groups(self.shouts_by_author, author.slug, shout_slug)
2024-01-22 15:42:45 +00:00
2024-11-02 01:44:07 +00:00
@staticmethod
async def stop():
"""Остановка фоновой задачи"""
self = ViewedStorage
async with self.lock:
self.running = False
logger.info("ViewedStorage worker was stopped.")
2023-11-03 10:10:22 +00:00
@staticmethod
2024-08-07 10:25:48 +00:00
async def worker():
2024-08-07 10:37:08 +00:00
"""Асинхронная задача обновления"""
2022-11-21 22:23:16 +00:00
failed = 0
2022-11-22 07:29:54 +00:00
self = ViewedStorage
2023-10-05 22:45:32 +00:00
2024-11-02 01:44:07 +00:00
while self.running:
2024-11-02 01:28:16 +00:00
try:
await self.update_pages()
failed = 0
except Exception as exc:
failed += 1
logger.debug(exc)
logger.info(" - update failed #%d, wait 10 secs" % failed)
if failed > 3:
logger.info(" - views update failed, not trying anymore")
2024-11-02 01:44:07 +00:00
self.running = False
2024-11-02 01:28:16 +00:00
break
if failed == 0:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat())
logger.info(" ⎩ next update: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0]))
await asyncio.sleep(self.period)
else:
await asyncio.sleep(10)
logger.info(" - try to update views again")
2024-12-11 23:03:19 +00:00
@staticmethod
async def update_slug_views(slug: str) -> int:
"""
Получает fresh статистику просмотров для указанного slug.
Args:
slug: Идентификатор страницы
Returns:
int: Количество просмотров
"""
self = ViewedStorage
if not self.analytics_client:
logger.warning("Google Analytics client not initialized")
return 0
try:
# Создаем фильтр для точного совпадения конца URL
request = RunReportRequest(
property=f"properties/{GOOGLE_PROPERTY_ID}",
date_ranges=[DateRange(start_date=self.start_date, end_date="today")],
dimensions=[Dimension(name="pagePath")],
dimension_filter=GAFilter(
field_name="pagePath",
string_filter=GAFilter.StringFilter(
value=f".*/{slug}$", # Используем регулярное выражение для точного совпадения конца URL
match_type=GAFilter.StringFilter.MatchType.FULL_REGEXP,
case_sensitive=False, # Включаем чувствительность к регистру для точности
),
),
metrics=[Metric(name="screenPageViews")],
)
response = self.analytics_client.run_report(request)
if not response.rows:
return 0
views = int(response.rows[0].metric_values[0].value)
# Кэшируем результат
self.views_by_shout[slug] = views
return views
except Exception as e:
logger.error(f"Google Analytics API Error: {e}")
2025-04-15 17:16:01 +00:00
return 0