From f1d9f4e0361653fbce779ba4f2ce3ec9037ee7fa Mon Sep 17 00:00:00 2001 From: Stepan Vladovskiy Date: Fri, 21 Mar 2025 17:28:54 -0300 Subject: [PATCH] feat(search.py): with db reset endpoint --- services/search.py | 51 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/services/search.py b/services/search.py index 3121a405..c7e9c80c 100644 --- a/services/search.py +++ b/services/search.py @@ -194,6 +194,9 @@ class SearchService: async def _process_document_batches(self, documents, batch_size, size_category): """Process document batches with retry logic""" + # Check for possible database corruption before starting + db_error_count = 0 + for i in range(0, len(documents), batch_size): batch = documents[i:i+batch_size] batch_id = f"{size_category}-{i//batch_size + 1}" @@ -222,12 +225,25 @@ class SearchService: error_detail = response.json() truncated_error = self._truncate_error_detail(error_detail) logger.error(f"Validation error from search service for batch {batch_id}: {truncated_error}") - - # Individual document validation often won't benefit from splitting break # Handle 500 server errors - these might be fixed by retrying with smaller batches elif response.status_code == 500: + db_error_count += 1 + + # If we've seen multiple 500s, check for DB corruption + if db_error_count >= 3: + logger.warning("Multiple server errors detected, attempting to reset search service") + reset_result = await self.reset_search_service() + if reset_result["status"] == "reset": + logger.info("Search service has been reset, restarting batch processing") + # Wait a moment for the service to stabilize + await asyncio.sleep(2) + # Only retry current batch + retry_count = 0 + continue + + # Try again with exponential backoff if retry_count < max_retries - 1: retry_count += 1 wait_time = (2 ** retry_count) + (random.random() * 0.5) # Exponential backoff with jitter @@ -252,8 +268,22 @@ class SearchService: result = response.json() logger.info(f"Batch {batch_id} indexed successfully: {result}") success = True + db_error_count = 0 # Reset error counter on success except Exception as e: + # Check if it looks like a database corruption error + error_str = str(e).lower() + if "duplicate key" in error_str or "unique constraint" in error_str or "nonetype" in error_str: + db_error_count += 1 + if db_error_count >= 2: + logger.warning(f"Database corruption detected: {error_str}") + reset_result = await self.reset_search_service() + if reset_result["status"] == "reset": + logger.info("Search service has been reset, restarting batch processing") + await asyncio.sleep(2) + retry_count = 0 + continue + if retry_count < max_retries - 1: retry_count += 1 wait_time = (2 ** retry_count) + (random.random() * 0.5) @@ -318,6 +348,23 @@ class SearchService: item['input']['text'] = f"{item['input']['text'][:100]}... [truncated, total {len(item['input']['text'])} chars]" return truncated_detail + + async def reset_search_service(self): + """Reset the search service to recover from database corruption""" + if not self.available: + logger.warning("Search not available, cannot reset") + return {"status": "disabled"} + + try: + logger.warning("Resetting search service due to database corruption") + response = await self.client.post("/initialize") + response.raise_for_status() + result = response.json() + logger.info(f"Search service reset: {result}") + return {"status": "reset", "message": "Search index has been reset"} + except Exception as e: + logger.error(f"Failed to reset search service: {e}") + return {"status": "error", "message": str(e)} async def search(self, text, limit, offset): """Search documents"""