import asyncio import gzip import json import os import pickle import time from pathlib import Path from typing import Any, Dict, List import muvera import numpy as np from sentence_transformers import SentenceTransformer from settings import MUVERA_INDEX_NAME, SEARCH_MAX_BATCH_SIZE, SEARCH_PREFETCH_SIZE from utils.logger import root_logger as logger primary_model = "paraphrase-multilingual-MiniLM-L12-v2" # 💾 Настройка локального кеша для HuggingFace моделей def get_models_cache_dir() -> str: """Определяет лучшую папку для кеша моделей""" # Пробуем /dump если доступен для записи dump_path = Path("/dump") logger.info(f"🔍 Checking /dump - exists: {dump_path.exists()}, writable: {os.access('/dump', os.W_OK) if dump_path.exists() else 'N/A'}") if dump_path.exists() and os.access("/dump", os.W_OK): cache_dir = "/dump/huggingface" try: Path(cache_dir).mkdir(parents=True, exist_ok=True) logger.info(f"✅ Using mounted storage: {cache_dir}") return cache_dir except Exception as e: logger.warning(f"Failed to create {cache_dir}: {e}") # Fallback - локальная папка ./dump cache_dir = "./dump/huggingface" Path(cache_dir).mkdir(parents=True, exist_ok=True) logger.info(f"📁 Using local fallback: {cache_dir}") return cache_dir MODELS_CACHE_DIR = get_models_cache_dir() os.environ.setdefault("TRANSFORMERS_CACHE", MODELS_CACHE_DIR) os.environ.setdefault("HF_HOME", MODELS_CACHE_DIR) # Global collection for background tasks background_tasks: List[asyncio.Task] = [] class MuveraWrapper: """🔍 Real vector search with SentenceTransformers + FDE encoding""" def __init__(self, vector_dimension: int = 768, cache_enabled: bool = True, batch_size: int = 100) -> None: self.vector_dimension = vector_dimension self.cache_enabled = cache_enabled self.batch_size = batch_size self.encoder: Any = None self.buckets = 128 # Default number of buckets for FDE encoding self.documents: Dict[str, Dict[str, Any]] = {} # Simple in-memory storage for demo self.embeddings: Dict[str, np.ndarray | None] = {} # Store encoded embeddings # 🚀 Инициализируем реальную модель эмбедингов с локальным кешом try: logger.info(f"💾 Using models cache directory: {MODELS_CACHE_DIR}") # Проверяем наличие основной модели is_cached = self._is_model_cached(primary_model) if is_cached: logger.info(f"🔍 Found cached model: {primary_model}") else: logger.info(f"🔽 Downloading model: {primary_model}") # Используем многоязычную модель, хорошо работающую с русским self.encoder = SentenceTransformer( primary_model, cache_folder=MODELS_CACHE_DIR, local_files_only=is_cached, # Не скачиваем если уже есть в кеше ) logger.info("🔍 SentenceTransformer model loaded successfully") except Exception as e: logger.error(f"Failed to load primary SentenceTransformer: {e}") # Fallback - простая модель try: fallback_model = "all-MiniLM-L6-v2" is_fallback_cached = self._is_model_cached(fallback_model) if is_fallback_cached: logger.info(f"🔍 Found cached fallback model: {fallback_model}") else: logger.info(f"🔽 Downloading fallback model: {fallback_model}") self.encoder = SentenceTransformer( fallback_model, cache_folder=MODELS_CACHE_DIR, local_files_only=is_fallback_cached ) logger.info("🔍 Fallback SentenceTransformer model loaded") except Exception: logger.error("Failed to load any SentenceTransformer model") self.encoder = None def _is_model_cached(self, model_name: str) -> bool: """🔍 Проверяет наличие модели в кеше""" try: # Проверяем наличие папки модели в кеше cache_path = Path(MODELS_CACHE_DIR) # SentenceTransformer сохраняет модели в формате models--org--model-name model_cache_name = f"models--sentence-transformers--{model_name}" model_path = cache_path / model_cache_name # Проверяем существование папки модели if not model_path.exists(): return False # Проверяем наличие snapshots папки (новый формат HuggingFace) snapshots_path = model_path / "snapshots" if snapshots_path.exists(): # Ищем любой snapshot с config.json for snapshot_dir in snapshots_path.iterdir(): if snapshot_dir.is_dir(): config_file = snapshot_dir / "config.json" if config_file.exists(): return True # Fallback: проверяем старый формат config_file = model_path / "config.json" return config_file.exists() except Exception as e: logger.debug(f"Error checking model cache for {model_name}: {e}") return False async def async_init(self) -> None: """🔄 Асинхронная инициализация - восстановление индекса из файла""" try: logger.info("🔍 Пытаемся восстановить векторный индекс из файла...") # Пытаемся загрузить из файла if await self.load_index_from_file(): logger.info("✅ Векторный индекс восстановлен из файла") else: logger.info("🔍 Сохраненный индекс не найден, будет создан новый") except Exception as e: logger.error(f"❌ Ошибка при восстановлении индекса: {e}") async def info(self) -> dict: """Return service information""" return { "vector_dimension": self.vector_dimension, "buckets": self.buckets, "documents_count": len(self.documents), "cache_enabled": self.cache_enabled, } async def search(self, query: str, limit: int) -> List[Dict[str, Any]]: """🔍 Real vector search using SentenceTransformers + FDE encoding""" if not query.strip() or not self.encoder: return [] try: # 🚀 Генерируем настоящий эмбединг запроса query_text = query.strip() query_embedding = self.encoder.encode(query_text, convert_to_numpy=True) # Нормализуем размерность для FDE if query_embedding.ndim == 1: query_embedding = query_embedding.reshape(1, -1) # Encode query using FDE query_fde = muvera.encode_fde(query_embedding, self.buckets, "avg") # 🔍 Semantic similarity search results = [] for doc_id, doc_embedding in self.embeddings.items(): if doc_embedding is not None: # Calculate cosine similarity similarity = np.dot(query_fde, doc_embedding) / ( np.linalg.norm(query_fde) * np.linalg.norm(doc_embedding) + 1e-8 ) results.append( { "id": doc_id, "score": float(similarity), "metadata": self.documents.get(doc_id, {}).get("metadata", {}), } ) # Sort by score and limit results results.sort(key=lambda x: x["score"], reverse=True) return results[:limit] except Exception as e: logger.error(f"🔍 Search error: {e}") return [] async def index(self, documents: List[Dict[str, Any]], silent: bool = False) -> None: """🚀 Index documents using real SentenceTransformers + FDE encoding""" if not self.encoder: if not silent: logger.warning("🔍 No encoder available for indexing") return # 🤫 Batch mode detection is_batch = len(documents) > 10 indexed_count = 0 skipped_count = 0 if is_batch: # 🚀 Batch processing for better performance valid_docs = [] doc_contents = [] for doc in documents: doc_id = doc["id"] self.documents[doc_id] = doc title = doc.get("title", "").strip() body = doc.get("body", "").strip() doc_content = f"{title} {body}".strip() if doc_content: valid_docs.append(doc) doc_contents.append(doc_content) else: skipped_count += 1 if doc_contents: try: # 🚀 Batch encode all documents at once batch_embeddings = self.encoder.encode( doc_contents, convert_to_numpy=True, show_progress_bar=not silent, batch_size=32 ) # Process each embedding for doc, embedding in zip(valid_docs, batch_embeddings, strict=False): emb = embedding doc_id = doc["id"] # Нормализуем размерность для FDE if emb.ndim == 1: emb = emb.reshape(1, -1) # Encode using FDE doc_fde = muvera.encode_fde(emb, self.buckets, "avg") self.embeddings[doc_id] = doc_fde indexed_count += 1 except Exception as e: if not silent: logger.error(f"🔍 Batch encoding error: {e}") return else: # 🔍 Single document processing for doc in documents: try: doc_id = doc["id"] self.documents[doc_id] = doc title = doc.get("title", "").strip() body = doc.get("body", "").strip() doc_content = f"{title} {body}".strip() if not doc_content: if not silent: logger.warning(f"🔍 Empty content for document {doc_id}") skipped_count += 1 continue # 🚀 Single document encoding doc_embedding = self.encoder.encode(doc_content, convert_to_numpy=True, show_progress_bar=False) if doc_embedding.ndim == 1: doc_embedding = doc_embedding.reshape(1, -1) doc_fde = muvera.encode_fde(doc_embedding, self.buckets, "avg") self.embeddings[doc_id] = doc_fde indexed_count += 1 if not silent: logger.debug(f"🔍 Indexed document {doc_id} with content length {len(doc_content)}") except Exception as e: if not silent: logger.error(f"🔍 Indexing error for document {doc.get('id', 'unknown')}: {e}") skipped_count += 1 continue # 🔍 Final statistics if not silent: if is_batch: logger.info(f"🚀 Batch indexed {indexed_count} documents, skipped {skipped_count}") elif indexed_count > 0: logger.debug(f"🔍 Indexed {indexed_count} documents") # 🗃️ Автосохранение индекса после успешной индексации if indexed_count > 0: try: await self.save_index_to_file() if not silent: logger.debug("💾 Индекс автоматически сохранен в файл") except Exception as e: logger.warning(f"⚠️ Не удалось сохранить индекс в файл: {e}") async def verify_documents(self, doc_ids: List[str]) -> Dict[str, Any]: """Verify which documents exist in the index""" missing = [doc_id for doc_id in doc_ids if doc_id not in self.documents] return {"missing": missing} async def get_index_status(self) -> Dict[str, Any]: """Get index status information""" return { "total_documents": len(self.documents), "total_embeddings": len(self.embeddings), "consistency": {"status": "ok", "null_embeddings_count": 0}, } async def save_index_to_file(self, dump_dir: str = "./dump") -> bool: """🗃️ Сохраняет векторный индекс в файл (fallback метод)""" try: # Создаем директорию если не существует dump_path = Path(dump_dir) dump_path.mkdir(parents=True, exist_ok=True) # Подготавливаем данные для сериализации index_data = { "documents": self.documents, "embeddings": self.embeddings, "vector_dimension": self.vector_dimension, "buckets": self.buckets, "timestamp": int(time.time()), "version": "1.0", } # Сериализуем данные с pickle serialized_data = pickle.dumps(index_data) # Подготавливаем имена файлов index_file = dump_path / f"{MUVERA_INDEX_NAME}_vector_index.pkl.gz" # Сохраняем основной индекс с gzip сжатием with gzip.open(index_file, "wb") as f: f.write(serialized_data) logger.info(f"🗃️ Векторный индекс сохранен в файл: {index_file}") logger.info(f" 📊 Документов: {len(self.documents)}, эмбедингов: {len(self.embeddings)}") return True except Exception as e: logger.error(f"❌ Ошибка сохранения индекса в файл: {e}") return False async def load_index_from_file(self, dump_dir: str = "./dump") -> bool: """🔄 Восстанавливает векторный индекс из файла""" try: dump_path = Path(dump_dir) index_file = dump_path / f"{MUVERA_INDEX_NAME}_vector_index.pkl.gz" # Проверяем существование файла if not index_file.exists(): logger.debug(f"🔍 Сохраненный индекс не найден: {index_file}") return False # Загружаем и распаковываем данные with gzip.open(index_file, "rb") as f: serialized_data = f.read() # Десериализуем данные index_data = pickle.loads(serialized_data) # noqa: S301 # Проверяем версию совместимости if index_data.get("version") != "1.0": logger.warning(f"🔍 Несовместимая версия индекса: {index_data.get('version')}") return False # Восстанавливаем данные self.documents = index_data["documents"] self.embeddings = index_data["embeddings"] self.vector_dimension = index_data["vector_dimension"] self.buckets = index_data["buckets"] file_size = int(index_file.stat().st_size) decompression_ratio = len(serialized_data) / file_size if file_size > 0 else 1.0 logger.info("🔄 Векторный индекс восстановлен из файла:") logger.info(f" 📁 Файл: {index_file}") logger.info(f" 📊 Документов: {len(self.documents)}, эмбедингов: {len(self.embeddings)}") logger.info( f" 💾 Размер: {file_size:,} → {len(serialized_data):,} байт (декомпрессия {decompression_ratio:.1f}x)" ) return True except Exception as e: logger.error(f"❌ Ошибка восстановления индекса из файла: {e}") return False async def close(self) -> None: """Close the wrapper (no-op for this simple implementation)""" class SearchService: def __init__(self) -> None: self.available: bool = False self.muvera_client: Any = None self.client: Any = None # Initialize local Muvera try: self.muvera_client = MuveraWrapper( vector_dimension=768, # Standard embedding dimension cache_enabled=True, batch_size=SEARCH_MAX_BATCH_SIZE, ) self.available = True logger.info(f"Local Muvera wrapper initialized - index: {MUVERA_INDEX_NAME}") except Exception as e: logger.error(f"Failed to initialize Muvera: {e}") self.available = False async def async_init(self) -> None: """🔄 Асинхронная инициализация - восстановление индекса""" if self.muvera_client: await self.muvera_client.async_init() async def info(self) -> dict: """Return information about search service""" if not self.available: return {"status": "disabled"} try: # Get Muvera service info if self.muvera_client: muvera_info = await self.muvera_client.info() return {"status": "enabled", "provider": "muvera", "mode": "local", "muvera_info": muvera_info} return {"status": "error", "message": "Muvera client not available"} except Exception: logger.exception("Failed to get search info") return {"status": "error", "message": "Failed to get search info"} def is_ready(self) -> bool: """Check if service is available""" return self.available async def search(self, text: str, limit: int, offset: int) -> list: """Search documents using Muvera""" if not self.available or not self.muvera_client: return [] try: logger.info(f"Muvera search for: '{text}' (limit={limit}, offset={offset})") # Perform Muvera search results = await self.muvera_client.search( query=text, limit=limit + offset, # Get enough results for pagination ) # Format results to match your existing format formatted_results = [] for result in results: formatted_results.append( { "id": str(result.get("id", "")), "score": result.get("score", 0.0), "metadata": result.get("metadata", {}), } ) # Apply pagination return formatted_results[offset : offset + limit] except Exception as e: logger.exception(f"Muvera search failed for '{text}': {e}") return [] async def search_authors(self, text: str, limit: int = 10, offset: int = 0) -> list: """Search only for authors using Muvera""" if not self.available or not self.muvera_client or not text.strip(): return [] try: logger.info(f"Muvera author search for: '{text}' (limit={limit}, offset={offset})") # Use Muvera to search with author-specific filtering results = await self.muvera_client.search( query=text, limit=limit + offset, ) # Format results author_results = [] for result in results: author_results.append( { "id": str(result.get("id", "")), "score": result.get("score", 0.0), "metadata": result.get("metadata", {}), } ) # Apply pagination return author_results[offset : offset + limit] except Exception: logger.exception(f"Error searching authors for '{text}'") return [] def index(self, shout: Any) -> None: """Index a single document using Muvera""" if not self.available or not self.muvera_client: return logger.info(f"Muvera indexing post {shout.id}") # Start in background to not block background_tasks.append(asyncio.create_task(self.perform_muvera_index(shout))) async def perform_muvera_index(self, shout: Any) -> None: """Index a single document using Muvera""" if not self.muvera_client: return try: logger.info(f"Muvera indexing document {shout.id}") # Prepare document data for Muvera doc_data: Dict[str, Any] = { "id": str(shout.id), "title": getattr(shout, "title", "") or "", "body": "", "metadata": {}, } # Combine body content body_parts = [] for field_name in ["subtitle", "lead", "body"]: field_value = getattr(shout, field_name, None) if field_value and isinstance(field_value, str) and field_value.strip(): body_parts.append(field_value.strip()) # Process media content media = getattr(shout, "media", None) if media: if isinstance(media, str): try: media_json = json.loads(media) if isinstance(media_json, dict): if "title" in media_json: body_parts.append(media_json["title"]) if "body" in media_json: body_parts.append(media_json["body"]) except json.JSONDecodeError: body_parts.append(media) elif isinstance(media, dict) and (media.get("title") or media.get("body")): if media.get("title"): body_parts.append(media["title"]) if media.get("body"): body_parts.append(media["body"]) # Set body content if body_parts: doc_data["body"] = " ".join(body_parts) # Add metadata doc_data["metadata"] = { "layout": getattr(shout, "layout", "article"), "lang": getattr(shout, "lang", "ru"), "created_at": getattr(shout, "created_at", 0), "created_by": getattr(shout, "created_by", 0), } # Index with Muvera (single document = verbose mode) await self.muvera_client.index(documents=[doc_data], silent=False) logger.info(f"🚀 Document {shout.id} indexed successfully") except Exception: logger.exception(f"Muvera indexing error for shout {shout.id}") async def bulk_index(self, shouts: list) -> None: """Index multiple documents using Muvera""" if not self.available or not self.muvera_client or not shouts: logger.warning( f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}" ) return # Запускаем метрики индексации start_time = time.time() logger.info(f"Starting Muvera bulk indexing of {len(shouts)} documents") # Prepare documents for Muvera documents: List[Dict[str, Any]] = [] total_skipped = 0 for shout in shouts: try: # Prepare document data for Muvera doc_data: Dict[str, Any] = { "id": str(getattr(shout, "id", "")), "title": getattr(shout, "title", "") or "", "body": "", "metadata": {}, } # Combine body content body_parts = [] for field_name in ["subtitle", "lead", "body"]: field_value = getattr(shout, field_name, None) if field_value and isinstance(field_value, str) and field_value.strip(): body_parts.append(field_value.strip()) # Process media content media = getattr(shout, "media", None) if media: if isinstance(media, str): try: media_json = json.loads(media) if isinstance(media_json, dict): if "title" in media_json: body_parts.append(media_json["title"]) if "body" in media_json: body_parts.append(media_json["body"]) except json.JSONDecodeError: body_parts.append(media) elif isinstance(media, dict) and (media.get("title") or media.get("body")): if media.get("title"): body_parts.append(media["title"]) if media.get("body"): body_parts.append(media["body"]) # Set body content if body_parts: doc_data["body"] = " ".join(body_parts) # Add metadata doc_data["metadata"] = { "layout": getattr(shout, "layout", "article"), "lang": getattr(shout, "lang", "ru"), "created_at": getattr(shout, "created_at", 0), "created_by": getattr(shout, "created_by", 0), } documents.append(doc_data) except Exception: logger.exception(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing") total_skipped += 1 if documents: try: # 🤫 Index with Muvera in silent mode for batch operations await self.muvera_client.index(documents=documents, silent=True) elapsed = time.time() - start_time logger.info( f"🚀 Bulk indexing completed in {elapsed:.2f}s: " f"{len(documents)} documents indexed, {total_skipped} shouts skipped" ) except Exception as e: logger.exception(f"Muvera bulk indexing failed: {e}") else: logger.warning("No documents to index") async def verify_docs(self, doc_ids: list) -> dict: """Verify which documents exist in the search index using Muvera""" if not self.available or not self.muvera_client: return {"status": "disabled"} try: logger.info(f"Verifying {len(doc_ids)} documents in Muvera search index") # Use Muvera to verify documents verification_result = await self.muvera_client.verify_documents(doc_ids) # Format result to match expected structure missing_ids = verification_result.get("missing", []) logger.info( f"Document verification complete: {len(missing_ids)} documents missing out of {len(doc_ids)} total" ) return {"missing": missing_ids, "details": {"missing_count": len(missing_ids), "total_count": len(doc_ids)}} except Exception: logger.exception("Document verification error") return {"status": "error", "message": "Document verification error"} async def check_index_status(self) -> dict: """Get detailed statistics about the search index health using Muvera""" if not self.available or not self.muvera_client: return {"status": "disabled"} try: # Get Muvera index status index_status = await self.muvera_client.get_index_status() # Check for consistency issues if index_status.get("consistency", {}).get("status") != "ok": null_count = index_status.get("consistency", {}).get("null_embeddings_count", 0) if null_count > 0: logger.warning(f"Found {null_count} documents with NULL embeddings") return index_status except Exception: logger.exception("Failed to check index status") return {"status": "error", "message": "Failed to check index status"} async def close(self) -> None: """Close connections and release resources""" if hasattr(self, "muvera_client") and self.muvera_client: try: await self.muvera_client.close() logger.info("Local Muvera client closed") except Exception as e: logger.warning(f"Error closing Muvera client: {e}") logger.info("Search service closed") # Create the search service singleton search_service = SearchService() # API-compatible functions for backward compatibility async def search_text(text: str, limit: int = 200, offset: int = 0) -> list: """Search text using Muvera - backward compatibility function""" if search_service.available: return await search_service.search(text, limit, offset) return [] async def search_author_text(text: str, limit: int = 10, offset: int = 0) -> list: """Search authors using Muvera - backward compatibility function""" if search_service.available: return await search_service.search_authors(text, limit, offset) return [] async def get_search_count(text: str) -> int: """Get count of search results - backward compatibility function""" if not search_service.available: return 0 # Get results and count them results = await search_text(text, SEARCH_PREFETCH_SIZE, 0) return len(results) async def get_author_search_count(text: str) -> int: """Get count of author search results - backward compatibility function""" if not search_service.available: return 0 # Get results and count them results = await search_author_text(text, SEARCH_PREFETCH_SIZE, 0) return len(results) async def initialize_search_index(shouts_data: list) -> None: """Initialize search index with existing data - backward compatibility function""" if not search_service.available: logger.warning("Search service not available for initialization") return try: # Сначала пытаемся восстановить существующий индекс await search_service.async_init() # Проверяем нужна ли переиндексация if len(shouts_data) > 0: await search_service.bulk_index(shouts_data) logger.info(f"Initialized search index with {len(shouts_data)} documents") except Exception as e: logger.exception(f"Failed to initialize search index: {e}") async def check_search_service() -> None: """Check if search service is available - backward compatibility function""" if search_service.available: logger.info("Search service is available and ready") else: logger.warning("Search service is not available") async def initialize_search_index_background() -> None: """Initialize search index in background - backward compatibility function""" try: logger.info("Background search index initialization started") # This function is kept for compatibility but doesn't do much # since Muvera handles indexing automatically logger.info("Background search index initialization completed") except Exception: logger.exception("Error in background search index initialization")