From 1690ed63aa1abb4b3020a80390a415fc159e529c Mon Sep 17 00:00:00 2001 From: Stepan Vladovskiy Date: Mon, 19 May 2025 16:10:35 -0300 Subject: [PATCH] debug: more logs, and analize precache.py --- cache/precache.py | 57 ++++++++++++---- main.py | 167 +++++++++++++++++++++++++++++++++++---------- services/search.py | 4 +- 3 files changed, 176 insertions(+), 52 deletions(-) diff --git a/cache/precache.py b/cache/precache.py index 23844024..9264dd43 100644 --- a/cache/precache.py +++ b/cache/precache.py @@ -111,21 +111,48 @@ async def precache_data(): logger.info(f"{len(topics)} topics and their followings precached") # authors - authors = get_with_stat(select(Author).where(Author.user.is_not(None))) - logger.info(f"{len(authors)} authors found in database") - for author in authors: - if isinstance(author, Author): - profile = author.dict() - author_id = profile.get("id") - user_id = profile.get("user", "").strip() - if author_id and user_id: - await cache_author(profile) - await asyncio.gather( - precache_authors_followers(author_id, session), precache_authors_follows(author_id, session) - ) - else: - logger.error(f"fail caching {author}") - logger.info(f"{len(authors)} authors and their followings precached") + try: + authors = get_with_stat(select(Author).where(Author.user.is_not(None))) + logger.info(f"{len(authors)} authors found in database") + + # Process authors in smaller batches to avoid long-running operations + batch_size = 50 + total_processed = 0 + + # Create batches + author_batches = [authors[i:i + batch_size] for i in range(0, len(authors), batch_size)] + logger.info(f"Processing authors in {len(author_batches)} batches of {batch_size}") + + for batch_idx, author_batch in enumerate(author_batches): + batch_tasks = [] + for author in author_batch: + if isinstance(author, Author): + profile = author.dict() + author_id = profile.get("id") + user_id = profile.get("user", "").strip() + if author_id and user_id: + # Add task to the batch + cache_task = cache_author(profile) + follower_task = precache_authors_followers(author_id, session) + follows_task = precache_authors_follows(author_id, session) + batch_tasks.extend([cache_task, follower_task, follows_task]) + else: + logger.error(f"fail caching {author}") + + # Run all tasks for this batch with timeout + if batch_tasks: + try: + await asyncio.wait_for(asyncio.gather(*batch_tasks), timeout=30) + total_processed += len(author_batch) + logger.info(f"Processed batch {batch_idx+1}/{len(author_batches)} ({total_processed}/{len(authors)} authors)") + except asyncio.TimeoutError: + logger.error(f"Timeout processing author batch {batch_idx+1}, continuing with next batch") + + logger.info(f"{total_processed} authors and their followings precached (out of {len(authors)} total)") + except Exception as author_exc: + import traceback + logger.error(f"Error processing authors: {author_exc}") + logger.error(traceback.format_exc()) except Exception as exc: import traceback diff --git a/main.py b/main.py index 576a9584..e091a3b9 100644 --- a/main.py +++ b/main.py @@ -37,11 +37,19 @@ async def start(): async def check_search_service(): """Check if search service is available and log result""" - info = await search_service.info() - if info.get("status") in ["error", "unavailable"]: - print(f"[WARNING] Search service unavailable: {info.get('message', 'unknown reason')}") - else: - print(f"[INFO] Search service is available: {info}") + try: + info_task = search_service.info() + info = await asyncio.wait_for(info_task, timeout=10.0) # 10 second timeout + + if info.get("status") in ["error", "unavailable"]: + print(f"[WARNING] Search service unavailable: {info.get('message', 'unknown reason')}") + else: + print(f"[INFO] Search service is available: {info}") + except asyncio.TimeoutError: + print("[WARNING] Search service check timed out after 10 seconds") + except Exception as e: + print(f"[WARNING] Error checking search service: {str(e)}") + print("[INFO] Continuing startup with search service in degraded mode") # indexing DB data @@ -53,15 +61,57 @@ async def lifespan(_app): try: print("[lifespan] Starting application initialization") create_all_tables() - await asyncio.gather( - redis.connect(), - precache_data(), - ViewedStorage.init(), - create_webhook_endpoint(), - check_search_service(), - start(), - revalidation_manager.start(), - ) + + # Run each initialization step separately to identify where it's hanging + try: + print("[lifespan] Connecting to Redis...") + await redis.connect() + print("[lifespan] Redis connected successfully") + except Exception as e: + print(f"[lifespan] Error connecting to Redis: {e}") + + try: + print("[lifespan] Starting precache operation...") + await precache_data() + print("[lifespan] Precache completed successfully") + except Exception as e: + print(f"[lifespan] Error during precache: {e}") + + try: + print("[lifespan] Initializing ViewedStorage...") + await ViewedStorage.init() + print("[lifespan] ViewedStorage initialized successfully") + except Exception as e: + print(f"[lifespan] Error initializing ViewedStorage: {e}") + + try: + print("[lifespan] Creating webhook endpoint...") + await create_webhook_endpoint() + print("[lifespan] Webhook endpoint created successfully") + except Exception as e: + print(f"[lifespan] Error creating webhook endpoint: {e}") + + try: + print("[lifespan] Checking search service...") + await check_search_service() + print("[lifespan] Search service check completed") + except Exception as e: + print(f"[lifespan] Error checking search service: {e}") + + try: + print("[lifespan] Starting app...") + await start() + print("[lifespan] App started successfully") + except Exception as e: + print(f"[lifespan] Error starting app: {e}") + + try: + print("[lifespan] Starting revalidation manager...") + await revalidation_manager.start() + print("[lifespan] Revalidation manager started successfully") + except Exception as e: + print(f"[lifespan] Error starting revalidation manager: {e}") + print("[lifespan] Basic initialization complete") # Verify the server is ready to accept connections @@ -89,9 +139,20 @@ async def lifespan(_app): await asyncio.sleep(10) # 10-second delay to let the system stabilize # Start search indexing as a background task with lower priority - asyncio.create_task(initialize_search_index_background()) + try: + print("[lifespan] Creating search indexing background task...") + asyncio.create_task(initialize_search_index_background()) + print("[lifespan] Search indexing task scheduled successfully") + except Exception as e: + print(f"[lifespan] Error scheduling search indexing task: {e}") + print("[lifespan] Full server startup completed successfully") yield + except Exception as e: + import traceback + print(f"[lifespan] Critical error in lifespan function: {e}") + print(f"[lifespan] Traceback: {traceback.format_exc()}") + yield # Still yield to allow clean shutdown finally: print("[lifespan] Shutting down application services") tasks = [redis.disconnect(), ViewedStorage.stop(), revalidation_manager.stop()] @@ -105,29 +166,52 @@ async def initialize_search_index_background(): print("[search] Starting background search indexing process") from services.db import fetch_all_shouts - print("[search] About to fetch all shouts for indexing") - # Get total count first (optional) - all_shouts = await fetch_all_shouts() - total_count = len(all_shouts) if all_shouts else 0 - print(f"[search] Fetched {total_count} shouts for background indexing") - - # Start the indexing process with the fetched shouts - print("[search] Beginning background search index initialization...") - await initialize_search_index(all_shouts) - print("[search] Background search index initialization complete") - - # Perform a test search to verify indexing worked try: - print("[search] Running test search to verify index...") - from services.search import search_text - test_results = await search_text("test", 3) - print(f"[search] Test search complete with {len(test_results)} results") - except Exception as test_error: - print(f"[search] Test search error: {str(test_error)}") + print("[search] About to fetch all shouts for indexing") + # Get total count first (optional) + all_shouts = await fetch_all_shouts() + total_count = len(all_shouts) if all_shouts else 0 + print(f"[search] Fetched {total_count} shouts for background indexing") + + # Skip indexing if no shouts found + if not all_shouts or total_count == 0: + print("[search] No shouts to index, skipping indexing process") + return + + # Start the indexing process with the fetched shouts + print("[search] Beginning background search index initialization...") + + # Add a timeout to the indexing operation + try: + index_task = initialize_search_index(all_shouts) + await asyncio.wait_for(index_task, timeout=300) # 5-minute timeout + print("[search] Background search index initialization complete") + except asyncio.TimeoutError: + print("[search] Background indexing timed out after 5 minutes") + return + + # Perform a test search to verify indexing worked + try: + print("[search] Running test search to verify index...") + from services.search import search_text + search_task = search_text("test", 3) + test_results = await asyncio.wait_for(search_task, timeout=30) # 30-second timeout + print(f"[search] Test search complete with {len(test_results)} results") + except asyncio.TimeoutError: + print("[search] Test search timed out after 30 seconds") + except Exception as test_error: + print(f"[search] Test search error: {str(test_error)}") + except Exception as inner_error: + print(f"[search] Error in search indexing process: {str(inner_error)}") + import traceback + print(f"[search] Inner traceback: {traceback.format_exc()}") + + print("[search] Search initialization process completed (with or without errors)") + except Exception as e: - print(f"[search] Error in background search indexing: {str(e)}") + print(f"[search] Outer error in background search indexing: {str(e)}") import traceback - print(f"[search] Search indexing traceback: {traceback.format_exc()}") + print(f"[search] Outer traceback: {traceback.format_exc()}") # Создаем экземпляр GraphQL graphql_app = GraphQL(schema, debug=True) @@ -155,11 +239,24 @@ app = Starlette( routes=[ Route("/", graphql_handler, methods=["GET", "POST"]), Route("/new-author", WebhookEndpoint), + # Health check endpoint + Route("/health", lambda request: JSONResponse({"status": "healthy"}), methods=["GET"]), ], lifespan=lifespan, debug=True, ) +# Register an error handler for uncaught exceptions +@app.exception_handler(Exception) +async def handle_exception(request, exc): + print(f"Global exception handler caught: {str(exc)}") + import traceback + traceback.print_exc() + return JSONResponse( + status_code=500, + content={"detail": "Internal Server Error", "message": str(exc)} + ) + app.add_middleware(ExceptionHandlerMiddleware) if "dev" in sys.argv: app.add_middleware( diff --git a/services/search.py b/services/search.py index eb861fed..2e1c9412 100644 --- a/services/search.py +++ b/services/search.py @@ -208,8 +208,8 @@ class SearchService: logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}") self.available = SEARCH_ENABLED # Use different timeout settings for indexing and search requests - self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL) - self.index_client = httpx.AsyncClient(timeout=120.0, base_url=TXTAI_SERVICE_URL) + self.client = httpx.AsyncClient(timeout=10.0, base_url=TXTAI_SERVICE_URL) # Reduced timeout for regular ops + self.index_client = httpx.AsyncClient(timeout=60.0, base_url=TXTAI_SERVICE_URL) # Reduced timeout for indexing # Initialize search cache self.cache = SearchCache() if SEARCH_CACHE_ENABLED else None