debug: something wrong one stap back with logs
All checks were successful
Deploy on push / deploy (push) Successful in 53s

This commit is contained in:
Stepan Vladovskiy 2025-03-12 13:11:19 -03:00
parent a9c7ac49d6
commit 24cca7f2cb

View File

@ -3,7 +3,6 @@ import json
import logging import logging
import os import os
import httpx import httpx
from typing import List, Dict, Any, Optional
from services.redis import redis from services.redis import redis
from utils.encoders import CustomJSONEncoder from utils.encoders import CustomJSONEncoder
@ -17,83 +16,17 @@ REDIS_TTL = 86400 # 1 day in seconds
# Configuration for search service # Configuration for search service
SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"]) SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"])
TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL") TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL")
# Add retry configuration
MAX_RETRIES = int(os.environ.get("TXTAI_MAX_RETRIES", "3"))
RETRY_DELAY = float(os.environ.get("TXTAI_RETRY_DELAY", "1.0"))
# Add request timeout configuration
REQUEST_TIMEOUT = float(os.environ.get("TXTAI_REQUEST_TIMEOUT", "30.0"))
# Add health check configuration
HEALTH_CHECK_INTERVAL = int(os.environ.get("SEARCH_HEALTH_CHECK_INTERVAL", "300")) # 5 minutes
class SearchService: class SearchService:
def __init__(self): def __init__(self):
logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}") logger.info("Initializing search service...")
self.available = SEARCH_ENABLED self.available = SEARCH_ENABLED
self.client = httpx.AsyncClient(timeout=REQUEST_TIMEOUT, base_url=TXTAI_SERVICE_URL) self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL)
self.last_health_check = 0
self.health_status = False
if not self.available: if not self.available:
logger.info("Search disabled (SEARCH_ENABLED = False)") logger.info("Search disabled (SEARCH_ENABLED = False)")
# Schedule health check if enabled
if self.available:
asyncio.create_task(self._schedule_health_checks())
async def _schedule_health_checks(self):
"""Schedule periodic health checks"""
while True:
try:
await self._check_health()
await asyncio.sleep(HEALTH_CHECK_INTERVAL)
except Exception as e:
logger.error(f"Error in health check scheduler: {e}")
await asyncio.sleep(HEALTH_CHECK_INTERVAL)
async def _check_health(self):
"""Check if search service is healthy"""
try:
info = await self.info()
self.health_status = info.get("status") != "error"
if self.health_status:
logger.info("Search service is healthy")
else:
logger.warning("Search service is unhealthy")
return self.health_status
except Exception as e:
self.health_status = False
logger.warning(f"Search health check failed: {e}")
return False
async def _retry_operation(self, operation_func, *args, **kwargs):
"""Execute an operation with retries"""
if not self.available:
return None
for attempt in range(MAX_RETRIES):
try:
return await operation_func(*args, **kwargs)
except httpx.ReadTimeout:
if attempt == MAX_RETRIES - 1:
raise
logger.warning(f"Request timed out, retrying ({attempt+1}/{MAX_RETRIES})")
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
except httpx.ConnectError as e:
if attempt == MAX_RETRIES - 1:
self.available = False
logger.error(f"Connection error after {MAX_RETRIES} attempts: {e}")
raise
logger.warning(f"Connection error, retrying ({attempt+1}/{MAX_RETRIES}): {e}")
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
except Exception as e:
if attempt == MAX_RETRIES - 1:
raise
logger.warning(f"Error, retrying ({attempt+1}/{MAX_RETRIES}): {e}")
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
return None # Should not reach here
async def info(self): async def info(self):
"""Return information about search service""" """Return information about search service"""
if not self.available: if not self.available:
@ -109,7 +42,7 @@ class SearchService:
def is_ready(self): def is_ready(self):
"""Check if service is available""" """Check if service is available"""
return self.available and self.health_status return self.available
def index(self, shout): def index(self, shout):
"""Index a single document""" """Index a single document"""
@ -136,24 +69,15 @@ class SearchService:
shout.media or "" shout.media or ""
])) ]))
# Send to txtai service with retry # Send to txtai service
await self._retry_operation( response = await self.client.post(
self._perform_index_request, "/index",
str(shout.id), json={"id": str(shout.id), "text": text}
text
) )
response.raise_for_status()
logger.info(f"Post {shout.id} successfully indexed") logger.info(f"Post {shout.id} successfully indexed")
except Exception as e: except Exception as e:
logger.error(f"Indexing error for shout {shout.id}: {e}") logger.error(f"Indexing error for shout {shout.id}: {e}")
async def _perform_index_request(self, id, text):
"""Execute the actual index request"""
response = await self.client.post(
"/index",
json={"id": id, "text": text}
)
response.raise_for_status()
return response.json()
async def bulk_index(self, shouts): async def bulk_index(self, shouts):
"""Index multiple documents at once""" """Index multiple documents at once"""
@ -172,29 +96,14 @@ class SearchService:
documents.append({"id": str(shout.id), "text": text}) documents.append({"id": str(shout.id), "text": text})
try: try:
# Using chunking to avoid large requests response = await self.client.post(
chunk_size = 100 # Adjust based on your needs "/bulk-index",
for i in range(0, len(documents), chunk_size): json={"documents": documents}
chunk = documents[i:i+chunk_size] )
logger.info(f"Bulk indexing chunk {i//chunk_size + 1}/{(len(documents)-1)//chunk_size + 1} ({len(chunk)} documents)") response.raise_for_status()
logger.info(f"Bulk indexed {len(documents)} documents")
await self._retry_operation(
self._perform_bulk_index_request,
chunk
)
logger.info(f"Bulk indexed {len(documents)} documents in total")
except Exception as e: except Exception as e:
logger.error(f"Bulk indexing error: {e}") logger.error(f"Bulk indexing error: {e}")
async def _perform_bulk_index_request(self, documents):
"""Execute the actual bulk index request"""
response = await self.client.post(
"/bulk-index",
json={"documents": documents}
)
response.raise_for_status()
return response.json()
async def search(self, text, limit, offset): async def search(self, text, limit, offset):
"""Search documents""" """Search documents"""
@ -210,16 +119,12 @@ class SearchService:
logger.info(f"Searching: {text} {offset}+{limit}") logger.info(f"Searching: {text} {offset}+{limit}")
try: try:
result = await self._retry_operation( response = await self.client.post(
self._perform_search_request, "/search",
text, json={"text": text, "limit": limit, "offset": offset}
limit,
offset
) )
response.raise_for_status()
if not result: result = response.json()
return []
formatted_results = result.get("results", []) formatted_results = result.get("results", [])
# Cache results # Cache results
@ -234,15 +139,6 @@ class SearchService:
except Exception as e: except Exception as e:
logger.error(f"Search error: {e}") logger.error(f"Search error: {e}")
return [] return []
async def _perform_search_request(self, text, limit, offset):
"""Execute the actual search request"""
response = await self.client.post(
"/search",
json={"text": text, "limit": limit, "offset": offset}
)
response.raise_for_status()
return response.json()
# Create the search service singleton # Create the search service singleton
@ -257,7 +153,7 @@ async def search_text(text: str, limit: int = 50, offset: int = 0):
return payload return payload
# Function to initialize search index with existing data # Function to initialize search with existing data
async def initialize_search_index(shouts_data): async def initialize_search_index(shouts_data):
"""Initialize search index with existing data during application startup""" """Initialize search index with existing data during application startup"""
if SEARCH_ENABLED: if SEARCH_ENABLED: