import asyncio import gzip import json import os import pickle import time from pathlib import Path from typing import Any, Dict, List import muvera import numpy as np from settings import MUVERA_INDEX_NAME, SEARCH_MAX_BATCH_SIZE, SEARCH_PREFETCH_SIZE from utils.logger import root_logger as logger # Отложенный импорт SentenceTransformer для избежания блокировки запуска SentenceTransformer = None primary_model = "paraphrase-multilingual-MiniLM-L12-v2" # 💾 Настройка локального кеша для HuggingFace моделей def get_models_cache_dir() -> str: """Определяет лучшую папку для кеша моделей""" # Пробуем /dump если доступен для записи dump_path = Path("/dump") logger.info( f"🔍 Checking /dump - exists: {dump_path.exists()}, writable: {os.access('/dump', os.W_OK) if dump_path.exists() else 'N/A'}" ) if dump_path.exists() and os.access("/dump", os.W_OK): cache_dir = "/dump/huggingface" try: Path(cache_dir).mkdir(parents=True, exist_ok=True) logger.info(f"✅ Using mounted storage: {cache_dir}") return cache_dir except Exception as e: logger.warning(f"Failed to create {cache_dir}: {e}") # Fallback - локальная папка ./dump cache_dir = "./dump/huggingface" Path(cache_dir).mkdir(parents=True, exist_ok=True) logger.info(f"📁 Using local fallback: {cache_dir}") return cache_dir MODELS_CACHE_DIR = get_models_cache_dir() # Используем HF_HOME вместо устаревшего TRANSFORMERS_CACHE os.environ.setdefault("HF_HOME", MODELS_CACHE_DIR) # Global collection for background tasks background_tasks: List[asyncio.Task] = [] def _lazy_import_sentence_transformers(): """🔄 Lazy import SentenceTransformer для избежания блокировки старта приложения""" global SentenceTransformer # noqa: PLW0603 if SentenceTransformer is None: try: from sentence_transformers import SentenceTransformer as SentenceTransformerClass SentenceTransformer = SentenceTransformerClass logger.info("✅ SentenceTransformer импортирован успешно") except ImportError as e: logger.error(f"❌ Не удалось импортировать SentenceTransformer: {e}") SentenceTransformer = None return SentenceTransformer class MuveraWrapper: """🔍 Real vector search with SentenceTransformers + FDE encoding""" def __init__(self, vector_dimension: int = 768, cache_enabled: bool = True, batch_size: int = 100) -> None: self.vector_dimension = vector_dimension self.cache_enabled = cache_enabled self.batch_size = batch_size self.encoder: Any = None self.buckets = 128 # Default number of buckets for FDE encoding self.documents: Dict[str, Dict[str, Any]] = {} # Simple in-memory storage for demo self.embeddings: Dict[str, np.ndarray | None] = {} # Store encoded embeddings # 🚀 Откладываем инициализацию модели до первого использования logger.info("🔄 MuveraWrapper инициализирован - модель будет загружена при первом использовании") self.encoder = None self._model_loaded = False def _is_model_cached(self, model_name: str) -> bool: """🔍 Проверяет наличие модели в кеше""" try: # Проверяем наличие папки модели в кеше cache_path = Path(MODELS_CACHE_DIR) # SentenceTransformer сохраняет модели в формате models--org--model-name model_cache_name = f"models--sentence-transformers--{model_name}" model_path = cache_path / model_cache_name # Проверяем существование папки модели if not model_path.exists(): return False # Проверяем наличие snapshots папки (новый формат HuggingFace) snapshots_path = model_path / "snapshots" if snapshots_path.exists(): # Ищем любой snapshot с config.json for snapshot_dir in snapshots_path.iterdir(): if snapshot_dir.is_dir(): config_file = snapshot_dir / "config.json" if config_file.exists(): return True # Fallback: проверяем старый формат config_file = model_path / "config.json" return config_file.exists() except Exception as e: logger.debug(f"Error checking model cache for {model_name}: {e}") return False def _ensure_model_loaded(self) -> bool: """🔄 Убеждаемся что модель загружена (lazy loading)""" if self._model_loaded: return self.encoder is not None # Импортируем SentenceTransformer при первой необходимости sentence_transformer_class = _lazy_import_sentence_transformers() if sentence_transformer_class is None: logger.error("❌ SentenceTransformer недоступен") return False try: logger.info(f"💾 Using models cache directory: {MODELS_CACHE_DIR}") # Проверяем наличие основной модели is_cached = self._is_model_cached(primary_model) if is_cached: logger.info(f"🔍 Found cached model: {primary_model}") else: logger.info(f"🔽 Downloading model: {primary_model}") # Используем многоязычную модель, хорошо работающую с русским self.encoder = sentence_transformer_class( primary_model, cache_folder=MODELS_CACHE_DIR, local_files_only=is_cached, # Не скачиваем если уже есть в кеше ) logger.info("🔍 SentenceTransformer model loaded successfully") self._model_loaded = True return True except Exception as e: logger.error(f"Failed to load primary SentenceTransformer: {e}") # Fallback - простая модель try: fallback_model = "all-MiniLM-L6-v2" is_fallback_cached = self._is_model_cached(fallback_model) if is_fallback_cached: logger.info(f"🔍 Found cached fallback model: {fallback_model}") else: logger.info(f"🔽 Downloading fallback model: {fallback_model}") self.encoder = sentence_transformer_class( fallback_model, cache_folder=MODELS_CACHE_DIR, local_files_only=is_fallback_cached ) logger.info("🔍 Fallback SentenceTransformer model loaded") self._model_loaded = True return True except Exception: logger.error("Failed to load any SentenceTransformer model") self.encoder = None self._model_loaded = True # Помечаем как попытка завершена return False async def async_init(self) -> None: """🔄 Асинхронная инициализация - восстановление индекса из файла""" try: logger.info("🔍 Пытаемся восстановить векторный индекс из файла...") # Пытаемся загрузить из файла if await self.load_index_from_file(): logger.info("✅ Векторный индекс восстановлен из файла") else: logger.info("🔍 Сохраненный индекс не найден, будет создан новый") except Exception as e: logger.error(f"❌ Ошибка при восстановлении индекса: {e}") async def info(self) -> dict: """Return service information""" return { "vector_dimension": self.vector_dimension, "buckets": self.buckets, "documents_count": len(self.documents), "cache_enabled": self.cache_enabled, } async def search(self, query: str, limit: int) -> List[Dict[str, Any]]: """🔍 Real vector search using SentenceTransformers + FDE encoding""" if not query.strip(): return [] # Загружаем модель при первом использовании if not self._ensure_model_loaded(): logger.warning("🔍 Search unavailable - model not loaded") return [] try: # 🚀 Генерируем настоящий эмбединг запроса query_text = query.strip() query_embedding = self.encoder.encode(query_text, convert_to_numpy=True) # Нормализуем размерность для FDE if query_embedding.ndim == 1: query_embedding = query_embedding.reshape(1, -1) # Encode query using FDE query_fde = muvera.encode_fde(query_embedding, self.buckets, "avg") # 🔍 Semantic similarity search results = [] for doc_id, doc_embedding in self.embeddings.items(): if doc_embedding is not None: # Calculate cosine similarity similarity = np.dot(query_fde, doc_embedding) / ( np.linalg.norm(query_fde) * np.linalg.norm(doc_embedding) + 1e-8 ) results.append( { "id": doc_id, "score": float(similarity), "metadata": self.documents.get(doc_id, {}).get("metadata", {}), } ) # Sort by score and limit results results.sort(key=lambda x: x["score"], reverse=True) return results[:limit] except Exception as e: logger.error(f"🔍 Search error: {e}") return [] async def index(self, documents: List[Dict[str, Any]], silent: bool = False) -> None: """🚀 Index documents using real SentenceTransformers + FDE encoding""" # Загружаем модель при первом использовании if not self._ensure_model_loaded(): if not silent: logger.warning("🔍 No encoder available for indexing") return # 🤫 Batch mode detection is_batch = len(documents) > 10 indexed_count = 0 skipped_count = 0 if is_batch: # 🚀 Batch processing for better performance valid_docs = [] doc_contents = [] for doc in documents: doc_id = doc["id"] self.documents[doc_id] = doc title = doc.get("title", "").strip() body = doc.get("body", "").strip() doc_content = f"{title} {body}".strip() if doc_content: valid_docs.append(doc) doc_contents.append(doc_content) else: skipped_count += 1 if doc_contents: try: # 🚀 Batch encode all documents at once batch_embeddings = self.encoder.encode( doc_contents, convert_to_numpy=True, show_progress_bar=not silent, batch_size=32 ) # Process each embedding for doc, embedding in zip(valid_docs, batch_embeddings, strict=False): emb = embedding doc_id = doc["id"] # Нормализуем размерность для FDE if emb.ndim == 1: emb = emb.reshape(1, -1) # Encode using FDE doc_fde = muvera.encode_fde(emb, self.buckets, "avg") self.embeddings[doc_id] = doc_fde indexed_count += 1 except Exception as e: if not silent: logger.error(f"🔍 Batch encoding error: {e}") return else: # 🔍 Single document processing for doc in documents: try: doc_id = doc["id"] self.documents[doc_id] = doc title = doc.get("title", "").strip() body = doc.get("body", "").strip() doc_content = f"{title} {body}".strip() if not doc_content: if not silent: logger.warning(f"🔍 Empty content for document {doc_id}") skipped_count += 1 continue # 🚀 Single document encoding doc_embedding = self.encoder.encode(doc_content, convert_to_numpy=True, show_progress_bar=False) if doc_embedding.ndim == 1: doc_embedding = doc_embedding.reshape(1, -1) doc_fde = muvera.encode_fde(doc_embedding, self.buckets, "avg") self.embeddings[doc_id] = doc_fde indexed_count += 1 if not silent: logger.debug(f"🔍 Indexed document {doc_id} with content length {len(doc_content)}") except Exception as e: if not silent: logger.error(f"🔍 Indexing error for document {doc.get('id', 'unknown')}: {e}") skipped_count += 1 continue # 🔍 Final statistics if not silent: if is_batch: logger.info(f"🚀 Batch indexed {indexed_count} documents, skipped {skipped_count}") elif indexed_count > 0: logger.debug(f"🔍 Indexed {indexed_count} documents") # 🗃️ Автосохранение индекса после успешной индексации if indexed_count > 0: try: await self.save_index_to_file() if not silent: logger.debug("💾 Индекс автоматически сохранен в файл") except Exception as e: logger.warning(f"⚠️ Не удалось сохранить индекс в файл: {e}") async def verify_documents(self, doc_ids: List[str]) -> Dict[str, Any]: """Verify which documents exist in the index""" missing = [doc_id for doc_id in doc_ids if doc_id not in self.documents] return {"missing": missing} async def get_index_status(self) -> Dict[str, Any]: """Get index status information""" return { "total_documents": len(self.documents), "total_embeddings": len(self.embeddings), "consistency": {"status": "ok", "null_embeddings_count": 0}, } async def save_index_to_file(self, dump_dir: str = "./dump") -> bool: """🗃️ Сохраняет векторный индекс в файл (fallback метод)""" try: # Создаем директорию если не существует dump_path = Path(dump_dir) dump_path.mkdir(parents=True, exist_ok=True) # Подготавливаем данные для сериализации index_data = { "documents": self.documents, "embeddings": self.embeddings, "vector_dimension": self.vector_dimension, "buckets": self.buckets, "timestamp": int(time.time()), "version": "1.0", } # Сериализуем данные с pickle serialized_data = pickle.dumps(index_data) # Подготавливаем имена файлов index_file = dump_path / f"{MUVERA_INDEX_NAME}_vector_index.pkl.gz" # Сохраняем основной индекс с gzip сжатием with gzip.open(index_file, "wb") as f: f.write(serialized_data) logger.info(f"🗃️ Векторный индекс сохранен в файл: {index_file}") logger.info(f" 📊 Документов: {len(self.documents)}, эмбедингов: {len(self.embeddings)}") return True except Exception as e: logger.error(f"❌ Ошибка сохранения индекса в файл: {e}") return False async def load_index_from_file(self, dump_dir: str = "./dump") -> bool: """🔄 Восстанавливает векторный индекс из файла""" try: dump_path = Path(dump_dir) index_file = dump_path / f"{MUVERA_INDEX_NAME}_vector_index.pkl.gz" # Проверяем существование файла if not index_file.exists(): logger.debug(f"🔍 Сохраненный индекс не найден: {index_file}") return False # Загружаем и распаковываем данные with gzip.open(index_file, "rb") as f: serialized_data = f.read() # Десериализуем данные index_data = pickle.loads(serialized_data) # noqa: S301 # Проверяем версию совместимости if index_data.get("version") != "1.0": logger.warning(f"🔍 Несовместимая версия индекса: {index_data.get('version')}") return False # Восстанавливаем данные self.documents = index_data["documents"] self.embeddings = index_data["embeddings"] self.vector_dimension = index_data["vector_dimension"] self.buckets = index_data["buckets"] file_size = int(index_file.stat().st_size) decompression_ratio = len(serialized_data) / file_size if file_size > 0 else 1.0 logger.info("🔄 Векторный индекс восстановлен из файла:") logger.info(f" 📁 Файл: {index_file}") logger.info(f" 📊 Документов: {len(self.documents)}, эмбедингов: {len(self.embeddings)}") logger.info( f" 💾 Размер: {file_size:,} → {len(serialized_data):,} байт (декомпрессия {decompression_ratio:.1f}x)" ) return True except Exception as e: logger.error(f"❌ Ошибка восстановления индекса из файла: {e}") return False async def close(self) -> None: """Close the wrapper (no-op for this simple implementation)""" class SearchService: def __init__(self) -> None: self.available: bool = False self.muvera_client: Any = None self.client: Any = None # Initialize local Muvera try: self.muvera_client = MuveraWrapper( vector_dimension=768, # Standard embedding dimension cache_enabled=True, batch_size=SEARCH_MAX_BATCH_SIZE, ) self.available = True logger.info(f"Local Muvera wrapper initialized - index: {MUVERA_INDEX_NAME}") except Exception as e: logger.error(f"Failed to initialize Muvera: {e}") self.available = False async def async_init(self) -> None: """🔄 Асинхронная инициализация - восстановление индекса""" if self.muvera_client: await self.muvera_client.async_init() async def info(self) -> dict: """Return information about search service""" if not self.available: return {"status": "disabled"} try: # Get Muvera service info if self.muvera_client: muvera_info = await self.muvera_client.info() return {"status": "enabled", "provider": "muvera", "mode": "local", "muvera_info": muvera_info} return {"status": "error", "message": "Muvera client not available"} except Exception: logger.exception("Failed to get search info") return {"status": "error", "message": "Failed to get search info"} def is_ready(self) -> bool: """Check if service is available""" return self.available async def search(self, text: str, limit: int, offset: int) -> list: """Search documents using Muvera""" if not self.available or not self.muvera_client: return [] try: logger.info(f"Muvera search for: '{text}' (limit={limit}, offset={offset})") # Perform Muvera search results = await self.muvera_client.search( query=text, limit=limit + offset, # Get enough results for pagination ) # Format results to match your existing format formatted_results = [] for result in results: formatted_results.append( { "id": str(result.get("id", "")), "score": result.get("score", 0.0), "metadata": result.get("metadata", {}), } ) # Apply pagination return formatted_results[offset : offset + limit] except Exception as e: logger.exception(f"Muvera search failed for '{text}': {e}") return [] async def search_authors(self, text: str, limit: int = 10, offset: int = 0) -> list: """Search only for authors using Muvera""" if not self.available or not self.muvera_client or not text.strip(): return [] try: logger.info(f"Muvera author search for: '{text}' (limit={limit}, offset={offset})") # Use Muvera to search with author-specific filtering results = await self.muvera_client.search( query=text, limit=limit + offset, ) # Format results author_results = [] for result in results: author_results.append( { "id": str(result.get("id", "")), "score": result.get("score", 0.0), "metadata": result.get("metadata", {}), } ) # Apply pagination return author_results[offset : offset + limit] except Exception: logger.exception(f"Error searching authors for '{text}'") return [] def index(self, shout: Any) -> None: """Index a single document using Muvera""" if not self.available or not self.muvera_client: return logger.info(f"Muvera indexing post {shout.id}") # Start in background to not block background_tasks.append(asyncio.create_task(self.perform_muvera_index(shout))) async def perform_muvera_index(self, shout: Any) -> None: """Index a single document using Muvera""" if not self.muvera_client: return try: logger.info(f"Muvera indexing document {shout.id}") # Prepare document data for Muvera doc_data: Dict[str, Any] = { "id": str(shout.id), "title": getattr(shout, "title", "") or "", "body": "", "metadata": {}, } # Combine body content body_parts = [] for field_name in ["subtitle", "lead", "body"]: field_value = getattr(shout, field_name, None) if field_value and isinstance(field_value, str) and field_value.strip(): body_parts.append(field_value.strip()) # Process media content media = getattr(shout, "media", None) if media: if isinstance(media, str): try: media_json = json.loads(media) if isinstance(media_json, dict): if "title" in media_json: body_parts.append(media_json["title"]) if "body" in media_json: body_parts.append(media_json["body"]) except json.JSONDecodeError: body_parts.append(media) elif isinstance(media, dict) and (media.get("title") or media.get("body")): if media.get("title"): body_parts.append(media["title"]) if media.get("body"): body_parts.append(media["body"]) # Set body content if body_parts: doc_data["body"] = " ".join(body_parts) # Add metadata doc_data["metadata"] = { "layout": getattr(shout, "layout", "article"), "lang": getattr(shout, "lang", "ru"), "created_at": getattr(shout, "created_at", 0), "created_by": getattr(shout, "created_by", 0), } # Index with Muvera (single document = verbose mode) await self.muvera_client.index(documents=[doc_data], silent=False) logger.info(f"🚀 Document {shout.id} indexed successfully") except Exception: logger.exception(f"Muvera indexing error for shout {shout.id}") async def bulk_index(self, shouts: list) -> None: """Index multiple documents using Muvera""" if not self.available or not self.muvera_client or not shouts: logger.warning( f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}" ) return # Запускаем метрики индексации start_time = time.time() logger.info(f"Starting Muvera bulk indexing of {len(shouts)} documents") # Prepare documents for Muvera documents: List[Dict[str, Any]] = [] total_skipped = 0 for shout in shouts: try: # Prepare document data for Muvera doc_data: Dict[str, Any] = { "id": str(getattr(shout, "id", "")), "title": getattr(shout, "title", "") or "", "body": "", "metadata": {}, } # Combine body content body_parts = [] for field_name in ["subtitle", "lead", "body"]: field_value = getattr(shout, field_name, None) if field_value and isinstance(field_value, str) and field_value.strip(): body_parts.append(field_value.strip()) # Process media content media = getattr(shout, "media", None) if media: if isinstance(media, str): try: media_json = json.loads(media) if isinstance(media_json, dict): if "title" in media_json: body_parts.append(media_json["title"]) if "body" in media_json: body_parts.append(media_json["body"]) except json.JSONDecodeError: body_parts.append(media) elif isinstance(media, dict) and (media.get("title") or media.get("body")): if media.get("title"): body_parts.append(media["title"]) if media.get("body"): body_parts.append(media["body"]) # Set body content if body_parts: doc_data["body"] = " ".join(body_parts) # Add metadata doc_data["metadata"] = { "layout": getattr(shout, "layout", "article"), "lang": getattr(shout, "lang", "ru"), "created_at": getattr(shout, "created_at", 0), "created_by": getattr(shout, "created_by", 0), } documents.append(doc_data) except Exception: logger.exception(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing") total_skipped += 1 if documents: try: # 🤫 Index with Muvera in silent mode for batch operations await self.muvera_client.index(documents=documents, silent=True) elapsed = time.time() - start_time logger.info( f"🚀 Bulk indexing completed in {elapsed:.2f}s: " f"{len(documents)} documents indexed, {total_skipped} shouts skipped" ) except Exception as e: logger.exception(f"Muvera bulk indexing failed: {e}") else: logger.warning("No documents to index") async def verify_docs(self, doc_ids: list) -> dict: """Verify which documents exist in the search index using Muvera""" if not self.available or not self.muvera_client: return {"status": "disabled"} try: logger.info(f"Verifying {len(doc_ids)} documents in Muvera search index") # Use Muvera to verify documents verification_result = await self.muvera_client.verify_documents(doc_ids) # Format result to match expected structure missing_ids = verification_result.get("missing", []) logger.info( f"Document verification complete: {len(missing_ids)} documents missing out of {len(doc_ids)} total" ) return {"missing": missing_ids, "details": {"missing_count": len(missing_ids), "total_count": len(doc_ids)}} except Exception: logger.exception("Document verification error") return {"status": "error", "message": "Document verification error"} async def check_index_status(self) -> dict: """Get detailed statistics about the search index health using Muvera""" if not self.available or not self.muvera_client: return {"status": "disabled"} try: # Get Muvera index status index_status = await self.muvera_client.get_index_status() # Check for consistency issues if index_status.get("consistency", {}).get("status") != "ok": null_count = index_status.get("consistency", {}).get("null_embeddings_count", 0) if null_count > 0: logger.warning(f"Found {null_count} documents with NULL embeddings") return index_status except Exception: logger.exception("Failed to check index status") return {"status": "error", "message": "Failed to check index status"} async def close(self) -> None: """Close connections and release resources""" if hasattr(self, "muvera_client") and self.muvera_client: try: await self.muvera_client.close() logger.info("Local Muvera client closed") except Exception as e: logger.warning(f"Error closing Muvera client: {e}") logger.info("Search service closed") # Create the search service singleton search_service = SearchService() # API-compatible functions for backward compatibility async def search_text(text: str, limit: int = 200, offset: int = 0) -> list: """Search text using Muvera - backward compatibility function""" if search_service.available: return await search_service.search(text, limit, offset) return [] async def search_author_text(text: str, limit: int = 10, offset: int = 0) -> list: """Search authors using Muvera - backward compatibility function""" if search_service.available: return await search_service.search_authors(text, limit, offset) return [] async def get_search_count(text: str) -> int: """Get count of search results - backward compatibility function""" if not search_service.available: return 0 # Get results and count them results = await search_text(text, SEARCH_PREFETCH_SIZE, 0) return len(results) async def get_author_search_count(text: str) -> int: """Get count of author search results - backward compatibility function""" if not search_service.available: return 0 # Get results and count them results = await search_author_text(text, SEARCH_PREFETCH_SIZE, 0) return len(results) async def initialize_search_index(shouts_data: list) -> None: """Initialize search index with existing data - backward compatibility function""" if not search_service.available: logger.warning("Search service not available for initialization") return try: # Сначала пытаемся восстановить существующий индекс await search_service.async_init() # Проверяем нужна ли переиндексация if len(shouts_data) > 0: await search_service.bulk_index(shouts_data) logger.info(f"Initialized search index with {len(shouts_data)} documents") except Exception as e: logger.exception(f"Failed to initialize search index: {e}") async def check_search_service() -> None: """Check if search service is available - backward compatibility function""" if search_service.available: logger.info("Search service is available and ready") else: logger.warning("Search service is not available") async def initialize_search_index_background() -> None: """Initialize search index in background - backward compatibility function""" try: logger.info("Background search index initialization started") # This function is kept for compatibility but doesn't do much # since Muvera handles indexing automatically logger.info("Background search index initialization completed") except Exception: logger.exception("Error in background search index initialization")