feat(search.py): change to txtai server, with ai model. And fix granian workers

This commit is contained in:
Stepan Vladovskiy 2025-03-05 20:08:21 +00:00
parent 20eba36c65
commit d55448398d
6 changed files with 211 additions and 182 deletions

View File

@ -29,7 +29,17 @@ jobs:
if: github.ref == 'refs/heads/dev' if: github.ref == 'refs/heads/dev'
uses: dokku/github-action@master uses: dokku/github-action@master
with: with:
branch: 'dev' branch: 'main'
force: true force: true
git_remote_url: 'ssh://dokku@v2.discours.io:22/core' git_remote_url: 'ssh://dokku@v2.discours.io:22/core'
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }} ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}
- name: Push to dokku for staging branch
if: github.ref == 'refs/heads/staging'
uses: dokku/github-action@master
with:
branch: 'main'
force: true
git_remote_url: 'ssh://dokku@staging.discours.io:22/core
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}
git_push_flags: '--force'

3
.gitignore vendored
View File

@ -161,4 +161,5 @@ views.json
*.key *.key
*.crt *.crt
*cache.json *cache.json
.cursor .cursor
.devcontainer/

View File

@ -17,7 +17,8 @@ from cache.revalidator import revalidation_manager
from services.exception import ExceptionHandlerMiddleware from services.exception import ExceptionHandlerMiddleware
from services.redis import redis from services.redis import redis
from services.schema import create_all_tables, resolvers from services.schema import create_all_tables, resolvers
from services.search import search_service #from services.search import search_service
from services.search import search_service, initialize_search_index
from services.viewed import ViewedStorage from services.viewed import ViewedStorage
from services.webhook import WebhookEndpoint, create_webhook_endpoint from services.webhook import WebhookEndpoint, create_webhook_endpoint
from settings import DEV_SERVER_PID_FILE_NAME, MODE from settings import DEV_SERVER_PID_FILE_NAME, MODE
@ -47,6 +48,12 @@ async def lifespan(_app):
start(), start(),
revalidation_manager.start(), revalidation_manager.start(),
) )
# After basic initialization is complete, fetch shouts and initialize search
from services.db import fetch_all_shouts # Import your database access function
all_shouts = await fetch_all_shouts() # Replace with your actual function
await initialize_search_index(all_shouts)
yield yield
finally: finally:
tasks = [redis.disconnect(), ViewedStorage.stop(), revalidation_manager.stop()] tasks = [redis.disconnect(), ViewedStorage.stop(), revalidation_manager.stop()]

View File

@ -17,6 +17,10 @@ gql
ariadne ariadne
granian granian
# NLP and search
txtai[embeddings]
sentence-transformers
pydantic pydantic
fakeredis fakeredis
pytest pytest

View File

