2024-08-07 05:42:59 +00:00
|
|
|
|
import asyncio
|
2024-08-09 06:37:06 +00:00
|
|
|
|
|
2025-03-22 08:47:19 +00:00
|
|
|
|
from cache.cache import (
|
|
|
|
|
cache_author,
|
|
|
|
|
cache_topic,
|
|
|
|
|
get_cached_author,
|
|
|
|
|
get_cached_topic,
|
|
|
|
|
invalidate_cache_by_prefix,
|
|
|
|
|
)
|
2024-11-02 08:56:47 +00:00
|
|
|
|
from resolvers.stat import get_with_stat
|
2024-08-07 05:57:56 +00:00
|
|
|
|
from utils.logger import root_logger as logger
|
2024-08-07 05:42:59 +00:00
|
|
|
|
|
2025-03-22 08:47:19 +00:00
|
|
|
|
CACHE_REVALIDATION_INTERVAL = 300 # 5 minutes
|
|
|
|
|
|
2024-08-07 05:42:59 +00:00
|
|
|
|
|
|
|
|
|
class CacheRevalidationManager:
|
2025-03-22 08:47:19 +00:00
|
|
|
|
def __init__(self, interval=CACHE_REVALIDATION_INTERVAL):
|
2024-08-07 06:51:09 +00:00
|
|
|
|
"""Инициализация менеджера с заданным интервалом проверки (в секундах)."""
|
|
|
|
|
self.interval = interval
|
|
|
|
|
self.items_to_revalidate = {"authors": set(), "topics": set(), "shouts": set(), "reactions": set()}
|
|
|
|
|
self.lock = asyncio.Lock()
|
|
|
|
|
self.running = True
|
2025-03-22 08:47:19 +00:00
|
|
|
|
self.MAX_BATCH_SIZE = 10 # Максимальное количество элементов для поштучной обработки
|
2024-08-07 05:42:59 +00:00
|
|
|
|
|
2024-08-07 06:51:09 +00:00
|
|
|
|
async def start(self):
|
|
|
|
|
"""Запуск фонового воркера для ревалидации кэша."""
|
2024-11-01 21:26:57 +00:00
|
|
|
|
self.task = asyncio.create_task(self.revalidate_cache())
|
2024-08-07 05:42:59 +00:00
|
|
|
|
|
|
|
|
|
async def revalidate_cache(self):
|
2024-08-07 06:51:09 +00:00
|
|
|
|
"""Циклическая проверка и ревалидация кэша каждые self.interval секунд."""
|
|
|
|
|
try:
|
|
|
|
|
while self.running:
|
|
|
|
|
await asyncio.sleep(self.interval)
|
|
|
|
|
await self.process_revalidation()
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
logger.info("Revalidation worker was stopped.")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"An error occurred in the revalidation worker: {e}")
|
2024-08-07 05:42:59 +00:00
|
|
|
|
|
|
|
|
|
async def process_revalidation(self):
|
2024-08-07 06:51:09 +00:00
|
|
|
|
"""Обновление кэша для всех сущностей, требующих ревалидации."""
|
|
|
|
|
async with self.lock:
|
|
|
|
|
# Ревалидация кэша авторов
|
2025-03-22 08:47:19 +00:00
|
|
|
|
if self.items_to_revalidate["authors"]:
|
|
|
|
|
logger.debug(f"Revalidating {len(self.items_to_revalidate['authors'])} authors")
|
|
|
|
|
for author_id in self.items_to_revalidate["authors"]:
|
|
|
|
|
if author_id == "all":
|
|
|
|
|
await invalidate_cache_by_prefix("authors")
|
|
|
|
|
break
|
|
|
|
|
author = await get_cached_author(author_id, get_with_stat)
|
|
|
|
|
if author:
|
|
|
|
|
await cache_author(author)
|
|
|
|
|
self.items_to_revalidate["authors"].clear()
|
2024-08-07 06:51:09 +00:00
|
|
|
|
|
|
|
|
|
# Ревалидация кэша тем
|
2025-03-22 08:47:19 +00:00
|
|
|
|
if self.items_to_revalidate["topics"]:
|
|
|
|
|
logger.debug(f"Revalidating {len(self.items_to_revalidate['topics'])} topics")
|
|
|
|
|
for topic_id in self.items_to_revalidate["topics"]:
|
|
|
|
|
if topic_id == "all":
|
|
|
|
|
await invalidate_cache_by_prefix("topics")
|
|
|
|
|
break
|
|
|
|
|
topic = await get_cached_topic(topic_id)
|
|
|
|
|
if topic:
|
|
|
|
|
await cache_topic(topic)
|
|
|
|
|
self.items_to_revalidate["topics"].clear()
|
|
|
|
|
|
|
|
|
|
# Ревалидация шаутов (публикаций)
|
|
|
|
|
if self.items_to_revalidate["shouts"]:
|
|
|
|
|
shouts_count = len(self.items_to_revalidate["shouts"])
|
|
|
|
|
logger.debug(f"Revalidating {shouts_count} shouts")
|
|
|
|
|
|
|
|
|
|
# Проверяем наличие специального флага 'all'
|
|
|
|
|
if "all" in self.items_to_revalidate["shouts"]:
|
|
|
|
|
await invalidate_cache_by_prefix("shouts")
|
|
|
|
|
# Если элементов много, но не 'all', используем специфический подход
|
|
|
|
|
elif shouts_count > self.MAX_BATCH_SIZE:
|
|
|
|
|
# Инвалидируем только collections keys, которые затрагивают много сущностей
|
|
|
|
|
collection_keys = await asyncio.create_task(self._redis.execute("KEYS", "shouts:*"))
|
|
|
|
|
if collection_keys:
|
|
|
|
|
await self._redis.execute("DEL", *collection_keys)
|
|
|
|
|
logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей шаутов")
|
|
|
|
|
|
|
|
|
|
# Обновляем кеш каждого конкретного шаута
|
|
|
|
|
for shout_id in self.items_to_revalidate["shouts"]:
|
|
|
|
|
if shout_id != "all":
|
|
|
|
|
# Точечная инвалидация для каждого shout_id
|
|
|
|
|
specific_keys = [f"shout:id:{shout_id}"]
|
|
|
|
|
for key in specific_keys:
|
|
|
|
|
await self._redis.execute("DEL", key)
|
|
|
|
|
logger.debug(f"Удален ключ кеша {key}")
|
|
|
|
|
else:
|
|
|
|
|
# Если элементов немного, обрабатываем каждый
|
|
|
|
|
for shout_id in self.items_to_revalidate["shouts"]:
|
|
|
|
|
if shout_id != "all":
|
|
|
|
|
# Точечная инвалидация для каждого shout_id
|
|
|
|
|
specific_keys = [f"shout:id:{shout_id}"]
|
|
|
|
|
for key in specific_keys:
|
|
|
|
|
await self._redis.execute("DEL", key)
|
|
|
|
|
logger.debug(f"Удален ключ кеша {key}")
|
|
|
|
|
|
|
|
|
|
self.items_to_revalidate["shouts"].clear()
|
|
|
|
|
|
|
|
|
|
# Аналогично для реакций - точечная инвалидация
|
|
|
|
|
if self.items_to_revalidate["reactions"]:
|
|
|
|
|
reactions_count = len(self.items_to_revalidate["reactions"])
|
|
|
|
|
logger.debug(f"Revalidating {reactions_count} reactions")
|
|
|
|
|
|
|
|
|
|
if "all" in self.items_to_revalidate["reactions"]:
|
|
|
|
|
await invalidate_cache_by_prefix("reactions")
|
|
|
|
|
elif reactions_count > self.MAX_BATCH_SIZE:
|
|
|
|
|
# Инвалидируем только collections keys для реакций
|
|
|
|
|
collection_keys = await asyncio.create_task(self._redis.execute("KEYS", "reactions:*"))
|
|
|
|
|
if collection_keys:
|
|
|
|
|
await self._redis.execute("DEL", *collection_keys)
|
|
|
|
|
logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей реакций")
|
|
|
|
|
|
|
|
|
|
# Точечная инвалидация для каждой реакции
|
|
|
|
|
for reaction_id in self.items_to_revalidate["reactions"]:
|
|
|
|
|
if reaction_id != "all":
|
|
|
|
|
specific_keys = [f"reaction:id:{reaction_id}"]
|
|
|
|
|
for key in specific_keys:
|
|
|
|
|
await self._redis.execute("DEL", key)
|
|
|
|
|
logger.debug(f"Удален ключ кеша {key}")
|
|
|
|
|
else:
|
|
|
|
|
# Точечная инвалидация для каждой реакции
|
|
|
|
|
for reaction_id in self.items_to_revalidate["reactions"]:
|
|
|
|
|
if reaction_id != "all":
|
|
|
|
|
specific_keys = [f"reaction:id:{reaction_id}"]
|
|
|
|
|
for key in specific_keys:
|
|
|
|
|
await self._redis.execute("DEL", key)
|
|
|
|
|
logger.debug(f"Удален ключ кеша {key}")
|
|
|
|
|
|
|
|
|
|
self.items_to_revalidate["reactions"].clear()
|
2024-08-07 05:42:59 +00:00
|
|
|
|
|
|
|
|
|
def mark_for_revalidation(self, entity_id, entity_type):
|
|
|
|
|
"""Отметить сущность для ревалидации."""
|
2025-03-22 08:47:19 +00:00
|
|
|
|
if entity_id and entity_type:
|
|
|
|
|
self.items_to_revalidate[entity_type].add(entity_id)
|
|
|
|
|
|
|
|
|
|
def invalidate_all(self, entity_type):
|
|
|
|
|
"""Пометить для инвалидации все элементы указанного типа."""
|
|
|
|
|
logger.debug(f"Marking all {entity_type} for invalidation")
|
|
|
|
|
# Особый флаг для полной инвалидации
|
|
|
|
|
self.items_to_revalidate[entity_type].add("all")
|
2024-08-07 05:42:59 +00:00
|
|
|
|
|
2024-11-01 21:26:57 +00:00
|
|
|
|
async def stop(self):
|
2024-08-07 06:51:09 +00:00
|
|
|
|
"""Остановка фонового воркера."""
|
|
|
|
|
self.running = False
|
2024-11-02 09:09:24 +00:00
|
|
|
|
if hasattr(self, "task"):
|
2024-11-01 21:26:57 +00:00
|
|
|
|
self.task.cancel()
|
|
|
|
|
try:
|
|
|
|
|
await self.task
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
pass
|
2024-08-07 06:51:09 +00:00
|
|
|
|
|
2024-08-07 05:42:59 +00:00
|
|
|
|
|
2025-03-22 08:47:19 +00:00
|
|
|
|
revalidation_manager = CacheRevalidationManager()
|