feat: moved txtai and search procedure in different instance
All checks were successful
Deploy on push / deploy (push) Successful in 2m18s

This commit is contained in:
Stepan Vladovskiy 2025-03-12 12:06:09 -03:00
parent c0b2116da2
commit f249752db5
4 changed files with 48 additions and 116 deletions

10
main.py
View File

@ -35,6 +35,14 @@ async def start():
f.write(str(os.getpid())) f.write(str(os.getpid()))
print(f"[main] process started in {MODE} mode") print(f"[main] process started in {MODE} mode")
async def check_search_service():
"""Check if search service is available and log result"""
info = await search_service.info()
if info.get("status") in ["error", "unavailable"]:
print(f"[WARNING] Search service unavailable: {info.get('message', 'unknown reason')}")
else:
print(f"[INFO] Search service is available: {info}")
async def lifespan(_app): async def lifespan(_app):
try: try:
@ -44,7 +52,7 @@ async def lifespan(_app):
precache_data(), precache_data(),
ViewedStorage.init(), ViewedStorage.init(),
create_webhook_endpoint(), create_webhook_endpoint(),
search_service.info(), check_search_service(),
start(), start(),
revalidation_manager.start(), revalidation_manager.start(),
) )

View File

@ -18,8 +18,7 @@ ariadne
granian granian
# NLP and search # NLP and search
txtai[embeddings] httpx
sentence-transformers
pydantic pydantic
fakeredis fakeredis

View File

