From 60a13a90972088e9f36707870ec44fd3b76fb246 Mon Sep 17 00:00:00 2001 From: Stepan Vladovskiy Date: Mon, 24 Mar 2025 19:47:02 -0300 Subject: [PATCH] refactor(search.py): moved initialization logic in search-txtai instance --- services/search.py | 199 +++++++++++++++++++-------------------------- 1 file changed, 85 insertions(+), 114 deletions(-) diff --git a/services/search.py b/services/search.py index c7e9c80c..d434b195 100644 --- a/services/search.py +++ b/services/search.py @@ -193,112 +193,100 @@ class SearchService: logger.info(f"Bulk indexing completed in {elapsed:.2f}s: {total_indexed} indexed, {total_skipped} skipped, {total_truncated} truncated, {total_retries} retries") async def _process_document_batches(self, documents, batch_size, size_category): - """Process document batches with retry logic""" - # Check for possible database corruption before starting - db_error_count = 0 - - for i in range(0, len(documents), batch_size): - batch = documents[i:i+batch_size] - batch_id = f"{size_category}-{i//batch_size + 1}" - logger.info(f"Processing {size_category} batch {batch_id} of {len(batch)} documents") + """Process document batches with retry logic""" + # Check for possible database corruption before starting + db_error_count = 0 - retry_count = 0 - max_retries = 3 - success = False - - # Process with retries - while not success and retry_count < max_retries: - try: - if batch: - sample = batch[0] - logger.info(f"Sample document in batch {batch_id}: id={sample['id']}, text_length={len(sample['text'])}") - - logger.info(f"Sending batch {batch_id} of {len(batch)} documents to search service (attempt {retry_count+1})") - response = await self.index_client.post( - "/bulk-index", - json=batch, - timeout=120.0 # Explicit longer timeout for large batches - ) - - # Handle 422 validation errors - these won't be fixed by retrying - if response.status_code == 422: - error_detail = response.json() - truncated_error = self._truncate_error_detail(error_detail) - logger.error(f"Validation error from search service for batch {batch_id}: {truncated_error}") - break - - # Handle 500 server errors - these might be fixed by retrying with smaller batches - elif response.status_code == 500: - db_error_count += 1 + for i in range(0, len(documents), batch_size): + batch = documents[i:i+batch_size] + batch_id = f"{size_category}-{i//batch_size + 1}" + logger.info(f"Processing {size_category} batch {batch_id} of {len(batch)} documents") + + retry_count = 0 + max_retries = 3 + success = False + + # Process with retries + while not success and retry_count < max_retries: + try: + if batch: + sample = batch[0] + logger.info(f"Sample document in batch {batch_id}: id={sample['id']}, text_length={len(sample['text'])}") - # If we've seen multiple 500s, check for DB corruption - if db_error_count >= 3: - logger.warning("Multiple server errors detected, attempting to reset search service") - reset_result = await self.reset_search_service() - if reset_result["status"] == "reset": - logger.info("Search service has been reset, restarting batch processing") - # Wait a moment for the service to stabilize - await asyncio.sleep(2) - # Only retry current batch - retry_count = 0 + logger.info(f"Sending batch {batch_id} of {len(batch)} documents to search service (attempt {retry_count+1})") + response = await self.index_client.post( + "/bulk-index", + json=batch, + timeout=120.0 # Explicit longer timeout for large batches + ) + + # Handle 422 validation errors - these won't be fixed by retrying + if response.status_code == 422: + error_detail = response.json() + truncated_error = self._truncate_error_detail(error_detail) + logger.error(f"Validation error from search service for batch {batch_id}: {truncated_error}") + break + + # Handle 500 server errors - these might be fixed by retrying with smaller batches + elif response.status_code == 500: + db_error_count += 1 + + # If we've seen multiple 500s, log a critical error + if db_error_count >= 3: + logger.critical(f"Multiple server errors detected (500). The search service may need manual intervention. Stopping batch {batch_id} processing.") + break + + # Try again with exponential backoff + if retry_count < max_retries - 1: + retry_count += 1 + wait_time = (2 ** retry_count) + (random.random() * 0.5) # Exponential backoff with jitter + logger.warning(f"Server error for batch {batch_id}, retrying in {wait_time:.1f}s (attempt {retry_count+1}/{max_retries})") + await asyncio.sleep(wait_time) continue - # Try again with exponential backoff + # Final retry, split the batch + elif len(batch) > 1: + logger.warning(f"Splitting batch {batch_id} after repeated failures") + mid = len(batch) // 2 + await self._process_single_batch(batch[:mid], f"{batch_id}-A") + await self._process_single_batch(batch[mid:], f"{batch_id}-B") + break + else: + # Can't split a single document + logger.error(f"Failed to index document {batch[0]['id']} after {max_retries} attempts") + break + + # Normal success case + response.raise_for_status() + result = response.json() + logger.info(f"Batch {batch_id} indexed successfully: {result}") + success = True + db_error_count = 0 # Reset error counter on success + + except Exception as e: + # Check if it looks like a database corruption error + error_str = str(e).lower() + if "duplicate key" in error_str or "unique constraint" in error_str or "nonetype" in error_str: + db_error_count += 1 + if db_error_count >= 2: + logger.critical(f"Potential database corruption detected: {error_str}. The search service may need manual intervention. Stopping batch {batch_id} processing.") + break + if retry_count < max_retries - 1: retry_count += 1 - wait_time = (2 ** retry_count) + (random.random() * 0.5) # Exponential backoff with jitter - logger.warning(f"Server error for batch {batch_id}, retrying in {wait_time:.1f}s (attempt {retry_count+1}/{max_retries})") + wait_time = (2 ** retry_count) + (random.random() * 0.5) + logger.warning(f"Error for batch {batch_id}, retrying in {wait_time:.1f}s: {str(e)[:200]}") await asyncio.sleep(wait_time) - continue - - # Final retry, split the batch - elif len(batch) > 1: - logger.warning(f"Splitting batch {batch_id} after repeated failures") - mid = len(batch) // 2 - await self._process_single_batch(batch[:mid], f"{batch_id}-A") - await self._process_single_batch(batch[mid:], f"{batch_id}-B") - break else: - # Can't split a single document - logger.error(f"Failed to index document {batch[0]['id']} after {max_retries} attempts") + # Last resort - try to split the batch + if len(batch) > 1: + logger.warning(f"Splitting batch {batch_id} after exception: {str(e)[:200]}") + mid = len(batch) // 2 + await self._process_single_batch(batch[:mid], f"{batch_id}-A") + await self._process_single_batch(batch[mid:], f"{batch_id}-B") + else: + logger.error(f"Failed to index document {batch[0]['id']} after {max_retries} attempts: {e}") break - - # Normal success case - response.raise_for_status() - result = response.json() - logger.info(f"Batch {batch_id} indexed successfully: {result}") - success = True - db_error_count = 0 # Reset error counter on success - - except Exception as e: - # Check if it looks like a database corruption error - error_str = str(e).lower() - if "duplicate key" in error_str or "unique constraint" in error_str or "nonetype" in error_str: - db_error_count += 1 - if db_error_count >= 2: - logger.warning(f"Database corruption detected: {error_str}") - reset_result = await self.reset_search_service() - if reset_result["status"] == "reset": - logger.info("Search service has been reset, restarting batch processing") - await asyncio.sleep(2) - retry_count = 0 - continue - - if retry_count < max_retries - 1: - retry_count += 1 - wait_time = (2 ** retry_count) + (random.random() * 0.5) - logger.warning(f"Error for batch {batch_id}, retrying in {wait_time:.1f}s: {str(e)[:200]}") - await asyncio.sleep(wait_time) - else: - # Last resort - try to split the batch - if len(batch) > 1: - logger.warning(f"Splitting batch {batch_id} after exception: {str(e)[:200]}") - mid = len(batch) // 2 - await self._process_single_batch(batch[:mid], f"{batch_id}-A") - await self._process_single_batch(batch[mid:], f"{batch_id}-B") - else: - logger.error(f"Failed to index document {batch[0]['id']} after {max_retries} attempts: {e}") - break async def _process_single_batch(self, documents, batch_id): """Process a single batch with maximum reliability""" @@ -348,23 +336,6 @@ class SearchService: item['input']['text'] = f"{item['input']['text'][:100]}... [truncated, total {len(item['input']['text'])} chars]" return truncated_detail - - async def reset_search_service(self): - """Reset the search service to recover from database corruption""" - if not self.available: - logger.warning("Search not available, cannot reset") - return {"status": "disabled"} - - try: - logger.warning("Resetting search service due to database corruption") - response = await self.client.post("/initialize") - response.raise_for_status() - result = response.json() - logger.info(f"Search service reset: {result}") - return {"status": "reset", "message": "Search index has been reset"} - except Exception as e: - logger.error(f"Failed to reset search service: {e}") - return {"status": "error", "message": str(e)} async def search(self, text, limit, offset): """Search documents"""