diff --git a/cache/revalidator.py b/cache/revalidator.py index 46f5193b..64fab6c3 100644 --- a/cache/revalidator.py +++ b/cache/revalidator.py @@ -14,7 +14,7 @@ class CacheRevalidationManager: async def start(self): """Запуск фонового воркера для ревалидации кэша.""" - asyncio.create_task(self.revalidate_cache()) + self.task = asyncio.create_task(self.revalidate_cache()) async def revalidate_cache(self): """Циклическая проверка и ревалидация кэша каждые self.interval секунд.""" @@ -48,9 +48,15 @@ class CacheRevalidationManager: """Отметить сущность для ревалидации.""" self.items_to_revalidate[entity_type].add(entity_id) - def stop(self): + async def stop(self): """Остановка фонового воркера.""" self.running = False + if hasattr(self, 'task'): + self.task.cancel() + try: + await self.task + except asyncio.CancelledError: + pass revalidation_manager = CacheRevalidationManager(interval=300) # Ревалидация каждые 5 минут diff --git a/main.py b/main.py index 6e32879b..af396e79 100644 --- a/main.py +++ b/main.py @@ -74,19 +74,24 @@ async def create_all_tables_async(): async def lifespan(app): - # Запуск всех сервисов при старте приложения - await asyncio.gather( - create_all_tables_async(), - redis.connect(), - precache_data(), - ViewedStorage.init(), - search_service.info(), - start(), - revalidation_manager.start(), - ) - yield - # Остановка сервисов при завершении работы приложения - await redis.disconnect() + try: + await asyncio.gather( + create_all_tables_async(), + redis.connect(), + precache_data(), + ViewedStorage.init(), + search_service.info(), + start(), + revalidation_manager.start(), + ) + yield + finally: + tasks = [ + redis.disconnect(), + ViewedStorage.stop(), + revalidation_manager.stop() + ] + await asyncio.gather(*tasks, return_exceptions=True) # Создаем экземпляр GraphQL diff --git a/server.py b/server.py index 8cc35392..5d3ac508 100644 --- a/server.py +++ b/server.py @@ -1,5 +1,3 @@ -import subprocess - from granian.constants import Interfaces from granian.log import LogLevels from granian.server import Granian @@ -8,23 +6,24 @@ from settings import PORT from utils.logger import root_logger as logger -def is_docker_container_running(name): - cmd = ["docker", "ps", "-f", f"name={name}"] - output = subprocess.run(cmd, capture_output=True, text=True).stdout - logger.info(output) - return name in output - - if __name__ == "__main__": logger.info("started") - granian_instance = Granian( - "main:app", - address="0.0.0.0", # noqa S104 - port=PORT, - interface=Interfaces.ASGI, - threads=4, - websockets=False, - log_level=LogLevels.debug, - ) - granian_instance.serve() + try: + granian_instance = Granian( + "main:app", + address="0.0.0.0", + port=PORT, + interface=Interfaces.ASGI, + threads=4, + websockets=False, + log_level=LogLevels.debug, + backlog=2048, + ) + + granian_instance.serve() + except Exception as error: + logger.error(f"Granian error: {error}", exc_info=True) + raise + finally: + logger.info("stopped") diff --git a/services/db.py b/services/db.py index 8a8822d6..edbe7949 100644 --- a/services/db.py +++ b/services/db.py @@ -22,7 +22,11 @@ if DB_URL.startswith("postgres"): max_overflow=20, pool_timeout=30, # Время ожидания свободного соединения pool_recycle=1800, # Время жизни соединения - connect_args={"sslmode": "disable"}, + pool_pre_ping=True, # Добавить проверку соединений + connect_args={ + "sslmode": "disable", + "connect_timeout": 40 # Добавить таймаут подключения + } ) else: engine = create_engine(DB_URL, echo=False, connect_args={"check_same_thread": False}) diff --git a/services/search.py b/services/search.py index e1e9e0de..65054cec 100644 --- a/services/search.py +++ b/services/search.py @@ -166,7 +166,19 @@ class SearchService: async def perform_index(self, shout, index_body): if self.client: - self.client.index(index=self.index_name, id=str(shout.id), body=index_body) + try: + await asyncio.wait_for( + self.client.index( + index=self.index_name, + id=str(shout.id), + body=index_body + ), + timeout=40.0 + ) + except asyncio.TimeoutError: + logger.error(f"Indexing timeout for shout {shout.id}") + except Exception as e: + logger.error(f"Indexing error for shout {shout.id}: {e}") async def search(self, text, limit, offset): logger.info(f"Ищем: {text} {offset}+{limit}") diff --git a/services/viewed.py b/services/viewed.py index c9d8f5be..3ed0018f 100644 --- a/services/viewed.py +++ b/services/viewed.py @@ -37,6 +37,12 @@ class ViewedStorage: auth_result = None disabled = False start_date = datetime.now().strftime("%Y-%m-%d") + running = True + + @staticmethod + async def stop(): + self = ViewedStorage + self.running = False @staticmethod async def init(): @@ -196,22 +202,26 @@ class ViewedStorage: if self.disabled: return - while True: - try: - await self.update_pages() - failed = 0 - except Exception as exc: - failed += 1 - logger.debug(exc) - logger.info(" - update failed #%d, wait 10 secs" % failed) - if failed > 3: - logger.info(" - views update failed, not trying anymore") - break - if failed == 0: - when = datetime.now(timezone.utc) + timedelta(seconds=self.period) - t = format(when.astimezone().isoformat()) - logger.info(" ⎩ next update: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])) - await asyncio.sleep(self.period) - else: - await asyncio.sleep(10) - logger.info(" - try to update views again") + try: + while self.running: + try: + await self.update_pages() + failed = 0 + except Exception as exc: + failed += 1 + logger.debug(exc) + logger.warning(" - update failed #%d, wait 10 secs" % failed) + if failed > 3 or isinstance(exc, asyncio.CancelledError): + logger.error("ViewedStorage worker cancelled") + break + finally: + self.running = False + + if failed == 0: + when = datetime.now(timezone.utc) + timedelta(seconds=self.period) + t = format(when.astimezone().isoformat()) + logger.info(" ⎩ next update: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])) + await asyncio.sleep(self.period) + else: + await asyncio.sleep(10) + logger.info(" - try to update views again")