From a9c7ac49d6324569aa6fa11f7fb204421e26fb06 Mon Sep 17 00:00:00 2001 From: Stepan Vladovskiy Date: Wed, 12 Mar 2025 13:07:27 -0300 Subject: [PATCH] feat: with logs >>> --- services/search.py | 146 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 125 insertions(+), 21 deletions(-) diff --git a/services/search.py b/services/search.py index 4328ce4c..ca5f8fff 100644 --- a/services/search.py +++ b/services/search.py @@ -3,6 +3,7 @@ import json import logging import os import httpx +from typing import List, Dict, Any, Optional from services.redis import redis from utils.encoders import CustomJSONEncoder @@ -15,18 +16,84 @@ REDIS_TTL = 86400 # 1 day in seconds # Configuration for search service SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"]) -TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL", "http://txtai-service:8000") +TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL") +# Add retry configuration +MAX_RETRIES = int(os.environ.get("TXTAI_MAX_RETRIES", "3")) +RETRY_DELAY = float(os.environ.get("TXTAI_RETRY_DELAY", "1.0")) +# Add request timeout configuration +REQUEST_TIMEOUT = float(os.environ.get("TXTAI_REQUEST_TIMEOUT", "30.0")) +# Add health check configuration +HEALTH_CHECK_INTERVAL = int(os.environ.get("SEARCH_HEALTH_CHECK_INTERVAL", "300")) # 5 minutes class SearchService: def __init__(self): - logger.info("Initializing search service...") + logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}") self.available = SEARCH_ENABLED - self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL) + self.client = httpx.AsyncClient(timeout=REQUEST_TIMEOUT, base_url=TXTAI_SERVICE_URL) + self.last_health_check = 0 + self.health_status = False if not self.available: logger.info("Search disabled (SEARCH_ENABLED = False)") + + # Schedule health check if enabled + if self.available: + asyncio.create_task(self._schedule_health_checks()) + async def _schedule_health_checks(self): + """Schedule periodic health checks""" + while True: + try: + await self._check_health() + await asyncio.sleep(HEALTH_CHECK_INTERVAL) + except Exception as e: + logger.error(f"Error in health check scheduler: {e}") + await asyncio.sleep(HEALTH_CHECK_INTERVAL) + + async def _check_health(self): + """Check if search service is healthy""" + try: + info = await self.info() + self.health_status = info.get("status") != "error" + if self.health_status: + logger.info("Search service is healthy") + else: + logger.warning("Search service is unhealthy") + return self.health_status + except Exception as e: + self.health_status = False + logger.warning(f"Search health check failed: {e}") + return False + + async def _retry_operation(self, operation_func, *args, **kwargs): + """Execute an operation with retries""" + if not self.available: + return None + + for attempt in range(MAX_RETRIES): + try: + return await operation_func(*args, **kwargs) + except httpx.ReadTimeout: + if attempt == MAX_RETRIES - 1: + raise + logger.warning(f"Request timed out, retrying ({attempt+1}/{MAX_RETRIES})") + await asyncio.sleep(RETRY_DELAY * (attempt + 1)) + except httpx.ConnectError as e: + if attempt == MAX_RETRIES - 1: + self.available = False + logger.error(f"Connection error after {MAX_RETRIES} attempts: {e}") + raise + logger.warning(f"Connection error, retrying ({attempt+1}/{MAX_RETRIES}): {e}") + await asyncio.sleep(RETRY_DELAY * (attempt + 1)) + except Exception as e: + if attempt == MAX_RETRIES - 1: + raise + logger.warning(f"Error, retrying ({attempt+1}/{MAX_RETRIES}): {e}") + await asyncio.sleep(RETRY_DELAY * (attempt + 1)) + + return None # Should not reach here + async def info(self): """Return information about search service""" if not self.available: @@ -42,7 +109,7 @@ class SearchService: def is_ready(self): """Check if service is available""" - return self.available + return self.available and self.health_status def index(self, shout): """Index a single document""" @@ -69,15 +136,24 @@ class SearchService: shout.media or "" ])) - # Send to txtai service - response = await self.client.post( - "/index", - json={"id": str(shout.id), "text": text} + # Send to txtai service with retry + await self._retry_operation( + self._perform_index_request, + str(shout.id), + text ) - response.raise_for_status() logger.info(f"Post {shout.id} successfully indexed") except Exception as e: logger.error(f"Indexing error for shout {shout.id}: {e}") + + async def _perform_index_request(self, id, text): + """Execute the actual index request""" + response = await self.client.post( + "/index", + json={"id": id, "text": text} + ) + response.raise_for_status() + return response.json() async def bulk_index(self, shouts): """Index multiple documents at once""" @@ -96,14 +172,29 @@ class SearchService: documents.append({"id": str(shout.id), "text": text}) try: - response = await self.client.post( - "/bulk-index", - json={"documents": documents} - ) - response.raise_for_status() - logger.info(f"Bulk indexed {len(documents)} documents") + # Using chunking to avoid large requests + chunk_size = 100 # Adjust based on your needs + for i in range(0, len(documents), chunk_size): + chunk = documents[i:i+chunk_size] + logger.info(f"Bulk indexing chunk {i//chunk_size + 1}/{(len(documents)-1)//chunk_size + 1} ({len(chunk)} documents)") + + await self._retry_operation( + self._perform_bulk_index_request, + chunk + ) + + logger.info(f"Bulk indexed {len(documents)} documents in total") except Exception as e: logger.error(f"Bulk indexing error: {e}") + + async def _perform_bulk_index_request(self, documents): + """Execute the actual bulk index request""" + response = await self.client.post( + "/bulk-index", + json={"documents": documents} + ) + response.raise_for_status() + return response.json() async def search(self, text, limit, offset): """Search documents""" @@ -119,12 +210,16 @@ class SearchService: logger.info(f"Searching: {text} {offset}+{limit}") try: - response = await self.client.post( - "/search", - json={"text": text, "limit": limit, "offset": offset} + result = await self._retry_operation( + self._perform_search_request, + text, + limit, + offset ) - response.raise_for_status() - result = response.json() + + if not result: + return [] + formatted_results = result.get("results", []) # Cache results @@ -139,6 +234,15 @@ class SearchService: except Exception as e: logger.error(f"Search error: {e}") return [] + + async def _perform_search_request(self, text, limit, offset): + """Execute the actual search request""" + response = await self.client.post( + "/search", + json={"text": text, "limit": limit, "offset": offset} + ) + response.raise_for_status() + return response.json() # Create the search service singleton @@ -153,7 +257,7 @@ async def search_text(text: str, limit: int = 50, offset: int = 0): return payload -# Function to initialize search with existing data +# Function to initialize search index with existing data async def initialize_search_index(shouts_data): """Initialize search index with existing data during application startup""" if SEARCH_ENABLED: