from typing import Optional from pydantic import BaseModel from functools import wraps from starlette.authentication import AuthenticationBackend from starlette.requests import HTTPConnection from graphql.error import GraphQLError from httpx import AsyncClient from services.db import local_session from settings import AUTH_URL from orm.author import Author class AuthUser(BaseModel): user_id: Optional[int] username: Optional[str] class AuthCredentials(BaseModel): user_id: Optional[int] = None scopes: Optional[dict] = {} logged_in: bool = False error_message: str = "" class JWTAuthenticate(AuthenticationBackend): async def authenticate(self, request: HTTPConnection): scopes = {} # TODO: integrate await user.get_permission logged_in, user_id = await check_auth(request) return ( AuthCredentials(user_id=user_id, scopes=scopes, logged_in=logged_in), AuthUser(user_id=user_id, username=""), ) class Unauthorized(GraphQLError): code = 401 message = "401 Unauthorized" async def check_auth(req): token = req.headers.get("Authorization") gql = ( {"mutation": "{ getSession { user { id } } }"} if "v2" in AUTH_URL else {"query": "{ session { user { id } } }"} ) headers = {"Authorization": token, "Content-Type": "application/json"} async with AsyncClient() as client: response = await client.post(AUTH_URL, headers=headers, data=gql) if response.status_code != 200: return False, None r = response.json() user_id = r.get("data", {}).get("session", {}).get("user", {}).get("id", None) is_authenticated = user_id is not None return is_authenticated, user_id def author_id_by_user_id(user_id): async with local_session() as session: author = session(Author).where(Author.user == user_id).first() return author.id def login_required(f): @wraps(f) async def decorated_function(*args, **kwargs): info = args[1] context = info.context req = context.get("request") is_authenticated, user_id = await check_auth(req) if not is_authenticated: raise Exception("You are not logged in") else: # Добавляем author_id в контекст author_id = await author_id_by_user_id(user_id) context["author_id"] = author_id # Если пользователь аутентифицирован, выполняем резолвер return await f(*args, **kwargs) return decorated_function def auth_request(f): @wraps(f) async def decorated_function(*args, **kwargs): req = args[0] is_authenticated, user_id = await check_auth(req) if not is_authenticated: raise Unauthorized("You are not logged in") else: author_id = await author_id_by_user_id(user_id) req["author_id"] = author_id return await f(*args, **kwargs) return decorated_function