import logging from functools import wraps from aiohttp import ClientSession from starlette.exceptions import HTTPException from services.core import get_author_by_user from services.logger import root_logger as logger from settings import AUTH_URL logger.setLevel(logging.DEBUG) async def check_auth(req): logger.debug("checking auth...") user_id = "" try: token = req.headers.get("Authorization") if token: # Logging the authentication token query_name = "validate_jwt_token" operation = "ValidateToken" headers = { "Content-Type": "application/json", } variables = { "params": { "token_type": "access_token", "token": token, } } gql = { "query": f"query {operation}($params: ValidateJWTTokenInput!) {{ {query_name}(params: $params) {{ is_valid claims }} }}", "variables": variables, "operationName": operation, } # Asynchronous HTTP request to the authentication server async with ClientSession() as session: async with session.post( AUTH_URL, json=gql, headers=headers ) as response: if response.status == 200: data = await response.json() errors = data.get("errors") if errors: logger.error(f"{errors}") else: user_id = ( data.get("data", {}) .get(query_name, {}) .get("claims", {}) .get("sub") ) logger.info(f"got user_id: {user_id}") return user_id except Exception as e: # Handling and logging exceptions during authentication check logger.error(e) if not user_id: raise HTTPException(status_code=401, detail="Unauthorized") def login_required(f): @wraps(f) async def decorated_function(*args, **kwargs): info = args[1] context = info.context req = context.get("request") user_id = await check_auth(req) if user_id: context["user_id"] = user_id.strip() author = get_author_by_user(user_id) if author and "id" in author: context["author_id"] = author["id"] else: logger.debug(author) HTTPException(status_code=401, detail="Unauthorized") return await f(*args, **kwargs) return decorated_function