From f249752db5b53c5ee359760cde7263cdf0ba7203 Mon Sep 17 00:00:00 2001 From: Stepan Vladovskiy Date: Wed, 12 Mar 2025 12:06:09 -0300 Subject: [PATCH] feat: moved txtai and search procedure in different instance --- main.py | 10 +++- requirements.txt | 3 +- server.py | 7 +-- services/search.py | 144 ++++++++++++--------------------------------- 4 files changed, 48 insertions(+), 116 deletions(-) diff --git a/main.py b/main.py index 7c4a722f..6f88b382 100644 --- a/main.py +++ b/main.py @@ -35,6 +35,14 @@ async def start(): f.write(str(os.getpid())) print(f"[main] process started in {MODE} mode") +async def check_search_service(): + """Check if search service is available and log result""" + info = await search_service.info() + if info.get("status") in ["error", "unavailable"]: + print(f"[WARNING] Search service unavailable: {info.get('message', 'unknown reason')}") + else: + print(f"[INFO] Search service is available: {info}") + async def lifespan(_app): try: @@ -44,7 +52,7 @@ async def lifespan(_app): precache_data(), ViewedStorage.init(), create_webhook_endpoint(), - search_service.info(), + check_search_service(), start(), revalidation_manager.start(), ) diff --git a/requirements.txt b/requirements.txt index ccab19f3..40212b99 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,8 +18,7 @@ ariadne granian # NLP and search -txtai[embeddings] -sentence-transformers +httpx pydantic fakeredis diff --git a/server.py b/server.py index e34609b1..0ba5b97c 100644 --- a/server.py +++ b/server.py @@ -4,7 +4,6 @@ from pathlib import Path from granian.constants import Interfaces from granian.log import LogLevels from granian.server import Server -from sentence_transformers import SentenceTransformer from settings import PORT from utils.logger import root_logger as logger @@ -12,11 +11,7 @@ from utils.logger import root_logger as logger if __name__ == "__main__": logger.info("started") try: - # Preload the model before starting the server - logger.info("Loading sentence transformer model...") - model = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2') - logger.info("Model loaded successfully!") - + granian_instance = Server( "main:app", address="0.0.0.0", diff --git a/services/search.py b/services/search.py index b8b97b60..4328ce4c 100644 --- a/services/search.py +++ b/services/search.py @@ -2,9 +2,7 @@ import asyncio import json import logging import os -import concurrent.futures - -from txtai.embeddings import Embeddings +import httpx from services.redis import redis from utils.encoders import CustomJSONEncoder @@ -13,96 +11,53 @@ from utils.encoders import CustomJSONEncoder logger = logging.getLogger("search") logger.setLevel(logging.WARNING) -REDIS_TTL = 86400 # 1 день в секундах +REDIS_TTL = 86400 # 1 day in seconds -# Configuration for txtai search +# Configuration for search service SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"]) -# Thread executor for non-blocking initialization -thread_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) +TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL", "http://txtai-service:8000") class SearchService: - def __init__(self, index_name="search_index"): - logger.info("Инициализируем поиск...") - self.index_name = index_name - self.embeddings = None - self._initialization_future = None + def __init__(self): + logger.info("Initializing search service...") self.available = SEARCH_ENABLED + self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL) if not self.available: - logger.info("Поиск отключен (SEARCH_ENABLED = False)") - return + logger.info("Search disabled (SEARCH_ENABLED = False)") - # Initialize embeddings in background thread - self._initialization_future = thread_executor.submit(self._init_embeddings) - - def _init_embeddings(self): - """Initialize txtai embeddings in a background thread""" - try: - # Use the same model as in TopicClassifier - model_path = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2" - - # Configure embeddings with content storage and quantization for lower memory usage - self.embeddings = Embeddings({ - "path": model_path, - "content": True, - "quantize": True - }) - logger.info("txtai embeddings initialized successfully") - return True - except Exception as e: - logger.error(f"Failed to initialize txtai embeddings: {e}") - self.available = False - return False - async def info(self): """Return information about search service""" if not self.available: return {"status": "disabled"} try: - if not self.is_ready(): - return {"status": "initializing", "model": "paraphrase-multilingual-mpnet-base-v2"} - - return { - "status": "active", - "count": len(self.embeddings) if self.embeddings else 0, - "model": "paraphrase-multilingual-mpnet-base-v2" - } + response = await self.client.get("/info") + response.raise_for_status() + return response.json() except Exception as e: logger.error(f"Failed to get search info: {e}") return {"status": "error", "message": str(e)} def is_ready(self): - """Check if embeddings are fully initialized and ready""" - return self.embeddings is not None and self.available + """Check if service is available""" + return self.available def index(self, shout): """Index a single document""" if not self.available: return - logger.info(f"Индексируем пост {shout.id}") + logger.info(f"Indexing post {shout.id}") # Start in background to not block asyncio.create_task(self.perform_index(shout)) async def perform_index(self, shout): """Actually perform the indexing operation""" - if not self.is_ready(): - # If embeddings not ready, wait for initialization - if self._initialization_future and not self._initialization_future.done(): - try: - # Wait for initialization to complete with timeout - await asyncio.get_event_loop().run_in_executor( - None, lambda: self._initialization_future.result(timeout=30)) - except Exception as e: - logger.error(f"Embeddings initialization failed: {e}") - return - - if not self.is_ready(): - logger.error(f"Cannot index shout {shout.id}: embeddings not ready") - return + if not self.available: + return try: # Combine all text fields @@ -114,12 +69,13 @@ class SearchService: shout.media or "" ])) - # Use upsert for individual documents - await asyncio.get_event_loop().run_in_executor( - None, - lambda: self.embeddings.upsert([(str(shout.id), text, None)]) + # Send to txtai service + response = await self.client.post( + "/index", + json={"id": str(shout.id), "text": text} ) - logger.info(f"Пост {shout.id} успешно индексирован") + response.raise_for_status() + logger.info(f"Post {shout.id} successfully indexed") except Exception as e: logger.error(f"Indexing error for shout {shout.id}: {e}") @@ -127,20 +83,6 @@ class SearchService: """Index multiple documents at once""" if not self.available or not shouts: return - - if not self.is_ready(): - # Wait for initialization if needed - if self._initialization_future and not self._initialization_future.done(): - try: - await asyncio.get_event_loop().run_in_executor( - None, lambda: self._initialization_future.result(timeout=30)) - except Exception as e: - logger.error(f"Embeddings initialization failed: {e}") - return - - if not self.is_ready(): - logger.error("Cannot perform bulk indexing: embeddings not ready") - return documents = [] for shout in shouts: @@ -151,11 +93,14 @@ class SearchService: shout.body or "", shout.media or "" ])) - documents.append((str(shout.id), text, None)) + documents.append({"id": str(shout.id), "text": text}) try: - await asyncio.get_event_loop().run_in_executor( - None, lambda: self.embeddings.upsert(documents)) + response = await self.client.post( + "/bulk-index", + json={"documents": documents} + ) + response.raise_for_status() logger.info(f"Bulk indexed {len(documents)} documents") except Exception as e: logger.error(f"Bulk indexing error: {e}") @@ -171,31 +116,16 @@ class SearchService: if cached: return json.loads(cached) - logger.info(f"Ищем: {text} {offset}+{limit}") - - if not self.is_ready(): - # Wait for initialization if needed - if self._initialization_future and not self._initialization_future.done(): - try: - await asyncio.get_event_loop().run_in_executor( - None, lambda: self._initialization_future.result(timeout=30)) - except Exception as e: - logger.error(f"Embeddings initialization failed: {e}") - return [] - - if not self.is_ready(): - logger.error("Cannot search: embeddings not ready") - return [] + logger.info(f"Searching: {text} {offset}+{limit}") try: - # Search with txtai (need to request more to handle offset) - total = offset + limit - results = await asyncio.get_event_loop().run_in_executor( - None, lambda: self.embeddings.search(text, total)) - - # Apply offset and convert to the expected format - results = results[offset:offset+limit] - formatted_results = [{"id": doc_id, "score": float(score)} for score, doc_id in results] + response = await self.client.post( + "/search", + json={"text": text, "limit": limit, "offset": offset} + ) + response.raise_for_status() + result = response.json() + formatted_results = result.get("results", []) # Cache results if formatted_results: @@ -229,4 +159,4 @@ async def initialize_search_index(shouts_data): if SEARCH_ENABLED: logger.info("Initializing search index with existing data...") await search_service.bulk_index(shouts_data) - logger.info(f"Search index initialized with {len(shouts_data)} documents") + logger.info(f"Search index initialized with {len(shouts_data)} documents") \ No newline at end of file