# 🔒 Безопасность системы аутентификации ## 🎯 Обзор Комплексная система безопасности с многоуровневой защитой от различных типов атак. ## 🛡️ Основные принципы безопасности ### 1. Defense in Depth - **Многоуровневая защита**: JWT + Redis + RBAC + Rate Limiting - **Fail Secure**: При ошибках система блокирует доступ - **Principle of Least Privilege**: Минимальные необходимые права ### 2. Zero Trust Architecture - **Verify Everything**: Каждый запрос проверяется - **Never Trust, Always Verify**: Нет доверенных зон - **Continuous Validation**: Постоянная проверка токенов ## 🔐 JWT Security ### Алгоритм и ключи ```python # settings.py JWT_ALGORITHM = "HS256" # HMAC with SHA-256 JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY") # Минимум 256 бит JWT_EXPIRATION_DELTA = 30 * 24 * 60 * 60 # 30 дней ``` ### Структура токена ```python # JWT Payload { "user_id": "123", "username": "john_doe", "iat": 1640995200, # Issued At "exp": 1643587200 # Expiration } ``` ### Лучшие практики JWT - **Короткое время жизни**: Максимум 30 дней - **Secure Secret**: Криптографически стойкий ключ - **No Sensitive Data**: Только необходимые данные в payload - **Revocation Support**: Redis для отзыва токенов ## 🍪 Cookie Security ### httpOnly Cookies ```python # Настройки cookie SESSION_COOKIE_NAME = "session_token" SESSION_COOKIE_HTTPONLY = True # Защита от XSS SESSION_COOKIE_SECURE = True # Только HTTPS SESSION_COOKIE_SAMESITE = "lax" # CSRF защита SESSION_COOKIE_MAX_AGE = 30 * 24 * 60 * 60 ``` ### Защита от атак - **XSS Protection**: httpOnly cookies недоступны JavaScript - **CSRF Protection**: SameSite=lax предотвращает CSRF - **Secure Flag**: Передача только по HTTPS - **Path Restriction**: Ограничение области действия ## 🔑 Password Security ### Хеширование паролей ```python from passlib.context import CryptContext pwd_context = CryptContext( schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=12 # Увеличенная сложность ) def hash_password(password: str) -> str: """Хеширует пароль с использованием bcrypt""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """Проверяет пароль""" return pwd_context.verify(plain_password, hashed_password) ``` ### Требования к паролям ```python import re def validate_password_strength(password: str) -> bool: """Проверка силы пароля""" if len(password) < 8: return False # Проверки has_upper = re.search(r'[A-Z]', password) has_lower = re.search(r'[a-z]', password) has_digit = re.search(r'\d', password) has_special = re.search(r'[!@#$%^&*(),.?":{}|<>]', password) return all([has_upper, has_lower, has_digit, has_special]) ``` ## 🚫 Защита от брутфорса ### Account Lockout ```python async def handle_login_attempt(author: Author, success: bool) -> None: """Обрабатывает попытку входа""" if not success: # Увеличиваем счетчик неудачных попыток author.failed_login_attempts += 1 if author.failed_login_attempts >= 5: # Блокируем аккаунт на 30 минут author.account_locked_until = int(time.time()) + 1800 logger.warning(f"Аккаунт {author.email} заблокирован") else: # Сбрасываем счетчик при успешном входе author.failed_login_attempts = 0 author.account_locked_until = None ``` ### Rate Limiting ```python from collections import defaultdict import time # Rate limiter request_counts = defaultdict(list) async def rate_limit_check( identifier: str, max_requests: int = 10, window_seconds: int = 60 ) -> bool: """Проверка rate limiting""" current_time = time.time() user_requests = request_counts[identifier] # Удаляем старые запросы user_requests[:] = [ req_time for req_time in user_requests if current_time - req_time < window_seconds ] # Проверяем лимит if len(user_requests) >= max_requests: return False # Добавляем текущий запрос user_requests.append(current_time) return True ``` ## 🔒 Redis Security ### Secure Configuration ```python # Redis настройки безопасности REDIS_CONFIG = { "socket_keepalive": True, "socket_keepalive_options": {}, "health_check_interval": 30, "retry_on_timeout": True, "socket_timeout": 5, "socket_connect_timeout": 5 } ``` ### TTL для всех ключей ```python async def secure_redis_set(key: str, value: str, ttl: int = 3600): """Безопасная установка значения с обязательным TTL""" await redis.setex(key, ttl, value) # Проверяем, что TTL установлен actual_ttl = await redis.ttl(key) if actual_ttl <= 0: logger.error(f"TTL не установлен для ключа: {key}") await redis.delete(key) ``` ### Атомарные операции ```python async def atomic_session_update(user_id: str, token: str, data: dict): """Атомарное обновление сессии""" async with redis.pipeline(transaction=True) as pipe: try: # Начинаем транзакцию await pipe.multi() # Обновляем данные сессии session_key = f"session:{user_id}:{token}" await pipe.hset(session_key, mapping=data) await pipe.expire(session_key, 30 * 24 * 60 * 60) # Обновляем список активных сессий sessions_key = f"user_sessions:{user_id}" await pipe.sadd(sessions_key, token) await pipe.expire(sessions_key, 30 * 24 * 60 * 60) # Выполняем транзакцию await pipe.execute() except Exception as e: logger.error(f"Ошибка атомарной операции: {e}") raise ``` ## 🛡️ OAuth Security ### State Parameter Protection ```python import secrets def generate_oauth_state() -> str: """Генерация криптографически стойкого state""" return secrets.token_urlsafe(32) async def validate_oauth_state(received_state: str, stored_state: str) -> bool: """Безопасная проверка state""" if not received_state or not stored_state: return False # Используем constant-time comparison return secrets.compare_digest(received_state, stored_state) ``` ### PKCE Support ```python import base64 import hashlib def generate_code_verifier() -> str: """Генерация code verifier для PKCE""" return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=') def generate_code_challenge(verifier: str) -> str: """Генерация code challenge""" digest = hashlib.sha256(verifier.encode('utf-8')).digest() return base64.urlsafe_b64encode(digest).decode('utf-8').rstrip('=') ``` ### Redirect URI Validation ```python from urllib.parse import urlparse def validate_redirect_uri(uri: str) -> bool: """Валидация redirect URI""" allowed_domains = [ "localhost:3000", "discours.io", "new.discours.io" ] try: parsed = urlparse(uri) # Проверяем схему if parsed.scheme not in ['http', 'https']: return False # Проверяем домен if not any(domain in parsed.netloc for domain in allowed_domains): return False # Проверяем на открытые редиректы if parsed.netloc != parsed.netloc.lower(): return False return True except Exception: return False ``` ## 🔍 Input Validation ### Request Validation ```python from pydantic import BaseModel, EmailStr, validator class LoginRequest(BaseModel): email: EmailStr password: str @validator('password') def validate_password(cls, v): if len(v) < 8: raise ValueError('Password too short') return v class RegisterRequest(BaseModel): email: EmailStr password: str name: str @validator('name') def validate_name(cls, v): if len(v.strip()) < 2: raise ValueError('Name too short') # Защита от XSS if '<' in v or '>' in v: raise ValueError('Invalid characters in name') return v.strip() ``` ### SQL Injection Prevention ```python # Используем ORM и параметризованные запросы from sqlalchemy import text # ✅ Безопасно async def get_user_by_email(email: str): query = text("SELECT * FROM authors WHERE email = :email") result = await db.execute(query, {"email": email}) return result.fetchone() # ❌ Небезопасно async def unsafe_query(email: str): query = f"SELECT * FROM authors WHERE email = '{email}'" # SQL Injection! return await db.execute(query) ``` ## 🚨 Security Headers ### HTTP Security Headers ```python def add_security_headers(response): """Добавляет заголовки безопасности""" response.headers.update({ # XSS Protection "X-XSS-Protection": "1; mode=block", "X-Content-Type-Options": "nosniff", "X-Frame-Options": "DENY", # HTTPS Enforcement "Strict-Transport-Security": "max-age=31536000; includeSubDomains", # Content Security Policy "Content-Security-Policy": ( "default-src 'self'; " "script-src 'self' 'unsafe-inline'; " "style-src 'self' 'unsafe-inline'; " "img-src 'self' data: https:; " "connect-src 'self' https://api.discours.io" ), # Referrer Policy "Referrer-Policy": "strict-origin-when-cross-origin", # Permissions Policy "Permissions-Policy": "geolocation=(), microphone=(), camera=()" }) ``` ## 📊 Security Monitoring ### Audit Logging ```python import json from datetime import datetime async def log_security_event( event_type: str, user_id: str = None, ip_address: str = None, user_agent: str = None, success: bool = True, details: dict = None ): """Логирование событий безопасности""" event = { "timestamp": datetime.utcnow().isoformat(), "event_type": event_type, "user_id": user_id, "ip_address": ip_address, "user_agent": user_agent, "success": success, "details": details or {} } # Логируем в файл аудита logger.info("security_event", extra=event) # Отправляем критические события в SIEM if event_type in ["login_failed", "account_locked", "token_stolen"]: await send_to_siem(event) ``` ### Anomaly Detection ```python from collections import defaultdict import asyncio # Детектор аномалий anomaly_tracker = defaultdict(list) async def detect_anomalies(user_id: str, event_type: str, ip_address: str): """Детекция аномальной активности""" current_time = time.time() user_events = anomaly_tracker[user_id] # Добавляем событие user_events.append({ "type": event_type, "ip": ip_address, "time": current_time }) # Очищаем старые события (последний час) user_events[:] = [ event for event in user_events if current_time - event["time"] < 3600 ] # Проверяем аномалии if len(user_events) > 50: # Слишком много событий await log_security_event( "anomaly_detected", user_id=user_id, details={"reason": "too_many_events", "count": len(user_events)} ) # Проверяем множественные IP unique_ips = set(event["ip"] for event in user_events) if len(unique_ips) > 5: # Слишком много IP адресов await log_security_event( "anomaly_detected", user_id=user_id, details={"reason": "multiple_ips", "ips": list(unique_ips)} ) ``` ## 🔧 Security Configuration ### Environment Variables ```bash # JWT Security JWT_SECRET_KEY=your_super_secret_key_minimum_256_bits JWT_ALGORITHM=HS256 JWT_EXPIRATION_HOURS=720 # Cookie Security SESSION_COOKIE_SECURE=true SESSION_COOKIE_HTTPONLY=true SESSION_COOKIE_SAMESITE=lax # Rate Limiting RATE_LIMIT_ENABLED=true RATE_LIMIT_REQUESTS=100 RATE_LIMIT_WINDOW=3600 # Security Features ACCOUNT_LOCKOUT_ENABLED=true MAX_LOGIN_ATTEMPTS=5 LOCKOUT_DURATION=1800 # HTTPS Enforcement FORCE_HTTPS=true HSTS_MAX_AGE=31536000 ``` ### Production Checklist #### Authentication Security - [ ] JWT secret минимум 256 бит - [ ] Короткое время жизни токенов (≤ 30 дней) - [ ] httpOnly cookies включены - [ ] Secure cookies для HTTPS - [ ] SameSite cookies настроены #### Password Security - [ ] bcrypt с rounds ≥ 12 - [ ] Требования к сложности паролей - [ ] Защита от брутфорса - [ ] Account lockout настроен #### OAuth Security - [ ] State parameter валидация - [ ] PKCE поддержка включена - [ ] Redirect URI валидация - [ ] Secure client secrets #### Infrastructure Security - [ ] HTTPS принудительно - [ ] Security headers настроены - [ ] Rate limiting включен - [ ] Audit logging работает #### Redis Security - [ ] TTL для всех ключей - [ ] Атомарные операции - [ ] Connection pooling - [ ] Health checks ## 🚨 Incident Response ### Security Incident Types 1. **Token Compromise**: Подозрение на кражу токенов 2. **Brute Force Attack**: Массовые попытки входа 3. **Account Takeover**: Несанкционированный доступ 4. **Data Breach**: Утечка данных 5. **System Compromise**: Компрометация системы ### Response Procedures #### Token Compromise ```python async def handle_token_compromise(user_id: str, reason: str): """Обработка компрометации токена""" # 1. Отзываем все токены пользователя sessions = SessionTokenManager() revoked_count = await sessions.revoke_user_sessions(user_id) # 2. Блокируем аккаунт author = await Author.get(user_id) author.account_locked_until = int(time.time()) + 3600 # 1 час await author.save() # 3. Логируем инцидент await log_security_event( "token_compromise", user_id=user_id, details={ "reason": reason, "revoked_tokens": revoked_count, "account_locked": True } ) # 4. Уведомляем пользователя await send_security_notification(user_id, "token_compromise") ``` #### Brute Force Response ```python async def handle_brute_force(ip_address: str, attempts: int): """Обработка брутфорс атаки""" # 1. Блокируем IP await block_ip_address(ip_address, duration=3600) # 2. Логируем атаку await log_security_event( "brute_force_attack", ip_address=ip_address, details={"attempts": attempts} ) # 3. Уведомляем администраторов await notify_admins("brute_force_detected", { "ip": ip_address, "attempts": attempts }) ``` ## 📚 Security Best Practices ### Development - **Secure by Default**: Безопасные настройки по умолчанию - **Fail Securely**: При ошибках блокируем доступ - **Defense in Depth**: Многоуровневая защита - **Principle of Least Privilege**: Минимальные права ### Operations - **Regular Updates**: Обновление зависимостей - **Security Monitoring**: Постоянный мониторинг - **Incident Response**: Готовность к инцидентам - **Regular Audits**: Регулярные аудиты безопасности ### Compliance - **GDPR**: Защита персональных данных - **OWASP**: Следование рекомендациям OWASP - **Security Standards**: Соответствие стандартам - **Documentation**: Документирование процедур