Files
core/docs/auth/security.md
Untone 14ff155789
All checks were successful
Deploy on push / deploy (push) Successful in 3m19s
config-fix
2025-09-30 21:48:29 +03:00

17 KiB
Raw Permalink Blame History

🔒 Безопасность системы аутентификации

🎯 Обзор

Комплексная система безопасности с многоуровневой защитой от различных типов атак.

🛡️ Основные принципы безопасности

1. Defense in Depth

  • Многоуровневая защита: JWT + Redis + RBAC + Rate Limiting
  • Fail Secure: При ошибках система блокирует доступ
  • Principle of Least Privilege: Минимальные необходимые права

2. Zero Trust Architecture

  • Verify Everything: Каждый запрос проверяется
  • Never Trust, Always Verify: Нет доверенных зон
  • Continuous Validation: Постоянная проверка токенов

🔐 JWT Security

Алгоритм и ключи

# settings.py
JWT_ALGORITHM = "HS256"  # HMAC with SHA-256
JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY")  # Минимум 256 бит
JWT_EXPIRATION_DELTA = 30 * 24 * 60 * 60  # 30 дней

Структура токена

# JWT Payload
{
    "user_id": "123",
    "username": "john_doe",
    "iat": 1640995200,  # Issued At
    "exp": 1643587200   # Expiration
}

Лучшие практики JWT

  • Короткое время жизни: Максимум 30 дней
  • Secure Secret: Криптографически стойкий ключ
  • No Sensitive Data: Только необходимые данные в payload
  • Revocation Support: Redis для отзыва токенов

httpOnly Cookies

# Настройки cookie
SESSION_COOKIE_NAME = "session_token"
SESSION_COOKIE_HTTPONLY = True      # Защита от XSS
SESSION_COOKIE_SECURE = True        # Только HTTPS
SESSION_COOKIE_SAMESITE = "lax"     # CSRF защита
SESSION_COOKIE_MAX_AGE = 30 * 24 * 60 * 60

Защита от атак

  • XSS Protection: httpOnly cookies недоступны JavaScript
  • CSRF Protection: SameSite=lax предотвращает CSRF
  • Secure Flag: Передача только по HTTPS
  • Path Restriction: Ограничение области действия

🔑 Password Security

Хеширование паролей

from passlib.context import CryptContext

pwd_context = CryptContext(
    schemes=["bcrypt"],
    deprecated="auto",
    bcrypt__rounds=12  # Увеличенная сложность
)

