import os from importlib import import_module from os.path import exists from ariadne import load_schema_from_path, make_executable_schema from ariadne.asgi import GraphQL from sentry_sdk.integrations.aiohttp import AioHttpIntegration from sentry_sdk.integrations.ariadne import AriadneIntegration from sentry_sdk.integrations.redis import RedisIntegration from starlette.applications import Starlette import asyncio from services.rediscache import redis from services.schema import resolvers from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN from services.core import CacheStorage import_module("resolvers") schema = make_executable_schema(load_schema_from_path("inbox.graphql"), resolvers) # type: ignore async def start_up(): await redis.connect() await CacheStorage.init() if MODE == "dev": if not exists(DEV_SERVER_PID_FILE_NAME): with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f: f.write(str(os.getpid())) else: # startup sentry monitoring services try: import sentry_sdk sentry_sdk.init( SENTRY_DSN, enable_tracing=True, integrations=[ AriadneIntegration(), RedisIntegration(), AioHttpIntegration(), ], ) except Exception as e: print("[sentry] init error") print(e) async def shutdown(): await redis.disconnect() app = Starlette(debug=True, on_startup=[start_up], on_shutdown=[shutdown]) app.mount("/", GraphQL(schema, debug=True))