core/cache/revalidator.py

158 lines
8.3 KiB
Python
Raw Permalink Normal View History

2024-08-07 05:42:59 +00:00
import asyncio
2024-08-09 06:37:06 +00:00
2025-03-22 08:47:19 +00:00
from cache.cache import (
cache_author,
cache_topic,
get_cached_author,
get_cached_topic,
invalidate_cache_by_prefix,
)
2024-11-02 08:56:47 +00:00
from resolvers.stat import get_with_stat
2024-08-07 05:57:56 +00:00
from utils.logger import root_logger as logger
2024-08-07 05:42:59 +00:00
2025-03-22 08:47:19 +00:00
CACHE_REVALIDATION_INTERVAL = 300 # 5 minutes
2024-08-07 05:42:59 +00:00
class CacheRevalidationManager:
2025-03-22 08:47:19 +00:00
def __init__(self, interval=CACHE_REVALIDATION_INTERVAL):
2024-08-07 06:51:09 +00:00
"""Инициализация менеджера с заданным интервалом проверки (в секундах)."""
self.interval = interval
self.items_to_revalidate = {"authors": set(), "topics": set(), "shouts": set(), "reactions": set()}
self.lock = asyncio.Lock()
self.running = True
2025-03-22 08:47:19 +00:00
self.MAX_BATCH_SIZE = 10 # Максимальное количество элементов для поштучной обработки
2024-08-07 05:42:59 +00:00
2024-08-07 06:51:09 +00:00
async def start(self):
"""Запуск фонового воркера для ревалидации кэша."""
2024-11-01 21:26:57 +00:00
self.task = asyncio.create_task(self.revalidate_cache())
2024-08-07 05:42:59 +00:00
async def revalidate_cache(self):
2024-08-07 06:51:09 +00:00
"""Циклическая проверка и ревалидация кэша каждые self.interval секунд."""
try:
while self.running:
await asyncio.sleep(self.interval)
await self.process_revalidation()
except asyncio.CancelledError:
logger.info("Revalidation worker was stopped.")
except Exception as e:
logger.error(f"An error occurred in the revalidation worker: {e}")
2024-08-07 05:42:59 +00:00
async def process_revalidation(self):
2024-08-07 06:51:09 +00:00
"""Обновление кэша для всех сущностей, требующих ревалидации."""
async with self.lock:
# Ревалидация кэша авторов
2025-03-22 08:47:19 +00:00
if self.items_to_revalidate["authors"]:
logger.debug(f"Revalidating {len(self.items_to_revalidate['authors'])} authors")
for author_id in self.items_to_revalidate["authors"]:
if author_id == "all":
await invalidate_cache_by_prefix("authors")
break
author = await get_cached_author(author_id, get_with_stat)
if author:
await cache_author(author)
self.items_to_revalidate["authors"].clear()
2024-08-07 06:51:09 +00:00
# Ревалидация кэша тем
2025-03-22 08:47:19 +00:00
if self.items_to_revalidate["topics"]:
logger.debug(f"Revalidating {len(self.items_to_revalidate['topics'])} topics")
for topic_id in self.items_to_revalidate["topics"]:
if topic_id == "all":
await invalidate_cache_by_prefix("topics")
break
topic = await get_cached_topic(topic_id)
if topic:
await cache_topic(topic)
self.items_to_revalidate["topics"].clear()
# Ревалидация шаутов (публикаций)
if self.items_to_revalidate["shouts"]:
shouts_count = len(self.items_to_revalidate["shouts"])
logger.debug(f"Revalidating {shouts_count} shouts")
# Проверяем наличие специального флага 'all'
if "all" in self.items_to_revalidate["shouts"]:
await invalidate_cache_by_prefix("shouts")
# Если элементов много, но не 'all', используем специфический подход
elif shouts_count > self.MAX_BATCH_SIZE:
# Инвалидируем только collections keys, которые затрагивают много сущностей
collection_keys = await asyncio.create_task(self._redis.execute("KEYS", "shouts:*"))
if collection_keys:
await self._redis.execute("DEL", *collection_keys)
logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей шаутов")
# Обновляем кеш каждого конкретного шаута
for shout_id in self.items_to_revalidate["shouts"]:
if shout_id != "all":
# Точечная инвалидация для каждого shout_id
specific_keys = [f"shout:id:{shout_id}"]
for key in specific_keys:
await self._redis.execute("DEL", key)
logger.debug(f"Удален ключ кеша {key}")
else:
# Если элементов немного, обрабатываем каждый
for shout_id in self.items_to_revalidate["shouts"]:
if shout_id != "all":
# Точечная инвалидация для каждого shout_id
specific_keys = [f"shout:id:{shout_id}"]
for key in specific_keys:
await self._redis.execute("DEL", key)
logger.debug(f"Удален ключ кеша {key}")
self.items_to_revalidate["shouts"].clear()
# Аналогично для реакций - точечная инвалидация
if self.items_to_revalidate["reactions"]:
reactions_count = len(self.items_to_revalidate["reactions"])
logger.debug(f"Revalidating {reactions_count} reactions")
if "all" in self.items_to_revalidate["reactions"]:
await invalidate_cache_by_prefix("reactions")
elif reactions_count > self.MAX_BATCH_SIZE:
# Инвалидируем только collections keys для реакций
collection_keys = await asyncio.create_task(self._redis.execute("KEYS", "reactions:*"))
if collection_keys:
await self._redis.execute("DEL", *collection_keys)
logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей реакций")
# Точечная инвалидация для каждой реакции
for reaction_id in self.items_to_revalidate["reactions"]:
if reaction_id != "all":
specific_keys = [f"reaction:id:{reaction_id}"]
for key in specific_keys:
await self._redis.execute("DEL", key)
logger.debug(f"Удален ключ кеша {key}")
else:
# Точечная инвалидация для каждой реакции
for reaction_id in self.items_to_revalidate["reactions"]:
if reaction_id != "all":
specific_keys = [f"reaction:id:{reaction_id}"]
for key in specific_keys:
await self._redis.execute("DEL", key)
logger.debug(f"Удален ключ кеша {key}")
self.items_to_revalidate["reactions"].clear()
2024-08-07 05:42:59 +00:00
def mark_for_revalidation(self, entity_id, entity_type):
"""Отметить сущность для ревалидации."""
2025-03-22 08:47:19 +00:00
if entity_id and entity_type:
self.items_to_revalidate[entity_type].add(entity_id)
def invalidate_all(self, entity_type):
"""Пометить для инвалидации все элементы указанного типа."""
logger.debug(f"Marking all {entity_type} for invalidation")
# Особый флаг для полной инвалидации
self.items_to_revalidate[entity_type].add("all")
2024-08-07 05:42:59 +00:00
2024-11-01 21:26:57 +00:00
async def stop(self):
2024-08-07 06:51:09 +00:00
"""Остановка фонового воркера."""
self.running = False
2024-11-02 09:09:24 +00:00
if hasattr(self, "task"):
2024-11-01 21:26:57 +00:00
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
2024-08-07 06:51:09 +00:00
2024-08-07 05:42:59 +00:00
2025-03-22 08:47:19 +00:00
revalidation_manager = CacheRevalidationManager()