# Модуль аутентификации и авторизации ## Общее описание Модуль реализует полноценную систему аутентификации с использованием локальной БД, Redis и httpOnly cookies для безопасного хранения токенов сессий. ## Архитектура системы ### Основные компоненты #### 1. **AuthMiddleware** (`auth/middleware.py`) - Единый middleware для обработки авторизации в GraphQL запросах - Извлечение Bearer токена из заголовка Authorization или httpOnly cookie - Проверка сессии через TokenStorage - Создание `request.user` и `request.auth` - Предоставление методов для установки/удаления cookies #### 2. **EnhancedGraphQLHTTPHandler** (`auth/handler.py`) - Расширенный GraphQL HTTP обработчик с поддержкой cookie и авторизации - Создание расширенного контекста запроса с авторизационными данными - Корректная обработка ответов с cookie и headers - Интеграция с AuthMiddleware #### 3. **TokenStorage** (`auth/tokens/storage.py`) - Централизованное управление токенами сессий - Хранение в Redis с TTL - Верификация и валидация токенов - Управление жизненным циклом сессий #### 4. **AuthCredentials** (`auth/credentials.py`) - Модель данных для хранения информации об авторизации - Содержит `author_id`, `scopes`, `logged_in`, `error_message`, `email`, `token` ### Модели данных #### Author (`orm/author.py`) - Основная модель пользователя с расширенным функционалом аутентификации - Поддерживает: - Локальную аутентификацию по email/телефону - Систему ролей и разрешений (RBAC) - Блокировку аккаунта при множественных неудачных попытках входа - Верификацию email/телефона ## Система httpOnly Cookies ### Принципы работы 1. **Безопасное хранение**: Токены сессий хранятся в httpOnly cookies, недоступных для JavaScript 2. **Автоматическая отправка**: Cookies автоматически отправляются с каждым запросом 3. **Защита от XSS**: httpOnly cookies защищены от кражи через JavaScript 4. **Двойная поддержка**: Система поддерживает как cookies, так и заголовок Authorization ### Конфигурация cookies ```python # settings.py SESSION_COOKIE_NAME = "session_token" SESSION_COOKIE_HTTPONLY = True SESSION_COOKIE_SECURE = True # для HTTPS SESSION_COOKIE_SAMESITE = "lax" SESSION_COOKIE_MAX_AGE = 30 * 24 * 60 * 60 # 30 дней ``` ### Установка cookies ```python # В AuthMiddleware def set_session_cookie(self, response: Response, token: str) -> None: """Устанавливает httpOnly cookie с токеном сессии""" response.set_cookie( key=SESSION_COOKIE_NAME, value=token, httponly=SESSION_COOKIE_HTTPONLY, secure=SESSION_COOKIE_SECURE, samesite=SESSION_COOKIE_SAMESITE, max_age=SESSION_COOKIE_MAX_AGE ) ``` ## Аутентификация ### Извлечение токенов Система проверяет токены в следующем порядке приоритета: 1. **httpOnly cookies** - основной источник для веб-приложений 2. **Заголовок Authorization** - для API клиентов и мобильных приложений ```python # auth/utils.py async def extract_token_from_request(request) -> str | None: """DRY функция для извлечения токена из request""" # 1. Проверяем cookies if hasattr(request, "cookies") and request.cookies: token = request.cookies.get(SESSION_COOKIE_NAME) if token: return token # 2. Проверяем заголовок Authorization headers = get_safe_headers(request) auth_header = headers.get("authorization", "") if auth_header and auth_header.startswith("Bearer "): token = auth_header[7:].strip() return token return None ``` ### Безопасное получение заголовков ```python # auth/utils.py def get_safe_headers(request: Any) -> dict[str, str]: """Безопасно получает заголовки запроса""" headers = {} try: # Первый приоритет: scope из ASGI if hasattr(request, "scope") and isinstance(request.scope, dict): scope_headers = request.scope.get("headers", []) if scope_headers: headers.update({k.decode("utf-8").lower(): v.decode("utf-8") for k, v in scope_headers}) # Второй приоритет: метод headers() или атрибут headers if hasattr(request, "headers"): if callable(request.headers): h = request.headers() if h: headers.update({k.lower(): v for k, v in h.items()}) else: h = request.headers if hasattr(h, "items") and callable(h.items): headers.update({k.lower(): v for k, v in h.items()}) except Exception as e: logger.warning(f"Ошибка при доступе к заголовкам: {e}") return headers ``` ## Управление сессиями ### Создание сессии ```python # auth/tokens/sessions.py async def create_session(author_id: int, email: str, **kwargs) -> str: """Создает новую сессию для пользователя""" session_data = { "author_id": author_id, "email": email, "created_at": int(time.time()), **kwargs } # Генерируем уникальный токен token = generate_session_token() # Сохраняем в Redis await redis.execute( "SETEX", f"session:{token}", SESSION_TOKEN_LIFE_SPAN, json.dumps(session_data) ) return token ``` ### Верификация сессии ```python # auth/tokens/storage.py async def verify_session(token: str) -> dict | None: """Верифицирует токен сессии""" if not token: return None try: # Получаем данные сессии из Redis session_data = await redis.execute("GET", f"session:{token}") if not session_data: return None return json.loads(session_data) except Exception as e: logger.error(f"Ошибка верификации сессии: {e}") return None ``` ### Удаление сессии ```python # auth/tokens/storage.py async def delete_session(token: str) -> bool: """Удаляет сессию пользователя""" try: result = await redis.execute("DEL", f"session:{token}") return bool(result) except Exception as e: logger.error(f"Ошибка удаления сессии: {e}") return False ``` ## OAuth интеграция ### Поддерживаемые провайдеры - **Google** - OAuth 2.0 с PKCE - **Facebook** - OAuth 2.0 - **GitHub** - OAuth 2.0 ### Реализация ```python # auth/oauth.py class OAuthProvider: """Базовый класс для OAuth провайдеров""" def __init__(self, client_id: str, client_secret: str, redirect_uri: str): self.client_id = client_id self.client_secret = client_secret self.redirect_uri = redirect_uri async def get_authorization_url(self, state: str = None) -> str: """Генерирует URL для авторизации""" pass async def exchange_code_for_token(self, code: str) -> dict: """Обменивает код авторизации на токен доступа""" pass async def get_user_info(self, access_token: str) -> dict: """Получает информацию о пользователе""" pass ``` ## Валидация ### Модели валидации ```python # auth/validations.py from pydantic import BaseModel, EmailStr class LoginRequest(BaseModel): email: EmailStr password: str class RegisterRequest(BaseModel): email: EmailStr password: str name: str phone: str | None = None class PasswordResetRequest(BaseModel): email: EmailStr class EmailConfirmationRequest(BaseModel): token: str ``` ## API Endpoints ### GraphQL мутации ```graphql # Мутации аутентификации mutation Login($email: String!, $password: String!) { login(email: $email, password: $password) { success token user { id email name } error } } mutation Register($input: RegisterInput!) { registerUser(input: $input) { success user { id email name } error } } mutation Logout { logout { success message } } # Получение текущей сессии query GetSession { getSession { success token user { id email name roles } error } } ``` ### REST API endpoints ```python # Основные endpoints POST /auth/login # Вход в систему POST /auth/register # Регистрация POST /auth/logout # Выход из системы GET /auth/session # Получение текущей сессии POST /auth/refresh # Обновление токена # OAuth endpoints GET /auth/oauth/{provider} # Инициация OAuth GET /auth/oauth/{provider}/callback # OAuth callback ``` ## Безопасность ### Хеширование паролей ```python # auth/identity.py from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(password: str) -> str: """Хеширует пароль с использованием bcrypt""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """Проверяет пароль""" return pwd_context.verify(plain_password, hashed_password) ``` ### Защита от брутфорса ```python # auth/core.py async def handle_login_attempt(author: Author, success: bool) -> None: """Обрабатывает попытку входа""" if not success: # Увеличиваем счетчик неудачных попыток author.failed_login_attempts += 1 if author.failed_login_attempts >= 5: # Блокируем аккаунт на 30 минут author.account_locked_until = int(time.time()) + 1800 logger.warning(f"Аккаунт {author.email} заблокирован") else: # Сбрасываем счетчик при успешном входе author.failed_login_attempts = 0 author.account_locked_until = None ``` ### CSRF защита ```python # auth/middleware.py def generate_csrf_token() -> str: """Генерирует CSRF токен""" return secrets.token_urlsafe(32) def verify_csrf_token(token: str, stored_token: str) -> bool: """Проверяет CSRF токен""" return secrets.compare_digest(token, stored_token) ``` ## Декораторы ### Основные декораторы ```python # auth/decorators.py from functools import wraps from graphql import GraphQLError def login_required(func): """Декоратор для проверки авторизации""" @wraps(func) async def wrapper(*args, **kwargs): info = args[-1] if args else None if not info or not hasattr(info, 'context'): raise GraphQLError("Context not available") user = info.context.get('user') if not user or not user.is_authenticated: raise GraphQLError("Authentication required") return await func(*args, **kwargs) return wrapper def require_permission(permission: str): """Декоратор для проверки разрешений""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): info = args[-1] if args else None if not info or not hasattr(info, 'context'): raise GraphQLError("Context not available") user = info.context.get('user') if not user or not user.is_authenticated: raise GraphQLError("Authentication required") # Проверяем разрешение через RBAC has_perm = await check_user_permission( user.id, permission, info.context.get('community_id', 1) ) if not has_perm: raise GraphQLError("Insufficient permissions") return await func(*args, **kwargs) return wrapper return decorator ``` ## Интеграция с RBAC ### Проверка разрешений ```python # auth/decorators.py async def check_user_permission(author_id: int, permission: str, community_id: int) -> bool: """Проверяет разрешение пользователя через RBAC систему""" try: from rbac.api import user_has_permission return await user_has_permission(author_id, permission, community_id) except Exception as e: logger.error(f"Ошибка проверки разрешений: {e}") return False ``` ### Получение ролей пользователя ```python # auth/middleware.py async def get_user_roles(author_id: int, community_id: int = 1) -> list[str]: """Получает роли пользователя в сообществе""" try: from rbac.api import get_user_roles_in_community return get_user_roles_in_community(author_id, community_id) except Exception as e: logger.error(f"Ошибка получения ролей: {e}") return [] ``` ## Мониторинг и логирование ### Логирование событий ```python # auth/middleware.py def log_auth_event(event_type: str, user_id: int | None = None, success: bool = True, **kwargs): """Логирует события авторизации""" logger.info( "auth_event", event_type=event_type, user_id=user_id, success=success, ip_address=kwargs.get('ip'), user_agent=kwargs.get('user_agent'), **kwargs ) ``` ### Метрики ```python # auth/middleware.py from prometheus_client import Counter, Histogram # Счетчики login_attempts = Counter('auth_login_attempts_total', 'Number of login attempts', ['success']) session_creations = Counter('auth_sessions_created_total', 'Number of sessions created') session_deletions = Counter('auth_sessions_deleted_total', 'Number of sessions deleted') # Гистограммы auth_duration = Histogram('auth_operation_duration_seconds', 'Time spent on auth operations', ['operation']) ``` ## Конфигурация ### Основные настройки ```python # settings.py # Настройки сессий SESSION_TOKEN_LIFE_SPAN = 30 * 24 * 60 * 60 # 30 дней SESSION_COOKIE_NAME = "session_token" SESSION_COOKIE_HTTPONLY = True SESSION_COOKIE_SECURE = True # для HTTPS SESSION_COOKIE_SAMESITE = "lax" SESSION_COOKIE_MAX_AGE = 30 * 24 * 60 * 60 # JWT настройки JWT_SECRET_KEY = "your-secret-key" JWT_ALGORITHM = "HS256" JWT_EXPIRATION_DELTA = 30 * 24 * 60 * 60 # OAuth настройки GOOGLE_CLIENT_ID = "your-google-client-id" GOOGLE_CLIENT_SECRET = "your-google-client-secret" FACEBOOK_CLIENT_ID = "your-facebook-client-id" FACEBOOK_CLIENT_SECRET = "your-facebook-client-secret" # Безопасность MAX_LOGIN_ATTEMPTS = 5 ACCOUNT_LOCKOUT_DURATION = 1800 # 30 минут PASSWORD_MIN_LENGTH = 8 ``` ## Примеры использования ### 1. Вход в систему ```typescript // Frontend - React/SolidJS const handleLogin = async (email: string, password: string) => { try { const response = await fetch('/auth/login', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ email, password }), credentials: 'include', // Важно для cookies }); if (response.ok) { const data = await response.json(); // Cookie автоматически установится браузером // Перенаправляем на главную страницу window.location.href = '/'; } else { const error = await response.json(); console.error('Login failed:', error.message); } } catch (error) { console.error('Login error:', error); } }; ``` ### 2. Проверка авторизации ```typescript // Frontend - проверка текущей сессии const checkAuth = async () => { try { const response = await fetch('/auth/session', { credentials: 'include', }); if (response.ok) { const data = await response.json(); if (data.user) { // Пользователь авторизован setUser(data.user); setIsAuthenticated(true); } } } catch (error) { console.error('Auth check failed:', error); } }; ``` ### 3. Защищенный API endpoint ```python # Backend - Python from auth.decorators import login_required, require_permission @login_required @require_permission("shout:create") async def create_shout(info, input_data): """Создание публикации с проверкой прав""" user = info.context.get('user') # Создаем публикацию shout = Shout( title=input_data['title'], content=input_data['content'], author_id=user.id ) db.add(shout) db.commit() return shout ``` ### 4. OAuth авторизация ```typescript // Frontend - OAuth кнопка const handleGoogleLogin = () => { // Перенаправляем на OAuth endpoint window.location.href = '/auth/oauth/google'; }; // Обработка OAuth callback useEffect(() => { const urlParams = new URLSearchParams(window.location.search); const code = urlParams.get('code'); const state = urlParams.get('state'); if (code && state) { // Обмениваем код на токен exchangeOAuthCode(code, state); } }, []); ``` ### 5. Выход из системы ```typescript // Frontend - выход const handleLogout = async () => { try { await fetch('/auth/logout', { method: 'POST', credentials: 'include', }); // Очищаем локальное состояние setUser(null); setIsAuthenticated(false); // Перенаправляем на страницу входа window.location.href = '/login'; } catch (error) { console.error('Logout failed:', error); } }; ``` ## Тестирование ### Тесты аутентификации ```python # tests/test_auth.py import pytest from httpx import AsyncClient @pytest.mark.asyncio async def test_login_success(client: AsyncClient): """Тест успешного входа""" response = await client.post("/auth/login", json={ "email": "test@example.com", "password": "password123" }) assert response.status_code == 200 data = response.json() assert data["success"] is True assert "token" in data # Проверяем установку cookie cookies = response.cookies assert "session_token" in cookies @pytest.mark.asyncio async def test_protected_endpoint_with_cookie(client: AsyncClient): """Тест защищенного endpoint с cookie""" # Сначала входим в систему login_response = await client.post("/auth/login", json={ "email": "test@example.com", "password": "password123" }) # Получаем cookie session_cookie = login_response.cookies.get("session_token") # Делаем запрос к защищенному endpoint response = await client.get("/auth/session", cookies={ "session_token": session_cookie }) assert response.status_code == 200 data = response.json() assert data["user"]["email"] == "test@example.com" ``` ### Тесты OAuth ```python # tests/test_oauth.py @pytest.mark.asyncio async def test_google_oauth_flow(client: AsyncClient, mock_google): """Тест OAuth flow для Google""" # Мокаем ответ от Google mock_google.return_value = { "id": "12345", "email": "test@gmail.com", "name": "Test User" } # Инициация OAuth response = await client.get("/auth/oauth/google") assert response.status_code == 302 # Проверяем редирект assert "accounts.google.com" in response.headers["location"] ``` ## Безопасность ### Лучшие практики 1. **httpOnly Cookies**: Токены сессий хранятся только в httpOnly cookies 2. **HTTPS**: Все endpoints должны работать через HTTPS в продакшене 3. **SameSite**: Используется `SameSite=lax` для защиты от CSRF 4. **Rate Limiting**: Ограничение количества попыток входа 5. **Логирование**: Детальное логирование всех событий авторизации 6. **Валидация**: Строгая валидация всех входных данных ### Защита от атак - **XSS**: httpOnly cookies недоступны для JavaScript - **CSRF**: SameSite cookies и CSRF токены - **Session Hijacking**: Secure cookies и регулярная ротация токенов - **Brute Force**: Ограничение попыток входа и блокировка аккаунтов - **SQL Injection**: Использование ORM и параметризованных запросов ## Миграция ### Обновление существующего кода Если в вашем коде используются старые методы аутентификации: ```python # Старый код token = request.headers.get("Authorization") # Новый код from auth.utils import extract_token_from_request token = await extract_token_from_request(request) ``` ### Совместимость Новая система полностью совместима с существующим кодом: - Поддерживаются как cookies, так и заголовки Authorization - Все существующие декораторы работают без изменений - API endpoints сохранили свои сигнатуры - RBAC интеграция работает как прежде