Files
core/auth/utils.py
Untone 4d42e01bd0
Some checks failed
Deploy on push / deploy (push) Failing after 3m6s
[0.9.13] - 2025-08-27
### 🚨 Исправлено
- **Удалено поле username из модели Author**: Поле `username` больше не является частью модели `Author`
  - Убрано свойство `@property def username` из `orm/author.py`
  - Обновлены все сервисы для использования `email` или `slug` вместо `username`
  - Исправлены резолверы для исключения `username` при обработке данных автора
  - Поле `username` теперь используется только в JWT токенах для совместимости

### 🧪 Исправлено
- **E2E тесты админ-панели**: Полностью переработаны E2E тесты для работы с реальным API
  - Тесты теперь делают реальные HTTP запросы к GraphQL API
  - Бэкенд для тестов использует выделенную тестовую БД (`test_e2e.db`)
  - Создан фикстура `backend_server` для запуска тестового сервера
  - Добавлен фикстура `create_test_users_in_backend_db` для регистрации пользователей через API
  - Убраны несуществующие GraphQL запросы (`get_community_stats`)
  - Тесты корректно работают с системой ролей и правами администратора

### �� Техническое
- **Рефакторинг аутентификации**: Упрощена логика работы с пользователями
  - Убраны зависимости от несуществующих полей в ORM моделях
  - Обновлены сервисы аутентификации для корректной работы без `username`
  - Исправлены все места использования `username` в коде
- **Улучшена тестовая инфраструктура**:
  - Тесты теперь используют реальный HTTP API вместо прямых DB проверок
  - Правильная изоляция тестовых данных через отдельную БД
  - Корректная работа с системой ролей и правами
2025-08-27 12:15:01 +03:00

