import asyncio import json import time from typing import Any, Dict, List import muvera from settings import SEARCH_MAX_BATCH_SIZE, SEARCH_PREFETCH_SIZE from utils.logger import root_logger as logger # Global collection for background tasks background_tasks: List[asyncio.Task] = [] class SearchService: def __init__(self) -> None: self.available: bool = False self.muvera_client: Any = None # Initialize Muvera try: # Initialize Muvera client with your configuration self.muvera_client = muvera.Client( vector_dimension=768, # Standard embedding dimension cache_enabled=True, batch_size=SEARCH_MAX_BATCH_SIZE, ) self.available = True logger.info("Muvera client initialized successfully - enhanced search enabled") except Exception as e: logger.error(f"Failed to initialize Muvera: {e}") self.available = False async def info(self) -> dict: """Return information about search service""" if not self.available: return {"status": "disabled"} try: # Get Muvera service info if self.muvera_client: muvera_info = await self.muvera_client.info() return {"status": "enabled", "provider": "muvera", "muvera_info": muvera_info} return {"status": "error", "message": "Muvera client not available"} except Exception: logger.exception("Failed to get search info") return {"status": "error", "message": "Failed to get search info"} def is_ready(self) -> bool: """Check if service is available""" return self.available async def search(self, text: str, limit: int, offset: int) -> list: """Search documents using Muvera""" if not self.available or not self.muvera_client: return [] try: logger.info(f"Muvera search for: '{text}' (limit={limit}, offset={offset})") # Perform Muvera search results = await self.muvera_client.search( query=text, limit=limit + offset, # Get enough results for pagination include_metadata=True, ) # Format results to match your existing format formatted_results = [] for result in results: formatted_results.append( { "id": str(result.get("id", "")), "score": result.get("score", 0.0), "metadata": result.get("metadata", {}), } ) # Apply pagination return formatted_results[offset : offset + limit] except Exception as e: logger.exception(f"Muvera search failed for '{text}': {e}") return [] async def search_authors(self, text: str, limit: int = 10, offset: int = 0) -> list: """Search only for authors using Muvera""" if not self.available or not self.muvera_client or not text.strip(): return [] try: logger.info(f"Muvera author search for: '{text}' (limit={limit}, offset={offset})") # Use Muvera to search with author-specific filtering results = await self.muvera_client.search( query=text, limit=limit + offset, include_metadata=True, filter_type="author", # Assuming Muvera supports content type filtering ) # Format results author_results = [] for result in results: author_results.append( { "id": str(result.get("id", "")), "score": result.get("score", 0.0), "metadata": result.get("metadata", {}), } ) # Apply pagination return author_results[offset : offset + limit] except Exception: logger.exception(f"Error searching authors for '{text}'") return [] def index(self, shout: Any) -> None: """Index a single document using Muvera""" if not self.available or not self.muvera_client: return logger.info(f"Muvera indexing post {shout.id}") # Start in background to not block background_tasks.append(asyncio.create_task(self.perform_muvera_index(shout))) async def perform_muvera_index(self, shout: Any) -> None: """Index a single document using Muvera""" if not self.muvera_client: return try: logger.info(f"Muvera indexing document {shout.id}") # Prepare document data for Muvera doc_data: Dict[str, Any] = { "id": str(shout.id), "title": getattr(shout, "title", "") or "", "body": "", "metadata": {}, } # Combine body content body_parts = [] for field_name in ["subtitle", "lead", "body"]: field_value = getattr(shout, field_name, None) if field_value and isinstance(field_value, str) and field_value.strip(): body_parts.append(field_value.strip()) # Process media content media = getattr(shout, "media", None) if media: if isinstance(media, str): try: media_json = json.loads(media) if isinstance(media_json, dict): if "title" in media_json: body_parts.append(media_json["title"]) if "body" in media_json: body_parts.append(media_json["body"]) except json.JSONDecodeError: body_parts.append(media) elif isinstance(media, dict) and (media.get("title") or media.get("body")): if media.get("title"): body_parts.append(media["title"]) if media.get("body"): body_parts.append(media["body"]) # Set body content if body_parts: doc_data["body"] = " ".join(body_parts) # Add metadata doc_data["metadata"] = { "layout": getattr(shout, "layout", "article"), "lang": getattr(shout, "lang", "ru"), "created_at": getattr(shout, "created_at", 0), "created_by": getattr(shout, "created_by", 0), } # Index with Muvera await self.muvera_client.index(documents=[doc_data], batch_size=1) logger.info(f"Document {shout.id} indexed with Muvera successfully") except Exception: logger.exception(f"Muvera indexing error for shout {shout.id}") async def bulk_index(self, shouts: list) -> None: """Index multiple documents using Muvera""" if not self.available or not self.muvera_client or not shouts: logger.warning( f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}" ) return start_time = time.time() logger.info(f"Starting Muvera bulk indexing of {len(shouts)} documents") # Prepare documents for Muvera documents: List[Dict[str, Any]] = [] total_skipped = 0 for shout in shouts: try: # Prepare document data for Muvera doc_data: Dict[str, Any] = { "id": str(shout.id), "title": getattr(shout, "title", "") or "", "body": "", "metadata": {}, } # Combine body content body_parts = [] for field_name in ["subtitle", "lead", "body"]: field_value = getattr(shout, field_name, None) if field_value and isinstance(field_value, str) and field_value.strip(): body_parts.append(field_value.strip()) # Process media content media = getattr(shout, "media", None) if media: if isinstance(media, str): try: media_json = json.loads(media) if isinstance(media_json, dict): if "title" in media_json: body_parts.append(media_json["title"]) if "body" in media_json: body_parts.append(media_json["body"]) except json.JSONDecodeError: body_parts.append(media) elif isinstance(media, dict) and (media.get("title") or media.get("body")): if media.get("title"): body_parts.append(media["title"]) if media.get("body"): body_parts.append(media["body"]) # Set body content if body_parts: doc_data["body"] = " ".join(body_parts) # Add metadata doc_data["metadata"] = { "layout": getattr(shout, "layout", "article"), "lang": getattr(shout, "lang", "ru"), "created_at": getattr(shout, "created_at", 0), "created_by": getattr(shout, "created_by", 0), } documents.append(doc_data) except Exception: logger.exception(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing") total_skipped += 1 if documents: try: # Index with Muvera await self.muvera_client.index(documents=documents, batch_size=SEARCH_MAX_BATCH_SIZE) elapsed = time.time() - start_time logger.info( f"Muvera bulk indexing completed in {elapsed:.2f}s: " f"{len(documents)} documents indexed, {total_skipped} shouts skipped" ) except Exception as e: logger.exception(f"Muvera bulk indexing failed: {e}") else: logger.warning("No documents to index") async def verify_docs(self, doc_ids: list) -> dict: """Verify which documents exist in the search index using Muvera""" if not self.available or not self.muvera_client: return {"status": "disabled"} try: logger.info(f"Verifying {len(doc_ids)} documents in Muvera search index") # Use Muvera to verify documents verification_result = await self.muvera_client.verify_documents(doc_ids) # Format result to match expected structure missing_ids = verification_result.get("missing", []) logger.info( f"Document verification complete: {len(missing_ids)} documents missing out of {len(doc_ids)} total" ) return {"missing": missing_ids, "details": {"missing_count": len(missing_ids), "total_count": len(doc_ids)}} except Exception: logger.exception("Document verification error") return {"status": "error", "message": "Document verification error"} async def check_index_status(self) -> dict: """Get detailed statistics about the search index health using Muvera""" if not self.available or not self.muvera_client: return {"status": "disabled"} try: # Get Muvera index status index_status = await self.muvera_client.get_index_status() # Check for consistency issues if index_status.get("consistency", {}).get("status") != "ok": null_count = index_status.get("consistency", {}).get("null_embeddings_count", 0) if null_count > 0: logger.warning(f"Found {null_count} documents with NULL embeddings") return index_status except Exception: logger.exception("Failed to check index status") return {"status": "error", "message": "Failed to check index status"} async def close(self) -> None: """Close connections and release resources""" if hasattr(self, "muvera_client") and self.muvera_client: try: await self.muvera_client.close() except Exception as e: logger.warning(f"Error closing Muvera client: {e}") logger.info("Search service closed") # Create the search service singleton search_service = SearchService() # API-compatible functions for backward compatibility async def search_text(text: str, limit: int = 200, offset: int = 0) -> list: """Search text using Muvera - backward compatibility function""" if search_service.available: return await search_service.search(text, limit, offset) return [] async def search_author_text(text: str, limit: int = 10, offset: int = 0) -> list: """Search authors using Muvera - backward compatibility function""" if search_service.available: return await search_service.search_authors(text, limit, offset) return [] async def get_search_count(text: str) -> int: """Get count of search results - backward compatibility function""" if not search_service.available: return 0 # Get results and count them results = await search_text(text, SEARCH_PREFETCH_SIZE, 0) return len(results) async def get_author_search_count(text: str) -> int: """Get count of author search results - backward compatibility function""" if not search_service.available: return 0 # Get results and count them results = await search_author_text(text, SEARCH_PREFETCH_SIZE, 0) return len(results) async def initialize_search_index(shouts_data: list) -> None: """Initialize search index with existing data - backward compatibility function""" if not search_service.available: logger.warning("Search service not available for initialization") return try: # Check if we need to reindex if len(shouts_data) > 0: await search_service.bulk_index(shouts_data) logger.info(f"Initialized search index with {len(shouts_data)} documents") except Exception as e: logger.exception(f"Failed to initialize search index: {e}") async def check_search_service() -> None: """Check if search service is available - backward compatibility function""" if search_service.available: logger.info("Search service is available and ready") else: logger.warning("Search service is not available") async def initialize_search_index_background() -> None: """Initialize search index in background - backward compatibility function""" try: logger.info("Background search index initialization started") # This function is kept for compatibility but doesn't do much # since Muvera handles indexing automatically logger.info("Background search index initialization completed") except Exception: logger.exception("Error in background search index initialization")