Files
core/services/search.py
Untone b4f683a7cc
Some checks failed
Deploy on push / deploy (push) Failing after 36s
fmt
2025-08-23 10:47:52 +03:00

396 lines
15 KiB
Python

import asyncio
import json
import time
from typing import Any, Dict, List
import muvera
from settings import SEARCH_MAX_BATCH_SIZE, SEARCH_PREFETCH_SIZE
from utils.logger import root_logger as logger
# Global collection for background tasks
background_tasks: List[asyncio.Task] = []
class SearchService:
def __init__(self) -> None:
self.available: bool = False
self.muvera_client: Any = None
# Initialize Muvera
try:
# Initialize Muvera client with your configuration
self.muvera_client = muvera.Client(
vector_dimension=768, # Standard embedding dimension
cache_enabled=True,
batch_size=SEARCH_MAX_BATCH_SIZE,
)
self.available = True
logger.info("Muvera client initialized successfully - enhanced search enabled")
except Exception as e:
logger.error(f"Failed to initialize Muvera: {e}")
self.available = False
async def info(self) -> dict:
"""Return information about search service"""
if not self.available:
return {"status": "disabled"}
try:
# Get Muvera service info
if self.muvera_client:
muvera_info = await self.muvera_client.info()
return {"status": "enabled", "provider": "muvera", "muvera_info": muvera_info}
return {"status": "error", "message": "Muvera client not available"}
except Exception:
logger.exception("Failed to get search info")
return {"status": "error", "message": "Failed to get search info"}
def is_ready(self) -> bool:
"""Check if service is available"""
return self.available
async def search(self, text: str, limit: int, offset: int) -> list:
"""Search documents using Muvera"""
if not self.available or not self.muvera_client:
return []
try:
logger.info(f"Muvera search for: '{text}' (limit={limit}, offset={offset})")
# Perform Muvera search
results = await self.muvera_client.search(
query=text,
limit=limit + offset, # Get enough results for pagination
include_metadata=True,
)
# Format results to match your existing format
formatted_results = []
for result in results:
formatted_results.append(
{
"id": str(result.get("id", "")),
"score": result.get("score", 0.0),
"metadata": result.get("metadata", {}),
}
)
# Apply pagination
return formatted_results[offset : offset + limit]
except Exception as e:
logger.exception(f"Muvera search failed for '{text}': {e}")
return []
async def search_authors(self, text: str, limit: int = 10, offset: int = 0) -> list:
"""Search only for authors using Muvera"""
if not self.available or not self.muvera_client or not text.strip():
return []
try:
logger.info(f"Muvera author search for: '{text}' (limit={limit}, offset={offset})")
# Use Muvera to search with author-specific filtering
results = await self.muvera_client.search(
query=text,
limit=limit + offset,
include_metadata=True,
filter_type="author", # Assuming Muvera supports content type filtering
)
# Format results
author_results = []
for result in results:
author_results.append(
{
"id": str(result.get("id", "")),
"score": result.get("score", 0.0),
"metadata": result.get("metadata", {}),
}
)
# Apply pagination
return author_results[offset : offset + limit]
except Exception:
logger.exception(f"Error searching authors for '{text}'")
return []
def index(self, shout: Any) -> None:
"""Index a single document using Muvera"""
if not self.available or not self.muvera_client:
return
logger.info(f"Muvera indexing post {shout.id}")
# Start in background to not block
background_tasks.append(asyncio.create_task(self.perform_muvera_index(shout)))
async def perform_muvera_index(self, shout: Any) -> None:
"""Index a single document using Muvera"""
if not self.muvera_client:
return
try:
logger.info(f"Muvera indexing document {shout.id}")
# Prepare document data for Muvera
doc_data: Dict[str, Any] = {
"id": str(shout.id),
"title": getattr(shout, "title", "") or "",
"body": "",
"metadata": {},
}
# Combine body content
body_parts = []
for field_name in ["subtitle", "lead", "body"]:
field_value = getattr(shout, field_name, None)
if field_value and isinstance(field_value, str) and field_value.strip():
body_parts.append(field_value.strip())
# Process media content
media = getattr(shout, "media", None)
if media:
if isinstance(media, str):
try:
media_json = json.loads(media)
if isinstance(media_json, dict):
if "title" in media_json:
body_parts.append(media_json["title"])
if "body" in media_json:
body_parts.append(media_json["body"])
except json.JSONDecodeError:
body_parts.append(media)
elif isinstance(media, dict) and (media.get("title") or media.get("body")):
if media.get("title"):
body_parts.append(media["title"])
if media.get("body"):
body_parts.append(media["body"])
# Set body content
if body_parts:
doc_data["body"] = " ".join(body_parts)
# Add metadata
doc_data["metadata"] = {
"layout": getattr(shout, "layout", "article"),
"lang": getattr(shout, "lang", "ru"),
"created_at": getattr(shout, "created_at", 0),
"created_by": getattr(shout, "created_by", 0),
}
# Index with Muvera
await self.muvera_client.index(documents=[doc_data], batch_size=1)
logger.info(f"Document {shout.id} indexed with Muvera successfully")
except Exception:
logger.exception(f"Muvera indexing error for shout {shout.id}")
async def bulk_index(self, shouts: list) -> None:
"""Index multiple documents using Muvera"""
if not self.available or not self.muvera_client or not shouts:
logger.warning(
f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}"
)
return
start_time = time.time()
logger.info(f"Starting Muvera bulk indexing of {len(shouts)} documents")
# Prepare documents for Muvera
documents: List[Dict[str, Any]] = []
total_skipped = 0
for shout in shouts:
try:
# Prepare document data for Muvera
doc_data: Dict[str, Any] = {
"id": str(shout.id),
"title": getattr(shout, "title", "") or "",
"body": "",
"metadata": {},
}
# Combine body content
body_parts = []
for field_name in ["subtitle", "lead", "body"]:
field_value = getattr(shout, field_name, None)
if field_value and isinstance(field_value, str) and field_value.strip():
body_parts.append(field_value.strip())
# Process media content
media = getattr(shout, "media", None)
if media:
if isinstance(media, str):
try:
media_json = json.loads(media)
if isinstance(media_json, dict):
if "title" in media_json:
body_parts.append(media_json["title"])
if "body" in media_json:
body_parts.append(media_json["body"])
except json.JSONDecodeError:
body_parts.append(media)
elif isinstance(media, dict) and (media.get("title") or media.get("body")):
if media.get("title"):
body_parts.append(media["title"])
if media.get("body"):
body_parts.append(media["body"])
# Set body content
if body_parts:
doc_data["body"] = " ".join(body_parts)
# Add metadata
doc_data["metadata"] = {
"layout": getattr(shout, "layout", "article"),
"lang": getattr(shout, "lang", "ru"),
"created_at": getattr(shout, "created_at", 0),
"created_by": getattr(shout, "created_by", 0),
}
documents.append(doc_data)
except Exception:
logger.exception(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing")
total_skipped += 1
if documents:
try:
# Index with Muvera
await self.muvera_client.index(documents=documents, batch_size=SEARCH_MAX_BATCH_SIZE)
elapsed = time.time() - start_time
logger.info(
f"Muvera bulk indexing completed in {elapsed:.2f}s: "
f"{len(documents)} documents indexed, {total_skipped} shouts skipped"
)
except Exception as e:
logger.exception(f"Muvera bulk indexing failed: {e}")
else:
logger.warning("No documents to index")
async def verify_docs(self, doc_ids: list) -> dict:
"""Verify which documents exist in the search index using Muvera"""
if not self.available or not self.muvera_client:
return {"status": "disabled"}
try:
logger.info(f"Verifying {len(doc_ids)} documents in Muvera search index")
# Use Muvera to verify documents
verification_result = await self.muvera_client.verify_documents(doc_ids)
# Format result to match expected structure
missing_ids = verification_result.get("missing", [])
logger.info(
f"Document verification complete: {len(missing_ids)} documents missing out of {len(doc_ids)} total"
)
return {"missing": missing_ids, "details": {"missing_count": len(missing_ids), "total_count": len(doc_ids)}}
except Exception:
logger.exception("Document verification error")
return {"status": "error", "message": "Document verification error"}
async def check_index_status(self) -> dict:
"""Get detailed statistics about the search index health using Muvera"""
if not self.available or not self.muvera_client:
return {"status": "disabled"}
try:
# Get Muvera index status
index_status = await self.muvera_client.get_index_status()
# Check for consistency issues
if index_status.get("consistency", {}).get("status") != "ok":
null_count = index_status.get("consistency", {}).get("null_embeddings_count", 0)
if null_count > 0:
logger.warning(f"Found {null_count} documents with NULL embeddings")
return index_status
except Exception:
logger.exception("Failed to check index status")
return {"status": "error", "message": "Failed to check index status"}
async def close(self) -> None:
"""Close connections and release resources"""
if hasattr(self, "muvera_client") and self.muvera_client:
try:
await self.muvera_client.close()
except Exception as e:
logger.warning(f"Error closing Muvera client: {e}")
logger.info("Search service closed")
# Create the search service singleton
search_service = SearchService()
# API-compatible functions for backward compatibility
async def search_text(text: str, limit: int = 200, offset: int = 0) -> list:
"""Search text using Muvera - backward compatibility function"""
if search_service.available:
return await search_service.search(text, limit, offset)
return []
async def search_author_text(text: str, limit: int = 10, offset: int = 0) -> list:
"""Search authors using Muvera - backward compatibility function"""
if search_service.available:
return await search_service.search_authors(text, limit, offset)
return []
async def get_search_count(text: str) -> int:
"""Get count of search results - backward compatibility function"""
if not search_service.available:
return 0
# Get results and count them
results = await search_text(text, SEARCH_PREFETCH_SIZE, 0)
return len(results)
async def get_author_search_count(text: str) -> int:
"""Get count of author search results - backward compatibility function"""
if not search_service.available:
return 0
# Get results and count them
results = await search_author_text(text, SEARCH_PREFETCH_SIZE, 0)
return len(results)
async def initialize_search_index(shouts_data: list) -> None:
"""Initialize search index with existing data - backward compatibility function"""
if not search_service.available:
logger.warning("Search service not available for initialization")
return
try:
# Check if we need to reindex
if len(shouts_data) > 0:
await search_service.bulk_index(shouts_data)
logger.info(f"Initialized search index with {len(shouts_data)} documents")
except Exception as e:
logger.exception(f"Failed to initialize search index: {e}")
async def check_search_service() -> None:
"""Check if search service is available - backward compatibility function"""
if search_service.available:
logger.info("Search service is available and ready")
else:
logger.warning("Search service is not available")
async def initialize_search_index_background() -> None:
"""Initialize search index in background - backward compatibility function"""
try:
logger.info("Background search index initialization started")
# This function is kept for compatibility but doesn't do much
# since Muvera handles indexing automatically
logger.info("Background search index initialization completed")
except Exception:
logger.exception("Error in background search index initialization")