Files
core/services/search.py
Untone cb84e0917b
Some checks failed
Deploy on push / deploy (push) Failing after 5m30s
search-index-restore
2025-09-01 12:05:30 +03:00

775 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import gzip
import json
import pickle
import time
from pathlib import Path
from typing import Any, Dict, List, cast
import muvera
import numpy as np
from sentence_transformers import SentenceTransformer
from settings import MUVERA_INDEX_NAME, SEARCH_MAX_BATCH_SIZE, SEARCH_PREFETCH_SIZE
from utils.logger import root_logger as logger
# Global collection for background tasks
background_tasks: List[asyncio.Task] = []
class MuveraWrapper:
"""🔍 Real vector search with SentenceTransformers + FDE encoding"""
def __init__(self, vector_dimension: int = 768, cache_enabled: bool = True, batch_size: int = 100) -> None:
self.vector_dimension = vector_dimension
self.cache_enabled = cache_enabled
self.batch_size = batch_size
self.encoder: Any = None
self.buckets = 128 # Default number of buckets for FDE encoding
self.documents: Dict[str, Dict[str, Any]] = {} # Simple in-memory storage for demo
self.embeddings: Dict[str, np.ndarray | None] = {} # Store encoded embeddings
# 🚀 Инициализируем реальную модель эмбедингов
try:
# Используем многоязычную модель, хорошо работающую с русским
self.encoder = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2")
logger.info("🔍 SentenceTransformer model loaded successfully")
except Exception as e:
logger.error(f"Failed to load SentenceTransformer: {e}")
# Fallback - простая модель
try:
self.encoder = SentenceTransformer("all-MiniLM-L6-v2")
logger.info("🔍 Fallback SentenceTransformer model loaded")
except Exception:
logger.error("Failed to load any SentenceTransformer model")
self.encoder = None
async def async_init(self) -> None:
"""🔄 Асинхронная инициализация - восстановление индекса из файла"""
try:
logger.info("🔍 Пытаемся восстановить векторный индекс из файла...")
# Проверяем метаданные сначала
metadata = await self.get_index_metadata_from_file()
if metadata:
logger.info(
f"🔍 Найден сохраненный индекс от {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(metadata.get('saved_at', 0)))}"
)
# Восстанавливаем индекс
if await self.load_index_from_file():
logger.info("✅ Векторный индекс успешно восстановлен из файла")
else:
logger.warning("⚠️ Не удалось восстановить индекс из файла")
else:
logger.info("🔍 Сохраненный индекс не найден, будет создан новый")
except Exception as e:
logger.error(f"❌ Ошибка при восстановлении индекса: {e}")
async def info(self) -> dict:
"""Return service information"""
return {
"vector_dimension": self.vector_dimension,
"buckets": self.buckets,
"documents_count": len(self.documents),
"cache_enabled": self.cache_enabled,
}
async def search(self, query: str, limit: int) -> List[Dict[str, Any]]:
"""🔍 Real vector search using SentenceTransformers + FDE encoding"""
if not query.strip() or not self.encoder:
return []
try:
# 🚀 Генерируем настоящий эмбединг запроса
query_text = query.strip()
query_embedding = self.encoder.encode(query_text, convert_to_numpy=True)
# Нормализуем размерность для FDE
if query_embedding.ndim == 1:
query_embedding = query_embedding.reshape(1, -1)
# Encode query using FDE
query_fde = muvera.encode_fde(query_embedding, self.buckets, "avg")
# 🔍 Semantic similarity search
results = []
for doc_id, doc_embedding in self.embeddings.items():
if doc_embedding is not None:
# Calculate cosine similarity
similarity = np.dot(query_fde, doc_embedding) / (
np.linalg.norm(query_fde) * np.linalg.norm(doc_embedding) + 1e-8
)
results.append(
{
"id": doc_id,
"score": float(similarity),
"metadata": self.documents.get(doc_id, {}).get("metadata", {}),
}
)
# Sort by score and limit results
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
except Exception as e:
logger.error(f"🔍 Search error: {e}")
return []
async def index(self, documents: List[Dict[str, Any]], silent: bool = False) -> None:
"""🚀 Index documents using real SentenceTransformers + FDE encoding"""
if not self.encoder:
if not silent:
logger.warning("🔍 No encoder available for indexing")
return
# 🤫 Batch mode detection
is_batch = len(documents) > 10
indexed_count = 0
skipped_count = 0
if is_batch:
# 🚀 Batch processing for better performance
valid_docs = []
doc_contents = []
for doc in documents:
doc_id = doc["id"]
self.documents[doc_id] = doc
title = doc.get("title", "").strip()
body = doc.get("body", "").strip()
doc_content = f"{title} {body}".strip()
if doc_content:
valid_docs.append(doc)
doc_contents.append(doc_content)
else:
skipped_count += 1
if doc_contents:
try:
# 🚀 Batch encode all documents at once
batch_embeddings = self.encoder.encode(
doc_contents, convert_to_numpy=True, show_progress_bar=not silent, batch_size=32
)
# Process each embedding
for doc, embedding in zip(valid_docs, batch_embeddings, strict=False):
emb = embedding
doc_id = doc["id"]
# Нормализуем размерность для FDE
if emb.ndim == 1:
emb = emb.reshape(1, -1)
# Encode using FDE
doc_fde = muvera.encode_fde(emb, self.buckets, "avg")
self.embeddings[doc_id] = doc_fde
indexed_count += 1
except Exception as e:
if not silent:
logger.error(f"🔍 Batch encoding error: {e}")
return
else:
# 🔍 Single document processing
for doc in documents:
try:
doc_id = doc["id"]
self.documents[doc_id] = doc
title = doc.get("title", "").strip()
body = doc.get("body", "").strip()
doc_content = f"{title} {body}".strip()
if not doc_content:
if not silent:
logger.warning(f"🔍 Empty content for document {doc_id}")
skipped_count += 1
continue
# 🚀 Single document encoding
doc_embedding = self.encoder.encode(doc_content, convert_to_numpy=True, show_progress_bar=False)
if doc_embedding.ndim == 1:
doc_embedding = doc_embedding.reshape(1, -1)
doc_fde = muvera.encode_fde(doc_embedding, self.buckets, "avg")
self.embeddings[doc_id] = doc_fde
indexed_count += 1
if not silent:
logger.debug(f"🔍 Indexed document {doc_id} with content length {len(doc_content)}")
except Exception as e:
if not silent:
logger.error(f"🔍 Indexing error for document {doc.get('id', 'unknown')}: {e}")
skipped_count += 1
continue
# 🔍 Final statistics
if not silent:
if is_batch:
logger.info(f"🚀 Batch indexed {indexed_count} documents, skipped {skipped_count}")
elif indexed_count > 0:
logger.debug(f"🔍 Indexed {indexed_count} documents")
# 🗃️ Автосохранение индекса после успешной индексации
if indexed_count > 0:
try:
await self.save_index_to_file()
if not silent:
logger.debug("💾 Индекс автоматически сохранен в файл")
except Exception as e:
logger.warning(f"⚠️ Не удалось автоматически сохранить индекс: {e}")
async def verify_documents(self, doc_ids: List[str]) -> Dict[str, Any]:
"""Verify which documents exist in the index"""
missing = [doc_id for doc_id in doc_ids if doc_id not in self.documents]
return {"missing": missing}
async def get_index_status(self) -> Dict[str, Any]:
"""Get index status information"""
return {
"total_documents": len(self.documents),
"total_embeddings": len(self.embeddings),
"consistency": {"status": "ok", "null_embeddings_count": 0},
}
async def save_index_to_file(self, dump_dir: str = "/dump") -> bool:
"""🗃️ Сохраняет векторный индекс в файл с использованием gzip сжатия"""
try:
# Создаем директорию если не существует
dump_path = Path(dump_dir)
dump_path.mkdir(parents=True, exist_ok=True)
# Подготавливаем данные для сериализации
index_data = {
"documents": self.documents,
"embeddings": self.embeddings,
"vector_dimension": self.vector_dimension,
"buckets": self.buckets,
"timestamp": int(time.time()),
"version": "1.0",
"index_name": MUVERA_INDEX_NAME,
}
# Сериализуем данные с pickle
serialized_data = pickle.dumps(index_data)
# Подготавливаем имена файлов
index_file = dump_path / f"{MUVERA_INDEX_NAME}_vector_index.pkl.gz"
metadata_file = dump_path / f"{MUVERA_INDEX_NAME}_metadata.json"
# Сохраняем основной индекс с gzip сжатием
with gzip.open(index_file, "wb") as f:
f.write(serialized_data)
# Сохраняем метаданные отдельно для быстрого доступа
metadata = {
"documents_count": len(self.documents),
"embeddings_count": len(self.embeddings),
"vector_dimension": self.vector_dimension,
"buckets": self.buckets,
"saved_at": int(time.time()),
"version": "1.0",
"index_name": MUVERA_INDEX_NAME,
"original_size_bytes": len(serialized_data),
"compressed_size_bytes": int(index_file.stat().st_size) if index_file.exists() else 0,
"index_file": str(index_file),
"metadata_file": str(metadata_file),
}
with Path(metadata_file).open(mode="w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
original_size = cast(int, metadata["original_size_bytes"])
compressed_size = cast(int, metadata["compressed_size_bytes"])
compression_ratio = original_size / compressed_size if compressed_size > 0 else 1.0
logger.info("🗃️ Векторный индекс сохранен в файл:")
logger.info(f" 📁 Файл: {index_file}")
logger.info(f" 📊 Документов: {len(self.documents)}, эмбедингов: {len(self.embeddings)}")
logger.info(
f" 💾 Размер: {metadata['original_size_bytes']:,}{metadata['compressed_size_bytes']:,} байт (сжатие {compression_ratio:.1f}x)"
)
return True
except Exception as e:
logger.error(f"❌ Ошибка сохранения индекса в файл: {e}")
return False
async def load_index_from_file(self, dump_dir: str = "/dump") -> bool:
"""🔄 Восстанавливает векторный индекс из файла"""
try:
dump_path = Path(dump_dir)
index_file = dump_path / f"{MUVERA_INDEX_NAME}_vector_index.pkl.gz"
metadata_file = dump_path / f"{MUVERA_INDEX_NAME}_metadata.json"
# Проверяем существование файлов
if not index_file.exists():
logger.info(f"🔍 Сохраненный индекс не найден: {index_file}")
return False
# Загружаем метаданные если есть
metadata = None
if metadata_file.exists():
try:
with Path(metadata_file).open(mode="r", encoding="utf-8") as f:
metadata = json.load(f)
logger.info(
f"🔍 Найден сохраненный индекс: {metadata.get('documents_count', 0)} документов, {metadata.get('embeddings_count', 0)} эмбедингов"
)
logger.info(
f"🔍 Размер файла: {metadata.get('compressed_size_bytes', 0):,} байт (сжато), сохранен: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(metadata.get('saved_at', 0)))}"
)
except Exception as e:
logger.warning(f"⚠️ Не удалось загрузить метаданные: {e}")
# Загружаем и распаковываем основные данные
with gzip.open(index_file, "rb") as f:
serialized_data = f.read()
# Десериализуем данные
import pickle
index_data = pickle.loads(serialized_data) # noqa: S301
# Проверяем версию совместимости
if index_data.get("version") != "1.0":
logger.warning(f"🔍 Несовместимая версия индекса: {index_data.get('version')}")
return False
# Восстанавливаем данные
self.documents = index_data["documents"]
self.embeddings = index_data["embeddings"]
self.vector_dimension = index_data["vector_dimension"]
self.buckets = index_data["buckets"]
file_size = int(index_file.stat().st_size)
decompression_ratio = len(serialized_data) / file_size if file_size > 0 else 1.0
logger.info("🔄 Векторный индекс восстановлен из файла:")
logger.info(f" 📁 Файл: {index_file}")
logger.info(f" 📊 Документов: {len(self.documents)}, эмбедингов: {len(self.embeddings)}")
logger.info(
f" 💾 Размер: {file_size:,}{len(serialized_data):,} байт (декомпрессия {decompression_ratio:.1f}x)"
)
return True
except Exception as e:
logger.error(f"❌ Ошибка восстановления индекса из файла: {e}")
return False
async def get_index_metadata_from_file(self, dump_dir: str = "/dump") -> dict[str, Any] | None:
"""📊 Получает метаданные сохраненного индекса из файла"""
try:
dump_path = Path(dump_dir)
metadata_file = dump_path / f"{MUVERA_INDEX_NAME}_metadata.json"
if metadata_file.exists():
with Path(metadata_file).open(mode="r", encoding="utf-8") as f:
return json.load(f)
return None
except Exception as e:
logger.error(f"❌ Ошибка получения метаданных индекса: {e}")
return None
async def close(self) -> None:
"""Close the wrapper (no-op for this simple implementation)"""
class SearchService:
def __init__(self) -> None:
self.available: bool = False
self.muvera_client: Any = None
self.client: Any = None
# Initialize local Muvera
try:
self.muvera_client = MuveraWrapper(
vector_dimension=768, # Standard embedding dimension
cache_enabled=True,
batch_size=SEARCH_MAX_BATCH_SIZE,
)
self.available = True
logger.info(f"Local Muvera wrapper initialized - index: {MUVERA_INDEX_NAME}")
except Exception as e:
logger.error(f"Failed to initialize Muvera: {e}")
self.available = False
async def async_init(self) -> None:
"""🔄 Асинхронная инициализация - восстановление индекса"""
if self.muvera_client:
await self.muvera_client.async_init()
async def info(self) -> dict:
"""Return information about search service"""
if not self.available:
return {"status": "disabled"}
try:
# Get Muvera service info
if self.muvera_client:
muvera_info = await self.muvera_client.info()
return {"status": "enabled", "provider": "muvera", "mode": "local", "muvera_info": muvera_info}
return {"status": "error", "message": "Muvera client not available"}
except Exception:
logger.exception("Failed to get search info")
return {"status": "error", "message": "Failed to get search info"}
def is_ready(self) -> bool:
"""Check if service is available"""
return self.available
async def search(self, text: str, limit: int, offset: int) -> list:
"""Search documents using Muvera"""
if not self.available or not self.muvera_client:
return []
try:
logger.info(f"Muvera search for: '{text}' (limit={limit}, offset={offset})")
# Perform Muvera search
results = await self.muvera_client.search(
query=text,
limit=limit + offset, # Get enough results for pagination
)
# Format results to match your existing format
formatted_results = []
for result in results:
formatted_results.append(
{
"id": str(result.get("id", "")),
"score": result.get("score", 0.0),
"metadata": result.get("metadata", {}),
}
)
# Apply pagination
return formatted_results[offset : offset + limit]
except Exception as e:
logger.exception(f"Muvera search failed for '{text}': {e}")
return []
async def search_authors(self, text: str, limit: int = 10, offset: int = 0) -> list:
"""Search only for authors using Muvera"""
if not self.available or not self.muvera_client or not text.strip():
return []
try:
logger.info(f"Muvera author search for: '{text}' (limit={limit}, offset={offset})")
# Use Muvera to search with author-specific filtering
results = await self.muvera_client.search(
query=text,
limit=limit + offset,
)
# Format results
author_results = []
for result in results:
author_results.append(
{
"id": str(result.get("id", "")),
"score": result.get("score", 0.0),
"metadata": result.get("metadata", {}),
}
)
# Apply pagination
return author_results[offset : offset + limit]
except Exception:
logger.exception(f"Error searching authors for '{text}'")
return []
def index(self, shout: Any) -> None:
"""Index a single document using Muvera"""
if not self.available or not self.muvera_client:
return
logger.info(f"Muvera indexing post {shout.id}")
# Start in background to not block
background_tasks.append(asyncio.create_task(self.perform_muvera_index(shout)))
async def perform_muvera_index(self, shout: Any) -> None:
"""Index a single document using Muvera"""
if not self.muvera_client:
return
try:
logger.info(f"Muvera indexing document {shout.id}")
# Prepare document data for Muvera
doc_data: Dict[str, Any] = {
"id": str(shout.id),
"title": getattr(shout, "title", "") or "",
"body": "",
"metadata": {},
}
# Combine body content
body_parts = []
for field_name in ["subtitle", "lead", "body"]:
field_value = getattr(shout, field_name, None)
if field_value and isinstance(field_value, str) and field_value.strip():
body_parts.append(field_value.strip())
# Process media content
media = getattr(shout, "media", None)
if media:
if isinstance(media, str):
try:
media_json = json.loads(media)
if isinstance(media_json, dict):
if "title" in media_json:
body_parts.append(media_json["title"])
if "body" in media_json:
body_parts.append(media_json["body"])
except json.JSONDecodeError:
body_parts.append(media)
elif isinstance(media, dict) and (media.get("title") or media.get("body")):
if media.get("title"):
body_parts.append(media["title"])
if media.get("body"):
body_parts.append(media["body"])
# Set body content
if body_parts:
doc_data["body"] = " ".join(body_parts)
# Add metadata
doc_data["metadata"] = {
"layout": getattr(shout, "layout", "article"),
"lang": getattr(shout, "lang", "ru"),
"created_at": getattr(shout, "created_at", 0),
"created_by": getattr(shout, "created_by", 0),
}
# Index with Muvera (single document = verbose mode)
await self.muvera_client.index(documents=[doc_data], silent=False)
logger.info(f"🚀 Document {shout.id} indexed successfully")
except Exception:
logger.exception(f"Muvera indexing error for shout {shout.id}")
async def bulk_index(self, shouts: list) -> None:
"""Index multiple documents using Muvera"""
if not self.available or not self.muvera_client or not shouts:
logger.warning(
f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}"
)
return
# Запускаем метрики индексации
start_time = time.time()
logger.info(f"Starting Muvera bulk indexing of {len(shouts)} documents")
# Prepare documents for Muvera
documents: List[Dict[str, Any]] = []
total_skipped = 0
for shout in shouts:
try:
# Prepare document data for Muvera
doc_data: Dict[str, Any] = {
"id": str(getattr(shout, "id", "")),
"title": getattr(shout, "title", "") or "",
"body": "",
"metadata": {},
}
# Combine body content
body_parts = []
for field_name in ["subtitle", "lead", "body"]:
field_value = getattr(shout, field_name, None)
if field_value and isinstance(field_value, str) and field_value.strip():
body_parts.append(field_value.strip())
# Process media content
media = getattr(shout, "media", None)
if media:
if isinstance(media, str):
try:
media_json = json.loads(media)
if isinstance(media_json, dict):
if "title" in media_json:
body_parts.append(media_json["title"])
if "body" in media_json:
body_parts.append(media_json["body"])
except json.JSONDecodeError:
body_parts.append(media)
elif isinstance(media, dict) and (media.get("title") or media.get("body")):
if media.get("title"):
body_parts.append(media["title"])
if media.get("body"):
body_parts.append(media["body"])
# Set body content
if body_parts:
doc_data["body"] = " ".join(body_parts)
# Add metadata
doc_data["metadata"] = {
"layout": getattr(shout, "layout", "article"),
"lang": getattr(shout, "lang", "ru"),
"created_at": getattr(shout, "created_at", 0),
"created_by": getattr(shout, "created_by", 0),
}
documents.append(doc_data)
except Exception:
logger.exception(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing")
total_skipped += 1
if documents:
try:
# 🤫 Index with Muvera in silent mode for batch operations
await self.muvera_client.index(documents=documents, silent=True)
elapsed = time.time() - start_time
logger.info(
f"🚀 Bulk indexing completed in {elapsed:.2f}s: "
f"{len(documents)} documents indexed, {total_skipped} shouts skipped"
)
except Exception as e:
logger.exception(f"Muvera bulk indexing failed: {e}")
else:
logger.warning("No documents to index")
async def verify_docs(self, doc_ids: list) -> dict:
"""Verify which documents exist in the search index using Muvera"""
if not self.available or not self.muvera_client:
return {"status": "disabled"}
try:
logger.info(f"Verifying {len(doc_ids)} documents in Muvera search index")
# Use Muvera to verify documents
verification_result = await self.muvera_client.verify_documents(doc_ids)
# Format result to match expected structure
missing_ids = verification_result.get("missing", [])
logger.info(
f"Document verification complete: {len(missing_ids)} documents missing out of {len(doc_ids)} total"
)
return {"missing": missing_ids, "details": {"missing_count": len(missing_ids), "total_count": len(doc_ids)}}
except Exception:
logger.exception("Document verification error")
return {"status": "error", "message": "Document verification error"}
async def check_index_status(self) -> dict:
"""Get detailed statistics about the search index health using Muvera"""
if not self.available or not self.muvera_client:
return {"status": "disabled"}
try:
# Get Muvera index status
index_status = await self.muvera_client.get_index_status()
# Check for consistency issues
if index_status.get("consistency", {}).get("status") != "ok":
null_count = index_status.get("consistency", {}).get("null_embeddings_count", 0)
if null_count > 0:
logger.warning(f"Found {null_count} documents with NULL embeddings")
return index_status
except Exception:
logger.exception("Failed to check index status")
return {"status": "error", "message": "Failed to check index status"}
async def close(self) -> None:
"""Close connections and release resources"""
if hasattr(self, "muvera_client") and self.muvera_client:
try:
await self.muvera_client.close()
logger.info("Local Muvera client closed")
except Exception as e:
logger.warning(f"Error closing Muvera client: {e}")
logger.info("Search service closed")
# Create the search service singleton
search_service = SearchService()
# API-compatible functions for backward compatibility
async def search_text(text: str, limit: int = 200, offset: int = 0) -> list:
"""Search text using Muvera - backward compatibility function"""
if search_service.available:
return await search_service.search(text, limit, offset)
return []
async def search_author_text(text: str, limit: int = 10, offset: int = 0) -> list:
"""Search authors using Muvera - backward compatibility function"""
if search_service.available:
return await search_service.search_authors(text, limit, offset)
return []
async def get_search_count(text: str) -> int:
"""Get count of search results - backward compatibility function"""
if not search_service.available:
return 0
# Get results and count them
results = await search_text(text, SEARCH_PREFETCH_SIZE, 0)
return len(results)
async def get_author_search_count(text: str) -> int:
"""Get count of author search results - backward compatibility function"""
if not search_service.available:
return 0
# Get results and count them
results = await search_author_text(text, SEARCH_PREFETCH_SIZE, 0)
return len(results)
async def initialize_search_index(shouts_data: list) -> None:
"""Initialize search index with existing data - backward compatibility function"""
if not search_service.available:
logger.warning("Search service not available for initialization")
return
try:
# Сначала пытаемся восстановить существующий индекс
await search_service.async_init()
# Проверяем нужна ли переиндексация
if len(shouts_data) > 0:
await search_service.bulk_index(shouts_data)
logger.info(f"Initialized search index with {len(shouts_data)} documents")
except Exception as e:
logger.exception(f"Failed to initialize search index: {e}")
async def check_search_service() -> None:
"""Check if search service is available - backward compatibility function"""
if search_service.available:
logger.info("Search service is available and ready")
else:
logger.warning("Search service is not available")
async def initialize_search_index_background() -> None:
"""Initialize search index in background - backward compatibility function"""
try:
logger.info("Background search index initialization started")
# This function is kept for compatibility but doesn't do much
# since Muvera handles indexing automatically
logger.info("Background search index initialization completed")
except Exception:
logger.exception("Error in background search index initialization")