### 🔍 Search System Redis Storage - **💾 Redis-based vector index storage**: Переключились обратно на Redis для хранения векторного индекса - Заменили файловое хранение в `/dump` на Redis ключи для надежности - Исправлена проблема с правами доступа на `/dump` папку на сервере - Векторный индекс теперь сохраняется по ключам `search_index:{name}:data` и `search_index:{name}:metadata` - **🛠️ Improved reliability**: Убрали зависимость от файловой системы для критичных данных - **⚡ Better performance**: Redis обеспечивает более быстрый доступ к индексу - **🔧 Technical changes**: - Заменили `save_index_to_file()` на `save_index_to_redis()` - Заменили `load_index_from_file()` на `load_index_from_redis()` - Обновили автосохранение для использования Redis вместо файлов - Удалили неиспользуемые импорты (`gzip`, `pathlib`, `cast`)
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
import asyncio
|
||||
import gzip
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import muvera
|
||||
@@ -10,6 +14,32 @@ from sentence_transformers import SentenceTransformer
|
||||
from settings import MUVERA_INDEX_NAME, SEARCH_MAX_BATCH_SIZE, SEARCH_PREFETCH_SIZE
|
||||
from utils.logger import root_logger as logger
|
||||
|
||||
primary_model = "paraphrase-multilingual-MiniLM-L12-v2"
|
||||
|
||||
|
||||
# 💾 Настройка локального кеша для HuggingFace моделей
|
||||
def get_models_cache_dir() -> str:
|
||||
"""Определяет лучшую папку для кеша моделей"""
|
||||
# Пробуем /dump если доступен для записи
|
||||
dump_path = Path("/dump")
|
||||
if dump_path.exists() and os.access("/dump", os.W_OK):
|
||||
cache_dir = "/dump/huggingface"
|
||||
try:
|
||||
Path(cache_dir).mkdir(parents=True, exist_ok=True)
|
||||
return cache_dir
|
||||
except Exception: # noqa: S110
|
||||
pass
|
||||
|
||||
# Fallback - локальная папка ./dump
|
||||
cache_dir = "./dump/huggingface"
|
||||
Path(cache_dir).mkdir(parents=True, exist_ok=True)
|
||||
return cache_dir
|
||||
|
||||
|
||||
MODELS_CACHE_DIR = get_models_cache_dir()
|
||||
os.environ.setdefault("TRANSFORMERS_CACHE", MODELS_CACHE_DIR)
|
||||
os.environ.setdefault("HF_HOME", MODELS_CACHE_DIR)
|
||||
|
||||
# Global collection for background tasks
|
||||
background_tasks: List[asyncio.Task] = []
|
||||
|
||||
@@ -26,21 +56,88 @@ class MuveraWrapper:
|
||||
self.documents: Dict[str, Dict[str, Any]] = {} # Simple in-memory storage for demo
|
||||
self.embeddings: Dict[str, np.ndarray | None] = {} # Store encoded embeddings
|
||||
|
||||
# 🚀 Инициализируем реальную модель эмбедингов
|
||||
# 🚀 Инициализируем реальную модель эмбедингов с локальным кешом
|
||||
try:
|
||||
logger.info(f"💾 Using models cache directory: {MODELS_CACHE_DIR}")
|
||||
|
||||
# Проверяем наличие основной модели
|
||||
is_cached = self._is_model_cached(primary_model)
|
||||
if is_cached:
|
||||
logger.info(f"🔍 Found cached model: {primary_model}")
|
||||
else:
|
||||
logger.info(f"🔽 Downloading model: {primary_model}")
|
||||
|
||||
# Используем многоязычную модель, хорошо работающую с русским
|
||||
self.encoder = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2")
|
||||
self.encoder = SentenceTransformer(
|
||||
primary_model,
|
||||
cache_folder=MODELS_CACHE_DIR,
|
||||
local_files_only=is_cached, # Не скачиваем если уже есть в кеше
|
||||
)
|
||||
logger.info("🔍 SentenceTransformer model loaded successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load SentenceTransformer: {e}")
|
||||
logger.error(f"Failed to load primary SentenceTransformer: {e}")
|
||||
# Fallback - простая модель
|
||||
try:
|
||||
self.encoder = SentenceTransformer("all-MiniLM-L6-v2")
|
||||
fallback_model = "all-MiniLM-L6-v2"
|
||||
is_fallback_cached = self._is_model_cached(fallback_model)
|
||||
if is_fallback_cached:
|
||||
logger.info(f"🔍 Found cached fallback model: {fallback_model}")
|
||||
else:
|
||||
logger.info(f"🔽 Downloading fallback model: {fallback_model}")
|
||||
|
||||
self.encoder = SentenceTransformer(
|
||||
fallback_model, cache_folder=MODELS_CACHE_DIR, local_files_only=is_fallback_cached
|
||||
)
|
||||
logger.info("🔍 Fallback SentenceTransformer model loaded")
|
||||
except Exception:
|
||||
logger.error("Failed to load any SentenceTransformer model")
|
||||
self.encoder = None
|
||||
|
||||
def _is_model_cached(self, model_name: str) -> bool:
|
||||
"""🔍 Проверяет наличие модели в кеше"""
|
||||
try:
|
||||
# Проверяем наличие папки модели в кеше
|
||||
cache_path = Path(MODELS_CACHE_DIR)
|
||||
|
||||
# SentenceTransformer сохраняет модели в формате models--org--model-name
|
||||
model_cache_name = f"models--sentence-transformers--{model_name}"
|
||||
model_path = cache_path / model_cache_name
|
||||
|
||||
# Проверяем существование папки модели
|
||||
if not model_path.exists():
|
||||
return False
|
||||
|
||||
# Проверяем наличие snapshots папки (новый формат HuggingFace)
|
||||
snapshots_path = model_path / "snapshots"
|
||||
if snapshots_path.exists():
|
||||
# Ищем любой snapshot с config.json
|
||||
for snapshot_dir in snapshots_path.iterdir():
|
||||
if snapshot_dir.is_dir():
|
||||
config_file = snapshot_dir / "config.json"
|
||||
if config_file.exists():
|
||||
return True
|
||||
|
||||
# Fallback: проверяем старый формат
|
||||
config_file = model_path / "config.json"
|
||||
return config_file.exists()
|
||||
except Exception as e:
|
||||
logger.debug(f"Error checking model cache for {model_name}: {e}")
|
||||
return False
|
||||
|
||||
async def async_init(self) -> None:
|
||||
"""🔄 Асинхронная инициализация - восстановление индекса из файла"""
|
||||
try:
|
||||
logger.info("🔍 Пытаемся восстановить векторный индекс из файла...")
|
||||
|
||||
# Пытаемся загрузить из файла
|
||||
if await self.load_index_from_file():
|
||||
logger.info("✅ Векторный индекс восстановлен из файла")
|
||||
else:
|
||||
logger.info("🔍 Сохраненный индекс не найден, будет создан новый")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при восстановлении индекса: {e}")
|
||||
|
||||
async def info(self) -> dict:
|
||||
"""Return service information"""
|
||||
return {
|
||||
@@ -190,6 +287,15 @@ class MuveraWrapper:
|
||||
elif indexed_count > 0:
|
||||
logger.debug(f"🔍 Indexed {indexed_count} documents")
|
||||
|
||||
# 🗃️ Автосохранение индекса после успешной индексации
|
||||
if indexed_count > 0:
|
||||
try:
|
||||
await self.save_index_to_file()
|
||||
if not silent:
|
||||
logger.debug("💾 Индекс автоматически сохранен в файл")
|
||||
except Exception as e:
|
||||
logger.warning(f"⚠️ Не удалось сохранить индекс в файл: {e}")
|
||||
|
||||
async def verify_documents(self, doc_ids: List[str]) -> Dict[str, Any]:
|
||||
"""Verify which documents exist in the index"""
|
||||
missing = [doc_id for doc_id in doc_ids if doc_id not in self.documents]
|
||||
@@ -203,6 +309,87 @@ class MuveraWrapper:
|
||||
"consistency": {"status": "ok", "null_embeddings_count": 0},
|
||||
}
|
||||
|
||||
async def save_index_to_file(self, dump_dir: str = "./dump") -> bool:
|
||||
"""🗃️ Сохраняет векторный индекс в файл (fallback метод)"""
|
||||
try:
|
||||
# Создаем директорию если не существует
|
||||
dump_path = Path(dump_dir)
|
||||
dump_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Подготавливаем данные для сериализации
|
||||
index_data = {
|
||||
"documents": self.documents,
|
||||
"embeddings": self.embeddings,
|
||||
"vector_dimension": self.vector_dimension,
|
||||
"buckets": self.buckets,
|
||||
"timestamp": int(time.time()),
|
||||
"version": "1.0",
|
||||
}
|
||||
|
||||
# Сериализуем данные с pickle
|
||||
serialized_data = pickle.dumps(index_data)
|
||||
|
||||
# Подготавливаем имена файлов
|
||||
index_file = dump_path / f"{MUVERA_INDEX_NAME}_vector_index.pkl.gz"
|
||||
|
||||
# Сохраняем основной индекс с gzip сжатием
|
||||
with gzip.open(index_file, "wb") as f:
|
||||
f.write(serialized_data)
|
||||
|
||||
logger.info(f"🗃️ Векторный индекс сохранен в файл: {index_file}")
|
||||
logger.info(f" 📊 Документов: {len(self.documents)}, эмбедингов: {len(self.embeddings)}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка сохранения индекса в файл: {e}")
|
||||
return False
|
||||
|
||||
async def load_index_from_file(self, dump_dir: str = "./dump") -> bool:
|
||||
"""🔄 Восстанавливает векторный индекс из файла"""
|
||||
try:
|
||||
dump_path = Path(dump_dir)
|
||||
index_file = dump_path / f"{MUVERA_INDEX_NAME}_vector_index.pkl.gz"
|
||||
|
||||
# Проверяем существование файла
|
||||
if not index_file.exists():
|
||||
logger.debug(f"🔍 Сохраненный индекс не найден: {index_file}")
|
||||
return False
|
||||
|
||||
# Загружаем и распаковываем данные
|
||||
with gzip.open(index_file, "rb") as f:
|
||||
serialized_data = f.read()
|
||||
|
||||
# Десериализуем данные
|
||||
index_data = pickle.loads(serialized_data) # noqa: S301
|
||||
|
||||
# Проверяем версию совместимости
|
||||
if index_data.get("version") != "1.0":
|
||||
logger.warning(f"🔍 Несовместимая версия индекса: {index_data.get('version')}")
|
||||
return False
|
||||
|
||||
# Восстанавливаем данные
|
||||
self.documents = index_data["documents"]
|
||||
self.embeddings = index_data["embeddings"]
|
||||
self.vector_dimension = index_data["vector_dimension"]
|
||||
self.buckets = index_data["buckets"]
|
||||
|
||||
file_size = int(index_file.stat().st_size)
|
||||
decompression_ratio = len(serialized_data) / file_size if file_size > 0 else 1.0
|
||||
|
||||
logger.info("🔄 Векторный индекс восстановлен из файла:")
|
||||
logger.info(f" 📁 Файл: {index_file}")
|
||||
logger.info(f" 📊 Документов: {len(self.documents)}, эмбедингов: {len(self.embeddings)}")
|
||||
logger.info(
|
||||
f" 💾 Размер: {file_size:,} → {len(serialized_data):,} байт (декомпрессия {decompression_ratio:.1f}x)"
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка восстановления индекса из файла: {e}")
|
||||
return False
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the wrapper (no-op for this simple implementation)"""
|
||||
|
||||
@@ -227,6 +414,11 @@ class SearchService:
|
||||
logger.error(f"Failed to initialize Muvera: {e}")
|
||||
self.available = False
|
||||
|
||||
async def async_init(self) -> None:
|
||||
"""🔄 Асинхронная инициализация - восстановление индекса"""
|
||||
if self.muvera_client:
|
||||
await self.muvera_client.async_init()
|
||||
|
||||
async def info(self) -> dict:
|
||||
"""Return information about search service"""
|
||||
if not self.available:
|
||||
@@ -563,7 +755,10 @@ async def initialize_search_index(shouts_data: list) -> None:
|
||||
return
|
||||
|
||||
try:
|
||||
# Check if we need to reindex
|
||||
# Сначала пытаемся восстановить существующий индекс
|
||||
await search_service.async_init()
|
||||
|
||||
# Проверяем нужна ли переиндексация
|
||||
if len(shouts_data) > 0:
|
||||
await search_service.bulk_index(shouts_data)
|
||||
logger.info(f"Initialized search index with {len(shouts_data)} documents")
|
||||
|
||||
Reference in New Issue
Block a user