Files
core/services/search.py
Untone 78bc110685
Some checks failed
Deploy on push / deploy (push) Failing after 5m42s
search-index-fix2
2025-09-10 12:39:00 +03:00

896 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import gzip
import json
import os
import pickle
import time
from pathlib import Path
from typing import Any, Dict, List
import muvera
import numpy as np
from settings import MUVERA_INDEX_NAME, SEARCH_MAX_BATCH_SIZE, SEARCH_PREFETCH_SIZE
from utils.logger import root_logger as logger
# Отложенный импорт SentenceTransformer для избежания блокировки запуска
SentenceTransformer = None
primary_model = "paraphrase-multilingual-MiniLM-L12-v2"
# 💾 Настройка локального кеша для HuggingFace моделей
def get_models_cache_dir() -> str:
"""Определяет лучшую папку для кеша моделей"""
# Пробуем /dump если доступен для записи
dump_path = Path("/dump")
logger.info(
f"🔍 Checking /dump - exists: {dump_path.exists()}, writable: {os.access('/dump', os.W_OK) if dump_path.exists() else 'N/A'}"
)
if dump_path.exists() and os.access("/dump", os.W_OK):
cache_dir = "/dump/huggingface"
try:
Path(cache_dir).mkdir(parents=True, exist_ok=True)
logger.info(f"✅ Using mounted storage: {cache_dir}")
return cache_dir
except Exception as e:
logger.warning(f"Failed to create {cache_dir}: {e}")
# Fallback - локальная папка ./dump
cache_dir = "./dump/huggingface"
Path(cache_dir).mkdir(parents=True, exist_ok=True)
logger.info(f"📁 Using local fallback: {cache_dir}")
return cache_dir
MODELS_CACHE_DIR = get_models_cache_dir()
def get_index_dump_dir() -> str:
"""Определяет лучшую папку для индекса векторного поиска"""
# Приоритет /dump если доступна, иначе ./dump как fallback
return "/dump" if Path("/dump").exists() else "./dump"
# Используем HF_HOME вместо устаревшего TRANSFORMERS_CACHE
os.environ.setdefault("HF_HOME", MODELS_CACHE_DIR)
# Global collection for background tasks
background_tasks: List[asyncio.Task] = []
async def preload_models() -> None:
"""🚀 Асинхронная предзагрузка моделей для кеширования"""
logger.info("🔄 Начинаем предзагрузку моделей...")
# Ждем импорта SentenceTransformer
_lazy_import_sentence_transformers()
if SentenceTransformer is None:
logger.error("❌ SentenceTransformer недоступен для предзагрузки")
return
# Создаем папку для кеша
Path(MODELS_CACHE_DIR).mkdir(parents=True, exist_ok=True)
logger.info(f"📁 Используем кеш директорию: {MODELS_CACHE_DIR}")
# Список моделей для предзагрузки
models = [
"paraphrase-multilingual-MiniLM-L12-v2", # Основная многоязычная модель
"all-MiniLM-L6-v2", # Fallback модель
]
for model_name in models:
try:
# Проверяем, есть ли модель в кеше
if _is_model_cached(model_name):
logger.info(f"🔍 Модель уже в кеше: {model_name}")
continue
logger.info(f"🔽 Загружаем модель: {model_name}")
# Запускаем загрузку в executor чтобы не блокировать event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None, lambda name=model_name: SentenceTransformer(name, cache_folder=MODELS_CACHE_DIR)
)
logger.info(f"✅ Модель загружена: {model_name}")
except Exception as e:
logger.warning(f"Не удалось загрузить {model_name}: {e}")
logger.info("🚀 Предзагрузка моделей завершена!")
def _is_model_cached(model_name: str) -> bool:
"""🔍 Проверяет наличие модели в кеше"""
try:
cache_path = Path(MODELS_CACHE_DIR)
model_cache_name = f"models--sentence-transformers--{model_name}"
model_path = cache_path / model_cache_name
if not model_path.exists():
return False
# Проверяем наличие snapshots папки (новый формат HuggingFace)
snapshots_path = model_path / "snapshots"
if snapshots_path.exists():
# Ищем любой snapshot с config.json
for snapshot_dir in snapshots_path.iterdir():
if snapshot_dir.is_dir():
config_file = snapshot_dir / "config.json"
if config_file.exists():
return True
# Fallback: проверяем старый формат
config_file = model_path / "config.json"
return config_file.exists()
except Exception:
return False
def _lazy_import_sentence_transformers():
"""🔄 Lazy import SentenceTransformer для избежания блокировки старта приложения"""
global SentenceTransformer # noqa: PLW0603
if SentenceTransformer is None:
try:
from sentence_transformers import SentenceTransformer as SentenceTransformerClass
SentenceTransformer = SentenceTransformerClass
logger.info("✅ SentenceTransformer импортирован успешно")
except ImportError as e:
logger.error(f"Не удалось импортировать SentenceTransformer: {e}")
SentenceTransformer = None
return SentenceTransformer
class MuveraWrapper:
"""🔍 Real vector search with SentenceTransformers + FDE encoding"""
def __init__(self, vector_dimension: int = 768, cache_enabled: bool = True, batch_size: int = 100) -> None:
self.vector_dimension = vector_dimension
self.cache_enabled = cache_enabled
self.batch_size = batch_size
self.encoder: Any = None
self.buckets = 128 # Default number of buckets for FDE encoding
self.documents: Dict[str, Dict[str, Any]] = {} # Simple in-memory storage for demo
self.embeddings: Dict[str, np.ndarray | None] = {} # Store encoded embeddings
# 🚀 Откладываем инициализацию модели до первого использования
logger.info("🔄 MuveraWrapper инициализирован - модель будет загружена при первом использовании")
self.encoder = None
self._model_loaded = False
def _ensure_model_loaded(self) -> bool:
"""🔄 Убеждаемся что модель загружена (lazy loading)"""
if self._model_loaded:
return self.encoder is not None
# Импортируем SentenceTransformer при первой необходимости
sentence_transformer_class = _lazy_import_sentence_transformers()
if sentence_transformer_class is None:
logger.error("❌ SentenceTransformer недоступен")
return False
try:
logger.info(f"💾 Using models cache directory: {MODELS_CACHE_DIR}")
# Проверяем наличие основной модели
is_cached = _is_model_cached(primary_model)
if is_cached:
logger.info(f"🔍 Found cached model: {primary_model}")
else:
logger.info(f"🔽 Downloading model: {primary_model}")
# Используем многоязычную модель, хорошо работающую с русским
self.encoder = sentence_transformer_class(
primary_model,
cache_folder=MODELS_CACHE_DIR,
local_files_only=is_cached, # Не скачиваем если уже есть в кеше
)
logger.info("🔍 SentenceTransformer model loaded successfully")
self._model_loaded = True
return True
except Exception as e:
logger.error(f"Failed to load primary SentenceTransformer: {e}")
# Fallback - простая модель
try:
fallback_model = "all-MiniLM-L6-v2"
is_fallback_cached = _is_model_cached(fallback_model)
if is_fallback_cached:
logger.info(f"🔍 Found cached fallback model: {fallback_model}")
else:
logger.info(f"🔽 Downloading fallback model: {fallback_model}")
self.encoder = sentence_transformer_class(
fallback_model, cache_folder=MODELS_CACHE_DIR, local_files_only=is_fallback_cached
)
logger.info("🔍 Fallback SentenceTransformer model loaded")
self._model_loaded = True
return True
except Exception:
logger.error("Failed to load any SentenceTransformer model")
self.encoder = None
self._model_loaded = True # Помечаем как попытка завершена
return False
async def async_init(self) -> None:
"""🔄 Асинхронная инициализация - восстановление индекса из файла"""
try:
logger.info("🔍 Пытаемся восстановить векторный индекс из файла...")
# Определяем лучшую папку для индекса (приоритет /dump)
dump_dir = get_index_dump_dir()
# Пытаемся загрузить из файла
if await self.load_index_from_file(dump_dir):
logger.info("✅ Векторный индекс восстановлен из файла")
else:
logger.info("🔍 Сохраненный индекс не найден, будет создан новый")
except Exception as e:
logger.error(f"❌ Ошибка при восстановлении индекса: {e}")
async def info(self) -> dict:
"""Return service information"""
return {
"vector_dimension": self.vector_dimension,
"buckets": self.buckets,
"documents_count": len(self.documents),
"cache_enabled": self.cache_enabled,
}
async def search(self, query: str, limit: int) -> List[Dict[str, Any]]:
"""🔍 Real vector search using SentenceTransformers + FDE encoding"""
if not query.strip():
return []
# Загружаем модель при первом использовании
if not self._ensure_model_loaded():
logger.warning("🔍 Search unavailable - model not loaded")
return []
try:
# 🚀 Генерируем настоящий эмбединг запроса
query_text = query.strip()
query_embedding = self.encoder.encode(query_text, convert_to_numpy=True)
# Нормализуем размерность для FDE
if query_embedding.ndim == 1:
query_embedding = query_embedding.reshape(1, -1)
# Encode query using FDE
query_fde = muvera.encode_fde(query_embedding, self.buckets, "avg")
# 🔍 Semantic similarity search
results = []
for doc_id, doc_embedding in self.embeddings.items():
if doc_embedding is not None:
# Calculate cosine similarity
similarity = np.dot(query_fde, doc_embedding) / (
np.linalg.norm(query_fde) * np.linalg.norm(doc_embedding) + 1e-8
)
results.append(
{
"id": doc_id,
"score": float(similarity),
"metadata": self.documents.get(doc_id, {}).get("metadata", {}),
}
)
# Sort by score and limit results
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
except Exception as e:
logger.error(f"🔍 Search error: {e}")
return []
async def index(self, documents: List[Dict[str, Any]], silent: bool = False) -> None:
"""🚀 Index documents using real SentenceTransformers + FDE encoding"""
# Загружаем модель при первом использовании
if not self._ensure_model_loaded():
if not silent:
logger.warning("🔍 No encoder available for indexing")
return
# 🤫 Batch mode detection
is_batch = len(documents) > 10
indexed_count = 0
skipped_count = 0
if is_batch:
# 🚀 Batch processing for better performance
valid_docs = []
doc_contents = []
for doc in documents:
doc_id = doc["id"]
self.documents[doc_id] = doc
title = doc.get("title", "").strip()
body = doc.get("body", "").strip()
doc_content = f"{title} {body}".strip()
if doc_content:
valid_docs.append(doc)
doc_contents.append(doc_content)
else:
skipped_count += 1
if doc_contents:
try:
# 🚀 Batch encode all documents at once
batch_embeddings = self.encoder.encode(
doc_contents, convert_to_numpy=True, show_progress_bar=not silent, batch_size=32
)
# Process each embedding
for doc, embedding in zip(valid_docs, batch_embeddings, strict=False):
emb = embedding
doc_id = doc["id"]
# Нормализуем размерность для FDE
if emb.ndim == 1:
emb = emb.reshape(1, -1)
# Encode using FDE
doc_fde = muvera.encode_fde(emb, self.buckets, "avg")
self.embeddings[doc_id] = doc_fde
indexed_count += 1
except Exception as e:
if not silent:
logger.error(f"🔍 Batch encoding error: {e}")
return
else:
# 🔍 Single document processing
for doc in documents:
try:
doc_id = doc["id"]
self.documents[doc_id] = doc
title = doc.get("title", "").strip()
body = doc.get("body", "").strip()
doc_content = f"{title} {body}".strip()
if not doc_content:
if not silent:
logger.warning(f"🔍 Empty content for document {doc_id}")
skipped_count += 1
continue
# 🚀 Single document encoding
doc_embedding = self.encoder.encode(doc_content, convert_to_numpy=True, show_progress_bar=False)
if doc_embedding.ndim == 1:
doc_embedding = doc_embedding.reshape(1, -1)
doc_fde = muvera.encode_fde(doc_embedding, self.buckets, "avg")
self.embeddings[doc_id] = doc_fde
indexed_count += 1
if not silent:
logger.debug(f"🔍 Indexed document {doc_id} with content length {len(doc_content)}")
except Exception as e:
if not silent:
logger.error(f"🔍 Indexing error for document {doc.get('id', 'unknown')}: {e}")
skipped_count += 1
continue
# 🔍 Final statistics
if not silent:
if is_batch:
logger.info(f"🚀 Batch indexed {indexed_count} documents, skipped {skipped_count}")
elif indexed_count > 0:
logger.debug(f"🔍 Indexed {indexed_count} documents")
# 🗃️ Автосохранение индекса после успешной индексации
if indexed_count > 0:
try:
# Используем тот же путь что и для загрузки
dump_dir = get_index_dump_dir()
await self.save_index_to_file(dump_dir)
if not silent:
logger.debug("💾 Индекс автоматически сохранен в файл")
except Exception as e:
logger.warning(f"⚠️ Не удалось сохранить индекс в файл: {e}")
async def verify_documents(self, doc_ids: List[str]) -> Dict[str, Any]:
"""Verify which documents exist in the index"""
missing = [doc_id for doc_id in doc_ids if doc_id not in self.documents]
return {"missing": missing}
async def get_index_status(self) -> Dict[str, Any]:
"""Get index status information"""
return {
"total_documents": len(self.documents),
"total_embeddings": len(self.embeddings),
"consistency": {"status": "ok", "null_embeddings_count": 0},
}
async def save_index_to_file(self, dump_dir: str = "./dump") -> bool:
"""🗃️ Сохраняет векторный индекс в файл (fallback метод)"""
try:
# Создаем директорию если не существует
dump_path = Path(dump_dir)
dump_path.mkdir(parents=True, exist_ok=True)
# Подготавливаем данные для сериализации
index_data = {
"documents": self.documents,
"embeddings": self.embeddings,
"vector_dimension": self.vector_dimension,
"buckets": self.buckets,
"timestamp": int(time.time()),
"version": "1.0",
}
# Сериализуем данные с pickle
serialized_data = pickle.dumps(index_data)
# Подготавливаем имена файлов
index_file = dump_path / f"{MUVERA_INDEX_NAME}_vector_index.pkl.gz"
# Сохраняем основной индекс с gzip сжатием
with gzip.open(index_file, "wb") as f:
f.write(serialized_data)
logger.info(f"🗃️ Векторный индекс сохранен в файл: {index_file}")
logger.info(f" 📊 Документов: {len(self.documents)}, эмбедингов: {len(self.embeddings)}")
return True
except Exception as e:
logger.error(f"❌ Ошибка сохранения индекса в файл: {e}")
return False
async def load_index_from_file(self, dump_dir: str = "./dump") -> bool:
"""🔄 Восстанавливает векторный индекс из файла"""
try:
dump_path = Path(dump_dir)
index_file = dump_path / f"{MUVERA_INDEX_NAME}_vector_index.pkl.gz"
# Проверяем существование файла
if not index_file.exists():
logger.debug(f"🔍 Сохраненный индекс не найден: {index_file}")
return False
# Загружаем и распаковываем данные
with gzip.open(index_file, "rb") as f:
serialized_data = f.read()
# Десериализуем данные
index_data = pickle.loads(serialized_data) # noqa: S301
# Проверяем версию совместимости
if index_data.get("version") != "1.0":
logger.warning(f"🔍 Несовместимая версия индекса: {index_data.get('version')}")
return False
# Восстанавливаем данные
self.documents = index_data["documents"]
self.embeddings = index_data["embeddings"]
self.vector_dimension = index_data["vector_dimension"]
self.buckets = index_data["buckets"]
file_size = int(index_file.stat().st_size)
decompression_ratio = len(serialized_data) / file_size if file_size > 0 else 1.0
logger.info("🔄 Векторный индекс восстановлен из файла:")
logger.info(f" 📁 Файл: {index_file}")
logger.info(f" 📊 Документов: {len(self.documents)}, эмбедингов: {len(self.embeddings)}")
logger.info(
f" 💾 Размер: {file_size:,}{len(serialized_data):,} байт (декомпрессия {decompression_ratio:.1f}x)"
)
return True
except Exception as e:
logger.error(f"❌ Ошибка восстановления индекса из файла: {e}")
return False
async def close(self) -> None:
"""Close the wrapper (no-op for this simple implementation)"""
class SearchService:
def __init__(self) -> None:
self.available: bool = False
self.muvera_client: Any = None
self.client: Any = None
# Initialize local Muvera
try:
self.muvera_client = MuveraWrapper(
vector_dimension=768, # Standard embedding dimension
cache_enabled=True,
batch_size=SEARCH_MAX_BATCH_SIZE,
)
self.available = True
logger.info(f"Local Muvera wrapper initialized - index: {MUVERA_INDEX_NAME}")
except Exception as e:
logger.error(f"Failed to initialize Muvera: {e}")
self.available = False
async def async_init(self) -> None:
"""🔄 Асинхронная инициализация - восстановление индекса"""
if self.muvera_client:
await self.muvera_client.async_init()
async def info(self) -> dict:
"""Return information about search service"""
if not self.available:
return {"status": "disabled"}
try:
# Get Muvera service info
if self.muvera_client:
muvera_info = await self.muvera_client.info()
return {"status": "enabled", "provider": "muvera", "mode": "local", "muvera_info": muvera_info}
return {"status": "error", "message": "Muvera client not available"}
except Exception:
logger.exception("Failed to get search info")
return {"status": "error", "message": "Failed to get search info"}
def is_ready(self) -> bool:
"""Check if service is available"""
return self.available
async def search(self, text: str, limit: int, offset: int) -> list:
"""Search documents using Muvera"""
if not self.available or not self.muvera_client:
return []
try:
logger.info(f"Muvera search for: '{text}' (limit={limit}, offset={offset})")
# Perform Muvera search
results = await self.muvera_client.search(
query=text,
limit=limit + offset, # Get enough results for pagination
)
# Format results to match your existing format
formatted_results = []
for result in results:
formatted_results.append(
{
"id": str(result.get("id", "")),
"score": result.get("score", 0.0),
"metadata": result.get("metadata", {}),
}
)
# Apply pagination
return formatted_results[offset : offset + limit]
except Exception as e:
logger.exception(f"Muvera search failed for '{text}': {e}")
return []
async def search_authors(self, text: str, limit: int = 10, offset: int = 0) -> list:
"""Search only for authors using Muvera"""
if not self.available or not self.muvera_client or not text.strip():
return []
try:
logger.info(f"Muvera author search for: '{text}' (limit={limit}, offset={offset})")
# Use Muvera to search with author-specific filtering
results = await self.muvera_client.search(
query=text,
limit=limit + offset,
)
# Format results
author_results = []
for result in results:
author_results.append(
{
"id": str(result.get("id", "")),
"score": result.get("score", 0.0),
"metadata": result.get("metadata", {}),
}
)
# Apply pagination
return author_results[offset : offset + limit]
except Exception:
logger.exception(f"Error searching authors for '{text}'")
return []
def index(self, shout: Any) -> None:
"""Index a single document using Muvera"""
if not self.available or not self.muvera_client:
return
logger.info(f"Muvera indexing post {shout.id}")
# Start in background to not block
background_tasks.append(asyncio.create_task(self.perform_muvera_index(shout)))
async def perform_muvera_index(self, shout: Any) -> None:
"""Index a single document using Muvera"""
if not self.muvera_client:
return
try:
logger.info(f"Muvera indexing document {shout.id}")
# Prepare document data for Muvera
doc_data: Dict[str, Any] = {
"id": str(shout.id),
"title": getattr(shout, "title", "") or "",
"body": "",
"metadata": {},
}
# Combine body content
body_parts = []
for field_name in ["subtitle", "lead", "body"]:
field_value = getattr(shout, field_name, None)
if field_value and isinstance(field_value, str) and field_value.strip():
body_parts.append(field_value.strip())
# Process media content
media = getattr(shout, "media", None)
if media:
if isinstance(media, str):
try:
media_json = json.loads(media)
if isinstance(media_json, dict):
if "title" in media_json:
body_parts.append(media_json["title"])
if "body" in media_json:
body_parts.append(media_json["body"])
except json.JSONDecodeError:
body_parts.append(media)
elif isinstance(media, dict) and (media.get("title") or media.get("body")):
if media.get("title"):
body_parts.append(media["title"])
if media.get("body"):
body_parts.append(media["body"])
# Set body content
if body_parts:
doc_data["body"] = " ".join(body_parts)
# Add metadata
doc_data["metadata"] = {
"layout": getattr(shout, "layout", "article"),
"lang": getattr(shout, "lang", "ru"),
"created_at": getattr(shout, "created_at", 0),
"created_by": getattr(shout, "created_by", 0),
}
# Index with Muvera (single document = verbose mode)
await self.muvera_client.index(documents=[doc_data], silent=False)
logger.info(f"🚀 Document {shout.id} indexed successfully")
except Exception:
logger.exception(f"Muvera indexing error for shout {shout.id}")
async def bulk_index(self, shouts: list) -> None:
"""Index multiple documents using Muvera"""
if not self.available or not self.muvera_client or not shouts:
logger.warning(
f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}"
)
return
# Запускаем метрики индексации
start_time = time.time()
logger.info(f"Starting Muvera bulk indexing of {len(shouts)} documents")
# Prepare documents for Muvera
documents: List[Dict[str, Any]] = []
total_skipped = 0
for shout in shouts:
try:
# Prepare document data for Muvera
doc_data: Dict[str, Any] = {
"id": str(getattr(shout, "id", "")),
"title": getattr(shout, "title", "") or "",
"body": "",
"metadata": {},
}
# Combine body content
body_parts = []
for field_name in ["subtitle", "lead", "body"]:
field_value = getattr(shout, field_name, None)
if field_value and isinstance(field_value, str) and field_value.strip():
body_parts.append(field_value.strip())
# Process media content
media = getattr(shout, "media", None)
if media:
if isinstance(media, str):
try:
media_json = json.loads(media)
if isinstance(media_json, dict):
if "title" in media_json:
body_parts.append(media_json["title"])
if "body" in media_json:
body_parts.append(media_json["body"])
except json.JSONDecodeError:
body_parts.append(media)
elif isinstance(media, dict) and (media.get("title") or media.get("body")):
if media.get("title"):
body_parts.append(media["title"])
if media.get("body"):
body_parts.append(media["body"])
# Set body content
if body_parts:
doc_data["body"] = " ".join(body_parts)
# Add metadata
doc_data["metadata"] = {
"layout": getattr(shout, "layout", "article"),
"lang": getattr(shout, "lang", "ru"),
"created_at": getattr(shout, "created_at", 0),
"created_by": getattr(shout, "created_by", 0),
}
documents.append(doc_data)
except Exception:
logger.exception(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing")
total_skipped += 1
if documents:
try:
# 🤫 Index with Muvera in silent mode for batch operations
await self.muvera_client.index(documents=documents, silent=True)
elapsed = time.time() - start_time
logger.info(
f"🚀 Bulk indexing completed in {elapsed:.2f}s: "
f"{len(documents)} documents indexed, {total_skipped} shouts skipped"
)
except Exception as e:
logger.exception(f"Muvera bulk indexing failed: {e}")
else:
logger.warning("No documents to index")
async def verify_docs(self, doc_ids: list) -> dict:
"""Verify which documents exist in the search index using Muvera"""
if not self.available or not self.muvera_client:
return {"status": "disabled"}
try:
logger.info(f"Verifying {len(doc_ids)} documents in Muvera search index")
# Use Muvera to verify documents
verification_result = await self.muvera_client.verify_documents(doc_ids)
# Format result to match expected structure
missing_ids = verification_result.get("missing", [])
logger.info(
f"Document verification complete: {len(missing_ids)} documents missing out of {len(doc_ids)} total"
)
return {"missing": missing_ids, "details": {"missing_count": len(missing_ids), "total_count": len(doc_ids)}}
except Exception:
logger.exception("Document verification error")
return {"status": "error", "message": "Document verification error"}
async def check_index_status(self) -> dict:
"""Get detailed statistics about the search index health using Muvera"""
if not self.available or not self.muvera_client:
return {"status": "disabled"}
try:
# Get Muvera index status
index_status = await self.muvera_client.get_index_status()
# Check for consistency issues
if index_status.get("consistency", {}).get("status") != "ok":
null_count = index_status.get("consistency", {}).get("null_embeddings_count", 0)
if null_count > 0:
logger.warning(f"Found {null_count} documents with NULL embeddings")
return index_status
except Exception:
logger.exception("Failed to check index status")
return {"status": "error", "message": "Failed to check index status"}
async def close(self) -> None:
"""Close connections and release resources"""
if hasattr(self, "muvera_client") and self.muvera_client:
try:
await self.muvera_client.close()
logger.info("Local Muvera client closed")
except Exception as e:
logger.warning(f"Error closing Muvera client: {e}")
logger.info("Search service closed")
# Create the search service singleton
search_service = SearchService()
# API-compatible functions for backward compatibility
async def search_text(text: str, limit: int = 200, offset: int = 0) -> list:
"""Search text using Muvera - backward compatibility function"""
if search_service.available:
return await search_service.search(text, limit, offset)
return []
async def search_author_text(text: str, limit: int = 10, offset: int = 0) -> list:
"""Search authors using Muvera - backward compatibility function"""
if search_service.available:
return await search_service.search_authors(text, limit, offset)
return []
async def get_search_count(text: str) -> int:
"""Get count of search results - backward compatibility function"""
if not search_service.available:
return 0
# Get results and count them
results = await search_text(text, SEARCH_PREFETCH_SIZE, 0)
return len(results)
async def get_author_search_count(text: str) -> int:
"""Get count of author search results - backward compatibility function"""
if not search_service.available:
return 0
# Get results and count them
results = await search_author_text(text, SEARCH_PREFETCH_SIZE, 0)
return len(results)
async def initialize_search_index(shouts_data: list) -> None:
"""Initialize search index with existing data - backward compatibility function"""
if not search_service.available:
logger.warning("Search service not available for initialization")
return
try:
# Сначала пытаемся восстановить существующий индекс
await search_service.async_init()
# Проверяем нужна ли переиндексация - только если индекс пустой
if search_service.muvera_client and len(search_service.muvera_client.documents) == 0:
if len(shouts_data) > 0:
logger.info(f"Index is empty, starting bulk indexing of {len(shouts_data)} documents")
await search_service.bulk_index(shouts_data)
logger.info(f"Initialized search index with {len(shouts_data)} documents")
else:
logger.info("No documents to index")
else:
existing_count = len(search_service.muvera_client.documents) if search_service.muvera_client else 0
logger.info(f"Search index already contains {existing_count} documents, skipping reindexing")
except Exception as e:
logger.exception(f"Failed to initialize search index: {e}")
async def check_search_service() -> None:
"""Check if search service is available - backward compatibility function"""
if search_service.available:
logger.info("Search service is available and ready")
else:
logger.warning("Search service is not available")
async def initialize_search_index_background() -> None:
"""Initialize search index in background - backward compatibility function"""
try:
logger.info("Background search index initialization started")
# This function is kept for compatibility but doesn't do much
# since Muvera handles indexing automatically
logger.info("Background search index initialization completed")
except Exception:
logger.exception("Error in background search index initialization")