def hash_password(password: str) -> str:
    """Хеширует пароль с использованием bcrypt"""
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Проверяет пароль"""
    return pwd_context.verify(plain_password, hashed_password)

Требования к паролям

import re

def validate_password_strength(password: str) -> bool:
    """Проверка силы пароля"""
    if len(password) < 8:
        return False
    
    # Проверки
    has_upper = re.search(r'[A-Z]', password)
    has_lower = re.search(r'[a-z]', password)
    has_digit = re.search(r'\d', password)
    has_special = re.search(r'[!@#$%^&*(),.?":{}|<>]', password)
    
    return all([has_upper, has_lower, has_digit, has_special])

🚫 Защита от брутфорса

Account Lockout

async def handle_login_attempt(author: Author, success: bool) -> None:
    """Обрабатывает попытку входа"""
    if not success:
        # Увеличиваем счетчик неудачных попыток
        author.failed_login_attempts += 1
        
        if author.failed_login_attempts >= 5:
            # Блокируем аккаунт на 30 минут
            author.account_locked_until = int(time.time()) + 1800
            logger.warning(f"Аккаунт {author.email} заблокирован")
    else:
        # Сбрасываем счетчик при успешном входе
        author.failed_login_attempts = 0
        author.account_locked_until = None

Rate Limiting

from collections import defaultdict
import time

# Rate limiter
request_counts = defaultdict(list)

async def rate_limit_check(
    identifier: str, 
    max_requests: int = 10, 
    window_seconds: int = 60
) -> bool:
    """Проверка rate limiting"""
    current_time = time.time()
    user_requests = request_counts[identifier]
    
    # Удаляем старые запросы
    user_requests[:] = [
        req_time for req_time in user_requests 
        if current_time - req_time < window_seconds
    ]
    
    # Проверяем лимит
    if len(user_requests) >= max_requests:
        return False
    
    # Добавляем текущий запрос
    user_requests.append(current_time)
    return True

🔒 Redis Security

Secure Configuration

# Redis настройки безопасности
REDIS_CONFIG = {
    "socket_keepalive": True,
    "socket_keepalive_options": {},
    "health_check_interval": 30,
    "retry_on_timeout": True,
    "socket_timeout": 5,
    "socket_connect_timeout": 5
}

TTL для всех ключей

async def secure_redis_set(key: str, value: str, ttl: int = 3600):
    """Безопасная установка значения с обязательным TTL"""
    await redis.setex(key, ttl, value)
    
    # Проверяем, что TTL установлен
    actual_ttl = await redis.ttl(key)
    if actual_ttl <= 0:
        logger.error(f"TTL не установлен для ключа: {key}")
        await redis.delete(key)

Атомарные операции

async def atomic_session_update(user_id: str, token: str, data: dict):
    """Атомарное обновление сессии"""
    async with redis.pipeline(transaction=True) as pipe:
        try:
            # Начинаем транзакцию
            await pipe.multi()
            
            # Обновляем данные сессии
            session_key = f"session:{user_id}:{token}"
            await pipe.hset(session_key, mapping=data)
            await pipe.expire(session_key, 30 * 24 * 60 * 60)
            
            # Обновляем список активных сессий
            sessions_key = f"user_sessions:{user_id}"
            await pipe.sadd(sessions_key, token)
            await pipe.expire(sessions_key, 30 * 24 * 60 * 60)
            
            # Выполняем транзакцию
            await pipe.execute()
            
        except Exception as e:
            logger.error(f"Ошибка атомарной операции: {e}")
            raise

🛡️ OAuth Security

State Parameter Protection

import secrets

def generate_oauth_state() -> str:
    """Генерация криптографически стойкого state"""
    return secrets.token_urlsafe(32)

async def validate_oauth_state(received_state: str, stored_state: str) -> bool:
    """Безопасная проверка state"""
    if not received_state or not stored_state:
        return False
    
    # Используем constant-time comparison
    return secrets.compare_digest(received_state, stored_state)

PKCE Support

import base64
import hashlib

def generate_code_verifier() -> str:
    """Генерация code verifier для PKCE"""
    return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=')

def generate_code_challenge(verifier: str) -> str:
    """Генерация code challenge"""
    digest = hashlib.sha256(verifier.encode('utf-8')).digest()
    return base64.urlsafe_b64encode(digest).decode('utf-8').rstrip('=')

Redirect URI Validation

from urllib.parse import urlparse

def validate_redirect_uri(uri: str) -> bool:
    """Валидация redirect URI"""
    allowed_domains = [
        "localhost:3000",
        "discours.io",
        "new.discours.io"
    ]
    
    try:
        parsed = urlparse(uri)
        
        # Проверяем схему
        if parsed.scheme not in ['http', 'https']:
            return False
        
        # Проверяем домен
        if not any(domain in parsed.netloc for domain in allowed_domains):
            return False
        
        # Проверяем на открытые редиректы
        if parsed.netloc != parsed.netloc.lower():
            return False
            
        return True
        
    except Exception:
        return False

🔍 Input Validation

Request Validation

from pydantic import BaseModel, EmailStr, validator

class LoginRequest(BaseModel):
    email: EmailStr
    password: str
    
    @validator('password')
    def validate_password(cls, v):
        if len(v) < 8:
            raise ValueError('Password too short')
        return v

class RegisterRequest(BaseModel):
    email: EmailStr
    password: str
    name: str
    
    @validator('name')
    def validate_name(cls, v):
        if len(v.strip()) < 2:
            raise ValueError('Name too short')
        # Защита от XSS
        if '<' in v or '>' in v:
            raise ValueError('Invalid characters in name')
        return v.strip()

SQL Injection Prevention

# Используем ORM и параметризованные запросы
from sqlalchemy import text

# ✅ Безопасно
async def get_user_by_email(email: str):
    query = text("SELECT * FROM authors WHERE email = :email")
    result = await db.execute(query, {"email": email})
    return result.fetchone()

# ❌ Небезопасно
async def unsafe_query(email: str):
    query = f"SELECT * FROM authors WHERE email = '{email}'"  # SQL Injection!
    return await db.execute(query)

🚨 Security Headers

HTTP Security Headers

def add_security_headers(response):
    """Добавляет заголовки безопасности"""
    response.headers.update({
        # XSS Protection
        "X-XSS-Protection": "1; mode=block",
        "X-Content-Type-Options": "nosniff",
        "X-Frame-Options": "DENY",
        
        # HTTPS Enforcement
        "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
        
        # Content Security Policy
        "Content-Security-Policy": (
            "default-src 'self'; "
            "script-src 'self' 'unsafe-inline'; "
            "style-src 'self' 'unsafe-inline'; "
            "img-src 'self' data: https:; "
            "connect-src 'self' https://api.discours.io"
        ),
        
        # Referrer Policy
        "Referrer-Policy": "strict-origin-when-cross-origin",
        
        # Permissions Policy
        "Permissions-Policy": "geolocation=(), microphone=(), camera=()"
    })

📊 Security Monitoring

Audit Logging

import json
from datetime import datetime

async def log_security_event(
    event_type: str,
    user_id: str = None,
    ip_address: str = None,
    user_agent: str = None,
    success: bool = True,
    details: dict = None
):
    """Логирование событий безопасности"""
    
    event = {
        "timestamp": datetime.utcnow().isoformat(),
        "event_type": event_type,
        "user_id": user_id,
        "ip_address": ip_address,
        "user_agent": user_agent,
        "success": success,
        "details": details or {}
    }
    
    # Логируем в файл аудита
    logger.info("security_event", extra=event)
    
    # Отправляем критические события в SIEM
    if event_type in ["login_failed", "account_locked", "token_stolen"]:
        await send_to_siem(event)

Anomaly Detection

from collections import defaultdict
import asyncio

# Детектор аномалий
anomaly_tracker = defaultdict(list)

async def detect_anomalies(user_id: str, event_type: str, ip_address: str):
    """Детекция аномальной активности"""
    
    current_time = time.time()
    user_events = anomaly_tracker[user_id]
    
    # Добавляем событие
    user_events.append({
        "type": event_type,
        "ip": ip_address,
        "time": current_time
    })
    
    # Очищаем старые события (последний час)
    user_events[:] = [
        event for event in user_events 
        if current_time - event["time"] < 3600
    ]
    
    # Проверяем аномалии
    if len(user_events) > 50:  # Слишком много событий
        await log_security_event(
            "anomaly_detected",
            user_id=user_id,
            details={"reason": "too_many_events", "count": len(user_events)}
        )
    
    # Проверяем множественные IP
    unique_ips = set(event["ip"] for event in user_events)
    if len(unique_ips) > 5:  # Слишком много IP адресов
        await log_security_event(
            "anomaly_detected",
            user_id=user_id,
            details={"reason": "multiple_ips", "ips": list(unique_ips)}
        )

🔧 Security Configuration

Environment Variables

# JWT Security
JWT_SECRET_KEY=your_super_secret_key_minimum_256_bits
JWT_ALGORITHM=HS256
JWT_EXPIRATION_HOURS=720

# Cookie Security
SESSION_COOKIE_SECURE=true
SESSION_COOKIE_HTTPONLY=true
SESSION_COOKIE_SAMESITE=lax

# Rate Limiting
RATE_LIMIT_ENABLED=true
RATE_LIMIT_REQUESTS=100
RATE_LIMIT_WINDOW=3600

# Security Features
ACCOUNT_LOCKOUT_ENABLED=true
MAX_LOGIN_ATTEMPTS=5
LOCKOUT_DURATION=1800

# HTTPS Enforcement
FORCE_HTTPS=true
HSTS_MAX_AGE=31536000

Production Checklist

Authentication Security

  • JWT secret минимум 256 бит
  • Короткое время жизни токенов (≤ 30 дней)
  • httpOnly cookies включены
  • Secure cookies для HTTPS
  • SameSite cookies настроены

Password Security

  • bcrypt с rounds ≥ 12
  • Требования к сложности паролей
  • Защита от брутфорса
  • Account lockout настроен

OAuth Security

  • State parameter валидация
  • PKCE поддержка включена
  • Redirect URI валидация
  • Secure client secrets

Infrastructure Security

  • HTTPS принудительно
  • Security headers настроены
  • Rate limiting включен
  • Audit logging работает

Redis Security

  • TTL для всех ключей
  • Атомарные операции
  • Connection pooling
  • Health checks

🚨 Incident Response

Security Incident Types

  1. Token Compromise: Подозрение на кражу токенов
  2. Brute Force Attack: Массовые попытки входа
  3. Account Takeover: Несанкционированный доступ
  4. Data Breach: Утечка данных
  5. System Compromise: Компрометация системы

Response Procedures

Token Compromise

async def handle_token_compromise(user_id: str, reason: str):
    """Обработка компрометации токена"""
    
    # 1. Отзываем все токены пользователя
    sessions = SessionTokenManager()
    revoked_count = await sessions.revoke_user_sessions(user_id)
    
    # 2. Блокируем аккаунт
    author = await Author.get(user_id)
    author.account_locked_until = int(time.time()) + 3600  # 1 час
    await author.save()
    
    # 3. Логируем инцидент
    await log_security_event(
        "token_compromise",
        user_id=user_id,
        details={
            "reason": reason,
            "revoked_tokens": revoked_count,
            "account_locked": True
        }
    )
    
    # 4. Уведомляем пользователя
    await send_security_notification(user_id, "token_compromise")

Brute Force Response

async def handle_brute_force(ip_address: str, attempts: int):
    """Обработка брутфорс атаки"""
    
    # 1. Блокируем IP
    await block_ip_address(ip_address, duration=3600)
    
    # 2. Логируем атаку
    await log_security_event(
        "brute_force_attack",
        ip_address=ip_address,
        details={"attempts": attempts}
    )
    
    # 3. Уведомляем администраторов
    await notify_admins("brute_force_detected", {
        "ip": ip_address,
        "attempts": attempts
    })

📚 Security Best Practices

Development

  • Secure by Default: Безопасные настройки по умолчанию
  • Fail Securely: При ошибках блокируем доступ
  • Defense in Depth: Многоуровневая защита
  • Principle of Least Privilege: Минимальные права

Operations

  • Regular Updates: Обновление зависимостей
  • Security Monitoring: Постоянный мониторинг
  • Incident Response: Готовность к инцидентам
  • Regular Audits: Регулярные аудиты безопасности

Compliance

  • GDPR: Защита персональных данных
  • OWASP: Следование рекомендациям OWASP
  • Security Standards: Соответствие стандартам
  • Documentation: Документирование процедур