Files
core/docs/auth/sessions.md
Untone 14ff155789
All checks were successful
Deploy on push / deploy (push) Successful in 3m19s
config-fix
2025-09-30 21:48:29 +03:00

15 KiB
Raw Blame History

🔑 Управление сессиями

🎯 Обзор

Система управления сессиями на основе JWT токенов с Redis хранением для отзыва и мониторинга активности.

🏗️ Архитектура

Принцип работы

  1. JWT токены с payload {user_id, username, iat, exp}
  2. Redis хранение для отзыва и управления жизненным циклом
  3. Множественные сессии на пользователя
  4. Автоматическое обновление last_activity при активности

Redis структура

session:{user_id}:{token}     # Hash: {user_id, username, device_info, last_activity}
user_sessions:{user_id}       # Set: {token1, token2, ...}

Извлечение токена (приоритет)

  1. Cookie session_token (httpOnly)
  2. Заголовок Authorization: Bearer <token>
  3. Заголовок X-Session-Token
  4. scope["auth_token"] (внутренний)

🔧 SessionTokenManager

Основные методы

from auth.tokens.sessions import SessionTokenManager

sessions = SessionTokenManager()

# Создание сессии
token = await sessions.create_session(
    user_id="123", 
    auth_data={"provider": "local"},
    username="john_doe",
    device_info={"ip": "192.168.1.1", "user_agent": "Mozilla/5.0"}
)

# Создание JWT токена сессии
token = await sessions.create_session_token(
    user_id="123",
    token_data={"username": "john_doe", "device_info": "..."}
)

# Проверка сессии
payload = await sessions.verify_session(token)
# Возвращает: {"user_id": "123", "username": "john_doe", "iat": 1640995200, "exp": 1643587200}

# Валидация токена сессии
valid, data = await sessions.validate_session_token(token)

# Получение данных сессии
session_data = await sessions.get_session_data(token, user_id)

# Обновление сессии
new_token = await sessions.refresh_session(user_id, old_token, device_info)

# Отзыв сессии
await sessions.revoke_session_token(token)

# Отзыв всех сессий пользователя
revoked_count = await sessions.revoke_user_sessions(user_id)

# Получение всех сессий пользователя
user_sessions = await sessions.get_user_sessions(user_id)

🍪 httpOnly Cookies

Принципы работы

  1. Безопасное хранение: Токены сессий хранятся в httpOnly cookies, недоступных для JavaScript
  2. Автоматическая отправка: Cookies автоматически отправляются с каждым запросом
  3. Защита от XSS: httpOnly cookies защищены от кражи через JavaScript
  4. Двойная поддержка: Система поддерживает как cookies, так и заголовок Authorization

Конфигурация cookies

# settings.py
SESSION_COOKIE_NAME = "session_token"
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SECURE = True  # для HTTPS
SESSION_COOKIE_SAMESITE = "lax"
SESSION_COOKIE_MAX_AGE = 30 * 24 * 60 * 60  # 30 дней

Установка cookies

# В AuthMiddleware
def set_session_cookie(self, response: Response, token: str) -> None:
    """Устанавливает httpOnly cookie с токеном сессии"""
    response.set_cookie(
        key=SESSION_COOKIE_NAME,
        value=token,
        httponly=SESSION_COOKIE_HTTPONLY,
        secure=SESSION_COOKIE_SECURE,
        samesite=SESSION_COOKIE_SAMESITE,
        max_age=SESSION_COOKIE_MAX_AGE
    )

🔍 Извлечение токенов

Автоматическое извлечение

from auth.utils import extract_token_from_request, get_auth_token, get_safe_headers

# Простое извлечение из cookies/headers
token = await extract_token_from_request(request)

# Расширенное извлечение с логированием
token = await get_auth_token(request)

# Ручная проверка источников
headers = get_safe_headers(request)
token = headers.get("authorization", "").replace("Bearer ", "")

# Извлечение из GraphQL контекста
from auth.utils import get_auth_token_from_context
token = await get_auth_token_from_context(info)

Приоритет источников

Система проверяет токены в следующем порядке приоритета:

  1. httpOnly cookies - основной источник для веб-приложений
  2. Заголовок Authorization - для API клиентов и мобильных приложений
