debug: server.py -> threds 1 , search.py -> add 3 times reconect
All checks were successful
Deploy on push / deploy (push) Successful in 49s
All checks were successful
Deploy on push / deploy (push) Successful in 49s
This commit is contained in:
parent
60a13a9097
commit
077cb46482
|
@ -17,7 +17,7 @@ if __name__ == "__main__":
|
|||
address="0.0.0.0",
|
||||
port=PORT,
|
||||
interface=Interfaces.ASGI,
|
||||
workers=4,
|
||||
workers=1,
|
||||
websockets=False,
|
||||
log_level=LogLevels.debug,
|
||||
backlog=2048,
|
||||
|
|
|
@ -290,32 +290,50 @@ class SearchService:
|
|||
|
||||
async def _process_single_batch(self, documents, batch_id):
|
||||
"""Process a single batch with maximum reliability"""
|
||||
try:
|
||||
if not documents:
|
||||
return
|
||||
max_retries = 3
|
||||
retry_count = 0
|
||||
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
if not documents:
|
||||
return
|
||||
|
||||
logger.info(f"Processing sub-batch {batch_id} with {len(documents)} documents")
|
||||
response = await self.index_client.post(
|
||||
"/bulk-index",
|
||||
json=documents,
|
||||
timeout=90.0
|
||||
)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
logger.info(f"Sub-batch {batch_id} indexed successfully: {result}")
|
||||
return # Success, exit the retry loop
|
||||
|
||||
logger.info(f"Processing sub-batch {batch_id} with {len(documents)} documents")
|
||||
response = await self.index_client.post(
|
||||
"/bulk-index",
|
||||
json=documents,
|
||||
timeout=90.0
|
||||
)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
logger.info(f"Sub-batch {batch_id} indexed successfully: {result}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error indexing sub-batch {batch_id}: {str(e)[:200]}")
|
||||
|
||||
# For tiny batches, try one-by-one as last resort
|
||||
if len(documents) > 1:
|
||||
logger.info(f"Processing documents in sub-batch {batch_id} individually")
|
||||
for i, doc in enumerate(documents):
|
||||
try:
|
||||
resp = await self.index_client.post("/index", json=doc, timeout=30.0)
|
||||
resp.raise_for_status()
|
||||
logger.info(f"Indexed document {doc['id']} individually")
|
||||
except Exception as e2:
|
||||
logger.error(f"Failed to index document {doc['id']} individually: {str(e2)[:100]}")
|
||||
except Exception as e:
|
||||
error_str = str(e).lower()
|
||||
retry_count += 1
|
||||
|
||||
# Check if it's a transient error that txtai might recover from internally
|
||||
if "dictionary changed size" in error_str or "transaction error" in error_str:
|
||||
wait_time = (2 ** retry_count) + (random.random() * 0.5)
|
||||
logger.warning(f"Transient txtai error in sub-batch {batch_id}, waiting {wait_time:.1f}s for recovery: {str(e)[:200]}")
|
||||
await asyncio.sleep(wait_time) # Wait for txtai to recover
|
||||
continue # Try again
|
||||
|
||||
# For other errors or final retry failure
|
||||
logger.error(f"Error indexing sub-batch {batch_id} (attempt {retry_count}/{max_retries}): {str(e)[:200]}")
|
||||
|
||||
# Only try one-by-one on the final retry
|
||||
if retry_count >= max_retries and len(documents) > 1:
|
||||
logger.info(f"Processing documents in sub-batch {batch_id} individually")
|
||||
for i, doc in enumerate(documents):
|
||||
try:
|
||||
resp = await self.index_client.post("/index", json=doc, timeout=30.0)
|
||||
resp.raise_for_status()
|
||||
logger.info(f"Indexed document {doc['id']} individually")
|
||||
except Exception as e2:
|
||||
logger.error(f"Failed to index document {doc['id']} individually: {str(e2)[:100]}")
|
||||
return # Exit after individual processing attempt
|
||||
|
||||
def _truncate_error_detail(self, error_detail):
|
||||
"""Truncate error details for logging"""
|
||||
|
|
Loading…
Reference in New Issue
Block a user