Files
core/auth/middleware.py
Untone af0f3e3dea
All checks were successful
Deploy on push / deploy (push) Successful in 2m55s
lesslogs
2025-09-28 17:26:23 +03:00

382 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Единый middleware для обработки авторизации в GraphQL запросах
"""
import time
from collections.abc import Awaitable, MutableMapping
from typing import Any, Callable
from graphql import GraphQLResolveInfo
from sqlalchemy.orm import exc
from starlette.authentication import UnauthenticatedUser
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.types import ASGIApp
from auth.credentials import AuthCredentials
from auth.tokens.storage import TokenStorage as TokenManager
from orm.author import Author
from settings import (
ADMIN_EMAILS as ADMIN_EMAILS_LIST,
)
from settings import (
SESSION_COOKIE_DOMAIN,
SESSION_COOKIE_HTTPONLY,
SESSION_COOKIE_NAME,
SESSION_COOKIE_SAMESITE,
SESSION_COOKIE_SECURE,
SESSION_TOKEN_HEADER,
)
from storage.db import local_session
from utils.logger import root_logger as logger
ADMIN_EMAILS = ADMIN_EMAILS_LIST.split(",")
class AuthenticatedUser:
"""Аутентифицированный пользователь"""
def __init__(
self,
user_id: str,
username: str = "",
roles: list | None = None,
permissions: dict | None = None,
token: str | None = None,
) -> None:
self.user_id = user_id
self.username = username
self.roles = roles or []
self.permissions = permissions or {}
self.token = token
@property
def is_authenticated(self) -> bool:
return True
@property
def display_name(self) -> str:
return self.username
@property
def identity(self) -> str:
return self.user_id
class AuthMiddleware:
"""
Единый middleware для обработки авторизации и аутентификации.
Основные функции:
1. Извлечение Bearer токена из заголовка Authorization или cookie
2. Проверка сессии через TokenStorage
3. Создание request.user и request.auth
4. Предоставление методов для установки/удаления cookies
"""
def __init__(self, app: ASGIApp) -> None:
self.app = app
self._context = None
async def authenticate_user(self, token: str) -> tuple[AuthCredentials, AuthenticatedUser | UnauthenticatedUser]:
"""Аутентифицирует пользователя по токену"""
if not token:
return AuthCredentials(
author_id=None, scopes={}, logged_in=False, error_message="no token", email=None, token=None
), UnauthenticatedUser()
# Проверяем сессию в Redis
try:
payload = await TokenManager.verify_session(token)
if not payload:
logger.debug("[auth.authenticate] Недействительный токен или сессия не найдена")
return AuthCredentials(
author_id=None,
scopes={},
logged_in=False,
error_message="Invalid token or session",
email=None,
token=None,
), UnauthenticatedUser()
with local_session() as session:
try:
# payload может быть словарем или объектом, обрабатываем оба случая
user_id = payload.user_id if hasattr(payload, "user_id") else payload.get("user_id")
if not user_id:
logger.debug("[auth.authenticate] user_id не найден в payload")
return AuthCredentials(
author_id=None,
scopes={},
logged_in=False,
error_message="Invalid token payload",
email=None,
token=None,
), UnauthenticatedUser()
author = session.query(Author).where(Author.id == user_id).one()
if author.is_locked():
logger.debug(f"[auth.authenticate] Аккаунт заблокирован: {author.id}")
return AuthCredentials(
author_id=None,
scopes={},
logged_in=False,
error_message="Account is locked",
email=None,
token=None,
), UnauthenticatedUser()
# Создаем пустой словарь разрешений
# Разрешения будут проверяться через RBAC систему по требованию
scopes: dict[str, Any] = {}
# Роли пользователя будут определяться в контексте конкретной операции
# через RBAC систему, а не здесь
roles: list[str] = []
# Обновляем last_seen
author.last_seen = int(time.time())
session.commit()
# Создаем объекты авторизации с сохранением токена
credentials = AuthCredentials(
author_id=author.id,
scopes=scopes,
logged_in=True,
error_message="",
email=author.email,
token=token,
)
user = AuthenticatedUser(
user_id=str(author.id),
username=author.slug or author.email or "",
roles=roles,
permissions=scopes,
token=token,
)
logger.debug(f"[auth.authenticate] Успешная аутентификация: {author.email}")
return credentials, user
except exc.NoResultFound:
logger.debug("[auth.authenticate] Пользователь не найден в базе данных")
return AuthCredentials(
author_id=None,
scopes={},
logged_in=False,
error_message="User not found",
email=None,
token=None,
), UnauthenticatedUser()
except Exception as e:
logger.warning(f"[auth.authenticate] Ошибка при работе с базой данных: {e}")
return AuthCredentials(
author_id=None, scopes={}, logged_in=False, error_message=str(e), email=None, token=None
), UnauthenticatedUser()
except Exception as e:
logger.warning(f"[auth.authenticate] Ошибка при проверке сессии: {e}")
return AuthCredentials(
author_id=None, scopes={}, logged_in=False, error_message=str(e), email=None, token=None
), UnauthenticatedUser()
async def __call__(
self,
scope: MutableMapping[str, Any],
receive: Callable[[], Awaitable[MutableMapping[str, Any]]],
send: Callable[[MutableMapping[str, Any]], Awaitable[None]],
) -> None:
"""Обработка ASGI запроса"""
if scope["type"] != "http":
await self.app(scope, receive, send)
return
# Извлекаем заголовки используя тот же механизм, что и get_safe_headers
headers = {}
# Первый приоритет: scope из ASGI (самый надежный источник)
if "headers" in scope:
scope_headers = scope.get("headers", [])
if scope_headers:
headers.update({k.decode("utf-8").lower(): v.decode("utf-8") for k, v in scope_headers})
# Используем тот же механизм получения токена, что и в декораторе
token = None
# 0. Проверяем сохраненный токен в scope (приоритет)
if "auth_token" in scope:
token = scope["auth_token"]
# 1. Проверяем заголовок Authorization
if not token:
auth_header = headers.get("authorization", "")
if auth_header:
token = auth_header[7:].strip() if auth_header.startswith("Bearer ") else auth_header.strip()
# 2. Проверяем основной заголовок авторизации, если Authorization не найден
if not token:
auth_header = headers.get(SESSION_TOKEN_HEADER.lower(), "")
if auth_header:
token = auth_header[7:].strip() if auth_header.startswith("Bearer ") else auth_header.strip()
# 3. Проверяем cookie
if not token:
cookies = headers.get("cookie", "")
if cookies:
cookie_items = cookies.split(";")
for item in cookie_items:
if "=" in item:
name, value = item.split("=", 1)
cookie_name = name.strip()
if cookie_name == SESSION_COOKIE_NAME:
token = value.strip()
break
# Аутентифицируем пользователя
auth, user = await self.authenticate_user(token or "")
# Добавляем в scope данные авторизации и пользователя
scope["auth"] = auth
scope["user"] = user
# Сохраняем токен в scope для использования в последующих запросах
if token:
scope["auth_token"] = token
await self.app(scope, receive, send)
def set_context(self, context) -> None:
"""Сохраняет ссылку на контекст GraphQL запроса"""
self._context = context
def set_cookie(self, key: str, value: str, **options: Any) -> None:
"""
Устанавливает cookie в ответе
Args:
key: Имя cookie
value: Значение cookie
**options: Дополнительные параметры (httponly, secure, max_age, etc.)
"""
success = False
# Способ 1: Через response
if self._context and "response" in self._context and hasattr(self._context["response"], "set_cookie"):
try:
self._context["response"].set_cookie(key, value, **options)
success = True
except Exception as e:
logger.error(f"[middleware] Ошибка при установке cookie {key} через response: {e!s}")
# Способ 2: Через собственный response в контексте
if not success and hasattr(self, "_response") and self._response and hasattr(self._response, "set_cookie"):
try:
self._response.set_cookie(key, value, **options)
success = True
except Exception as e:
logger.error(f"[middleware] Ошибка при установке cookie {key} через _response: {e!s}")
if not success:
logger.error(f"[middleware] Не удалось установить cookie {key}: объекты response недоступны")
def delete_cookie(self, key: str, **options: Any) -> None:
"""
Удаляет cookie из ответа
"""
success = False
# Способ 1: Через response
if self._context and "response" in self._context and hasattr(self._context["response"], "delete_cookie"):
try:
self._context["response"].delete_cookie(key, **options)
success = True
except Exception as e:
logger.error(f"[middleware] Ошибка при удалении cookie {key} через response: {e!s}")
# Способ 2: Через собственный response в контексте
if not success and hasattr(self, "_response") and self._response and hasattr(self._response, "delete_cookie"):
try:
self._response.delete_cookie(key, **options)
success = True
except Exception as e:
logger.error(f"[middleware] Ошибка при удалении cookie {key} через _response: {e!s}")
if not success:
logger.error(f"[middleware] Не удалось удалить cookie {key}: объекты response недоступны")
async def resolve(
self, next_resolver: Callable[..., Any], root: Any, info: GraphQLResolveInfo, *args: Any, **kwargs: Any
) -> Any:
"""
Middleware для обработки запросов GraphQL.
Добавляет методы для установки cookie в контекст.
"""
try:
# Получаем доступ к контексту запроса
context = info.context
# Сохраняем ссылку на контекст
self.set_context(context)
# Добавляем себя как объект, содержащий утилитные методы
context["extensions"] = self
# Проверяем наличие response в контексте
if "response" not in context or not context["response"]:
context["response"] = JSONResponse({})
return await next_resolver(root, info, *args, **kwargs)
except Exception as e:
logger.error(f"[AuthMiddleware] Ошибка в GraphQL resolve: {e!s}")
raise
async def process_result(self, request: Request, result: Any) -> Response:
"""
Обрабатывает результат GraphQL запроса, поддерживая установку cookie
Args:
request: Starlette Request объект
result: результат GraphQL запроса (dict или Response)
Returns:
Response: HTTP-ответ с результатом и cookie (если необходимо)
"""
# Проверяем, является ли result уже объектом Response
response = result if isinstance(result, Response) else JSONResponse(result)
# Проверяем, был ли токен в запросе или ответе
if request.method == "POST":
try:
data = await request.json()
op_name = data.get("operationName", "").lower()
# Если это операция logout, удаляем cookie
if op_name == "logout":
response.delete_cookie(
key=SESSION_COOKIE_NAME,
secure=SESSION_COOKIE_SECURE,
httponly=SESSION_COOKIE_HTTPONLY,
samesite=SESSION_COOKIE_SAMESITE
if SESSION_COOKIE_SAMESITE in ["strict", "lax", "none"]
else "none",
domain=SESSION_COOKIE_DOMAIN,
)
except Exception as e:
logger.error(f"[process_result] Ошибка при обработке POST запроса: {e!s}")
return response
# Создаем единый экземпляр AuthMiddleware для использования с GraphQL
async def _dummy_app(
scope: MutableMapping[str, Any],
receive: Callable[[], Awaitable[MutableMapping[str, Any]]],
send: Callable[[MutableMapping[str, Any]], Awaitable[None]],
) -> None:
"""Dummy ASGI app for middleware initialization"""
auth_middleware = AuthMiddleware(_dummy_app)