# auth/utils.py
async def extract_token_from_request(request) -> str | None:
    """DRY функция для извлечения токена из request"""
    
    # 1. Проверяем cookies
    if hasattr(request, "cookies") and request.cookies:
        token = request.cookies.get(SESSION_COOKIE_NAME)
        if token:
            return token

    # 2. Проверяем заголовок Authorization
    headers = get_safe_headers(request)
    auth_header = headers.get("authorization", "")
    if auth_header and auth_header.startswith("Bearer "):
        token = auth_header[7:].strip()
        return token

    return None

Безопасное получение заголовков

# auth/utils.py
def get_safe_headers(request: Any) -> dict[str, str]:
    """Безопасно получает заголовки запроса"""
    headers = {}
    try:
        # Первый приоритет: scope из ASGI
        if hasattr(request, "scope") and isinstance(request.scope, dict):
            scope_headers = request.scope.get("headers", [])
            if scope_headers:
                headers.update({k.decode("utf-8").lower(): v.decode("utf-8") 
                              for k, v in scope_headers})

        # Второй приоритет: метод headers() или атрибут headers
        if hasattr(request, "headers"):
            if callable(request.headers):
                h = request.headers()
                if h:
                    headers.update({k.lower(): v for k, v in h.items()})
            else:
                h = request.headers
                if hasattr(h, "items") and callable(h.items):
                    headers.update({k.lower(): v for k, v in h.items()})

    except Exception as e:
        logger.warning(f"Ошибка при доступе к заголовкам: {e}")

    return headers

🔄 Жизненный цикл сессии

Создание сессии

# auth/tokens/sessions.py
async def create_session(author_id: int, email: str, **kwargs) -> str:
    """Создает новую сессию для пользователя"""
    session_data = {
        "author_id": author_id,
        "email": email,
        "created_at": int(time.time()),
        **kwargs
    }
    
    # Генерируем уникальный токен
    token = generate_session_token()
    
    # Сохраняем в Redis
    await redis.execute(
        "SETEX",
        f"session:{token}",
        SESSION_TOKEN_LIFE_SPAN,
        json.dumps(session_data)
    )
    
    return token

Верификация сессии

# auth/tokens/storage.py
async def verify_session(token: str) -> dict | None:
    """Верифицирует токен сессии"""
    if not token:
        return None
        
    try:
        # Получаем данные сессии из Redis
        session_data = await redis.execute("GET", f"session:{token}")
        if not session_data:
            return None
            
        return json.loads(session_data)
        
    except Exception as e:
        logger.error(f"Ошибка верификации сессии: {e}")
        return None

Обновление сессии

async def refresh_session(user_id: str, old_token: str, device_info: dict = None) -> str:
    """Обновляет сессию пользователя"""
    
    # Проверяем старую сессию
    old_payload = await verify_session(old_token)
    if not old_payload:
        raise InvalidTokenError("Invalid session token")
    
    # Отзываем старый токен
    await revoke_session_token(old_token)
    
    # Создаем новый токен
    new_token = await create_session(
        user_id=user_id,
        username=old_payload.get("username"),
        device_info=device_info or old_payload.get("device_info", {})
    )
    
    return new_token

Удаление сессии

# auth/tokens/storage.py
async def delete_session(token: str) -> bool:
    """Удаляет сессию пользователя"""
    try:
        result = await redis.execute("DEL", f"session:{token}")
        return bool(result)
    except Exception as e:
        logger.error(f"Ошибка удаления сессии: {e}")
        return False

🔒 Безопасность

JWT токены

  • Алгоритм: HS256
  • Secret: Из переменной окружения JWT_SECRET_KEY
  • Payload: {user_id, username, iat, exp}
  • Expiration: 30 дней (настраивается)

Redis security

  • TTL для всех токенов
  • Атомарные операции через pipelines
  • SCAN вместо KEYS для производительности
  • Транзакции для критических операций

Защита от атак

  • XSS: httpOnly cookies недоступны для JavaScript
  • CSRF: SameSite cookies и CSRF токены
  • Session Hijacking: Secure cookies и регулярная ротация токенов
  • Brute Force: Ограничение попыток входа и блокировка аккаунтов

📊 Мониторинг сессий

Статистика

