loop-fix-4

This commit is contained in:
Untone 2024-08-07 08:42:59 +03:00
parent 8b1e42de1c
commit 1d4fa4b977
3 changed files with 51 additions and 45 deletions

View File

@ -15,6 +15,7 @@ from services.search import search_service
from services.sentry import start_sentry from services.sentry import start_sentry
from services.viewed import ViewedStorage from services.viewed import ViewedStorage
from services.webhook import WebhookEndpoint from services.webhook import WebhookEndpoint
from services.revalidator import revalidation_manager
from settings import DEV_SERVER_PID_FILE_NAME, MODE from settings import DEV_SERVER_PID_FILE_NAME, MODE
import_module("resolvers") import_module("resolvers")
@ -43,6 +44,7 @@ app = Starlette(
search_service.info, search_service.info,
start_sentry, start_sentry,
start, start,
revalidation_manager.start,
], ],
on_shutdown=[redis.disconnect], on_shutdown=[redis.disconnect],
debug=True, debug=True,

48
services/revalidator.py Normal file
View File

@ -0,0 +1,48 @@
import asyncio
from services.logger import root_logger as logger
from services.cache import get_cached_author, cache_author, cache_topic, get_cached_topic
class CacheRevalidationManager:
"""Управление периодической ревалидацией кэша."""
def __init__(self):
self.items_to_revalidate = {"authors": set(), "topics": set()}
self.revalidation_interval = 60 # Интервал ревалидации в секундах
self.loop = None
def start(self):
self.loop = asyncio.get_event_loop()
self.loop.run_until_complete(self.revalidate_cache())
self.loop.run_forever()
logger.info("[services.revalidator] started infinite loop")
async def revalidate_cache(self):
"""Периодическая ревалидация кэша."""
while True:
await asyncio.sleep(self.revalidation_interval)
await self.process_revalidation()
async def process_revalidation(self):
"""Ревалидация кэша для отмеченных сущностей."""
for entity_type, ids in self.items_to_revalidate.items():
for entity_id in ids:
if entity_type == "authors":
# Ревалидация кэша автора
author = await get_cached_author(entity_id)
if author:
await cache_author(author)
elif entity_type == "topics":
# Ревалидация кэша темы
topic = await get_cached_topic(entity_id)
if topic:
await cache_topic(topic)
ids.clear()
def mark_for_revalidation(self, entity_id, entity_type):
"""Отметить сущность для ревалидации."""
self.items_to_revalidate[entity_type].add(entity_id)
# Инициализация менеджера ревалидации
revalidation_manager = CacheRevalidationManager()

View File

@ -1,56 +1,12 @@
import asyncio
from sqlalchemy import event from sqlalchemy import event
from orm.author import Author, AuthorFollower from orm.author import Author, AuthorFollower
from orm.reaction import Reaction from orm.reaction import Reaction
from orm.shout import Shout, ShoutAuthor from orm.shout import Shout, ShoutAuthor
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from services.cache import cache_author, get_cached_author, cache_topic, get_cached_topic from services.revalidator import revalidation_manager
from services.logger import root_logger as logger from services.logger import root_logger as logger
class CacheRevalidationManager:
"""Управление периодической ревалидацией кэша."""
def __init__(self):
self.items_to_revalidate = {"authors": set(), "topics": set()}
self.revalidation_interval = 60 # Интервал ревалидации в секундах
def start(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self.revalidate_cache())
loop.run_forever()
async def revalidate_cache(self):
"""Периодическая ревалидация кэша."""
while True:
await asyncio.sleep(self.revalidation_interval)
await self.process_revalidation()
async def process_revalidation(self):
"""Ревалидация кэша для отмеченных сущностей."""
for entity_type, ids in self.items_to_revalidate.items():
for entity_id in ids:
if entity_type == "authors":
# Ревалидация кэша автора
author = await get_cached_author(entity_id)
if author:
await cache_author(author)
elif entity_type == "topics":
# Ревалидация кэша темы
topic = await get_cached_topic(entity_id)
if topic:
await cache_topic(topic)
ids.clear()
def mark_for_revalidation(self, entity_id, entity_type):
"""Отметить сущность для ревалидации."""
self.items_to_revalidate[entity_type].add(entity_id)
# Инициализация и запуск менеджера ревалидации
revalidation_manager = CacheRevalidationManager()
revalidation_manager.start()
def after_update_handler(mapper, connection, target): def after_update_handler(mapper, connection, target):
"""Обработчик обновления сущности.""" """Обработчик обновления сущности."""
entity_type = "authors" if isinstance(target, Author) else "topics" if isinstance(target, Topic) else "shouts" entity_type = "authors" if isinstance(target, Author) else "topics" if isinstance(target, Topic) else "shouts"