295 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Вспомогательные функции для аутентификации
Содержит функции для работы с токенами, заголовками и запросами
"""
from typing import Any, Tuple
from settings import SESSION_COOKIE_NAME, SESSION_TOKEN_HEADER
from utils.logger import root_logger as logger
def get_safe_headers(request: Any) -> dict[str, str]:
"""
Безопасно получает заголовки запроса.
Args:
request: Объект запроса
Returns:
Dict[str, str]: Словарь заголовков
"""
headers = {}
try:
# Первый приоритет: scope из ASGI (самый надежный источник)
if hasattr(request, "scope") and isinstance(request.scope, dict):
scope_headers = request.scope.get("headers", [])
if scope_headers:
headers.update({k.decode("utf-8").lower(): v.decode("utf-8") for k, v in scope_headers})
logger.debug(f"[decorators] Получены заголовки из request.scope: {len(headers)}")
logger.debug(f"[decorators] Заголовки из request.scope: {list(headers.keys())}")
# Второй приоритет: метод headers() или атрибут headers
if hasattr(request, "headers"):
if callable(request.headers):
h = request.headers()
if h:
headers.update({k.lower(): v for k, v in h.items()})
logger.debug(f"[decorators] Получены заголовки из request.headers() метода: {len(headers)}")
else:
h = request.headers
if hasattr(h, "items") and callable(h.items):
headers.update({k.lower(): v for k, v in h.items()})
logger.debug(f"[decorators] Получены заголовки из request.headers атрибута: {len(headers)}")
elif isinstance(h, dict):
headers.update({k.lower(): v for k, v in h.items()})
logger.debug(f"[decorators] Получены заголовки из request.headers словаря: {len(headers)}")
# Третий приоритет: атрибут _headers
if hasattr(request, "_headers") and request._headers:
headers.update({k.lower(): v for k, v in request._headers.items()})
logger.debug(f"[decorators] Получены заголовки из request._headers: {len(headers)}")
except Exception as e:
logger.warning(f"[decorators] Ошибка при доступе к заголовкам: {e}")
return headers
async def extract_token_from_request(request) -> str | None:
"""
DRY функция для извлечения токена из request.
Проверяет cookies и заголовок Authorization.
Args:
request: Request объект
Returns:
Optional[str]: Токен или None
"""
if not request:
return None
# 1. Проверяем cookies
if hasattr(request, "cookies") and request.cookies:
token = request.cookies.get(SESSION_COOKIE_NAME)
if token:
logger.debug(f"[utils] Токен получен из cookie {SESSION_COOKIE_NAME}")
return token
# 2. Проверяем заголовок Authorization
headers = get_safe_headers(request)
auth_header = headers.get("authorization", "")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header[7:].strip()
logger.debug("[utils] Токен получен из заголовка Authorization")
return token
logger.debug("[utils] Токен не найден ни в cookies, ни в заголовке")
return None
async def get_user_data_by_token(token: str) -> Tuple[bool, dict | None, str | None]:
"""
Получает данные пользователя по токену.
Args:
token: Токен авторизации
Returns:
Tuple[bool, Optional[dict], Optional[str]]: (success, user_data, error_message)
"""
try:
from auth.tokens.storage import TokenStorage as TokenManager
from orm.author import Author
from storage.db import local_session
# Проверяем сессию через TokenManager
payload = await TokenManager.verify_session(token)
if not payload:
return False, None, "Сессия не найдена"
# Получаем user_id из payload
user_id = payload.user_id if hasattr(payload, "user_id") else payload.get("user_id")
if not user_id:
return False, None, "Токен не содержит user_id"
# Получаем данные пользователя
with local_session() as session:
author_obj = session.query(Author).where(Author.id == int(user_id)).first()
if not author_obj:
return False, None, f"Пользователь с ID {user_id} не найден в БД"
try:
user_data = author_obj.dict()
except Exception:
user_data = {
"id": author_obj.id,
"email": author_obj.email,
"name": getattr(author_obj, "name", ""),
"slug": getattr(author_obj, "slug", ""),
}
logger.debug(f"[utils] Данные пользователя получены для ID {user_id}")
return True, user_data, None
except Exception as e:
logger.error(f"[utils] Ошибка при получении данных пользователя: {e}")
return False, None, f"Ошибка получения данных: {e!s}"
async def get_auth_token_from_context(info: Any) -> str | None:
"""
Извлекает токен авторизации из GraphQL контекста.
Порядок проверки:
1. Проверяет заголовок Authorization
2. Проверяет cookie session_token
3. Переиспользует логику get_auth_token для request
Args:
info: GraphQLResolveInfo объект
Returns:
Optional[str]: Токен авторизации или None
"""
try:
context = getattr(info, "context", {})
request = context.get("request")
if request:
# Переиспользуем существующую логику для request
return await get_auth_token(request)
# Если request отсутствует, возвращаем None
logger.debug("[utils] Request отсутствует в GraphQL контексте")
return None
except Exception as e:
logger.error(f"[utils] Ошибка при извлечении токена из GraphQL контекста: {e}")
return None
async def get_auth_token(request: Any) -> str | None:
"""
Извлекает токен авторизации из запроса.
Порядок проверки:
1. Проверяет auth из middleware
2. Проверяет auth из scope
3. Проверяет заголовок Authorization
4. Проверяет cookie с именем auth_token
Args:
request: Объект запроса
Returns:
Optional[str]: Токен авторизации или None
"""
try:
# 1. Проверяем auth из middleware (если middleware уже обработал токен)
if hasattr(request, "auth") and request.auth:
token = getattr(request.auth, "token", None)
if token:
token_len = len(token) if hasattr(token, "__len__") else "unknown"
logger.debug(f"[decorators] Токен получен из request.auth: {token_len}")
return token
logger.debug("[decorators] request.auth есть, но token НЕ найден")
else:
logger.debug("[decorators] request.auth НЕ найден")
# 2. Проверяем наличие auth_token в scope (приоритет)
if hasattr(request, "scope") and isinstance(request.scope, dict) and "auth_token" in request.scope:
token = request.scope.get("auth_token")
if token is not None:
token_len = len(token) if hasattr(token, "__len__") else "unknown"
logger.debug(f"[decorators] Токен получен из scope.auth_token: {token_len}")
return token
# 3. Получаем заголовки запроса безопасным способом
headers = get_safe_headers(request)
logger.debug(f"[decorators] Получены заголовки: {list(headers.keys())}")
# 4. Проверяем кастомный заголовок авторизации
auth_header_key = SESSION_TOKEN_HEADER.lower()
if auth_header_key in headers:
token = headers[auth_header_key]
logger.debug(f"[decorators] Токен найден в заголовке {SESSION_TOKEN_HEADER}")
# Убираем префикс Bearer если есть
if token.startswith("Bearer "):
token = token.replace("Bearer ", "", 1).strip()
logger.debug(f"[decorators] Обработанный токен: {len(token)}")
return token
# 5. Проверяем стандартный заголовок Authorization
if "authorization" in headers:
auth_header = headers["authorization"]
logger.debug(f"[decorators] Найден заголовок Authorization: {auth_header[:20]}...")
if auth_header.startswith("Bearer "):
token = auth_header.replace("Bearer ", "", 1).strip()
logger.debug(f"[decorators] Извлечен Bearer токен: {len(token)}")
return token
logger.debug("[decorators] Authorization заголовок не содержит Bearer токен")
# 6. Проверяем cookies
if hasattr(request, "cookies") and request.cookies:
if isinstance(request.cookies, dict):
cookies = request.cookies
elif hasattr(request.cookies, "get"):
cookies = {k: request.cookies.get(k) for k in getattr(request.cookies, "keys", list)()}
else:
cookies = {}
logger.debug(f"[decorators] Доступные cookies: {list(cookies.keys())}")
# Проверяем кастомную cookie
if SESSION_COOKIE_NAME in cookies:
token = cookies[SESSION_COOKIE_NAME]
logger.debug(f"[decorators] Токен найден в cookie {SESSION_COOKIE_NAME}: {len(token)}")
return token
# Проверяем стандартную cookie
if "auth_token" in cookies:
token = cookies["auth_token"]
logger.debug(f"[decorators] Токен найден в cookie auth_token: {len(token)}")
return token
logger.debug("[decorators] Токен НЕ найден ни в одном источнике")
return None
except Exception as e:
logger.error(f"[decorators] Критическая ошибка при извлечении токена: {e}")
return None
def extract_bearer_token(auth_header: str) -> str | None:
"""
Извлекает токен из заголовка Authorization с Bearer схемой.
Args:
auth_header: Заголовок Authorization
Returns:
Optional[str]: Извлеченный токен или None
"""
if not auth_header:
return None
if auth_header.startswith("Bearer "):
return auth_header[7:].strip()
return None
def format_auth_header(token: str) -> str:
"""
Форматирует токен в заголовок Authorization.
Args:
token: Токен авторизации
Returns:
str: Отформатированный заголовок
"""
return f"Bearer {token}"