embedding-search
Some checks failed
Deploy on push / deploy (push) Failing after 22m28s

This commit is contained in:
2025-08-31 19:20:43 +03:00
parent 7325cdc5f5
commit db3dafa569
8 changed files with 1197 additions and 49 deletions

View File

@@ -5,6 +5,7 @@ from typing import Any, Dict, List
import muvera
import numpy as np
from sentence_transformers import SentenceTransformer
from settings import MUVERA_INDEX_NAME, SEARCH_MAX_BATCH_SIZE, SEARCH_PREFETCH_SIZE
from utils.logger import root_logger as logger
@@ -14,16 +15,32 @@ background_tasks: List[asyncio.Task] = []
class MuveraWrapper:
"""Simple wrapper around muvera.encode_fde to provide expected interface"""
"""🔍 Real vector search with SentenceTransformers + FDE encoding"""
def __init__(self, vector_dimension: int = 768, cache_enabled: bool = True, batch_size: int = 100) -> None:
self.vector_dimension = vector_dimension
self.cache_enabled = cache_enabled
self.batch_size = batch_size
self.encoder: Any = None
self.buckets = 128 # Default number of buckets for FDE encoding
self.documents: Dict[str, Dict[str, Any]] = {} # Simple in-memory storage for demo
self.embeddings: Dict[str, np.ndarray | None] = {} # Store encoded embeddings
# 🚀 Инициализируем реальную модель эмбедингов
try:
# Используем многоязычную модель, хорошо работающую с русским
self.encoder = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2")
logger.info("🔍 SentenceTransformer model loaded successfully")
except Exception as e:
logger.error(f"Failed to load SentenceTransformer: {e}")
# Fallback - простая модель
try:
self.encoder = SentenceTransformer("all-MiniLM-L6-v2")
logger.info("🔍 Fallback SentenceTransformer model loaded")
except Exception:
logger.error("Failed to load any SentenceTransformer model")
self.encoder = None
async def info(self) -> dict:
"""Return service information"""
return {
@@ -34,53 +51,144 @@ class MuveraWrapper:
}
async def search(self, query: str, limit: int) -> List[Dict[str, Any]]:
"""Simple search implementation using FDE encoding with deterministic results"""
if not query.strip():
"""🔍 Real vector search using SentenceTransformers + FDE encoding"""
if not query.strip() or not self.encoder:
return []
# Create deterministic query embedding based on query hash
query_hash = hash(query.strip().lower())
rng = np.random.default_rng(seed=query_hash & 0x7FFFFFFF) # Use positive seed
query_embedding = rng.standard_normal((32, self.vector_dimension)).astype(np.float32)
try:
# 🚀 Генерируем настоящий эмбединг запроса
query_text = query.strip()
query_embedding = self.encoder.encode(query_text, convert_to_numpy=True)
# Encode query using FDE
query_fde = muvera.encode_fde(query_embedding, self.buckets, "sum")
# Нормализуем размерность для FDE
if query_embedding.ndim == 1:
query_embedding = query_embedding.reshape(1, -1)
# Simple similarity search (cosine similarity with encoded vectors)
results = []
for doc_id, doc_embedding in self.embeddings.items():
if doc_embedding is not None:
# Calculate similarity (dot product of normalized vectors)
similarity = np.dot(query_fde, doc_embedding) / (
np.linalg.norm(query_fde) * np.linalg.norm(doc_embedding)
)
results.append(
{
"id": doc_id,
"score": float(similarity),
"metadata": self.documents.get(doc_id, {}).get("metadata", {}),
}
)
# Encode query using FDE
query_fde = muvera.encode_fde(query_embedding, self.buckets, "avg")
# Sort by score and limit results - добавляем сортировку по ID для стабильности
results.sort(key=lambda x: (x["score"], x["id"]), reverse=True)
return results[:limit]
# 🔍 Semantic similarity search
results = []
for doc_id, doc_embedding in self.embeddings.items():
if doc_embedding is not None:
# Calculate cosine similarity
similarity = np.dot(query_fde, doc_embedding) / (
np.linalg.norm(query_fde) * np.linalg.norm(doc_embedding) + 1e-8
)
results.append(
{
"id": doc_id,
"score": float(similarity),
"metadata": self.documents.get(doc_id, {}).get("metadata", {}),
}
)
async def index(self, documents: List[Dict[str, Any]]) -> None:
"""Index documents using FDE encoding with deterministic embeddings"""
for doc in documents:
doc_id = doc["id"]
self.documents[doc_id] = doc
# Sort by score and limit results
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
# Create deterministic document embedding based on document content hash
doc_content = f"{doc.get('title', '')} {doc.get('body', '')}"
content_hash = hash(doc_content.strip().lower() + str(doc_id))
rng = np.random.default_rng(seed=content_hash & 0x7FFFFFFF) # Use positive seed
doc_embedding = rng.standard_normal((32, self.vector_dimension)).astype(np.float32)
except Exception as e:
logger.error(f"🔍 Search error: {e}")
return []
# Encode document using FDE (average aggregation for documents)
doc_fde = muvera.encode_fde(doc_embedding, self.buckets, "avg")
self.embeddings[doc_id] = doc_fde
async def index(self, documents: List[Dict[str, Any]], silent: bool = False) -> None:
"""🚀 Index documents using real SentenceTransformers + FDE encoding"""
if not self.encoder:
if not silent:
logger.warning("🔍 No encoder available for indexing")
return
# 🤫 Batch mode detection
is_batch = len(documents) > 10
indexed_count = 0
skipped_count = 0
if is_batch:
# 🚀 Batch processing for better performance
valid_docs = []
doc_contents = []
for doc in documents:
doc_id = doc["id"]
self.documents[doc_id] = doc
title = doc.get("title", "").strip()
body = doc.get("body", "").strip()
doc_content = f"{title} {body}".strip()
if doc_content:
valid_docs.append(doc)
doc_contents.append(doc_content)
else:
skipped_count += 1
if doc_contents:
try:
# 🚀 Batch encode all documents at once
batch_embeddings = self.encoder.encode(
doc_contents, convert_to_numpy=True, show_progress_bar=not silent, batch_size=32
)
# Process each embedding
for doc, embedding in zip(valid_docs, batch_embeddings, strict=False):
emb = embedding
doc_id = doc["id"]
# Нормализуем размерность для FDE
if emb.ndim == 1:
emb = emb.reshape(1, -1)
# Encode using FDE
doc_fde = muvera.encode_fde(emb, self.buckets, "avg")
self.embeddings[doc_id] = doc_fde
indexed_count += 1
except Exception as e:
if not silent:
logger.error(f"🔍 Batch encoding error: {e}")
return
else:
# 🔍 Single document processing
for doc in documents:
try:
doc_id = doc["id"]
self.documents[doc_id] = doc
title = doc.get("title", "").strip()
body = doc.get("body", "").strip()
doc_content = f"{title} {body}".strip()
if not doc_content:
if not silent:
logger.warning(f"🔍 Empty content for document {doc_id}")
skipped_count += 1
continue
# 🚀 Single document encoding
doc_embedding = self.encoder.encode(doc_content, convert_to_numpy=True, show_progress_bar=False)
if doc_embedding.ndim == 1:
doc_embedding = doc_embedding.reshape(1, -1)
doc_fde = muvera.encode_fde(doc_embedding, self.buckets, "avg")
self.embeddings[doc_id] = doc_fde
indexed_count += 1
if not silent:
logger.debug(f"🔍 Indexed document {doc_id} with content length {len(doc_content)}")
except Exception as e:
if not silent:
logger.error(f"🔍 Indexing error for document {doc.get('id', 'unknown')}: {e}")
skipped_count += 1
continue
# 🔍 Final statistics
if not silent:
if is_batch:
logger.info(f"🚀 Batch indexed {indexed_count} documents, skipped {skipped_count}")
elif indexed_count > 0:
logger.debug(f"🔍 Indexed {indexed_count} documents")
async def verify_documents(self, doc_ids: List[str]) -> Dict[str, Any]:
"""Verify which documents exist in the index"""
@@ -264,10 +372,10 @@ class SearchService:
"created_by": getattr(shout, "created_by", 0),
}
# Index with Muvera
await self.muvera_client.index(documents=[doc_data])
# Index with Muvera (single document = verbose mode)
await self.muvera_client.index(documents=[doc_data], silent=False)
logger.info(f"Document {shout.id} indexed with Muvera successfully")
logger.info(f"🚀 Document {shout.id} indexed successfully")
except Exception:
logger.exception(f"Muvera indexing error for shout {shout.id}")
@@ -344,12 +452,12 @@ class SearchService:
if documents:
try:
# Index with Muvera
await self.muvera_client.index(documents=documents)
# 🤫 Index with Muvera in silent mode for batch operations
await self.muvera_client.index(documents=documents, silent=True)
elapsed = time.time() - start_time
logger.info(
f"Muvera bulk indexing completed in {elapsed:.2f}s: "
f"🚀 Bulk indexing completed in {elapsed:.2f}s: "
f"{len(documents)} documents indexed, {total_skipped} shouts skipped"
)
except Exception as e: