import os import re from sqlalchemy import select from starlette.endpoints import HTTPEndpoint from starlette.exceptions import HTTPException from starlette.requests import Request from starlette.responses import JSONResponse from orm.author import Author from resolvers.stat import get_with_stat from services.cache import cache_author from services.db import local_session class WebhookEndpoint(HTTPEndpoint): async def post(self, request: Request) -> JSONResponse: try: data = await request.json() if not data: raise HTTPException(status_code=400, detail="Request body is empty") auth = request.headers.get("Authorization") if not auth or auth != os.environ.get("WEBHOOK_SECRET"): raise HTTPException( status_code=401, detail="Invalid Authorization header" ) # logger.debug(data) user = data.get("user") if not isinstance(user, dict): raise HTTPException( status_code=400, detail="User data is not a dictionary" ) # name: str = ( f"{user.get('given_name', user.get('slug'))} {user.get('middle_name', '')}" + f"{user.get('family_name', '')}".strip() ) or "Аноним" user_id: str = user.get("id", "") email: str = user.get("email", "") pic: str = user.get("picture", "") if user_id: with local_session() as session: author = ( session.query(Author).filter(Author.user == user_id).first() ) if not author: # If the author does not exist, create a new one slug: str = email.split("@")[0].replace(".", "-").lower() slug: str = re.sub("[^0-9a-z]+", "-", slug) while True: author = ( session.query(Author) .filter(Author.slug == slug) .first() ) if not author: break slug = f"{slug}-{len(session.query(Author).filter(Author.email == email).all()) + 1}" author = Author(user=user_id, slug=slug, name=name, pic=pic) session.add(author) session.commit() author_query = select(Author).filter(Author.user == user_id) [author_with_stat] = get_with_stat(author_query) if author_with_stat: await cache_author(author_with_stat) return JSONResponse({"status": "success"}) except HTTPException as e: return JSONResponse( {"status": "error", "message": str(e.detail)}, status_code=e.status_code ) except Exception as e: import traceback traceback.print_exc() return JSONResponse({"status": "error", "message": str(e)}, status_code=500)