import logging from functools import wraps from aiohttp import ClientSession from starlette.exceptions import HTTPException from services.core import get_author_by_user from settings import AUTH_URL logger = logging.getLogger('[services.auth] ') logger.setLevel(logging.DEBUG) async def check_auth(req) -> str | None: logger.debug('checking auth...') user_id = '' try: token = req.headers.get('Authorization') if token: # Logging the authentication token query_name = 'validate_jwt_token' operation = 'ValidateToken' headers = { 'Content-Type': 'application/json', } variables = { 'params': { 'token_type': 'access_token', 'token': token, } } gql = { 'query': f'query {operation}($params: ValidateJWTTokenInput!) {{ {query_name}(params: $params) {{ is_valid claims }} }}', 'variables': variables, 'operationName': operation, } # Asynchronous HTTP request to the authentication server async with ClientSession() as session: async with session.post( AUTH_URL, json=gql, headers=headers ) as response: if response.status == 200: data = await response.json() errors = data.get('errors') if errors: logger.error(f'{errors}') else: user_id = ( data.get('data', {}) .get(query_name, {}) .get('claims', {}) .get('sub') ) logger.info(f'got user_id: {user_id}') return user_id except Exception as e: # Handling and logging exceptions during authentication check logger.error(e) if not user_id: raise HTTPException(status_code=401, detail='Unauthorized') def login_required(f): @wraps(f) async def decorated_function(*args, **kwargs): info = args[1] context = info.context req = context.get('request') user_id = await check_auth(req) if user_id: context['user_id'] = user_id.strip() author = get_author_by_user(user_id) if author and 'id' in author: context['author_id'] = author['id'] else: logger.debug(author) HTTPException(status_code=401, detail='Unauthorized') return await f(*args, **kwargs) return decorated_function