Files
core/auth/utils.py
Untone e78e12eeee
Some checks failed
Deploy on push / deploy (push) Failing after 17s
circular-fix
2025-08-17 16:33:54 +03:00

180 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Вспомогательные функции для аутентификации
Содержит функции для работы с токенами, заголовками и запросами
"""
from typing import Any
from settings import SESSION_COOKIE_NAME, SESSION_TOKEN_HEADER
from utils.logger import root_logger as logger
def get_safe_headers(request: Any) -> dict[str, str]:
"""
Безопасно получает заголовки запроса.
Args:
request: Объект запроса
Returns:
Dict[str, str]: Словарь заголовков
"""
headers = {}
try:
# Первый приоритет: scope из ASGI (самый надежный источник)
if hasattr(request, "scope") and isinstance(request.scope, dict):
scope_headers = request.scope.get("headers", [])
if scope_headers:
headers.update({k.decode("utf-8").lower(): v.decode("utf-8") for k, v in scope_headers})
logger.debug(f"[decorators] Получены заголовки из request.scope: {len(headers)}")
logger.debug(f"[decorators] Заголовки из request.scope: {list(headers.keys())}")
# Второй приоритет: метод headers() или атрибут headers
if hasattr(request, "headers"):
if callable(request.headers):
h = request.headers()
if h:
headers.update({k.lower(): v for k, v in h.items()})
logger.debug(f"[decorators] Получены заголовки из request.headers() метода: {len(headers)}")
else:
h = request.headers
if hasattr(h, "items") and callable(h.items):
headers.update({k.lower(): v for k, v in h.items()})
logger.debug(f"[decorators] Получены заголовки из request.headers атрибута: {len(headers)}")
elif isinstance(h, dict):
headers.update({k.lower(): v for k, v in h.items()})
logger.debug(f"[decorators] Получены заголовки из request.headers словаря: {len(headers)}")
# Третий приоритет: атрибут _headers
if hasattr(request, "_headers") and request._headers:
headers.update({k.lower(): v for k, v in request._headers.items()})
logger.debug(f"[decorators] Получены заголовки из request._headers: {len(headers)}")
except Exception as e:
logger.warning(f"[decorators] Ошибка при доступе к заголовкам: {e}")
return headers
async def get_auth_token(request: Any) -> str | None:
"""
Извлекает токен авторизации из запроса.
Порядок проверки:
1. Проверяет auth из middleware
2. Проверяет auth из scope
3. Проверяет заголовок Authorization
4. Проверяет cookie с именем auth_token
Args:
request: Объект запроса
Returns:
Optional[str]: Токен авторизации или None
"""
try:
# 1. Проверяем auth из middleware (если middleware уже обработал токен)
if hasattr(request, "auth") and request.auth:
token = getattr(request.auth, "token", None)
if token:
token_len = len(token) if hasattr(token, "__len__") else "unknown"
logger.debug(f"[decorators] Токен получен из request.auth: {token_len}")
return token
logger.debug("[decorators] request.auth есть, но token НЕ найден")
else:
logger.debug("[decorators] request.auth НЕ найден")
# 2. Проверяем наличие auth_token в scope (приоритет)
if hasattr(request, "scope") and isinstance(request.scope, dict) and "auth_token" in request.scope:
token = request.scope.get("auth_token")
if token is not None:
token_len = len(token) if hasattr(token, "__len__") else "unknown"
logger.debug(f"[decorators] Токен получен из scope.auth_token: {token_len}")
return token
# 3. Получаем заголовки запроса безопасным способом
headers = get_safe_headers(request)
logger.debug(f"[decorators] Получены заголовки: {list(headers.keys())}")
# 4. Проверяем кастомный заголовок авторизации
auth_header_key = SESSION_TOKEN_HEADER.lower()
if auth_header_key in headers:
token = headers[auth_header_key]
logger.debug(f"[decorators] Токен найден в заголовке {SESSION_TOKEN_HEADER}")
# Убираем префикс Bearer если есть
if token.startswith("Bearer "):
token = token.replace("Bearer ", "", 1).strip()
logger.debug(f"[decorators] Обработанный токен: {len(token)}")
return token
# 5. Проверяем стандартный заголовок Authorization
if "authorization" in headers:
auth_header = headers["authorization"]
logger.debug(f"[decorators] Найден заголовок Authorization: {auth_header[:20]}...")
if auth_header.startswith("Bearer "):
token = auth_header.replace("Bearer ", "", 1).strip()
logger.debug(f"[decorators] Извлечен Bearer токен: {len(token)}")
return token
else:
logger.debug("[decorators] Authorization заголовок не содержит Bearer токен")
# 6. Проверяем cookies
if hasattr(request, "cookies") and request.cookies:
if isinstance(request.cookies, dict):
cookies = request.cookies
elif hasattr(request.cookies, "get"):
cookies = {k: request.cookies.get(k) for k in getattr(request.cookies, "keys", lambda: [])()}
else:
cookies = {}
logger.debug(f"[decorators] Доступные cookies: {list(cookies.keys())}")
# Проверяем кастомную cookie
if SESSION_COOKIE_NAME in cookies:
token = cookies[SESSION_COOKIE_NAME]
logger.debug(f"[decorators] Токен найден в cookie {SESSION_COOKIE_NAME}: {len(token)}")
return token
# Проверяем стандартную cookie
if "auth_token" in cookies:
token = cookies["auth_token"]
logger.debug(f"[decorators] Токен найден в cookie auth_token: {len(token)}")
return token
logger.debug("[decorators] Токен НЕ найден ни в одном источнике")
return None
except Exception as e:
logger.error(f"[decorators] Критическая ошибка при извлечении токена: {e}")
return None
def extract_bearer_token(auth_header: str) -> str | None:
"""
Извлекает токен из заголовка Authorization с Bearer схемой.
Args:
auth_header: Заголовок Authorization
Returns:
Optional[str]: Извлеченный токен или None
"""
if not auth_header:
return None
if auth_header.startswith("Bearer "):
return auth_header[7:].strip()
return None
def format_auth_header(token: str) -> str:
"""
Форматирует токен в заголовок Authorization.
Args:
token: Токен авторизации
Returns:
str: Отформатированный заголовок
"""
return f"Bearer {token}"