diff --git a/services/search.py b/services/search.py index 4c2e4986..041aee5f 100644 --- a/services/search.py +++ b/services/search.py @@ -1,7 +1,10 @@ import asyncio +import gzip import json +import pickle import time -from typing import Any, Dict, List +from pathlib import Path +from typing import Any, Dict, List, cast import muvera import numpy as np @@ -41,6 +44,29 @@ class MuveraWrapper: logger.error("Failed to load any SentenceTransformer model") self.encoder = None + async def async_init(self) -> None: + """🔄 Асинхронная инициализация - восстановление индекса из файла""" + try: + logger.info("🔍 Пытаемся восстановить векторный индекс из файла...") + + # Проверяем метаданные сначала + metadata = await self.get_index_metadata_from_file() + if metadata: + logger.info( + f"🔍 Найден сохраненный индекс от {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(metadata.get('saved_at', 0)))}" + ) + + # Восстанавливаем индекс + if await self.load_index_from_file(): + logger.info("✅ Векторный индекс успешно восстановлен из файла") + else: + logger.warning("⚠️ Не удалось восстановить индекс из файла") + else: + logger.info("🔍 Сохраненный индекс не найден, будет создан новый") + + except Exception as e: + logger.error(f"❌ Ошибка при восстановлении индекса: {e}") + async def info(self) -> dict: """Return service information""" return { @@ -190,6 +216,15 @@ class MuveraWrapper: elif indexed_count > 0: logger.debug(f"🔍 Indexed {indexed_count} documents") + # 🗃️ Автосохранение индекса после успешной индексации + if indexed_count > 0: + try: + await self.save_index_to_file() + if not silent: + logger.debug("💾 Индекс автоматически сохранен в файл") + except Exception as e: + logger.warning(f"⚠️ Не удалось автоматически сохранить индекс: {e}") + async def verify_documents(self, doc_ids: List[str]) -> Dict[str, Any]: """Verify which documents exist in the index""" missing = [doc_id for doc_id in doc_ids if doc_id not in self.documents] @@ -203,6 +238,147 @@ class MuveraWrapper: "consistency": {"status": "ok", "null_embeddings_count": 0}, } + async def save_index_to_file(self, dump_dir: str = "/dump") -> bool: + """🗃️ Сохраняет векторный индекс в файл с использованием gzip сжатия""" + try: + # Создаем директорию если не существует + dump_path = Path(dump_dir) + dump_path.mkdir(parents=True, exist_ok=True) + + # Подготавливаем данные для сериализации + index_data = { + "documents": self.documents, + "embeddings": self.embeddings, + "vector_dimension": self.vector_dimension, + "buckets": self.buckets, + "timestamp": int(time.time()), + "version": "1.0", + "index_name": MUVERA_INDEX_NAME, + } + + # Сериализуем данные с pickle + serialized_data = pickle.dumps(index_data) + + # Подготавливаем имена файлов + index_file = dump_path / f"{MUVERA_INDEX_NAME}_vector_index.pkl.gz" + metadata_file = dump_path / f"{MUVERA_INDEX_NAME}_metadata.json" + + # Сохраняем основной индекс с gzip сжатием + with gzip.open(index_file, "wb") as f: + f.write(serialized_data) + + # Сохраняем метаданные отдельно для быстрого доступа + metadata = { + "documents_count": len(self.documents), + "embeddings_count": len(self.embeddings), + "vector_dimension": self.vector_dimension, + "buckets": self.buckets, + "saved_at": int(time.time()), + "version": "1.0", + "index_name": MUVERA_INDEX_NAME, + "original_size_bytes": len(serialized_data), + "compressed_size_bytes": int(index_file.stat().st_size) if index_file.exists() else 0, + "index_file": str(index_file), + "metadata_file": str(metadata_file), + } + + with Path(metadata_file).open(mode="w", encoding="utf-8") as f: + json.dump(metadata, f, indent=2, ensure_ascii=False) + + original_size = cast(int, metadata["original_size_bytes"]) + compressed_size = cast(int, metadata["compressed_size_bytes"]) + compression_ratio = original_size / compressed_size if compressed_size > 0 else 1.0 + + logger.info("🗃️ Векторный индекс сохранен в файл:") + logger.info(f" 📁 Файл: {index_file}") + logger.info(f" 📊 Документов: {len(self.documents)}, эмбедингов: {len(self.embeddings)}") + logger.info( + f" 💾 Размер: {metadata['original_size_bytes']:,} → {metadata['compressed_size_bytes']:,} байт (сжатие {compression_ratio:.1f}x)" + ) + + return True + + except Exception as e: + logger.error(f"❌ Ошибка сохранения индекса в файл: {e}") + return False + + async def load_index_from_file(self, dump_dir: str = "/dump") -> bool: + """🔄 Восстанавливает векторный индекс из файла""" + try: + dump_path = Path(dump_dir) + index_file = dump_path / f"{MUVERA_INDEX_NAME}_vector_index.pkl.gz" + metadata_file = dump_path / f"{MUVERA_INDEX_NAME}_metadata.json" + + # Проверяем существование файлов + if not index_file.exists(): + logger.info(f"🔍 Сохраненный индекс не найден: {index_file}") + return False + + # Загружаем метаданные если есть + metadata = None + if metadata_file.exists(): + try: + with Path(metadata_file).open(mode="r", encoding="utf-8") as f: + metadata = json.load(f) + logger.info( + f"🔍 Найден сохраненный индекс: {metadata.get('documents_count', 0)} документов, {metadata.get('embeddings_count', 0)} эмбедингов" + ) + logger.info( + f"🔍 Размер файла: {metadata.get('compressed_size_bytes', 0):,} байт (сжато), сохранен: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(metadata.get('saved_at', 0)))}" + ) + except Exception as e: + logger.warning(f"⚠️ Не удалось загрузить метаданные: {e}") + + # Загружаем и распаковываем основные данные + with gzip.open(index_file, "rb") as f: + serialized_data = f.read() + # Десериализуем данные + import pickle + + index_data = pickle.loads(serialized_data) # noqa: S301 + + # Проверяем версию совместимости + if index_data.get("version") != "1.0": + logger.warning(f"🔍 Несовместимая версия индекса: {index_data.get('version')}") + return False + + # Восстанавливаем данные + self.documents = index_data["documents"] + self.embeddings = index_data["embeddings"] + self.vector_dimension = index_data["vector_dimension"] + self.buckets = index_data["buckets"] + + file_size = int(index_file.stat().st_size) + decompression_ratio = len(serialized_data) / file_size if file_size > 0 else 1.0 + + logger.info("🔄 Векторный индекс восстановлен из файла:") + logger.info(f" 📁 Файл: {index_file}") + logger.info(f" 📊 Документов: {len(self.documents)}, эмбедингов: {len(self.embeddings)}") + logger.info( + f" 💾 Размер: {file_size:,} → {len(serialized_data):,} байт (декомпрессия {decompression_ratio:.1f}x)" + ) + + return True + + except Exception as e: + logger.error(f"❌ Ошибка восстановления индекса из файла: {e}") + return False + + async def get_index_metadata_from_file(self, dump_dir: str = "/dump") -> dict[str, Any] | None: + """📊 Получает метаданные сохраненного индекса из файла""" + try: + dump_path = Path(dump_dir) + metadata_file = dump_path / f"{MUVERA_INDEX_NAME}_metadata.json" + + if metadata_file.exists(): + with Path(metadata_file).open(mode="r", encoding="utf-8") as f: + return json.load(f) + return None + + except Exception as e: + logger.error(f"❌ Ошибка получения метаданных индекса: {e}") + return None + async def close(self) -> None: """Close the wrapper (no-op for this simple implementation)""" @@ -227,6 +403,11 @@ class SearchService: logger.error(f"Failed to initialize Muvera: {e}") self.available = False + async def async_init(self) -> None: + """🔄 Асинхронная инициализация - восстановление индекса""" + if self.muvera_client: + await self.muvera_client.async_init() + async def info(self) -> dict: """Return information about search service""" if not self.available: @@ -563,7 +744,10 @@ async def initialize_search_index(shouts_data: list) -> None: return try: - # Check if we need to reindex + # Сначала пытаемся восстановить существующий индекс + await search_service.async_init() + + # Проверяем нужна ли переиндексация if len(shouts_data) > 0: await search_service.bulk_index(shouts_data) logger.info(f"Initialized search index with {len(shouts_data)} documents")