Some checks failed
Deploy on push / deploy (push) Failing after 3m6s
### 🚨 Исправлено - **Удалено поле username из модели Author**: Поле `username` больше не является частью модели `Author` - Убрано свойство `@property def username` из `orm/author.py` - Обновлены все сервисы для использования `email` или `slug` вместо `username` - Исправлены резолверы для исключения `username` при обработке данных автора - Поле `username` теперь используется только в JWT токенах для совместимости ### 🧪 Исправлено - **E2E тесты админ-панели**: Полностью переработаны E2E тесты для работы с реальным API - Тесты теперь делают реальные HTTP запросы к GraphQL API - Бэкенд для тестов использует выделенную тестовую БД (`test_e2e.db`) - Создан фикстура `backend_server` для запуска тестового сервера - Добавлен фикстура `create_test_users_in_backend_db` для регистрации пользователей через API - Убраны несуществующие GraphQL запросы (`get_community_stats`) - Тесты корректно работают с системой ролей и правами администратора ### �� Техническое - **Рефакторинг аутентификации**: Упрощена логика работы с пользователями - Убраны зависимости от несуществующих полей в ORM моделях - Обновлены сервисы аутентификации для корректной работы без `username` - Исправлены все места использования `username` в коде - **Улучшена тестовая инфраструктура**: - Тесты теперь используют реальный HTTP API вместо прямых DB проверок - Правильная изоляция тестовых данных через отдельную БД - Корректная работа с системой ролей и правами
295 lines
12 KiB
Python
295 lines
12 KiB
Python
"""
|
||
Вспомогательные функции для аутентификации
|
||
Содержит функции для работы с токенами, заголовками и запросами
|
||
"""
|
||
|
||
from typing import Any, Tuple
|
||
|
||
from settings import SESSION_COOKIE_NAME, SESSION_TOKEN_HEADER
|
||
from utils.logger import root_logger as logger
|
||
|
||
|
||
def get_safe_headers(request: Any) -> dict[str, str]:
|
||
"""
|
||
Безопасно получает заголовки запроса.
|
||
|
||
Args:
|
||
request: Объект запроса
|
||
|
||
Returns:
|
||
Dict[str, str]: Словарь заголовков
|
||
"""
|
||
headers = {}
|
||
try:
|
||
# Первый приоритет: scope из ASGI (самый надежный источник)
|
||
if hasattr(request, "scope") and isinstance(request.scope, dict):
|
||
scope_headers = request.scope.get("headers", [])
|
||
if scope_headers:
|
||
headers.update({k.decode("utf-8").lower(): v.decode("utf-8") for k, v in scope_headers})
|
||
logger.debug(f"[decorators] Получены заголовки из request.scope: {len(headers)}")
|
||
logger.debug(f"[decorators] Заголовки из request.scope: {list(headers.keys())}")
|
||
|
||
# Второй приоритет: метод headers() или атрибут headers
|
||
if hasattr(request, "headers"):
|
||
if callable(request.headers):
|
||
h = request.headers()
|
||
if h:
|
||
headers.update({k.lower(): v for k, v in h.items()})
|
||
logger.debug(f"[decorators] Получены заголовки из request.headers() метода: {len(headers)}")
|
||
else:
|
||
h = request.headers
|
||
if hasattr(h, "items") and callable(h.items):
|
||
headers.update({k.lower(): v for k, v in h.items()})
|
||
logger.debug(f"[decorators] Получены заголовки из request.headers атрибута: {len(headers)}")
|
||
elif isinstance(h, dict):
|
||
headers.update({k.lower(): v for k, v in h.items()})
|
||
logger.debug(f"[decorators] Получены заголовки из request.headers словаря: {len(headers)}")
|
||
|
||
# Третий приоритет: атрибут _headers
|
||
if hasattr(request, "_headers") and request._headers:
|
||
headers.update({k.lower(): v for k, v in request._headers.items()})
|
||
logger.debug(f"[decorators] Получены заголовки из request._headers: {len(headers)}")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[decorators] Ошибка при доступе к заголовкам: {e}")
|
||
|
||
return headers
|
||
|
||
|
||
async def extract_token_from_request(request) -> str | None:
|
||
"""
|
||
DRY функция для извлечения токена из request.
|
||
Проверяет cookies и заголовок Authorization.
|
||
|
||
Args:
|
||
request: Request объект
|
||
|
||
Returns:
|
||
Optional[str]: Токен или None
|
||
"""
|
||
if not request:
|
||
return None
|
||
|
||
# 1. Проверяем cookies
|
||
if hasattr(request, "cookies") and request.cookies:
|
||
token = request.cookies.get(SESSION_COOKIE_NAME)
|
||
if token:
|
||
logger.debug(f"[utils] Токен получен из cookie {SESSION_COOKIE_NAME}")
|
||
return token
|
||
|
||
# 2. Проверяем заголовок Authorization
|
||
headers = get_safe_headers(request)
|
||
auth_header = headers.get("authorization", "")
|
||
if auth_header and auth_header.startswith("Bearer "):
|
||
token = auth_header[7:].strip()
|
||
logger.debug("[utils] Токен получен из заголовка Authorization")
|
||
return token
|
||
|
||
logger.debug("[utils] Токен не найден ни в cookies, ни в заголовке")
|
||
return None
|
||
|
||
|
||
async def get_user_data_by_token(token: str) -> Tuple[bool, dict | None, str | None]:
|
||
"""
|
||
Получает данные пользователя по токену.
|
||
|
||
Args:
|
||
token: Токен авторизации
|
||
|
||
Returns:
|
||
Tuple[bool, Optional[dict], Optional[str]]: (success, user_data, error_message)
|
||
"""
|
||
try:
|
||
from auth.tokens.storage import TokenStorage as TokenManager
|
||
from orm.author import Author
|
||
from storage.db import local_session
|
||
|
||
# Проверяем сессию через TokenManager
|
||
payload = await TokenManager.verify_session(token)
|
||
|
||
if not payload:
|
||
return False, None, "Сессия не найдена"
|
||
|
||
# Получаем user_id из payload
|
||
user_id = payload.user_id if hasattr(payload, "user_id") else payload.get("user_id")
|
||
|
||
if not user_id:
|
||
return False, None, "Токен не содержит user_id"
|
||
|
||
# Получаем данные пользователя
|
||
with local_session() as session:
|
||
author_obj = session.query(Author).where(Author.id == int(user_id)).first()
|
||
if not author_obj:
|
||
return False, None, f"Пользователь с ID {user_id} не найден в БД"
|
||
|
||
try:
|
||
user_data = author_obj.dict()
|
||
except Exception:
|
||
user_data = {
|
||
"id": author_obj.id,
|
||
"email": author_obj.email,
|
||
"name": getattr(author_obj, "name", ""),
|
||
"slug": getattr(author_obj, "slug", ""),
|
||
}
|
||
|
||
logger.debug(f"[utils] Данные пользователя получены для ID {user_id}")
|
||
return True, user_data, None
|
||
|
||
except Exception as e:
|
||
logger.error(f"[utils] Ошибка при получении данных пользователя: {e}")
|
||
return False, None, f"Ошибка получения данных: {e!s}"
|
||
|
||
|
||
async def get_auth_token_from_context(info: Any) -> str | None:
|
||
"""
|
||
Извлекает токен авторизации из GraphQL контекста.
|
||
Порядок проверки:
|
||
1. Проверяет заголовок Authorization
|
||
2. Проверяет cookie session_token
|
||
3. Переиспользует логику get_auth_token для request
|
||
|
||
Args:
|
||
info: GraphQLResolveInfo объект
|
||
|
||
Returns:
|
||
Optional[str]: Токен авторизации или None
|
||
"""
|
||
try:
|
||
context = getattr(info, "context", {})
|
||
request = context.get("request")
|
||
|
||
if request:
|
||
# Переиспользуем существующую логику для request
|
||
return await get_auth_token(request)
|
||
|
||
# Если request отсутствует, возвращаем None
|
||
logger.debug("[utils] Request отсутствует в GraphQL контексте")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"[utils] Ошибка при извлечении токена из GraphQL контекста: {e}")
|
||
return None
|
||
|
||
|
||
async def get_auth_token(request: Any) -> str | None:
|
||
"""
|
||
Извлекает токен авторизации из запроса.
|
||
Порядок проверки:
|
||
1. Проверяет auth из middleware
|
||
2. Проверяет auth из scope
|
||
3. Проверяет заголовок Authorization
|
||
4. Проверяет cookie с именем auth_token
|
||
|
||
Args:
|
||
request: Объект запроса
|
||
|
||
Returns:
|
||
Optional[str]: Токен авторизации или None
|
||
"""
|
||
try:
|
||
# 1. Проверяем auth из middleware (если middleware уже обработал токен)
|
||
if hasattr(request, "auth") and request.auth:
|
||
token = getattr(request.auth, "token", None)
|
||
if token:
|
||
token_len = len(token) if hasattr(token, "__len__") else "unknown"
|
||
logger.debug(f"[decorators] Токен получен из request.auth: {token_len}")
|
||
return token
|
||
logger.debug("[decorators] request.auth есть, но token НЕ найден")
|
||
else:
|
||
logger.debug("[decorators] request.auth НЕ найден")
|
||
|
||
# 2. Проверяем наличие auth_token в scope (приоритет)
|
||
if hasattr(request, "scope") and isinstance(request.scope, dict) and "auth_token" in request.scope:
|
||
token = request.scope.get("auth_token")
|
||
if token is not None:
|
||
token_len = len(token) if hasattr(token, "__len__") else "unknown"
|
||
logger.debug(f"[decorators] Токен получен из scope.auth_token: {token_len}")
|
||
return token
|
||
|
||
# 3. Получаем заголовки запроса безопасным способом
|
||
headers = get_safe_headers(request)
|
||
logger.debug(f"[decorators] Получены заголовки: {list(headers.keys())}")
|
||
|
||
# 4. Проверяем кастомный заголовок авторизации
|
||
auth_header_key = SESSION_TOKEN_HEADER.lower()
|
||
if auth_header_key in headers:
|
||
token = headers[auth_header_key]
|
||
logger.debug(f"[decorators] Токен найден в заголовке {SESSION_TOKEN_HEADER}")
|
||
# Убираем префикс Bearer если есть
|
||
if token.startswith("Bearer "):
|
||
token = token.replace("Bearer ", "", 1).strip()
|
||
logger.debug(f"[decorators] Обработанный токен: {len(token)}")
|
||
return token
|
||
|
||
# 5. Проверяем стандартный заголовок Authorization
|
||
if "authorization" in headers:
|
||
auth_header = headers["authorization"]
|
||
logger.debug(f"[decorators] Найден заголовок Authorization: {auth_header[:20]}...")
|
||
if auth_header.startswith("Bearer "):
|
||
token = auth_header.replace("Bearer ", "", 1).strip()
|
||
logger.debug(f"[decorators] Извлечен Bearer токен: {len(token)}")
|
||
return token
|
||
logger.debug("[decorators] Authorization заголовок не содержит Bearer токен")
|
||
|
||
# 6. Проверяем cookies
|
||
if hasattr(request, "cookies") and request.cookies:
|
||
if isinstance(request.cookies, dict):
|
||
cookies = request.cookies
|
||
elif hasattr(request.cookies, "get"):
|
||
cookies = {k: request.cookies.get(k) for k in getattr(request.cookies, "keys", list)()}
|
||
else:
|
||
cookies = {}
|
||
|
||
logger.debug(f"[decorators] Доступные cookies: {list(cookies.keys())}")
|
||
|
||
# Проверяем кастомную cookie
|
||
if SESSION_COOKIE_NAME in cookies:
|
||
token = cookies[SESSION_COOKIE_NAME]
|
||
logger.debug(f"[decorators] Токен найден в cookie {SESSION_COOKIE_NAME}: {len(token)}")
|
||
return token
|
||
|
||
# Проверяем стандартную cookie
|
||
if "auth_token" in cookies:
|
||
token = cookies["auth_token"]
|
||
logger.debug(f"[decorators] Токен найден в cookie auth_token: {len(token)}")
|
||
return token
|
||
|
||
logger.debug("[decorators] Токен НЕ найден ни в одном источнике")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"[decorators] Критическая ошибка при извлечении токена: {e}")
|
||
return None
|
||
|
||
|
||
def extract_bearer_token(auth_header: str) -> str | None:
|
||
"""
|
||
Извлекает токен из заголовка Authorization с Bearer схемой.
|
||
|
||
Args:
|
||
auth_header: Заголовок Authorization
|
||
|
||
Returns:
|
||
Optional[str]: Извлеченный токен или None
|
||
"""
|
||
if not auth_header:
|
||
return None
|
||
|
||
if auth_header.startswith("Bearer "):
|
||
return auth_header[7:].strip()
|
||
|
||
return None
|
||
|
||
|
||
def format_auth_header(token: str) -> str:
|
||
"""
|
||
Форматирует токен в заголовок Authorization.
|
||
|
||
Args:
|
||
token: Токен авторизации
|
||
|
||
Returns:
|
||
str: Отформатированный заголовок
|
||
"""
|
||
return f"Bearer {token}"
|