@ -4,7 +4,6 @@ from pathlib import Path
from granian.constants import Interfaces from granian.constants import Interfaces
from granian.log import LogLevels from granian.log import LogLevels
from granian.server import Server from granian.server import Server
from sentence_transformers import SentenceTransformer
from settings import PORT from settings import PORT
from utils.logger import root_logger as logger from utils.logger import root_logger as logger
@ -12,11 +11,7 @@ from utils.logger import root_logger as logger
if __name__ == "__main__": if __name__ == "__main__":
logger.info("started") logger.info("started")
try: try:
# Preload the model before starting the server
logger.info("Loading sentence transformer model...")
model = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2')
logger.info("Model loaded successfully!")
granian_instance = Server( granian_instance = Server(
"main:app", "main:app",
address="0.0.0.0", address="0.0.0.0",

View File

@ -2,9 +2,7 @@ import asyncio
import json import json
import logging import logging
import os import os
import concurrent.futures import httpx
from txtai.embeddings import Embeddings
from services.redis import redis from services.redis import redis
from utils.encoders import CustomJSONEncoder from utils.encoders import CustomJSONEncoder
@ -13,96 +11,53 @@ from utils.encoders import CustomJSONEncoder
logger = logging.getLogger("search") logger = logging.getLogger("search")
logger.setLevel(logging.WARNING) logger.setLevel(logging.WARNING)
REDIS_TTL = 86400 # 1 день в секундах REDIS_TTL = 86400 # 1 day in seconds
# Configuration for txtai search # Configuration for search service
SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"]) SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"])
# Thread executor for non-blocking initialization TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL", "http://txtai-service:8000")
thread_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
class SearchService: class SearchService:
def __init__(self, index_name="search_index"): def __init__(self):
logger.info("Инициализируем поиск...") logger.info("Initializing search service...")
self.index_name = index_name
self.embeddings = None
self._initialization_future = None
self.available = SEARCH_ENABLED self.available = SEARCH_ENABLED
self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL)
if not self.available: if not self.available:
logger.info("Поиск отключен (SEARCH_ENABLED = False)") logger.info("Search disabled (SEARCH_ENABLED = False)")
return
# Initialize embeddings in background thread
self._initialization_future = thread_executor.submit(self._init_embeddings)
def _init_embeddings(self):
"""Initialize txtai embeddings in a background thread"""
try:
# Use the same model as in TopicClassifier
model_path = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
# Configure embeddings with content storage and quantization for lower memory usage
self.embeddings = Embeddings({
"path": model_path,
"content": True,
"quantize": True
})
logger.info("txtai embeddings initialized successfully")
return True
except Exception as e:
logger.error(f"Failed to initialize txtai embeddings: {e}")
self.available = False
return False
async def info(self): async def info(self):
"""Return information about search service""" """Return information about search service"""
if not self.available: if not self.available:
return {"status": "disabled"} return {"status": "disabled"}
try: try:
if not self.is_ready(): response = await self.client.get("/info")
return {"status": "initializing", "model": "paraphrase-multilingual-mpnet-base-v2"} response.raise_for_status()
return response.json()
return {
"status": "active",
"count": len(self.embeddings) if self.embeddings else 0,
"model": "paraphrase-multilingual-mpnet-base-v2"
}
except Exception as e: except Exception as e:
logger.error(f"Failed to get search info: {e}") logger.error(f"Failed to get search info: {e}")
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
def is_ready(self): def is_ready(self):
"""Check if embeddings are fully initialized and ready""" """Check if service is available"""
return self.embeddings is not None and self.available return self.available
def index(self, shout): def index(self, shout):
"""Index a single document""" """Index a single document"""
if not self.available: if not self.available:
return return
logger.info(f"Индексируем пост {shout.id}") logger.info(f"Indexing post {shout.id}")
# Start in background to not block # Start in background to not block
asyncio.create_task(self.perform_index(shout)) asyncio.create_task(self.perform_index(shout))
async def perform_index(self, shout): async def perform_index(self, shout):
"""Actually perform the indexing operation""" """Actually perform the indexing operation"""
if not self.is_ready(): if not self.available:
# If embeddings not ready, wait for initialization return
if self._initialization_future and not self._initialization_future.done():
try:
# Wait for initialization to complete with timeout
await asyncio.get_event_loop().run_in_executor(
None, lambda: self._initialization_future.result(timeout=30))
except Exception as e:
logger.error(f"Embeddings initialization failed: {e}")
return
if not self.is_ready():
logger.error(f"Cannot index shout {shout.id}: embeddings not ready")
return
try: try:
# Combine all text fields # Combine all text fields
@ -114,12 +69,13 @@ class SearchService:
shout.media or "" shout.media or ""
])) ]))
# Use upsert for individual documents # Send to txtai service
await asyncio.get_event_loop().run_in_executor( response = await self.client.post(
None, "/index",
lambda: self.embeddings.upsert([(str(shout.id), text, None)]) json={"id": str(shout.id), "text": text}
) )
logger.info(f"Пост {shout.id} успешно индексирован") response.raise_for_status()
logger.info(f"Post {shout.id} successfully indexed")
except Exception as e: except Exception as e:
logger.error(f"Indexing error for shout {shout.id}: {e}") logger.error(f"Indexing error for shout {shout.id}: {e}")
@ -127,20 +83,6 @@ class SearchService:
"""Index multiple documents at once""" """Index multiple documents at once"""
if not self.available or not shouts: if not self.available or not shouts:
return return
if not self.is_ready():
# Wait for initialization if needed
if self._initialization_future and not self._initialization_future.done():
try:
await asyncio.get_event_loop().run_in_executor(
None, lambda: self._initialization_future.result(timeout=30))
except Exception as e:
logger.error(f"Embeddings initialization failed: {e}")
return
if not self.is_ready():
logger.error("Cannot perform bulk indexing: embeddings not ready")
return
documents = [] documents = []
for shout in shouts: for shout in shouts:
@ -151,11 +93,14 @@ class SearchService:
shout.body or "", shout.body or "",
shout.media or "" shout.media or ""
])) ]))
documents.append((str(shout.id), text, None)) documents.append({"id": str(shout.id), "text": text})
try: try:
await asyncio.get_event_loop().run_in_executor( response = await self.client.post(
None, lambda: self.embeddings.upsert(documents)) "/bulk-index",
json={"documents": documents}
)
response.raise_for_status()
logger.info(f"Bulk indexed {len(documents)} documents") logger.info(f"Bulk indexed {len(documents)} documents")
except Exception as e: except Exception as e:
logger.error(f"Bulk indexing error: {e}") logger.error(f"Bulk indexing error: {e}")
@ -171,31 +116,16 @@ class SearchService:
if cached: if cached:
return json.loads(cached) return json.loads(cached)
logger.info(f"Ищем: {text} {offset}+{limit}") logger.info(f"Searching: {text} {offset}+{limit}")
if not self.is_ready():
# Wait for initialization if needed
if self._initialization_future and not self._initialization_future.done():
try:
await asyncio.get_event_loop().run_in_executor(
None, lambda: self._initialization_future.result(timeout=30))
except Exception as e:
logger.error(f"Embeddings initialization failed: {e}")
return []
if not self.is_ready():
logger.error("Cannot search: embeddings not ready")
return []
try: try:
# Search with txtai (need to request more to handle offset) response = await self.client.post(
total = offset + limit "/search",
results = await asyncio.get_event_loop().run_in_executor( json={"text": text, "limit": limit, "offset": offset}
None, lambda: self.embeddings.search(text, total)) )
response.raise_for_status()
# Apply offset and convert to the expected format result = response.json()
results = results[offset:offset+limit] formatted_results = result.get("results", [])
formatted_results = [{"id": doc_id, "score": float(score)} for score, doc_id in results]
# Cache results # Cache results
if formatted_results: if formatted_results:
@ -229,4 +159,4 @@ async def initialize_search_index(shouts_data):
if SEARCH_ENABLED: if SEARCH_ENABLED:
logger.info("Initializing search index with existing data...") logger.info("Initializing search index with existing data...")
await search_service.bulk_index(shouts_data) await search_service.bulk_index(shouts_data)
logger.info(f"Search index initialized with {len(shouts_data)} documents") logger.info(f"Search index initialized with {len(shouts_data)} documents")