inbox/services/auth.py
2023-10-03 18:29:56 +03:00

97 lines
3.0 KiB
Python

from typing import Optional
from pydantic import BaseModel
from functools import wraps
from starlette.authentication import AuthenticationBackend
from starlette.requests import HTTPConnection
from graphql.error import GraphQLError
from httpx import AsyncClient
from services.db import local_session
from settings import AUTH_URL
from orm.author import Author
class AuthUser(BaseModel):
user_id: Optional[int]
username: Optional[str]
class AuthCredentials(BaseModel):
user_id: Optional[int] = None
scopes: Optional[dict] = {}
logged_in: bool = False
error_message: str = ""
class JWTAuthenticate(AuthenticationBackend):
async def authenticate(self, request: HTTPConnection):
scopes = {} # TODO: integrate await user.get_permission
logged_in, user_id = await check_auth(request)
return (
AuthCredentials(user_id=user_id, scopes=scopes, logged_in=logged_in),
AuthUser(user_id=user_id, username=""),
)
class Unauthorized(GraphQLError):
code = 401
message = "401 Unauthorized"
async def check_auth(req):
token = req.headers.get("Authorization")
gql = (
{"mutation": "{ getSession { user { id } } }"}
if "v2" in AUTH_URL
else {"query": "{ session { user { id } } }"}
)
headers = {"Authorization": token, "Content-Type": "application/json"}
async with AsyncClient() as client:
response = await client.post(AUTH_URL, headers=headers, data=gql)
if response.status_code != 200:
return False, None
r = response.json()
user_id = r.get("data", {}).get("session", {}).get("user", {}).get("id", None)
is_authenticated = user_id is not None
return is_authenticated, user_id
def author_id_by_user_id(user_id):
async with local_session() as session:
author = session(Author).where(Author.user == user_id).first()
return author.id
def login_required(f):
@wraps(f)
async def decorated_function(*args, **kwargs):
info = args[1]
context = info.context
req = context.get("request")
is_authenticated, user_id = await check_auth(req)
if not is_authenticated:
raise Exception("You are not logged in")
else:
# Добавляем author_id в контекст
author_id = await author_id_by_user_id(user_id)
context["author_id"] = author_id
# Если пользователь аутентифицирован, выполняем резолвер
return await f(*args, **kwargs)
return decorated_function
def auth_request(f):
@wraps(f)
async def decorated_function(*args, **kwargs):
req = args[0]
is_authenticated, user_id = await check_auth(req)
if not is_authenticated:
raise Unauthorized("You are not logged in")
else:
author_id = await author_id_by_user_id(user_id)
req["author_id"] = author_id
return await f(*args, **kwargs)
return decorated_function