core/main.py
Stepan Vladovskiy 1690ed63aa
All checks were successful
Deploy on push / deploy (push) Successful in 44s
debug: more logs, and analize precache.py
2025-05-19 16:10:35 -03:00

269 lines
11 KiB
Python

import asyncio
import os
import sys
from importlib import import_module
from os.path import exists
from ariadne import load_schema_from_path, make_executable_schema
from ariadne.asgi import GraphQL
from starlette.applications import Starlette
from starlette.middleware.cors import CORSMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.routing import Route
from cache.precache import precache_data
from cache.revalidator import revalidation_manager
from services.exception import ExceptionHandlerMiddleware
from services.redis import redis
from services.schema import create_all_tables, resolvers
#from services.search import search_service
from services.search import search_service, initialize_search_index
from services.viewed import ViewedStorage
from services.webhook import WebhookEndpoint, create_webhook_endpoint
from settings import DEV_SERVER_PID_FILE_NAME, MODE
import_module("resolvers")
schema = make_executable_schema(load_schema_from_path("schema/"), resolvers)
async def start():
if MODE == "development":
if not exists(DEV_SERVER_PID_FILE_NAME):
# pid file management
with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f:
f.write(str(os.getpid()))
print(f"[main] process started in {MODE} mode")
async def check_search_service():
"""Check if search service is available and log result"""
try:
info_task = search_service.info()
info = await asyncio.wait_for(info_task, timeout=10.0) # 10 second timeout
if info.get("status") in ["error", "unavailable"]:
print(f"[WARNING] Search service unavailable: {info.get('message', 'unknown reason')}")
else:
print(f"[INFO] Search service is available: {info}")
except asyncio.TimeoutError:
print("[WARNING] Search service check timed out after 10 seconds")
except Exception as e:
print(f"[WARNING] Error checking search service: {str(e)}")
print("[INFO] Continuing startup with search service in degraded mode")
# indexing DB data
# async def indexing():
# from services.db import fetch_all_shouts
# all_shouts = await fetch_all_shouts()
# await initialize_search_index(all_shouts)
async def lifespan(_app):
try:
print("[lifespan] Starting application initialization")
create_all_tables()
# Run each initialization step separately to identify where it's hanging
try:
print("[lifespan] Connecting to Redis...")
await redis.connect()
print("[lifespan] Redis connected successfully")
except Exception as e:
print(f"[lifespan] Error connecting to Redis: {e}")
try:
print("[lifespan] Starting precache operation...")
await precache_data()
print("[lifespan] Precache completed successfully")
except Exception as e:
print(f"[lifespan] Error during precache: {e}")
try:
print("[lifespan] Initializing ViewedStorage...")
await ViewedStorage.init()
print("[lifespan] ViewedStorage initialized successfully")
except Exception as e:
print(f"[lifespan] Error initializing ViewedStorage: {e}")
try:
print("[lifespan] Creating webhook endpoint...")
await create_webhook_endpoint()
print("[lifespan] Webhook endpoint created successfully")
except Exception as e:
print(f"[lifespan] Error creating webhook endpoint: {e}")
try:
print("[lifespan] Checking search service...")
await check_search_service()
print("[lifespan] Search service check completed")
except Exception as e:
print(f"[lifespan] Error checking search service: {e}")
try:
print("[lifespan] Starting app...")
await start()
print("[lifespan] App started successfully")
except Exception as e:
print(f"[lifespan] Error starting app: {e}")
try:
print("[lifespan] Starting revalidation manager...")
await revalidation_manager.start()
print("[lifespan] Revalidation manager started successfully")
except Exception as e:
print(f"[lifespan] Error starting revalidation manager: {e}")
print("[lifespan] Basic initialization complete")
# Verify the server is ready to accept connections
import socket
import os
def check_port_available(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind(('0.0.0.0', port))
s.close()
return False # Port is available, not in use
except:
return True # Port is in use, not available
# Check if the port is in use
port = int(os.environ.get("PORT", 8000))
if check_port_available(port):
print(f"[lifespan] ✅ Server port {port} is active, ready to accept connections")
else:
print(f"[lifespan] ⚠️ Warning: Server port {port} is not bound yet!")
# Add a delay before starting the intensive search indexing
print("[lifespan] Waiting for system stabilization before search indexing...")
await asyncio.sleep(10) # 10-second delay to let the system stabilize
# Start search indexing as a background task with lower priority
try:
print("[lifespan] Creating search indexing background task...")
asyncio.create_task(initialize_search_index_background())
print("[lifespan] Search indexing task scheduled successfully")
except Exception as e:
print(f"[lifespan] Error scheduling search indexing task: {e}")
print("[lifespan] Full server startup completed successfully")
yield
except Exception as e:
import traceback
print(f"[lifespan] Critical error in lifespan function: {e}")
print(f"[lifespan] Traceback: {traceback.format_exc()}")
yield # Still yield to allow clean shutdown
finally:
print("[lifespan] Shutting down application services")
tasks = [redis.disconnect(), ViewedStorage.stop(), revalidation_manager.stop()]
await asyncio.gather(*tasks, return_exceptions=True)
print("[lifespan] Shutdown complete")
# Initialize search index in the background
async def initialize_search_index_background():
"""Run search indexing as a background task with low priority"""
try:
print("[search] Starting background search indexing process")
from services.db import fetch_all_shouts
try:
print("[search] About to fetch all shouts for indexing")
# Get total count first (optional)
all_shouts = await fetch_all_shouts()
total_count = len(all_shouts) if all_shouts else 0
print(f"[search] Fetched {total_count} shouts for background indexing")
# Skip indexing if no shouts found
if not all_shouts or total_count == 0:
print("[search] No shouts to index, skipping indexing process")
return
# Start the indexing process with the fetched shouts
print("[search] Beginning background search index initialization...")
# Add a timeout to the indexing operation
try:
index_task = initialize_search_index(all_shouts)
await asyncio.wait_for(index_task, timeout=300) # 5-minute timeout
print("[search] Background search index initialization complete")
except asyncio.TimeoutError:
print("[search] Background indexing timed out after 5 minutes")
return
# Perform a test search to verify indexing worked
try:
print("[search] Running test search to verify index...")
from services.search import search_text
search_task = search_text("test", 3)
test_results = await asyncio.wait_for(search_task, timeout=30) # 30-second timeout
print(f"[search] Test search complete with {len(test_results)} results")
except asyncio.TimeoutError:
print("[search] Test search timed out after 30 seconds")
except Exception as test_error:
print(f"[search] Test search error: {str(test_error)}")
except Exception as inner_error:
print(f"[search] Error in search indexing process: {str(inner_error)}")
import traceback
print(f"[search] Inner traceback: {traceback.format_exc()}")
print("[search] Search initialization process completed (with or without errors)")
except Exception as e:
print(f"[search] Outer error in background search indexing: {str(e)}")
import traceback
print(f"[search] Outer traceback: {traceback.format_exc()}")
# Создаем экземпляр GraphQL
graphql_app = GraphQL(schema, debug=True)
# Оборачиваем GraphQL-обработчик для лучшей обработки ошибок
async def graphql_handler(request: Request):
if request.method not in ["GET", "POST"]:
return JSONResponse({"error": "Method Not Allowed"}, status_code=405)
try:
result = await graphql_app.handle_request(request)
if isinstance(result, Response):
return result
return JSONResponse(result)
except asyncio.CancelledError:
return JSONResponse({"error": "Request cancelled"}, status_code=499)
except Exception as e:
print(f"GraphQL error: {str(e)}")
return JSONResponse({"error": str(e)}, status_code=500)
# Обновляем маршрут в Starlette
app = Starlette(
routes=[
Route("/", graphql_handler, methods=["GET", "POST"]),
Route("/new-author", WebhookEndpoint),
# Health check endpoint
Route("/health", lambda request: JSONResponse({"status": "healthy"}), methods=["GET"]),
],
lifespan=lifespan,
debug=True,
)
# Register an error handler for uncaught exceptions
@app.exception_handler(Exception)
async def handle_exception(request, exc):
print(f"Global exception handler caught: {str(exc)}")
import traceback
traceback.print_exc()
return JSONResponse(
status_code=500,
content={"detail": "Internal Server Error", "message": str(exc)}
)
app.add_middleware(ExceptionHandlerMiddleware)
if "dev" in sys.argv:
app.add_middleware(
CORSMiddleware,
allow_origins=["https://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)