diff --git a/server.py b/server.py index 0ba5b97c..281f50ff 100644 --- a/server.py +++ b/server.py @@ -17,7 +17,7 @@ if __name__ == "__main__": address="0.0.0.0", port=PORT, interface=Interfaces.ASGI, - workers=4, + workers=1, websockets=False, log_level=LogLevels.debug, backlog=2048, diff --git a/services/search.py b/services/search.py index d434b195..5401d0f6 100644 --- a/services/search.py +++ b/services/search.py @@ -290,32 +290,50 @@ class SearchService: async def _process_single_batch(self, documents, batch_id): """Process a single batch with maximum reliability""" - try: - if not documents: - return + max_retries = 3 + retry_count = 0 + + while retry_count < max_retries: + try: + if not documents: + return + + logger.info(f"Processing sub-batch {batch_id} with {len(documents)} documents") + response = await self.index_client.post( + "/bulk-index", + json=documents, + timeout=90.0 + ) + response.raise_for_status() + result = response.json() + logger.info(f"Sub-batch {batch_id} indexed successfully: {result}") + return # Success, exit the retry loop - logger.info(f"Processing sub-batch {batch_id} with {len(documents)} documents") - response = await self.index_client.post( - "/bulk-index", - json=documents, - timeout=90.0 - ) - response.raise_for_status() - result = response.json() - logger.info(f"Sub-batch {batch_id} indexed successfully: {result}") - except Exception as e: - logger.error(f"Error indexing sub-batch {batch_id}: {str(e)[:200]}") - - # For tiny batches, try one-by-one as last resort - if len(documents) > 1: - logger.info(f"Processing documents in sub-batch {batch_id} individually") - for i, doc in enumerate(documents): - try: - resp = await self.index_client.post("/index", json=doc, timeout=30.0) - resp.raise_for_status() - logger.info(f"Indexed document {doc['id']} individually") - except Exception as e2: - logger.error(f"Failed to index document {doc['id']} individually: {str(e2)[:100]}") + except Exception as e: + error_str = str(e).lower() + retry_count += 1 + + # Check if it's a transient error that txtai might recover from internally + if "dictionary changed size" in error_str or "transaction error" in error_str: + wait_time = (2 ** retry_count) + (random.random() * 0.5) + logger.warning(f"Transient txtai error in sub-batch {batch_id}, waiting {wait_time:.1f}s for recovery: {str(e)[:200]}") + await asyncio.sleep(wait_time) # Wait for txtai to recover + continue # Try again + + # For other errors or final retry failure + logger.error(f"Error indexing sub-batch {batch_id} (attempt {retry_count}/{max_retries}): {str(e)[:200]}") + + # Only try one-by-one on the final retry + if retry_count >= max_retries and len(documents) > 1: + logger.info(f"Processing documents in sub-batch {batch_id} individually") + for i, doc in enumerate(documents): + try: + resp = await self.index_client.post("/index", json=doc, timeout=30.0) + resp.raise_for_status() + logger.info(f"Indexed document {doc['id']} individually") + except Exception as e2: + logger.error(f"Failed to index document {doc['id']} individually: {str(e2)[:100]}") + return # Exit after individual processing attempt def _truncate_error_detail(self, error_detail): """Truncate error details for logging"""