topics+authors-reimplemented-cache
All checks were successful
Deploy on push / deploy (push) Successful in 5s

This commit is contained in:
2025-03-22 11:47:19 +03:00
parent 86ddb50cb8
commit 615f1fe468
11 changed files with 1127 additions and 521 deletions

240
cache/cache.py vendored
View File

@@ -1,6 +1,35 @@
"""
Caching system for the Discours platform
----------------------------------------
This module provides a comprehensive caching solution with these key components:
1. KEY NAMING CONVENTIONS:
- Entity-based keys: "entity:property:value" (e.g., "author:id:123")
- Collection keys: "entity:collection:params" (e.g., "authors:stats:limit=10:offset=0")
- Special case keys: Maintained for backwards compatibility (e.g., "topic_shouts_123")
2. CORE FUNCTIONS:
- cached_query(): High-level function for retrieving cached data or executing queries
3. ENTITY-SPECIFIC FUNCTIONS:
- cache_author(), cache_topic(): Cache entity data
- get_cached_author(), get_cached_topic(): Retrieve entity data from cache
- invalidate_cache_by_prefix(): Invalidate all keys with a specific prefix
4. CACHE INVALIDATION STRATEGY:
- Direct invalidation via invalidate_* functions for immediate changes
- Delayed invalidation via revalidation_manager for background processing
- Event-based triggers for automatic cache updates (see triggers.py)
To maintain consistency with the existing codebase, this module preserves
the original key naming patterns while providing a more structured approach
for new cache operations.
"""
import asyncio
import json
from typing import List
from typing import Any, Dict, List, Optional, Union
import orjson
from sqlalchemy import and_, join, select
@@ -20,8 +49,10 @@ DEFAULT_FOLLOWS = {
"communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}],
}
CACHE_TTL = 300 # 5 минут
CACHE_TTL = 300 # 5 minutes
# Key templates for common entity types
# These are used throughout the codebase and should be maintained for compatibility
CACHE_KEYS = {
"TOPIC_ID": "topic:id:{}",
"TOPIC_SLUG": "topic:slug:{}",
@@ -38,8 +69,8 @@ CACHE_KEYS = {
async def cache_topic(topic: dict):
payload = json.dumps(topic, cls=CustomJSONEncoder)
await asyncio.gather(
redis_operation("SET", f"topic:id:{topic['id']}", payload),
redis_operation("SET", f"topic:slug:{topic['slug']}", payload),
redis.execute("SET", f"topic:id:{topic['id']}", payload),
redis.execute("SET", f"topic:slug:{topic['slug']}", payload),
)
@@ -47,29 +78,29 @@ async def cache_topic(topic: dict):
async def cache_author(author: dict):
payload = json.dumps(author, cls=CustomJSONEncoder)
await asyncio.gather(
redis_operation("SET", f"author:user:{author['user'].strip()}", str(author["id"])),
redis_operation("SET", f"author:id:{author['id']}", payload),
redis.execute("SET", f"author:user:{author['user'].strip()}", str(author["id"])),
redis.execute("SET", f"author:id:{author['id']}", payload),
)
# Cache follows data
async def cache_follows(follower_id: int, entity_type: str, entity_id: int, is_insert=True):
key = f"author:follows-{entity_type}s:{follower_id}"
follows_str = await redis_operation("GET", key)
follows_str = await redis.execute("GET", key)
follows = orjson.loads(follows_str) if follows_str else DEFAULT_FOLLOWS[entity_type]
if is_insert:
if entity_id not in follows:
follows.append(entity_id)
else:
follows = [eid for eid in follows if eid != entity_id]
await redis_operation("SET", key, json.dumps(follows, cls=CustomJSONEncoder))
await redis.execute("SET", key, json.dumps(follows, cls=CustomJSONEncoder))
await update_follower_stat(follower_id, entity_type, len(follows))
# Update follower statistics
async def update_follower_stat(follower_id, entity_type, count):
follower_key = f"author:id:{follower_id}"
follower_str = await redis_operation("GET", follower_key)
follower_str = await redis.execute("GET", follower_key)
follower = orjson.loads(follower_str) if follower_str else None
if follower:
follower["stat"] = {f"{entity_type}s": count}
@@ -79,7 +110,7 @@ async def update_follower_stat(follower_id, entity_type, count):
# Get author from cache
async def get_cached_author(author_id: int, get_with_stat):
author_key = f"author:id:{author_id}"
result = await redis_operation("GET", author_key)
result = await redis.execute("GET", author_key)
if result:
return orjson.loads(result)
# Load from database if not found in cache
@@ -104,7 +135,7 @@ async def get_cached_topic(topic_id: int):
dict: Topic data or None if not found.
"""
topic_key = f"topic:id:{topic_id}"
cached_topic = await redis_operation("GET", topic_key)
cached_topic = await redis.execute("GET", topic_key)
if cached_topic:
return orjson.loads(cached_topic)
@@ -113,7 +144,7 @@ async def get_cached_topic(topic_id: int):
topic = session.execute(select(Topic).where(Topic.id == topic_id)).scalar_one_or_none()
if topic:
topic_dict = topic.dict()
await redis_operation("SET", topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder))
await redis.execute("SET", topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder))
return topic_dict
return None
@@ -122,7 +153,7 @@ async def get_cached_topic(topic_id: int):
# Get topic by slug from cache
async def get_cached_topic_by_slug(slug: str, get_with_stat):
topic_key = f"topic:slug:{slug}"
result = await redis_operation("GET", topic_key)
result = await redis.execute("GET", topic_key)
if result:
return orjson.loads(result)
# Load from database if not found in cache
@@ -139,7 +170,7 @@ async def get_cached_topic_by_slug(slug: str, get_with_stat):
async def get_cached_authors_by_ids(author_ids: List[int]) -> List[dict]:
# Fetch all author data concurrently
keys = [f"author:id:{author_id}" for author_id in author_ids]
results = await asyncio.gather(*(redis_operation("GET", key) for key in keys))
results = await asyncio.gather(*(redis.execute("GET", key) for key in keys))
authors = [orjson.loads(result) if result else None for result in results]
# Load missing authors from database and cache
missing_indices = [index for index, author in enumerate(authors) if author is None]
@@ -166,7 +197,7 @@ async def get_cached_topic_followers(topic_id: int):
"""
try:
cache_key = CACHE_KEYS["TOPIC_FOLLOWERS"].format(topic_id)
cached = await redis_operation("GET", cache_key)
cached = await redis.execute("GET", cache_key)
if cached:
followers_ids = orjson.loads(cached)
@@ -182,7 +213,7 @@ async def get_cached_topic_followers(topic_id: int):
.all()
]
await redis_operation("SETEX", cache_key, value=orjson.dumps(followers_ids), ttl=CACHE_TTL)
await redis.execute("SETEX", cache_key, CACHE_TTL, orjson.dumps(followers_ids))
followers = await get_cached_authors_by_ids(followers_ids)
logger.debug(f"Cached {len(followers)} followers for topic #{topic_id}")
return followers
@@ -195,7 +226,7 @@ async def get_cached_topic_followers(topic_id: int):
# Get cached author followers
async def get_cached_author_followers(author_id: int):
# Check cache for data
cached = await redis_operation("GET", f"author:followers:{author_id}")
cached = await redis.execute("GET", f"author:followers:{author_id}")
if cached:
followers_ids = orjson.loads(cached)
followers = await get_cached_authors_by_ids(followers_ids)
@@ -211,7 +242,7 @@ async def get_cached_author_followers(author_id: int):
.filter(AuthorFollower.author == author_id, Author.id != author_id)
.all()
]
await redis_operation("SET", f"author:followers:{author_id}", orjson.dumps(followers_ids))
await redis.execute("SET", f"author:followers:{author_id}", orjson.dumps(followers_ids))
followers = await get_cached_authors_by_ids(followers_ids)
return followers
@@ -219,7 +250,7 @@ async def get_cached_author_followers(author_id: int):
# Get cached follower authors
async def get_cached_follower_authors(author_id: int):
# Attempt to retrieve authors from cache
cached = await redis_operation("GET", f"author:follows-authors:{author_id}")
cached = await redis.execute("GET", f"author:follows-authors:{author_id}")
if cached:
authors_ids = orjson.loads(cached)
else:
@@ -233,7 +264,7 @@ async def get_cached_follower_authors(author_id: int):
.where(AuthorFollower.follower == author_id)
).all()
]
await redis_operation("SET", f"author:follows-authors:{author_id}", orjson.dumps(authors_ids))
await redis.execute("SET", f"author:follows-authors:{author_id}", orjson.dumps(authors_ids))
authors = await get_cached_authors_by_ids(authors_ids)
return authors
@@ -242,7 +273,7 @@ async def get_cached_follower_authors(author_id: int):
# Get cached follower topics
async def get_cached_follower_topics(author_id: int):
# Attempt to retrieve topics from cache
cached = await redis_operation("GET", f"author:follows-topics:{author_id}")
cached = await redis.execute("GET", f"author:follows-topics:{author_id}")
if cached:
topics_ids = orjson.loads(cached)
else:
@@ -255,11 +286,11 @@ async def get_cached_follower_topics(author_id: int):
.where(TopicFollower.follower == author_id)
.all()
]
await redis_operation("SET", f"author:follows-topics:{author_id}", orjson.dumps(topics_ids))
await redis.execute("SET", f"author:follows-topics:{author_id}", orjson.dumps(topics_ids))
topics = []
for topic_id in topics_ids:
topic_str = await redis_operation("GET", f"topic:id:{topic_id}")
topic_str = await redis.execute("GET", f"topic:id:{topic_id}")
if topic_str:
topic = orjson.loads(topic_str)
if topic and topic not in topics:
@@ -281,10 +312,10 @@ async def get_cached_author_by_user_id(user_id: str, get_with_stat):
dict: Dictionary with author data or None if not found.
"""
# Attempt to find author ID by user_id in Redis cache
author_id = await redis_operation("GET", f"author:user:{user_id.strip()}")
author_id = await redis.execute("GET", f"author:user:{user_id.strip()}")
if author_id:
# If ID is found, get full author data by ID
author_data = await redis_operation("GET", f"author:id:{author_id}")
author_data = await redis.execute("GET", f"author:id:{author_id}")
if author_data:
return orjson.loads(author_data)
@@ -296,8 +327,8 @@ async def get_cached_author_by_user_id(user_id: str, get_with_stat):
author = authors[0]
author_dict = author.dict()
await asyncio.gather(
redis_operation("SET", f"author:user:{user_id.strip()}", str(author.id)),
redis_operation("SET", f"author:id:{author.id}", orjson.dumps(author_dict)),
redis.execute("SET", f"author:user:{user_id.strip()}", str(author.id)),
redis.execute("SET", f"author:id:{author.id}", orjson.dumps(author_dict)),
)
return author_dict
@@ -318,7 +349,7 @@ async def get_cached_topic_authors(topic_id: int):
"""
# Attempt to get a list of author IDs from cache
rkey = f"topic:authors:{topic_id}"
cached_authors_ids = await redis_operation("GET", rkey)
cached_authors_ids = await redis.execute("GET", rkey)
if cached_authors_ids:
authors_ids = orjson.loads(cached_authors_ids)
else:
@@ -332,7 +363,7 @@ async def get_cached_topic_authors(topic_id: int):
)
authors_ids = [author_id for (author_id,) in session.execute(query).all()]
# Cache the retrieved author IDs
await redis_operation("SET", rkey, orjson.dumps(authors_ids))
await redis.execute("SET", rkey, orjson.dumps(authors_ids))
# Retrieve full author details from cached IDs
if authors_ids:
@@ -353,11 +384,11 @@ async def invalidate_shouts_cache(cache_keys: List[str]):
cache_key = f"shouts:{key}"
# Удаляем основной кэш
await redis_operation("DEL", cache_key)
await redis.execute("DEL", cache_key)
logger.debug(f"Invalidated cache key: {cache_key}")
# Добавляем ключ в список инвалидированных с TTL
await redis_operation("SETEX", f"{cache_key}:invalidated", value="1", ttl=CACHE_TTL)
await redis.execute("SETEX", f"{cache_key}:invalidated", CACHE_TTL, "1")
# Если это кэш темы, инвалидируем также связанные ключи
if key.startswith("topic_"):
@@ -369,7 +400,7 @@ async def invalidate_shouts_cache(cache_keys: List[str]):
f"topic:stats:{topic_id}",
]
for related_key in related_keys:
await redis_operation("DEL", related_key)
await redis.execute("DEL", related_key)
logger.debug(f"Invalidated related key: {related_key}")
except Exception as e:
@@ -380,13 +411,13 @@ async def cache_topic_shouts(topic_id: int, shouts: List[dict]):
"""Кэширует список публикаций для темы"""
key = f"topic_shouts_{topic_id}"
payload = json.dumps(shouts, cls=CustomJSONEncoder)
await redis_operation("SETEX", key, value=payload, ttl=CACHE_TTL)
await redis.execute("SETEX", key, CACHE_TTL, payload)
async def get_cached_topic_shouts(topic_id: int) -> List[dict]:
"""Получает кэшированный список публикаций для темы"""
key = f"topic_shouts_{topic_id}"
cached = await redis_operation("GET", key)
cached = await redis.execute("GET", key)
if cached:
return orjson.loads(cached)
return None
@@ -432,27 +463,7 @@ async def invalidate_shout_related_cache(shout: Shout, author_id: int):
await invalidate_shouts_cache(list(cache_keys))
async def redis_operation(operation: str, key: str, value=None, ttl=None):
"""
Унифицированная функция для работы с Redis
Args:
operation: 'GET', 'SET', 'DEL', 'SETEX'
key: ключ
value: значение (для SET/SETEX)
ttl: время жизни в секундах (для SETEX)
"""
try:
if operation == "GET":
return await redis.execute("GET", key)
elif operation == "SET":
await redis.execute("SET", key, value)
elif operation == "SETEX":
await redis.execute("SETEX", key, ttl or CACHE_TTL, value)
elif operation == "DEL":
await redis.execute("DEL", key)
except Exception as e:
logger.error(f"Redis {operation} error for key {key}: {e}")
# Function removed - direct Redis calls used throughout the module instead
async def get_cached_entity(entity_type: str, entity_id: int, get_method, cache_method):
@@ -466,7 +477,7 @@ async def get_cached_entity(entity_type: str, entity_id: int, get_method, cache_
cache_method: метод кэширования
"""
key = f"{entity_type}:id:{entity_id}"
cached = await redis_operation("GET", key)
cached = await redis.execute("GET", key)
if cached:
return orjson.loads(cached)
@@ -497,3 +508,120 @@ async def cache_by_id(entity, entity_id: int, cache_method):
d = x.dict()
await cache_method(d)
return d
# Универсальная функция для сохранения данных в кеш
async def cache_data(key: str, data: Any, ttl: Optional[int] = None) -> None:
"""
Сохраняет данные в кеш по указанному ключу.
Args:
key: Ключ кеша
data: Данные для сохранения
ttl: Время жизни кеша в секундах (None - бессрочно)
"""
try:
payload = json.dumps(data, cls=CustomJSONEncoder)
if ttl:
await redis.execute("SETEX", key, ttl, payload)
else:
await redis.execute("SET", key, payload)
logger.debug(f"Данные сохранены в кеш по ключу {key}")
except Exception as e:
logger.error(f"Ошибка при сохранении данных в кеш: {e}")
# Универсальная функция для получения данных из кеша
async def get_cached_data(key: str) -> Optional[Any]:
"""
Получает данные из кеша по указанному ключу.
Args:
key: Ключ кеша
Returns:
Any: Данные из кеша или None, если данных нет
"""
try:
cached_data = await redis.execute("GET", key)
if cached_data:
logger.debug(f"Данные получены из кеша по ключу {key}")
return orjson.loads(cached_data)
return None
except Exception as e:
logger.error(f"Ошибка при получении данных из кеша: {e}")
return None
# Универсальная функция для инвалидации кеша по префиксу
async def invalidate_cache_by_prefix(prefix: str) -> None:
"""
Инвалидирует все ключи кеша с указанным префиксом.
Args:
prefix: Префикс ключей кеша для инвалидации
"""
try:
keys = await redis.execute("KEYS", f"{prefix}:*")
if keys:
await redis.execute("DEL", *keys)
logger.debug(f"Удалено {len(keys)} ключей кеша с префиксом {prefix}")
except Exception as e:
logger.error(f"Ошибка при инвалидации кеша: {e}")
# Универсальная функция для получения и кеширования данных
async def cached_query(
cache_key: str,
query_func: callable,
ttl: Optional[int] = None,
force_refresh: bool = False,
use_key_format: bool = True,
**query_params,
) -> Any:
"""
Gets data from cache or executes query and saves result to cache.
Supports existing key formats for compatibility.
Args:
cache_key: Cache key or key template from CACHE_KEYS
query_func: Function to execute the query
ttl: Cache TTL in seconds (None - indefinite)
force_refresh: Force cache refresh
use_key_format: Whether to check if cache_key matches a key template in CACHE_KEYS
**query_params: Parameters to pass to the query function
Returns:
Any: Data from cache or query result
"""
# Check if cache_key matches a pattern in CACHE_KEYS
actual_key = cache_key
if use_key_format and "{}" in cache_key:
# Look for a template match in CACHE_KEYS
for key_name, key_format in CACHE_KEYS.items():
if cache_key == key_format:
# We have a match, now look for the id or value to format with
for param_name, param_value in query_params.items():
if param_name in ["id", "slug", "user", "topic_id", "author_id"]:
actual_key = cache_key.format(param_value)
break
# If not forcing refresh, try to get data from cache
if not force_refresh:
cached_result = await get_cached_data(actual_key)
if cached_result is not None:
return cached_result
# If data not in cache or refresh required, execute query
try:
result = await query_func(**query_params)
if result is not None:
# Save result to cache
await cache_data(actual_key, result, ttl)
return result
except Exception as e:
logger.error(f"Error executing query for caching: {e}")
# In case of error, return data from cache if not forcing refresh
if not force_refresh:
return await get_cached_data(actual_key)
raise

181
cache/memorycache.py vendored
View File

@@ -1,181 +0,0 @@
"""
Модуль для кеширования данных с использованием Redis.
Предоставляет API, совместимый с dogpile.cache для поддержки обратной совместимости.
"""
import functools
import hashlib
import inspect
import json
import logging
import pickle
from typing import Callable, Optional
import orjson
from services.redis import redis
from utils.encoders import CustomJSONEncoder
logger = logging.getLogger(__name__)
DEFAULT_TTL = 300 # время жизни кеша в секундах (5 минут)
class RedisCache:
"""
Класс, предоставляющий API, совместимый с dogpile.cache, но использующий Redis.
Примеры:
>>> cache_region = RedisCache()
>>> @cache_region.cache_on_arguments("my_key")
... def my_func(arg1, arg2):
... return arg1 + arg2
"""
def __init__(self, ttl: int = DEFAULT_TTL):
"""
Инициализация объекта кеша.
Args:
ttl: Время жизни кеша в секундах
"""
self.ttl = ttl
def cache_on_arguments(self, cache_key: Optional[str] = None) -> Callable:
"""
Декоратор для кеширования результатов функций с использованием Redis.
Args:
cache_key: Опциональный базовый ключ кеша. Если не указан, генерируется из сигнатуры функции.
Returns:
Декоратор для кеширования функции
Примеры:
>>> @cache_region.cache_on_arguments("users")
... def get_users():
... return db.query(User).all()
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Генерация ключа кеша
key = self._generate_cache_key(func, cache_key, *args, **kwargs)
# Попытка получить данные из кеша
cached_data = await redis.get(key)
if cached_data:
try:
return orjson.loads(cached_data)
except Exception:
# Если не удалось десериализовать как JSON, попробуем как pickle
return pickle.loads(cached_data.encode())
# Вызов оригинальной функции, если данных в кеше нет
result = func(*args, **kwargs)
# Сохранение результата в кеш
try:
# Пытаемся сериализовать как JSON
serialized = json.dumps(result, cls=CustomJSONEncoder)
except (TypeError, ValueError):
# Если не удалось, используем pickle
serialized = pickle.dumps(result).decode()
await redis.set(key, serialized, ex=self.ttl)
return result
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
# Для функций, которые не являются корутинами
# Генерация ключа кеша
key = self._generate_cache_key(func, cache_key, *args, **kwargs)
# Синхронная версия не использует await, поэтому результат всегда вычисляется
result = func(*args, **kwargs)
# Асинхронно записываем в кэш (будет выполнено позже)
try:
import asyncio
# Попытка сериализовать результат в JSON
try:
serialized = json.dumps(result, cls=CustomJSONEncoder)
except (TypeError, ValueError) as e:
logger.debug(f"JSON сериализация не удалась, используем pickle: {e}")
# Если не удалось сериализовать как JSON, используем pickle
serialized = pickle.dumps(result).decode()
asyncio.create_task(redis.set(key, serialized, ex=self.ttl))
except Exception as e:
logger.error(f"Ошибка при кешировании результата: {e}")
# Для отладки добавляем информацию о типе объекта
logger.debug(f"Тип результата: {type(result)}")
if hasattr(result, "__class__"):
logger.debug(f"Класс результата: {result.__class__.__name__}")
return result
# Возвращаем асинхронный или синхронный враппер в зависимости от типа функции
if inspect.iscoroutinefunction(func):
return wrapper
else:
return sync_wrapper
return decorator
def _generate_cache_key(self, func: Callable, base_key: Optional[str], *args, **kwargs) -> str:
"""
Генерирует ключ кеша на основе функции и её аргументов.
Args:
func: Кешируемая функция
base_key: Базовый ключ кеша
*args: Позиционные аргументы функции
**kwargs: Именованные аргументы функции
Returns:
Строковый ключ для кеша
"""
if base_key:
key_prefix = f"cache:{base_key}"
else:
key_prefix = f"cache:{func.__module__}.{func.__name__}"
# Создаем хеш аргументов
arg_hash = hashlib.md5()
# Добавляем позиционные аргументы
for arg in args:
try:
arg_hash.update(str(arg).encode())
except Exception:
arg_hash.update(str(id(arg)).encode())
# Добавляем именованные аргументы (сортируем для детерминированности)
for k in sorted(kwargs.keys()):
try:
arg_hash.update(f"{k}:{kwargs[k]}".encode())
except Exception:
arg_hash.update(f"{k}:{id(kwargs[k])}".encode())
return f"{key_prefix}:{arg_hash.hexdigest()}"
def invalidate(self, func: Callable, *args, **kwargs) -> None:
"""
Инвалидирует (удаляет) кеш для конкретной функции с конкретными аргументами.
Args:
func: Кешированная функция
*args: Позиционные аргументы функции
**kwargs: Именованные аргументы функции
"""
key = self._generate_cache_key(func, None, *args, **kwargs)
import asyncio
asyncio.create_task(redis.execute("DEL", key))
# Экземпляр класса RedisCache для использования в коде
cache_region = RedisCache()

15
cache/precache.py vendored
View File

@@ -1,7 +1,6 @@
import asyncio
import json
import orjson
from sqlalchemy import and_, join, select
from cache.cache import cache_author, cache_topic
@@ -87,11 +86,15 @@ async def precache_data():
# Преобразуем словарь в список аргументов для HSET
if value:
flattened = []
for field, val in value.items():
flattened.extend([field, val])
await redis.execute("HSET", key, *flattened)
# Если значение - словарь, преобразуем его в плоский список для HSET
if isinstance(value, dict):
flattened = []
for field, val in value.items():
flattened.extend([field, val])
await redis.execute("HSET", key, *flattened)
else:
# Предполагаем, что значение уже содержит список
await redis.execute("HSET", key, *value)
logger.info(f"redis hash '{key}' was restored")
with local_session() as session:

122
cache/revalidator.py vendored
View File

@@ -1,17 +1,26 @@
import asyncio
from cache.cache import cache_author, cache_topic, get_cached_author, get_cached_topic
from cache.cache import (
cache_author,
cache_topic,
get_cached_author,
get_cached_topic,
invalidate_cache_by_prefix,
)
from resolvers.stat import get_with_stat
from utils.logger import root_logger as logger
CACHE_REVALIDATION_INTERVAL = 300 # 5 minutes
class CacheRevalidationManager:
def __init__(self, interval=60):
def __init__(self, interval=CACHE_REVALIDATION_INTERVAL):
"""Инициализация менеджера с заданным интервалом проверки (в секундах)."""
self.interval = interval
self.items_to_revalidate = {"authors": set(), "topics": set(), "shouts": set(), "reactions": set()}
self.lock = asyncio.Lock()
self.running = True
self.MAX_BATCH_SIZE = 10 # Максимальное количество элементов для поштучной обработки
async def start(self):
"""Запуск фонового воркера для ревалидации кэша."""
@@ -32,22 +41,107 @@ class CacheRevalidationManager:
"""Обновление кэша для всех сущностей, требующих ревалидации."""
async with self.lock:
# Ревалидация кэша авторов
for author_id in self.items_to_revalidate["authors"]:
author = await get_cached_author(author_id, get_with_stat)
if author:
await cache_author(author)
self.items_to_revalidate["authors"].clear()
if self.items_to_revalidate["authors"]:
logger.debug(f"Revalidating {len(self.items_to_revalidate['authors'])} authors")
for author_id in self.items_to_revalidate["authors"]:
if author_id == "all":
await invalidate_cache_by_prefix("authors")
break
author = await get_cached_author(author_id, get_with_stat)
if author:
await cache_author(author)
self.items_to_revalidate["authors"].clear()
# Ревалидация кэша тем
for topic_id in self.items_to_revalidate["topics"]:
topic = await get_cached_topic(topic_id)
if topic:
await cache_topic(topic)
self.items_to_revalidate["topics"].clear()
if self.items_to_revalidate["topics"]:
logger.debug(f"Revalidating {len(self.items_to_revalidate['topics'])} topics")
for topic_id in self.items_to_revalidate["topics"]:
if topic_id == "all":
await invalidate_cache_by_prefix("topics")
break
topic = await get_cached_topic(topic_id)
if topic:
await cache_topic(topic)
self.items_to_revalidate["topics"].clear()
# Ревалидация шаутов (публикаций)
if self.items_to_revalidate["shouts"]:
shouts_count = len(self.items_to_revalidate["shouts"])
logger.debug(f"Revalidating {shouts_count} shouts")
# Проверяем наличие специального флага 'all'
if "all" in self.items_to_revalidate["shouts"]:
await invalidate_cache_by_prefix("shouts")
# Если элементов много, но не 'all', используем специфический подход
elif shouts_count > self.MAX_BATCH_SIZE:
# Инвалидируем только collections keys, которые затрагивают много сущностей
collection_keys = await asyncio.create_task(self._redis.execute("KEYS", "shouts:*"))
if collection_keys:
await self._redis.execute("DEL", *collection_keys)
logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей шаутов")
# Обновляем кеш каждого конкретного шаута
for shout_id in self.items_to_revalidate["shouts"]:
if shout_id != "all":
# Точечная инвалидация для каждого shout_id
specific_keys = [f"shout:id:{shout_id}"]
for key in specific_keys:
await self._redis.execute("DEL", key)
logger.debug(f"Удален ключ кеша {key}")
else:
# Если элементов немного, обрабатываем каждый
for shout_id in self.items_to_revalidate["shouts"]:
if shout_id != "all":
# Точечная инвалидация для каждого shout_id
specific_keys = [f"shout:id:{shout_id}"]
for key in specific_keys:
await self._redis.execute("DEL", key)
logger.debug(f"Удален ключ кеша {key}")
self.items_to_revalidate["shouts"].clear()
# Аналогично для реакций - точечная инвалидация
if self.items_to_revalidate["reactions"]:
reactions_count = len(self.items_to_revalidate["reactions"])
logger.debug(f"Revalidating {reactions_count} reactions")
if "all" in self.items_to_revalidate["reactions"]:
await invalidate_cache_by_prefix("reactions")
elif reactions_count > self.MAX_BATCH_SIZE:
# Инвалидируем только collections keys для реакций
collection_keys = await asyncio.create_task(self._redis.execute("KEYS", "reactions:*"))
if collection_keys:
await self._redis.execute("DEL", *collection_keys)
logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей реакций")
# Точечная инвалидация для каждой реакции
for reaction_id in self.items_to_revalidate["reactions"]:
if reaction_id != "all":
specific_keys = [f"reaction:id:{reaction_id}"]
for key in specific_keys:
await self._redis.execute("DEL", key)
logger.debug(f"Удален ключ кеша {key}")
else:
# Точечная инвалидация для каждой реакции
for reaction_id in self.items_to_revalidate["reactions"]:
if reaction_id != "all":
specific_keys = [f"reaction:id:{reaction_id}"]
for key in specific_keys:
await self._redis.execute("DEL", key)
logger.debug(f"Удален ключ кеша {key}")
self.items_to_revalidate["reactions"].clear()
def mark_for_revalidation(self, entity_id, entity_type):
"""Отметить сущность для ревалидации."""
self.items_to_revalidate[entity_type].add(entity_id)
if entity_id and entity_type:
self.items_to_revalidate[entity_type].add(entity_id)
def invalidate_all(self, entity_type):
"""Пометить для инвалидации все элементы указанного типа."""
logger.debug(f"Marking all {entity_type} for invalidation")
# Особый флаг для полной инвалидации
self.items_to_revalidate[entity_type].add("all")
async def stop(self):
"""Остановка фонового воркера."""
@@ -60,4 +154,4 @@ class CacheRevalidationManager:
pass
revalidation_manager = CacheRevalidationManager(interval=300) # Ревалидация каждые 5 минут
revalidation_manager = CacheRevalidationManager()