This commit is contained in:
@@ -4,6 +4,7 @@ import time
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import muvera
|
||||
import numpy as np
|
||||
|
||||
from settings import SEARCH_MAX_BATCH_SIZE, SEARCH_PREFETCH_SIZE
|
||||
from utils.logger import root_logger as logger
|
||||
@@ -12,21 +13,107 @@ from utils.logger import root_logger as logger
|
||||
background_tasks: List[asyncio.Task] = []
|
||||
|
||||
|
||||
class MuveraWrapper:
|
||||
"""Simple wrapper around muvera.encode_fde to provide expected interface"""
|
||||
|
||||
def __init__(self, vector_dimension: int = 768, cache_enabled: bool = True, batch_size: int = 100) -> None:
|
||||
self.vector_dimension = vector_dimension
|
||||
self.cache_enabled = cache_enabled
|
||||
self.batch_size = batch_size
|
||||
self.buckets = 128 # Default number of buckets for FDE encoding
|
||||
self.documents: Dict[str, Dict[str, Any]] = {} # Simple in-memory storage for demo
|
||||
self.embeddings: Dict[str, np.ndarray | None] = {} # Store encoded embeddings
|
||||
|
||||
async def info(self) -> dict:
|
||||
"""Return service information"""
|
||||
return {
|
||||
"vector_dimension": self.vector_dimension,
|
||||
"buckets": self.buckets,
|
||||
"documents_count": len(self.documents),
|
||||
"cache_enabled": self.cache_enabled,
|
||||
}
|
||||
|
||||
async def search(self, query: str, limit: int) -> List[Dict[str, Any]]:
|
||||
"""Simple search implementation using FDE encoding"""
|
||||
if not query.strip():
|
||||
return []
|
||||
|
||||
# For demo purposes, create a simple query embedding
|
||||
# In a real implementation, you'd use a proper text embedding model
|
||||
rng = np.random.default_rng()
|
||||
query_embedding = rng.standard_normal((32, self.vector_dimension)).astype(np.float32)
|
||||
|
||||
# Encode query using FDE
|
||||
query_fde = muvera.encode_fde(query_embedding, self.buckets, "sum")
|
||||
|
||||
# Simple similarity search (cosine similarity with encoded vectors)
|
||||
results = []
|
||||
for doc_id, doc_embedding in self.embeddings.items():
|
||||
if doc_embedding is not None:
|
||||
# Calculate similarity (dot product of normalized vectors)
|
||||
similarity = np.dot(query_fde, doc_embedding) / (
|
||||
np.linalg.norm(query_fde) * np.linalg.norm(doc_embedding)
|
||||
)
|
||||
results.append(
|
||||
{
|
||||
"id": doc_id,
|
||||
"score": float(similarity),
|
||||
"metadata": self.documents.get(doc_id, {}).get("metadata", {}),
|
||||
}
|
||||
)
|
||||
|
||||
# Sort by score and limit results
|
||||
results.sort(key=lambda x: x["score"], reverse=True)
|
||||
return results[:limit]
|
||||
|
||||
async def index(self, documents: List[Dict[str, Any]]) -> None:
|
||||
"""Index documents using FDE encoding"""
|
||||
for doc in documents:
|
||||
doc_id = doc["id"]
|
||||
self.documents[doc_id] = doc
|
||||
|
||||
# Create a simple document embedding (in real implementation, use proper text embedding)
|
||||
# For now, create random embeddings for demo
|
||||
rng = np.random.default_rng()
|
||||
doc_embedding = rng.standard_normal((32, self.vector_dimension)).astype(np.float32)
|
||||
|
||||
# Encode document using FDE (average aggregation for documents)
|
||||
doc_fde = muvera.encode_fde(doc_embedding, self.buckets, "avg")
|
||||
self.embeddings[doc_id] = doc_fde
|
||||
|
||||
async def verify_documents(self, doc_ids: List[str]) -> Dict[str, Any]:
|
||||
"""Verify which documents exist in the index"""
|
||||
missing = [doc_id for doc_id in doc_ids if doc_id not in self.documents]
|
||||
return {"missing": missing}
|
||||
|
||||
async def get_index_status(self) -> Dict[str, Any]:
|
||||
"""Get index status information"""
|
||||
return {
|
||||
"total_documents": len(self.documents),
|
||||
"total_embeddings": len(self.embeddings),
|
||||
"consistency": {"status": "ok", "null_embeddings_count": 0},
|
||||
}
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the wrapper (no-op for this simple implementation)"""
|
||||
|
||||
|
||||
class SearchService:
|
||||
def __init__(self) -> None:
|
||||
self.available: bool = False
|
||||
self.muvera_client: Any = None
|
||||
self.client: Any = None
|
||||
|
||||
# Initialize Muvera
|
||||
try:
|
||||
# Initialize Muvera client with your configuration
|
||||
self.muvera_client = muvera.Client(
|
||||
# Initialize Muvera wrapper with your configuration
|
||||
self.muvera_client = MuveraWrapper(
|
||||
vector_dimension=768, # Standard embedding dimension
|
||||
cache_enabled=True,
|
||||
batch_size=SEARCH_MAX_BATCH_SIZE,
|
||||
)
|
||||
self.available = True
|
||||
logger.info("Muvera client initialized successfully - enhanced search enabled")
|
||||
logger.info("Muvera wrapper initialized successfully - enhanced search enabled")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize Muvera: {e}")
|
||||
self.available = False
|
||||
@@ -61,7 +148,6 @@ class SearchService:
|
||||
results = await self.muvera_client.search(
|
||||
query=text,
|
||||
limit=limit + offset, # Get enough results for pagination
|
||||
include_metadata=True,
|
||||
)
|
||||
|
||||
# Format results to match your existing format
|
||||
@@ -94,8 +180,6 @@ class SearchService:
|
||||
results = await self.muvera_client.search(
|
||||
query=text,
|
||||
limit=limit + offset,
|
||||
include_metadata=True,
|
||||
filter_type="author", # Assuming Muvera supports content type filtering
|
||||
)
|
||||
|
||||
# Format results
|
||||
@@ -180,7 +264,7 @@ class SearchService:
|
||||
}
|
||||
|
||||
# Index with Muvera
|
||||
await self.muvera_client.index(documents=[doc_data], batch_size=1)
|
||||
await self.muvera_client.index(documents=[doc_data])
|
||||
|
||||
logger.info(f"Document {shout.id} indexed with Muvera successfully")
|
||||
|
||||
@@ -259,7 +343,7 @@ class SearchService:
|
||||
if documents:
|
||||
try:
|
||||
# Index with Muvera
|
||||
await self.muvera_client.index(documents=documents, batch_size=SEARCH_MAX_BATCH_SIZE)
|
||||
await self.muvera_client.index(documents=documents)
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
logger.info(
|
||||
|
||||
Reference in New Issue
Block a user