from typing import Optional from pydantic import BaseModel from functools import wraps from starlette.authentication import AuthenticationBackend from starlette.requests import HTTPConnection from httpx import AsyncClient from httpx._exceptions import HTTPError from services.db import local_session from settings import AUTH_URL from orm.author import Author INTERNAL_AUTH_SERVER = "v2.discours" in AUTH_URL class AuthCredentials(BaseModel): user_id: Optional[int] = None scopes: Optional[dict] = {} logged_in: bool = False error_message: str = "" class JWTAuthenticate(AuthenticationBackend): async def authenticate(self, request: HTTPConnection): logged_in, user_id = await check_auth(request) return AuthCredentials(user_id=user_id, logged_in=logged_in), user_id async def check_auth(req): token = req.headers.get("Authorization") gql = ( {"mutation": "{ getSession { user { id } } }"} if INTERNAL_AUTH_SERVER else {"query": "{ session { user { id } } }"} ) headers = {"Authorization": token, "Content-Type": "application/json"} async with AsyncClient() as client: response = await client.post(AUTH_URL, headers=headers, data=gql) if response.status_code != 200: return False, None r = response.json() user_id = r.get("data", {}).get("session", {}).get("user", {}).get("id", None) is_authenticated = user_id is not None return is_authenticated, user_id async def author_id_by_user_id(user_id): async with local_session() as session: author = session(Author).where(Author.user == user_id).first() return author.id def login_required(f): @wraps(f) async def decorated_function(*args, **kwargs): info = args[1] context = info.context req = context.get("request") is_authenticated, user_id = await check_auth(req) if not is_authenticated: raise Exception("You are not logged in") else: # Добавляем author_id в контекст author_id = await author_id_by_user_id(user_id) context["author_id"] = author_id # Если пользователь аутентифицирован, выполняем резолвер return await f(*args, **kwargs) return decorated_function def auth_request(f): @wraps(f) async def decorated_function(*args, **kwargs): req = args[0] is_authenticated, user_id = await check_auth(req) if not is_authenticated: raise HTTPError("please, login first") else: req["author_id"] = ( user_id if INTERNAL_AUTH_SERVER else await author_id_by_user_id(user_id) ) return await f(*args, **kwargs) return decorated_function