511 lines
20 KiB
Python
511 lines
20 KiB
Python
import asyncio
|
|
import json
|
|
import time
|
|
from typing import Any, Dict, List
|
|
|
|
import muvera
|
|
import numpy as np
|
|
|
|
from settings import MUVERA_INDEX_NAME, SEARCH_MAX_BATCH_SIZE, SEARCH_PREFETCH_SIZE
|
|
from utils.logger import root_logger as logger
|
|
|
|
# Простые метрики производительности поиска
|
|
class SearchMetrics:
|
|
def __init__(self):
|
|
self.indexing_start_time: float | None = None
|
|
self.documents_indexed: int = 0
|
|
|
|
def start_indexing(self, doc_count: int):
|
|
self.indexing_start_time = time.time()
|
|
self.documents_indexed = doc_count
|
|
print(f"🔍 Индексация {doc_count} документов...")
|
|
|
|
def end_indexing(self):
|
|
if self.indexing_start_time:
|
|
duration = time.time() - self.indexing_start_time
|
|
rate = self.documents_indexed / duration if duration > 0 else 0
|
|
print(f"✅ Индексация завершена за {duration:.2f}s ({rate:.1f} doc/s)")
|
|
|
|
# Глобальный экземпляр метрик
|
|
search_metrics = SearchMetrics()
|
|
|
|
# Global collection for background tasks
|
|
background_tasks: List[asyncio.Task] = []
|
|
|
|
|
|
class MuveraWrapper:
|
|
"""Simple wrapper around muvera.encode_fde to provide expected interface"""
|
|
|
|
def __init__(self, vector_dimension: int = 768, cache_enabled: bool = True, batch_size: int = 100) -> None:
|
|
self.vector_dimension = vector_dimension
|
|
self.cache_enabled = cache_enabled
|
|
self.batch_size = batch_size
|
|
self.buckets = 128 # Default number of buckets for FDE encoding
|
|
self.documents: Dict[str, Dict[str, Any]] = {} # Simple in-memory storage for demo
|
|
self.embeddings: Dict[str, np.ndarray | None] = {} # Store encoded embeddings
|
|
|
|
async def info(self) -> dict:
|
|
"""Return service information"""
|
|
return {
|
|
"vector_dimension": self.vector_dimension,
|
|
"buckets": self.buckets,
|
|
"documents_count": len(self.documents),
|
|
"cache_enabled": self.cache_enabled,
|
|
}
|
|
|
|
async def search(self, query: str, limit: int) -> List[Dict[str, Any]]:
|
|
"""Simple search implementation using FDE encoding with deterministic results"""
|
|
if not query.strip():
|
|
return []
|
|
|
|
# Create deterministic query embedding based on query hash
|
|
query_hash = hash(query.strip().lower())
|
|
rng = np.random.default_rng(seed=query_hash & 0x7FFFFFFF) # Use positive seed
|
|
query_embedding = rng.standard_normal((32, self.vector_dimension)).astype(np.float32)
|
|
|
|
# Encode query using FDE
|
|
query_fde = muvera.encode_fde(query_embedding, self.buckets, "sum")
|
|
|
|
# Simple similarity search (cosine similarity with encoded vectors)
|
|
results = []
|
|
for doc_id, doc_embedding in self.embeddings.items():
|
|
if doc_embedding is not None:
|
|
# Calculate similarity (dot product of normalized vectors)
|
|
similarity = np.dot(query_fde, doc_embedding) / (
|
|
np.linalg.norm(query_fde) * np.linalg.norm(doc_embedding)
|
|
)
|
|
results.append(
|
|
{
|
|
"id": doc_id,
|
|
"score": float(similarity),
|
|
"metadata": self.documents.get(doc_id, {}).get("metadata", {}),
|
|
}
|
|
)
|
|
|
|
# Sort by score and limit results - добавляем сортировку по ID для стабильности
|
|
results.sort(key=lambda x: (x["score"], x["id"]), reverse=True)
|
|
return results[:limit]
|
|
|
|
async def index(self, documents: List[Dict[str, Any]]) -> None:
|
|
"""Index documents using FDE encoding with deterministic embeddings"""
|
|
for doc in documents:
|
|
doc_id = doc["id"]
|
|
self.documents[doc_id] = doc
|
|
|
|
# Create deterministic document embedding based on document content hash
|
|
doc_content = f"{doc.get('title', '')} {doc.get('body', '')}"
|
|
content_hash = hash(doc_content.strip().lower() + str(doc_id))
|
|
rng = np.random.default_rng(seed=content_hash & 0x7FFFFFFF) # Use positive seed
|
|
doc_embedding = rng.standard_normal((32, self.vector_dimension)).astype(np.float32)
|
|
|
|
# Encode document using FDE (average aggregation for documents)
|
|
doc_fde = muvera.encode_fde(doc_embedding, self.buckets, "avg")
|
|
self.embeddings[doc_id] = doc_fde
|
|
|
|
async def verify_documents(self, doc_ids: List[str]) -> Dict[str, Any]:
|
|
"""Verify which documents exist in the index"""
|
|
missing = [doc_id for doc_id in doc_ids if doc_id not in self.documents]
|
|
return {"missing": missing}
|
|
|
|
async def get_index_status(self) -> Dict[str, Any]:
|
|
"""Get index status information"""
|
|
return {
|
|
"total_documents": len(self.documents),
|
|
"total_embeddings": len(self.embeddings),
|
|
"consistency": {"status": "ok", "null_embeddings_count": 0},
|
|
}
|
|
|
|
async def close(self) -> None:
|
|
"""Close the wrapper (no-op for this simple implementation)"""
|
|
|
|
|
|
class SearchService:
|
|
def __init__(self) -> None:
|
|
self.available: bool = False
|
|
self.muvera_client: Any = None
|
|
self.client: Any = None
|
|
|
|
# Initialize local Muvera
|
|
try:
|
|
self.muvera_client = MuveraWrapper(
|
|
vector_dimension=768, # Standard embedding dimension
|
|
cache_enabled=True,
|
|
batch_size=SEARCH_MAX_BATCH_SIZE,
|
|
)
|
|
self.available = True
|
|
logger.info(f"Local Muvera wrapper initialized - index: {MUVERA_INDEX_NAME}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Muvera: {e}")
|
|
self.available = False
|
|
|
|
async def info(self) -> dict:
|
|
"""Return information about search service"""
|
|
if not self.available:
|
|
return {"status": "disabled"}
|
|
try:
|
|
# Get Muvera service info
|
|
if self.muvera_client:
|
|
muvera_info = await self.muvera_client.info()
|
|
return {
|
|
"status": "enabled",
|
|
"provider": "muvera",
|
|
"mode": "local",
|
|
"muvera_info": muvera_info
|
|
}
|
|
return {"status": "error", "message": "Muvera client not available"}
|
|
except Exception:
|
|
logger.exception("Failed to get search info")
|
|
return {"status": "error", "message": "Failed to get search info"}
|
|
|
|
def is_ready(self) -> bool:
|
|
"""Check if service is available"""
|
|
return self.available
|
|
|
|
async def search(self, text: str, limit: int, offset: int) -> list:
|
|
"""Search documents using Muvera"""
|
|
if not self.available or not self.muvera_client:
|
|
return []
|
|
|
|
try:
|
|
logger.info(f"Muvera search for: '{text}' (limit={limit}, offset={offset})")
|
|
|
|
# Perform Muvera search
|
|
results = await self.muvera_client.search(
|
|
query=text,
|
|
limit=limit + offset, # Get enough results for pagination
|
|
)
|
|
|
|
# Format results to match your existing format
|
|
formatted_results = []
|
|
for result in results:
|
|
formatted_results.append(
|
|
{
|
|
"id": str(result.get("id", "")),
|
|
"score": result.get("score", 0.0),
|
|
"metadata": result.get("metadata", {}),
|
|
}
|
|
)
|
|
|
|
# Apply pagination
|
|
return formatted_results[offset : offset + limit]
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Muvera search failed for '{text}': {e}")
|
|
return []
|
|
|
|
async def search_authors(self, text: str, limit: int = 10, offset: int = 0) -> list:
|
|
"""Search only for authors using Muvera"""
|
|
if not self.available or not self.muvera_client or not text.strip():
|
|
return []
|
|
|
|
try:
|
|
logger.info(f"Muvera author search for: '{text}' (limit={limit}, offset={offset})")
|
|
|
|
# Use Muvera to search with author-specific filtering
|
|
results = await self.muvera_client.search(
|
|
query=text,
|
|
limit=limit + offset,
|
|
)
|
|
|
|
# Format results
|
|
author_results = []
|
|
for result in results:
|
|
author_results.append(
|
|
{
|
|
"id": str(result.get("id", "")),
|
|
"score": result.get("score", 0.0),
|
|
"metadata": result.get("metadata", {}),
|
|
}
|
|
)
|
|
|
|
# Apply pagination
|
|
return author_results[offset : offset + limit]
|
|
|
|
except Exception:
|
|
logger.exception(f"Error searching authors for '{text}'")
|
|
return []
|
|
|
|
def index(self, shout: Any) -> None:
|
|
"""Index a single document using Muvera"""
|
|
if not self.available or not self.muvera_client:
|
|
return
|
|
|
|
logger.info(f"Muvera indexing post {shout.id}")
|
|
# Start in background to not block
|
|
background_tasks.append(asyncio.create_task(self.perform_muvera_index(shout)))
|
|
|
|
async def perform_muvera_index(self, shout: Any) -> None:
|
|
"""Index a single document using Muvera"""
|
|
if not self.muvera_client:
|
|
return
|
|
|
|
try:
|
|
logger.info(f"Muvera indexing document {shout.id}")
|
|
|
|
# Prepare document data for Muvera
|
|
doc_data: Dict[str, Any] = {
|
|
"id": str(shout.id),
|
|
"title": getattr(shout, "title", "") or "",
|
|
"body": "",
|
|
"metadata": {},
|
|
}
|
|
|
|
# Combine body content
|
|
body_parts = []
|
|
for field_name in ["subtitle", "lead", "body"]:
|
|
field_value = getattr(shout, field_name, None)
|
|
if field_value and isinstance(field_value, str) and field_value.strip():
|
|
body_parts.append(field_value.strip())
|
|
|
|
# Process media content
|
|
media = getattr(shout, "media", None)
|
|
if media:
|
|
if isinstance(media, str):
|
|
try:
|
|
media_json = json.loads(media)
|
|
if isinstance(media_json, dict):
|
|
if "title" in media_json:
|
|
body_parts.append(media_json["title"])
|
|
if "body" in media_json:
|
|
body_parts.append(media_json["body"])
|
|
except json.JSONDecodeError:
|
|
body_parts.append(media)
|
|
elif isinstance(media, dict) and (media.get("title") or media.get("body")):
|
|
if media.get("title"):
|
|
body_parts.append(media["title"])
|
|
if media.get("body"):
|
|
body_parts.append(media["body"])
|
|
|
|
# Set body content
|
|
if body_parts:
|
|
doc_data["body"] = " ".join(body_parts)
|
|
|
|
# Add metadata
|
|
doc_data["metadata"] = {
|
|
"layout": getattr(shout, "layout", "article"),
|
|
"lang": getattr(shout, "lang", "ru"),
|
|
"created_at": getattr(shout, "created_at", 0),
|
|
"created_by": getattr(shout, "created_by", 0),
|
|
}
|
|
|
|
# Index with Muvera
|
|
await self.muvera_client.index(documents=[doc_data])
|
|
|
|
logger.info(f"Document {shout.id} indexed with Muvera successfully")
|
|
|
|
except Exception:
|
|
logger.exception(f"Muvera indexing error for shout {shout.id}")
|
|
|
|
async def bulk_index(self, shouts: list) -> None:
|
|
"""Index multiple documents using Muvera"""
|
|
if not self.available or not self.muvera_client or not shouts:
|
|
logger.warning(
|
|
f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}"
|
|
)
|
|
return
|
|
|
|
# Запускаем метрики индексации
|
|
search_metrics.start_indexing(len(shouts))
|
|
start_time = time.time()
|
|
logger.info(f"Starting Muvera bulk indexing of {len(shouts)} documents")
|
|
|
|
# Prepare documents for Muvera
|
|
documents: List[Dict[str, Any]] = []
|
|
total_skipped = 0
|
|
|
|
for shout in shouts:
|
|
try:
|
|
# Prepare document data for Muvera
|
|
doc_data: Dict[str, Any] = {
|
|
"id": str(getattr(shout, "id", "")),
|
|
"title": getattr(shout, "title", "") or "",
|
|
"body": "",
|
|
"metadata": {},
|
|
}
|
|
|
|
# Combine body content
|
|
body_parts = []
|
|
for field_name in ["subtitle", "lead", "body"]:
|
|
field_value = getattr(shout, field_name, None)
|
|
if field_value and isinstance(field_value, str) and field_value.strip():
|
|
body_parts.append(field_value.strip())
|
|
|
|
# Process media content
|
|
media = getattr(shout, "media", None)
|
|
if media:
|
|
if isinstance(media, str):
|
|
try:
|
|
media_json = json.loads(media)
|
|
if isinstance(media_json, dict):
|
|
if "title" in media_json:
|
|
body_parts.append(media_json["title"])
|
|
if "body" in media_json:
|
|
body_parts.append(media_json["body"])
|
|
except json.JSONDecodeError:
|
|
body_parts.append(media)
|
|
elif isinstance(media, dict) and (media.get("title") or media.get("body")):
|
|
if media.get("title"):
|
|
body_parts.append(media["title"])
|
|
if media.get("body"):
|
|
body_parts.append(media["body"])
|
|
|
|
# Set body content
|
|
if body_parts:
|
|
doc_data["body"] = " ".join(body_parts)
|
|
|
|
# Add metadata
|
|
doc_data["metadata"] = {
|
|
"layout": getattr(shout, "layout", "article"),
|
|
"lang": getattr(shout, "lang", "ru"),
|
|
"created_at": getattr(shout, "created_at", 0),
|
|
"created_by": getattr(shout, "created_by", 0),
|
|
}
|
|
|
|
documents.append(doc_data)
|
|
|
|
except Exception:
|
|
logger.exception(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing")
|
|
total_skipped += 1
|
|
|
|
if documents:
|
|
try:
|
|
# Index with Muvera
|
|
await self.muvera_client.index(documents=documents)
|
|
|
|
elapsed = time.time() - start_time
|
|
logger.info(
|
|
f"Muvera bulk indexing completed in {elapsed:.2f}s: "
|
|
f"{len(documents)} documents indexed, {total_skipped} shouts skipped"
|
|
)
|
|
# Завершаем метрики индексации
|
|
search_metrics.end_indexing()
|
|
except Exception as e:
|
|
logger.exception(f"Muvera bulk indexing failed: {e}")
|
|
else:
|
|
logger.warning("No documents to index")
|
|
|
|
async def verify_docs(self, doc_ids: list) -> dict:
|
|
"""Verify which documents exist in the search index using Muvera"""
|
|
if not self.available or not self.muvera_client:
|
|
return {"status": "disabled"}
|
|
|
|
try:
|
|
logger.info(f"Verifying {len(doc_ids)} documents in Muvera search index")
|
|
|
|
# Use Muvera to verify documents
|
|
verification_result = await self.muvera_client.verify_documents(doc_ids)
|
|
|
|
# Format result to match expected structure
|
|
missing_ids = verification_result.get("missing", [])
|
|
|
|
logger.info(
|
|
f"Document verification complete: {len(missing_ids)} documents missing out of {len(doc_ids)} total"
|
|
)
|
|
|
|
return {"missing": missing_ids, "details": {"missing_count": len(missing_ids), "total_count": len(doc_ids)}}
|
|
except Exception:
|
|
logger.exception("Document verification error")
|
|
return {"status": "error", "message": "Document verification error"}
|
|
|
|
async def check_index_status(self) -> dict:
|
|
"""Get detailed statistics about the search index health using Muvera"""
|
|
if not self.available or not self.muvera_client:
|
|
return {"status": "disabled"}
|
|
|
|
try:
|
|
# Get Muvera index status
|
|
index_status = await self.muvera_client.get_index_status()
|
|
|
|
# Check for consistency issues
|
|
if index_status.get("consistency", {}).get("status") != "ok":
|
|
null_count = index_status.get("consistency", {}).get("null_embeddings_count", 0)
|
|
if null_count > 0:
|
|
logger.warning(f"Found {null_count} documents with NULL embeddings")
|
|
|
|
return index_status
|
|
except Exception:
|
|
logger.exception("Failed to check index status")
|
|
return {"status": "error", "message": "Failed to check index status"}
|
|
|
|
async def close(self) -> None:
|
|
"""Close connections and release resources"""
|
|
if hasattr(self, "muvera_client") and self.muvera_client:
|
|
try:
|
|
await self.muvera_client.close()
|
|
logger.info("Local Muvera client closed")
|
|
except Exception as e:
|
|
logger.warning(f"Error closing Muvera client: {e}")
|
|
logger.info("Search service closed")
|
|
|
|
|
|
# Create the search service singleton
|
|
search_service = SearchService()
|
|
|
|
|
|
# API-compatible functions for backward compatibility
|
|
async def search_text(text: str, limit: int = 200, offset: int = 0) -> list:
|
|
"""Search text using Muvera - backward compatibility function"""
|
|
if search_service.available:
|
|
return await search_service.search(text, limit, offset)
|
|
return []
|
|
|
|
|
|
async def search_author_text(text: str, limit: int = 10, offset: int = 0) -> list:
|
|
"""Search authors using Muvera - backward compatibility function"""
|
|
if search_service.available:
|
|
return await search_service.search_authors(text, limit, offset)
|
|
return []
|
|
|
|
|
|
async def get_search_count(text: str) -> int:
|
|
"""Get count of search results - backward compatibility function"""
|
|
if not search_service.available:
|
|
return 0
|
|
# Get results and count them
|
|
results = await search_text(text, SEARCH_PREFETCH_SIZE, 0)
|
|
return len(results)
|
|
|
|
|
|
async def get_author_search_count(text: str) -> int:
|
|
"""Get count of author search results - backward compatibility function"""
|
|
if not search_service.available:
|
|
return 0
|
|
# Get results and count them
|
|
results = await search_author_text(text, SEARCH_PREFETCH_SIZE, 0)
|
|
return len(results)
|
|
|
|
|
|
async def initialize_search_index(shouts_data: list) -> None:
|
|
"""Initialize search index with existing data - backward compatibility function"""
|
|
if not search_service.available:
|
|
logger.warning("Search service not available for initialization")
|
|
return
|
|
|
|
try:
|
|
# Check if we need to reindex
|
|
if len(shouts_data) > 0:
|
|
await search_service.bulk_index(shouts_data)
|
|
logger.info(f"Initialized search index with {len(shouts_data)} documents")
|
|
except Exception as e:
|
|
logger.exception(f"Failed to initialize search index: {e}")
|
|
|
|
|
|
async def check_search_service() -> None:
|
|
"""Check if search service is available - backward compatibility function"""
|
|
if search_service.available:
|
|
logger.info("Search service is available and ready")
|
|
else:
|
|
logger.warning("Search service is not available")
|
|
|
|
|
|
async def initialize_search_index_background() -> None:
|
|
"""Initialize search index in background - backward compatibility function"""
|
|
try:
|
|
logger.info("Background search index initialization started")
|
|
# This function is kept for compatibility but doesn't do much
|
|
# since Muvera handles indexing automatically
|
|
logger.info("Background search index initialization completed")
|
|
except Exception:
|
|
logger.exception("Error in background search index initialization")
|