All checks were successful
Deploy on push / deploy (push) Successful in 5m47s
- **🔍 Comprehensive authentication documentation refactoring**: Полная переработка документации аутентификации
- Обновлена таблица содержания в README.md
- Исправлены архитектурные диаграммы - токены хранятся только в Redis
- Добавлены практические примеры кода для микросервисов
- Консолидирована OAuth документация
17 KiB
17 KiB
🔒 Безопасность системы аутентификации
🎯 Обзор
Комплексная система безопасности с многоуровневой защитой от различных типов атак.
🛡️ Основные принципы безопасности
1. Defense in Depth
- Многоуровневая защита: JWT + Redis + RBAC + Rate Limiting
- Fail Secure: При ошибках система блокирует доступ
- Principle of Least Privilege: Минимальные необходимые права
2. Zero Trust Architecture
- Verify Everything: Каждый запрос проверяется
- Never Trust, Always Verify: Нет доверенных зон
- Continuous Validation: Постоянная проверка токенов
🔐 JWT Security
Алгоритм и ключи
# settings.py
JWT_ALGORITHM = "HS256" # HMAC with SHA-256
JWT_SECRET = os.getenv("JWT_SECRET") # Минимум 256 бит
JWT_EXPIRATION_DELTA = 30 * 24 * 60 * 60 # 30 дней
Структура токена
# JWT Payload
{
"user_id": "123",
"username": "john_doe",
"iat": 1640995200, # Issued At
"exp": 1643587200 # Expiration
}
Лучшие практики JWT
- Короткое время жизни: Максимум 30 дней
- Secure Secret: Криптографически стойкий ключ
- No Sensitive Data: Только необходимые данные в payload
- Revocation Support: Redis для отзыва токенов
🍪 Cookie Security
httpOnly Cookies
# Настройки cookie
SESSION_COOKIE_NAME = "session_token"
SESSION_COOKIE_HTTPONLY = True # Защита от XSS
SESSION_COOKIE_SECURE = True # Только HTTPS
SESSION_COOKIE_SAMESITE = "lax" # CSRF защита
SESSION_COOKIE_MAX_AGE = 30 * 24 * 60 * 60
Защита от атак
- XSS Protection: httpOnly cookies недоступны JavaScript
- CSRF Protection: SameSite=lax предотвращает CSRF
- Secure Flag: Передача только по HTTPS
- Path Restriction: Ограничение области действия
🔑 Password Security
Хеширование паролей
from passlib.context import CryptContext
pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12 # Увеличенная сложность
)
def hash_password(password: str) -> str:
"""Хеширует пароль с использованием bcrypt"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Проверяет пароль"""
return pwd_context.verify(plain_password, hashed_password)
Требования к паролям
import re
def validate_password_strength(password: str) -> bool:
"""Проверка силы пароля"""
if len(password) < 8:
return False
# Проверки
has_upper = re.search(r'[A-Z]', password)
has_lower = re.search(r'[a-z]', password)
has_digit = re.search(r'\d', password)
has_special = re.search(r'[!@#$%^&*(),.?":{}|<>]', password)
return all([has_upper, has_lower, has_digit, has_special])
🚫 Защита от брутфорса
Account Lockout
async def handle_login_attempt(author: Author, success: bool) -> None:
"""Обрабатывает попытку входа"""
if not success:
# Увеличиваем счетчик неудачных попыток
author.failed_login_attempts += 1
if author.failed_login_attempts >= 5:
# Блокируем аккаунт на 30 минут
author.account_locked_until = int(time.time()) + 1800
logger.warning(f"Аккаунт {author.email} заблокирован")
else:
# Сбрасываем счетчик при успешном входе
author.failed_login_attempts = 0
author.account_locked_until = None
Rate Limiting
from collections import defaultdict
import time
# Rate limiter
request_counts = defaultdict(list)
async def rate_limit_check(
identifier: str,
max_requests: int = 10,
window_seconds: int = 60
) -> bool:
"""Проверка rate limiting"""
current_time = time.time()
user_requests = request_counts[identifier]
# Удаляем старые запросы
user_requests[:] = [
req_time for req_time in user_requests
if current_time - req_time < window_seconds
]
# Проверяем лимит
if len(user_requests) >= max_requests:
return False
# Добавляем текущий запрос
user_requests.append(current_time)
return True
🔒 Redis Security
Secure Configuration
# Redis настройки безопасности
REDIS_CONFIG = {
"socket_keepalive": True,
"socket_keepalive_options": {},
"health_check_interval": 30,
"retry_on_timeout": True,
"socket_timeout": 5,
"socket_connect_timeout": 5
}
TTL для всех ключей
async def secure_redis_set(key: str, value: str, ttl: int = 3600):
"""Безопасная установка значения с обязательным TTL"""
await redis.setex(key, ttl, value)
# Проверяем, что TTL установлен
actual_ttl = await redis.ttl(key)
if actual_ttl <= 0:
logger.error(f"TTL не установлен для ключа: {key}")
await redis.delete(key)
Атомарные операции
async def atomic_session_update(user_id: str, token: str, data: dict):
"""Атомарное обновление сессии"""
async with redis.pipeline(transaction=True) as pipe:
try:
# Начинаем транзакцию
await pipe.multi()
# Обновляем данные сессии
session_key = f"session:{user_id}:{token}"
await pipe.hset(session_key, mapping=data)
await pipe.expire(session_key, 30 * 24 * 60 * 60)
# Обновляем список активных сессий
sessions_key = f"user_sessions:{user_id}"
await pipe.sadd(sessions_key, token)
await pipe.expire(sessions_key, 30 * 24 * 60 * 60)
# Выполняем транзакцию
await pipe.execute()
except Exception as e:
logger.error(f"Ошибка атомарной операции: {e}")
raise
🛡️ OAuth Security
State Parameter Protection
import secrets
def generate_oauth_state() -> str:
"""Генерация криптографически стойкого state"""
return secrets.token_urlsafe(32)
async def validate_oauth_state(received_state: str, stored_state: str) -> bool:
"""Безопасная проверка state"""
if not received_state or not stored_state:
return False
# Используем constant-time comparison
return secrets.compare_digest(received_state, stored_state)
PKCE Support
import base64
import hashlib
def generate_code_verifier() -> str:
"""Генерация code verifier для PKCE"""
return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=')
def generate_code_challenge(verifier: str) -> str:
"""Генерация code challenge"""
digest = hashlib.sha256(verifier.encode('utf-8')).digest()
return base64.urlsafe_b64encode(digest).decode('utf-8').rstrip('=')
Redirect URI Validation
from urllib.parse import urlparse
def validate_redirect_uri(uri: str) -> bool:
"""Валидация redirect URI"""
allowed_domains = [
"localhost:3000",
"discours.io",
"new.discours.io"
]
try:
parsed = urlparse(uri)
# Проверяем схему
if parsed.scheme not in ['http', 'https']:
return False
# Проверяем домен
if not any(domain in parsed.netloc for domain in allowed_domains):
return False
# Проверяем на открытые редиректы
if parsed.netloc != parsed.netloc.lower():
return False
return True
except Exception:
return False
🔍 Input Validation
Request Validation
from pydantic import BaseModel, EmailStr, validator
class LoginRequest(BaseModel):
email: EmailStr
password: str
@validator('password')
def validate_password(cls, v):
if len(v) < 8:
raise ValueError('Password too short')
return v
class RegisterRequest(BaseModel):
email: EmailStr
password: str
name: str
@validator('name')
def validate_name(cls, v):
if len(v.strip()) < 2:
raise ValueError('Name too short')
# Защита от XSS
if '<' in v or '>' in v:
raise ValueError('Invalid characters in name')
return v.strip()
SQL Injection Prevention
# Используем ORM и параметризованные запросы
from sqlalchemy import text
# ✅ Безопасно
async def get_user_by_email(email: str):
query = text("SELECT * FROM authors WHERE email = :email")
result = await db.execute(query, {"email": email})
return result.fetchone()
# ❌ Небезопасно
async def unsafe_query(email: str):
query = f"SELECT * FROM authors WHERE email = '{email}'" # SQL Injection!
return await db.execute(query)
🚨 Security Headers
HTTP Security Headers
def add_security_headers(response):
"""Добавляет заголовки безопасности"""
response.headers.update({
# XSS Protection
"X-XSS-Protection": "1; mode=block",
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
# HTTPS Enforcement
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
# Content Security Policy
"Content-Security-Policy": (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"connect-src 'self' https://api.discours.io"
),
# Referrer Policy
"Referrer-Policy": "strict-origin-when-cross-origin",
# Permissions Policy
"Permissions-Policy": "geolocation=(), microphone=(), camera=()"
})
📊 Security Monitoring
Audit Logging
import json
from datetime import datetime
async def log_security_event(
event_type: str,
user_id: str = None,
ip_address: str = None,
user_agent: str = None,
success: bool = True,
details: dict = None
):
"""Логирование событий безопасности"""
event = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"user_id": user_id,
"ip_address": ip_address,
"user_agent": user_agent,
"success": success,
"details": details or {}
}
# Логируем в файл аудита
logger.info("security_event", extra=event)
# Отправляем критические события в SIEM
if event_type in ["login_failed", "account_locked", "token_stolen"]:
await send_to_siem(event)
Anomaly Detection
from collections import defaultdict
import asyncio
# Детектор аномалий
anomaly_tracker = defaultdict(list)
async def detect_anomalies(user_id: str, event_type: str, ip_address: str):
"""Детекция аномальной активности"""
current_time = time.time()
user_events = anomaly_tracker[user_id]
# Добавляем событие
user_events.append({
"type": event_type,
"ip": ip_address,
"time": current_time
})
# Очищаем старые события (последний час)
user_events[:] = [
event for event in user_events
if current_time - event["time"] < 3600
]
# Проверяем аномалии
if len(user_events) > 50: # Слишком много событий
await log_security_event(
"anomaly_detected",
user_id=user_id,
details={"reason": "too_many_events", "count": len(user_events)}
)
# Проверяем множественные IP
unique_ips = set(event["ip"] for event in user_events)
if len(unique_ips) > 5: # Слишком много IP адресов
await log_security_event(
"anomaly_detected",
user_id=user_id,
details={"reason": "multiple_ips", "ips": list(unique_ips)}
)
🔧 Security Configuration
Environment Variables
# JWT Security
JWT_SECRET=your_super_secret_key_minimum_256_bits
JWT_ALGORITHM=HS256
JWT_EXPIRATION_HOURS=720
# Cookie Security
SESSION_COOKIE_SECURE=true
SESSION_COOKIE_HTTPONLY=true
SESSION_COOKIE_SAMESITE=lax
# Rate Limiting
RATE_LIMIT_ENABLED=true
RATE_LIMIT_REQUESTS=100
RATE_LIMIT_WINDOW=3600
# Security Features
ACCOUNT_LOCKOUT_ENABLED=true
MAX_LOGIN_ATTEMPTS=5
LOCKOUT_DURATION=1800
# HTTPS Enforcement
FORCE_HTTPS=true
HSTS_MAX_AGE=31536000
Production Checklist
Authentication Security
- JWT secret минимум 256 бит
- Короткое время жизни токенов (≤ 30 дней)
- httpOnly cookies включены
- Secure cookies для HTTPS
- SameSite cookies настроены
Password Security
- bcrypt с rounds ≥ 12
- Требования к сложности паролей
- Защита от брутфорса
- Account lockout настроен
OAuth Security
- State parameter валидация
- PKCE поддержка включена
- Redirect URI валидация
- Secure client secrets
Infrastructure Security
- HTTPS принудительно
- Security headers настроены
- Rate limiting включен
- Audit logging работает
Redis Security
- TTL для всех ключей
- Атомарные операции
- Connection pooling
- Health checks
🚨 Incident Response
Security Incident Types
- Token Compromise: Подозрение на кражу токенов
- Brute Force Attack: Массовые попытки входа
- Account Takeover: Несанкционированный доступ
- Data Breach: Утечка данных
- System Compromise: Компрометация системы
Response Procedures
Token Compromise
async def handle_token_compromise(user_id: str, reason: str):
"""Обработка компрометации токена"""
# 1. Отзываем все токены пользователя
sessions = SessionTokenManager()
revoked_count = await sessions.revoke_user_sessions(user_id)
# 2. Блокируем аккаунт
author = await Author.get(user_id)
author.account_locked_until = int(time.time()) + 3600 # 1 час
await author.save()
# 3. Логируем инцидент
await log_security_event(
"token_compromise",
user_id=user_id,
details={
"reason": reason,
"revoked_tokens": revoked_count,
"account_locked": True
}
)
# 4. Уведомляем пользователя
await send_security_notification(user_id, "token_compromise")
Brute Force Response
async def handle_brute_force(ip_address: str, attempts: int):
"""Обработка брутфорс атаки"""
# 1. Блокируем IP
await block_ip_address(ip_address, duration=3600)
# 2. Логируем атаку
await log_security_event(
"brute_force_attack",
ip_address=ip_address,
details={"attempts": attempts}
)
# 3. Уведомляем администраторов
await notify_admins("brute_force_detected", {
"ip": ip_address,
"attempts": attempts
})
📚 Security Best Practices
Development
- Secure by Default: Безопасные настройки по умолчанию
- Fail Securely: При ошибках блокируем доступ
- Defense in Depth: Многоуровневая защита
- Principle of Least Privilege: Минимальные права
Operations
- Regular Updates: Обновление зависимостей
- Security Monitoring: Постоянный мониторинг
- Incident Response: Готовность к инцидентам
- Regular Audits: Регулярные аудиты безопасности
Compliance
- GDPR: Защита персональных данных
- OWASP: Следование рекомендациям OWASP
- Security Standards: Соответствие стандартам
- Documentation: Документирование процедур