from auth.tokens.monitoring import TokenMonitoring

monitoring = TokenMonitoring()

# Статистика токенов
stats = await monitoring.get_token_statistics()
print(f"Active sessions: {stats['session_tokens']}")
print(f"Memory usage: {stats['memory_usage']} bytes")

# Health check
health = await monitoring.health_check()
if health["status"] == "healthy":
    print("Session system is healthy")

Логирование событий

# auth/middleware.py
def log_auth_event(event_type: str, user_id: int | None = None, 
                   success: bool = True, **kwargs):
    """Логирует события авторизации"""
    logger.info(
        "auth_event",
        event_type=event_type,
        user_id=user_id,
        success=success,
        ip_address=kwargs.get('ip'),
        user_agent=kwargs.get('user_agent'),
        **kwargs
    )

Метрики

# auth/middleware.py
from prometheus_client import Counter, Histogram

# Счетчики
login_attempts = Counter('auth_login_attempts_total', 'Number of login attempts', ['success'])
session_creations = Counter('auth_sessions_created_total', 'Number of sessions created')
session_deletions = Counter('auth_sessions_deleted_total', 'Number of sessions deleted')

# Гистограммы
auth_duration = Histogram('auth_operation_duration_seconds', 'Time spent on auth operations', ['operation'])

🧪 Тестирование

Unit тесты

import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_login_success(client: AsyncClient):
    """Тест успешного входа"""
    response = await client.post("/auth/login", json={
        "email": "test@example.com",
        "password": "password123"
    })
    
    assert response.status_code == 200
    data = response.json()
    assert data["success"] is True
    assert "token" in data
    
    # Проверяем установку cookie
    cookies = response.cookies
    assert "session_token" in cookies

@pytest.mark.asyncio
async def test_protected_endpoint_with_cookie(client: AsyncClient):
    """Тест защищенного endpoint с cookie"""
    # Сначала входим в систему
    login_response = await client.post("/auth/login", json={
        "email": "test@example.com",
        "password": "password123"
    })
    
    # Получаем cookie
    session_cookie = login_response.cookies.get("session_token")
    
    # Делаем запрос к защищенному endpoint
    response = await client.get("/auth/session", cookies={
        "session_token": session_cookie
    })
    
    assert response.status_code == 200
    data = response.json()
    assert data["user"]["email"] == "test@example.com"

💡 Примеры использования

1. Вход в систему

// Frontend - React/SolidJS
const handleLogin = async (email: string, password: string) => {
  try {
    const response = await fetch('/auth/login', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({ email, password }),
      credentials: 'include', // Важно для cookies
    });

    if (response.ok) {
      const data = await response.json();
      // Cookie автоматически установится браузером
      // Перенаправляем на главную страницу
      window.location.href = '/';
    } else {
      const error = await response.json();
      console.error('Login failed:', error.message);
    }
  } catch (error) {
    console.error('Login error:', error);
  }
};

2. Проверка авторизации

// Frontend - проверка текущей сессии
const checkAuth = async () => {
  try {
    const response = await fetch('/auth/session', {
      credentials: 'include',
    });

    if (response.ok) {
      const data = await response.json();
      if (data.user) {
        // Пользователь авторизован
        setUser(data.user);
        setIsAuthenticated(true);
      }
    }
  } catch (error) {
    console.error('Auth check failed:', error);
  }
};

3. Защищенный API endpoint

# Backend - Python
from auth.decorators import login_required, require_permission

@login_required
@require_permission("shout:create")
async def create_shout(info, input_data):
    """Создание публикации с проверкой прав"""
    user = info.context.get('user')
    
    # Создаем публикацию
    shout = Shout(
        title=input_data['title'],
        content=input_data['content'],
        author_id=user.id
    )
    
    db.add(shout)
    db.commit()
    
    return shout

4. Выход из системы

// Frontend - выход
const handleLogout = async () => {
  try {
    await fetch('/auth/logout', {
      method: 'POST',
      credentials: 'include',
    });
    
    // Очищаем локальное состояние
    setUser(null);
    setIsAuthenticated(false);
    
    // Перенаправляем на страницу входа
    window.location.href = '/login';
  } catch (error) {
    console.error('Logout failed:', error);
  }
};