@ -3,7 +3,8 @@ from pathlib import Path
from granian.constants import Interfaces from granian.constants import Interfaces
from granian.log import LogLevels from granian.log import LogLevels
from granian.server import Granian from granian.server import Server
from sentence_transformers import SentenceTransformer
from settings import PORT from settings import PORT
from utils.logger import root_logger as logger from utils.logger import root_logger as logger
@ -11,12 +12,17 @@ from utils.logger import root_logger as logger
if __name__ == "__main__": if __name__ == "__main__":
logger.info("started") logger.info("started")
try: try:
granian_instance = Granian( # Preload the model before starting the server
logger.info("Loading sentence transformer model...")
model = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2')
logger.info("Model loaded successfully!")
granian_instance = Server(
"main:app", "main:app",
address="0.0.0.0", address="0.0.0.0",
port=PORT, port=PORT,
interface=Interfaces.ASGI, interface=Interfaces.ASGI,
threads=4, workers=4,
websockets=False, websockets=False,
log_level=LogLevels.debug, log_level=LogLevels.debug,
backlog=2048, backlog=2048,

View File

@ -2,8 +2,9 @@ import asyncio
import json import json
import logging import logging
import os import os
import concurrent.futures
from opensearchpy import OpenSearch from txtai.embeddings import Embeddings
from services.redis import redis from services.redis import redis
from utils.encoders import CustomJSONEncoder from utils.encoders import CustomJSONEncoder
@ -12,220 +13,220 @@ from utils.encoders import CustomJSONEncoder
logger = logging.getLogger("search") logger = logging.getLogger("search")
logger.setLevel(logging.WARNING) logger.setLevel(logging.WARNING)
ELASTIC_HOST = os.environ.get("ELASTIC_HOST", "").replace("https://", "")
ELASTIC_USER = os.environ.get("ELASTIC_USER", "")
ELASTIC_PASSWORD = os.environ.get("ELASTIC_PASSWORD", "")
ELASTIC_PORT = os.environ.get("ELASTIC_PORT", 9200)
ELASTIC_URL = os.environ.get(
"ELASTIC_URL",
f"https://{ELASTIC_USER}:{ELASTIC_PASSWORD}@{ELASTIC_HOST}:{ELASTIC_PORT}",
)
REDIS_TTL = 86400 # 1 день в секундах REDIS_TTL = 86400 # 1 день в секундах
index_settings = { # Configuration for txtai search
"settings": { SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"])
"index": {"number_of_shards": 1, "auto_expand_replicas": "0-all"}, # Thread executor for non-blocking initialization
"analysis": { thread_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
"analyzer": {
"ru": {
"tokenizer": "standard",
"filter": ["lowercase", "ru_stop", "ru_stemmer"],
}
},
"filter": {
"ru_stemmer": {"type": "stemmer", "language": "russian"},
"ru_stop": {"type": "stop", "stopwords": "_russian_"},
},
},
},
"mappings": {
"properties": {
"body": {"type": "text", "analyzer": "ru"},
"title": {"type": "text", "analyzer": "ru"},
"subtitle": {"type": "text", "analyzer": "ru"},
"lead": {"type": "text", "analyzer": "ru"},
"media": {"type": "text", "analyzer": "ru"},
}
},
}
expected_mapping = index_settings["mappings"]
# Создание цикла событий
search_loop = asyncio.get_event_loop()
# В начале файла добавим флаг
SEARCH_ENABLED = bool(os.environ.get("ELASTIC_HOST", ""))
def get_indices_stats():
indices_stats = search_service.client.cat.indices(format="json")
for index_info in indices_stats:
index_name = index_info["index"]
if not index_name.startswith("."):
index_health = index_info["health"]
index_status = index_info["status"]
pri_shards = index_info["pri"]
rep_shards = index_info["rep"]
docs_count = index_info["docs.count"]
docs_deleted = index_info["docs.deleted"]
store_size = index_info["store.size"]
pri_store_size = index_info["pri.store.size"]
logger.info(f"Index: {index_name}")
logger.info(f"Health: {index_health}")
logger.info(f"Status: {index_status}")
logger.info(f"Primary Shards: {pri_shards}")
logger.info(f"Replica Shards: {rep_shards}")
logger.info(f"Documents Count: {docs_count}")
logger.info(f"Deleted Documents: {docs_deleted}")
logger.info(f"Store Size: {store_size}")
logger.info(f"Primary Store Size: {pri_store_size}")
class SearchService: class SearchService:
def __init__(self, index_name="search_index"): def __init__(self, index_name="search_index"):
logger.info("Инициализируем поиск...") logger.info("Инициализируем поиск...")
self.index_name = index_name self.index_name = index_name
self.client = None self.embeddings = None
self.lock = asyncio.Lock() self._initialization_future = None
self.available = SEARCH_ENABLED
# Инициализация клиента OpenSearch только если поиск включен
if SEARCH_ENABLED: if not self.available:
try: logger.info("Поиск отключен (SEARCH_ENABLED = False)")
self.client = OpenSearch( return
hosts=[{"host": ELASTIC_HOST, "port": ELASTIC_PORT}],
http_compress=True, # Initialize embeddings in background thread
http_auth=(ELASTIC_USER, ELASTIC_PASSWORD), self._initialization_future = thread_executor.submit(self._init_embeddings)
use_ssl=True,
verify_certs=False, def _init_embeddings(self):
ssl_assert_hostname=False, """Initialize txtai embeddings in a background thread"""
ssl_show_warn=False, try:
) # Use the same model as in TopicClassifier
logger.info("Клиент OpenSearch.org подключен") model_path = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
search_loop.create_task(self.check_index())
except Exception as exc: # Configure embeddings with content storage and quantization for lower memory usage
logger.warning(f"Поиск отключен из-за ошибки подключения: {exc}") self.embeddings = Embeddings({
self.client = None "path": model_path,
else: "content": True,
logger.info("Поиск отключен (ELASTIC_HOST не установлен)") "quantize": True
})
logger.info("txtai embeddings initialized successfully")
return True
except Exception as e:
logger.error(f"Failed to initialize txtai embeddings: {e}")
self.available = False
return False
async def info(self): async def info(self):
if not SEARCH_ENABLED: """Return information about search service"""
if not self.available:
return {"status": "disabled"} return {"status": "disabled"}
try: try:
return get_indices_stats() if not self.is_ready():
return {"status": "initializing", "model": "paraphrase-multilingual-mpnet-base-v2"}
return {
"status": "active",
"count": len(self.embeddings) if self.embeddings else 0,
"model": "paraphrase-multilingual-mpnet-base-v2"
}
except Exception as e: except Exception as e:
logger.error(f"Failed to get search info: {e}") logger.error(f"Failed to get search info: {e}")
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
def delete_index(self): def is_ready(self):
if self.client: """Check if embeddings are fully initialized and ready"""
logger.warning(f"[!!!] Удаляем индекс {self.index_name}") return self.embeddings is not None and self.available
self.client.indices.delete(index=self.index_name, ignore_unavailable=True)
def create_index(self):
if self.client:
logger.info(f"Создается индекс: {self.index_name}")
self.client.indices.create(index=self.index_name, body=index_settings)
logger.info(f"Индекс {self.index_name} создан")
async def check_index(self):
if self.client:
logger.info(f"Проверяем индекс {self.index_name}...")
if not self.client.indices.exists(index=self.index_name):
self.create_index()
self.client.indices.put_mapping(index=self.index_name, body=expected_mapping)
else:
logger.info(f"Найден существующий индекс {self.index_name}")
# Проверка и обновление структуры индекса, если необходимо
result = self.client.indices.get_mapping(index=self.index_name)
if isinstance(result, str):
result = json.loads(result)
if isinstance(result, dict):
mapping = result.get(self.index_name, {}).get("mappings")
logger.info(f"Найдена структура индексации: {mapping['properties'].keys()}")
expected_keys = expected_mapping["properties"].keys()
if mapping and mapping["properties"].keys() != expected_keys:
logger.info(f"Ожидаемая структура индексации: {expected_mapping}")
logger.warning("[!!!] Требуется переиндексация всех данных")
self.delete_index()
self.client = None
else:
logger.error("клиент не инициализован, невозможно проверить индекс")
def index(self, shout): def index(self, shout):
if not SEARCH_ENABLED: """Index a single document"""
if not self.available:
return return
if self.client: logger.info(f"Индексируем пост {shout.id}")
logger.info(f"Индексируем пост {shout.id}")
index_body = { # Start in background to not block
"body": shout.body, asyncio.create_task(self.perform_index(shout))
"title": shout.title,
"subtitle": shout.subtitle,
"lead": shout.lead,
"media": shout.media,
}
asyncio.create_task(self.perform_index(shout, index_body))
async def perform_index(self, shout, index_body): async def perform_index(self, shout):
if self.client: """Actually perform the indexing operation"""
try: if not self.is_ready():
await asyncio.wait_for( # If embeddings not ready, wait for initialization
self.client.index(index=self.index_name, id=str(shout.id), body=index_body), timeout=40.0 if self._initialization_future and not self._initialization_future.done():
) try:
except asyncio.TimeoutError: # Wait for initialization to complete with timeout
logger.error(f"Indexing timeout for shout {shout.id}") await asyncio.get_event_loop().run_in_executor(
except Exception as e: None, lambda: self._initialization_future.result(timeout=30))
logger.error(f"Indexing error for shout {shout.id}: {e}") except Exception as e:
logger.error(f"Embeddings initialization failed: {e}")
return
if not self.is_ready():
logger.error(f"Cannot index shout {shout.id}: embeddings not ready")
return
try:
# Combine all text fields
text = " ".join(filter(None, [
shout.title or "",
shout.subtitle or "",
shout.lead or "",
shout.body or "",
shout.media or ""
]))
# Use upsert for individual documents
await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.embeddings.upsert([(str(shout.id), text, None)])
)
logger.info(f"Пост {shout.id} успешно индексирован")
except Exception as e:
logger.error(f"Indexing error for shout {shout.id}: {e}")
async def bulk_index(self, shouts):
"""Index multiple documents at once"""
if not self.available or not shouts:
return
if not self.is_ready():
# Wait for initialization if needed
if self._initialization_future and not self._initialization_future.done():
try:
await asyncio.get_event_loop().run_in_executor(
None, lambda: self._initialization_future.result(timeout=30))
except Exception as e:
logger.error(f"Embeddings initialization failed: {e}")
return
if not self.is_ready():
logger.error("Cannot perform bulk indexing: embeddings not ready")
return
documents = []
for shout in shouts:
text = " ".join(filter(None, [
shout.title or "",
shout.subtitle or "",
shout.lead or "",
shout.body or "",
shout.media or ""
]))
documents.append((str(shout.id), text, None))
try:
await asyncio.get_event_loop().run_in_executor(
None, lambda: self.embeddings.upsert(documents))
logger.info(f"Bulk indexed {len(documents)} documents")
except Exception as e:
logger.error(f"Bulk indexing error: {e}")
async def search(self, text, limit, offset): async def search(self, text, limit, offset):
if not SEARCH_ENABLED: """Search documents"""
if not self.available:
return [] return []
# Check Redis cache first
redis_key = f"search:{text}:{offset}+{limit}"
cached = await redis.get(redis_key)
if cached:
return json.loads(cached)
logger.info(f"Ищем: {text} {offset}+{limit}") logger.info(f"Ищем: {text} {offset}+{limit}")
search_body = {
"query": {"multi_match": {"query": text, "fields": ["title", "lead", "subtitle", "body", "media"]}} if not self.is_ready():
} # Wait for initialization if needed
if self._initialization_future and not self._initialization_future.done():
if self.client: try:
search_response = self.client.search( await asyncio.get_event_loop().run_in_executor(
index=self.index_name, None, lambda: self._initialization_future.result(timeout=30))
body=search_body, except Exception as e:
size=limit, logger.error(f"Embeddings initialization failed: {e}")
from_=offset, return []
_source=False,
_source_excludes=["title", "body", "subtitle", "media", "lead", "_index"], if not self.is_ready():
) logger.error("Cannot search: embeddings not ready")
hits = search_response["hits"]["hits"] return []
results = [{"id": hit["_id"], "score": hit["_score"]} for hit in hits]
try:
# если результаты не пустые # Search with txtai (need to request more to handle offset)
if results: total = offset + limit
# Кэширование в Redis с TTL results = await asyncio.get_event_loop().run_in_executor(
redis_key = f"search:{text}:{offset}+{limit}" None, lambda: self.embeddings.search(text, total))
# Apply offset and convert to the expected format
results = results[offset:offset+limit]
formatted_results = [{"id": doc_id, "score": float(score)} for score, doc_id in results]
# Cache results
if formatted_results:
await redis.execute( await redis.execute(
"SETEX", "SETEX",
redis_key, redis_key,
REDIS_TTL, REDIS_TTL,
json.dumps(results, cls=CustomJSONEncoder), json.dumps(formatted_results, cls=CustomJSONEncoder),
) )
return results return formatted_results
return [] except Exception as e:
logger.error(f"Search error: {e}")
return []
# Create the search service singleton
search_service = SearchService() search_service = SearchService()
# Keep the API exactly the same to maintain compatibility
async def search_text(text: str, limit: int = 50, offset: int = 0): async def search_text(text: str, limit: int = 50, offset: int = 0):
payload = [] payload = []
if search_service.client: if search_service.available:
# Использование метода search_post из OpenSearchService
payload = await search_service.search(text, limit, offset) payload = await search_service.search(text, limit, offset)
return payload return payload
# Проверить что URL корректный # Function to initialize search with existing data
OPENSEARCH_URL = os.getenv("OPENSEARCH_URL", "rc1a-3n5pi3bhuj9gieel.mdb.yandexcloud.net") async def initialize_search_index(shouts_data):
"""Initialize search index with existing data during application startup"""
if SEARCH_ENABLED:
logger.info("Initializing search index with existing data...")
await search_service.bulk_index(shouts_data)
logger.info(f"Search index initialized with {len(shouts_data)} documents")