Files
core/docs/auth/security.md

580 lines
17 KiB
Markdown
Raw Normal View History

# 🔒 Безопасность системы аутентификации
## 🎯 Обзор
Комплексная система безопасности с многоуровневой защитой от различных типов атак.
## 🛡️ Основные принципы безопасности
### 1. Defense in Depth
- **Многоуровневая защита**: JWT + Redis + RBAC + Rate Limiting
- **Fail Secure**: При ошибках система блокирует доступ
- **Principle of Least Privilege**: Минимальные необходимые права
### 2. Zero Trust Architecture
- **Verify Everything**: Каждый запрос проверяется
- **Never Trust, Always Verify**: Нет доверенных зон
- **Continuous Validation**: Постоянная проверка токенов
## 🔐 JWT Security
### Алгоритм и ключи
```python
# settings.py
JWT_ALGORITHM = "HS256" # HMAC with SHA-256
2025-09-30 21:48:29 +03:00
JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY") # Минимум 256 бит
JWT_EXPIRATION_DELTA = 30 * 24 * 60 * 60 # 30 дней
```
### Структура токена
```python
# JWT Payload
{
"user_id": "123",
"username": "john_doe",
"iat": 1640995200, # Issued At
"exp": 1643587200 # Expiration
}
```
### Лучшие практики JWT
- **Короткое время жизни**: Максимум 30 дней
- **Secure Secret**: Криптографически стойкий ключ
- **No Sensitive Data**: Только необходимые данные в payload
- **Revocation Support**: Redis для отзыва токенов
## 🍪 Cookie Security
### httpOnly Cookies
```python
# Настройки cookie
SESSION_COOKIE_NAME = "session_token"
SESSION_COOKIE_HTTPONLY = True # Защита от XSS
SESSION_COOKIE_SECURE = True # Только HTTPS
SESSION_COOKIE_SAMESITE = "lax" # CSRF защита
SESSION_COOKIE_MAX_AGE = 30 * 24 * 60 * 60
```
### Защита от атак
- **XSS Protection**: httpOnly cookies недоступны JavaScript
- **CSRF Protection**: SameSite=lax предотвращает CSRF
- **Secure Flag**: Передача только по HTTPS
- **Path Restriction**: Ограничение области действия
## 🔑 Password Security
### Хеширование паролей
```python
from passlib.context import CryptContext
pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12 # Увеличенная сложность
)
def hash_password(password: str) -> str:
"""Хеширует пароль с использованием bcrypt"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Проверяет пароль"""
return pwd_context.verify(plain_password, hashed_password)
```
### Требования к паролям
```python
import re
def validate_password_strength(password: str) -> bool:
"""Проверка силы пароля"""
if len(password) < 8:
return False
# Проверки
has_upper = re.search(r'[A-Z]', password)
has_lower = re.search(r'[a-z]', password)
has_digit = re.search(r'\d', password)
has_special = re.search(r'[!@#$%^&*(),.?":{}|<>]', password)
return all([has_upper, has_lower, has_digit, has_special])
```
## 🚫 Защита от брутфорса
### Account Lockout
```python
async def handle_login_attempt(author: Author, success: bool) -> None:
"""Обрабатывает попытку входа"""
if not success:
# Увеличиваем счетчик неудачных попыток
author.failed_login_attempts += 1
if author.failed_login_attempts >= 5:
# Блокируем аккаунт на 30 минут
author.account_locked_until = int(time.time()) + 1800
logger.warning(f"Аккаунт {author.email} заблокирован")
else:
# Сбрасываем счетчик при успешном входе
author.failed_login_attempts = 0
author.account_locked_until = None
```
### Rate Limiting
```python
from collections import defaultdict
import time
# Rate limiter
request_counts = defaultdict(list)
async def rate_limit_check(
identifier: str,
max_requests: int = 10,
window_seconds: int = 60
) -> bool:
"""Проверка rate limiting"""
current_time = time.time()
user_requests = request_counts[identifier]
# Удаляем старые запросы
user_requests[:] = [
req_time for req_time in user_requests
if current_time - req_time < window_seconds
]
# Проверяем лимит
if len(user_requests) >= max_requests:
return False
# Добавляем текущий запрос
user_requests.append(current_time)
return True
```
## 🔒 Redis Security
### Secure Configuration
```python
# Redis настройки безопасности
REDIS_CONFIG = {
"socket_keepalive": True,
"socket_keepalive_options": {},
"health_check_interval": 30,
"retry_on_timeout": True,
"socket_timeout": 5,
"socket_connect_timeout": 5
}
```
### TTL для всех ключей
```python
async def secure_redis_set(key: str, value: str, ttl: int = 3600):
"""Безопасная установка значения с обязательным TTL"""
await redis.setex(key, ttl, value)
# Проверяем, что TTL установлен
actual_ttl = await redis.ttl(key)
if actual_ttl <= 0:
logger.error(f"TTL не установлен для ключа: {key}")
await redis.delete(key)
```
### Атомарные операции
```python
async def atomic_session_update(user_id: str, token: str, data: dict):
"""Атомарное обновление сессии"""
async with redis.pipeline(transaction=True) as pipe:
try:
# Начинаем транзакцию
await pipe.multi()
# Обновляем данные сессии
session_key = f"session:{user_id}:{token}"
await pipe.hset(session_key, mapping=data)
await pipe.expire(session_key, 30 * 24 * 60 * 60)
# Обновляем список активных сессий
sessions_key = f"user_sessions:{user_id}"
await pipe.sadd(sessions_key, token)
await pipe.expire(sessions_key, 30 * 24 * 60 * 60)
# Выполняем транзакцию
await pipe.execute()
except Exception as e:
logger.error(f"Ошибка атомарной операции: {e}")
raise
```
## 🛡️ OAuth Security
### State Parameter Protection
```python
import secrets
def generate_oauth_state() -> str:
"""Генерация криптографически стойкого state"""
return secrets.token_urlsafe(32)
async def validate_oauth_state(received_state: str, stored_state: str) -> bool:
"""Безопасная проверка state"""
if not received_state or not stored_state:
return False
# Используем constant-time comparison
return secrets.compare_digest(received_state, stored_state)
```
### PKCE Support
```python
import base64
import hashlib
def generate_code_verifier() -> str:
"""Генерация code verifier для PKCE"""
return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=')
def generate_code_challenge(verifier: str) -> str:
"""Генерация code challenge"""
digest = hashlib.sha256(verifier.encode('utf-8')).digest()
return base64.urlsafe_b64encode(digest).decode('utf-8').rstrip('=')
```
### Redirect URI Validation
```python
from urllib.parse import urlparse
def validate_redirect_uri(uri: str) -> bool:
"""Валидация redirect URI"""
allowed_domains = [
"localhost:3000",
"discours.io",
"new.discours.io"
]
try:
parsed = urlparse(uri)
# Проверяем схему
if parsed.scheme not in ['http', 'https']:
return False
# Проверяем домен
if not any(domain in parsed.netloc for domain in allowed_domains):
return False
# Проверяем на открытые редиректы
if parsed.netloc != parsed.netloc.lower():
return False
return True
except Exception:
return False
```
## 🔍 Input Validation
### Request Validation
```python
from pydantic import BaseModel, EmailStr, validator
class LoginRequest(BaseModel):
email: EmailStr
password: str
@validator('password')
def validate_password(cls, v):
if len(v) < 8:
raise ValueError('Password too short')
return v
class RegisterRequest(BaseModel):
email: EmailStr
password: str
name: str
@validator('name')
def validate_name(cls, v):
if len(v.strip()) < 2:
raise ValueError('Name too short')
# Защита от XSS
if '<' in v or '>' in v:
raise ValueError('Invalid characters in name')
return v.strip()
```
### SQL Injection Prevention
```python
# Используем ORM и параметризованные запросы
from sqlalchemy import text
# ✅ Безопасно
async def get_user_by_email(email: str):
query = text("SELECT * FROM authors WHERE email = :email")
result = await db.execute(query, {"email": email})
return result.fetchone()
# ❌ Небезопасно
async def unsafe_query(email: str):
query = f"SELECT * FROM authors WHERE email = '{email}'" # SQL Injection!
return await db.execute(query)
```
## 🚨 Security Headers
### HTTP Security Headers
```python
def add_security_headers(response):
"""Добавляет заголовки безопасности"""
response.headers.update({
# XSS Protection
"X-XSS-Protection": "1; mode=block",
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
# HTTPS Enforcement
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
# Content Security Policy
"Content-Security-Policy": (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"connect-src 'self' https://api.discours.io"
),
# Referrer Policy
"Referrer-Policy": "strict-origin-when-cross-origin",
# Permissions Policy
"Permissions-Policy": "geolocation=(), microphone=(), camera=()"
})
```
## 📊 Security Monitoring
### Audit Logging
```python
import json
from datetime import datetime
async def log_security_event(
event_type: str,
user_id: str = None,
ip_address: str = None,
user_agent: str = None,
success: bool = True,
details: dict = None
):
"""Логирование событий безопасности"""
event = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"user_id": user_id,
"ip_address": ip_address,
"user_agent": user_agent,
"success": success,
"details": details or {}
}
# Логируем в файл аудита
logger.info("security_event", extra=event)
# Отправляем критические события в SIEM
if event_type in ["login_failed", "account_locked", "token_stolen"]:
await send_to_siem(event)
```
### Anomaly Detection
```python
from collections import defaultdict
import asyncio
# Детектор аномалий
anomaly_tracker = defaultdict(list)
async def detect_anomalies(user_id: str, event_type: str, ip_address: str):
"""Детекция аномальной активности"""
current_time = time.time()
user_events = anomaly_tracker[user_id]
# Добавляем событие
user_events.append({
"type": event_type,
"ip": ip_address,
"time": current_time
})
# Очищаем старые события (последний час)
user_events[:] = [
event for event in user_events
if current_time - event["time"] < 3600
]
# Проверяем аномалии
if len(user_events) > 50: # Слишком много событий
await log_security_event(
"anomaly_detected",
user_id=user_id,
details={"reason": "too_many_events", "count": len(user_events)}
)
# Проверяем множественные IP
unique_ips = set(event["ip"] for event in user_events)
if len(unique_ips) > 5: # Слишком много IP адресов
await log_security_event(
"anomaly_detected",
user_id=user_id,
details={"reason": "multiple_ips", "ips": list(unique_ips)}
)
```
## 🔧 Security Configuration
### Environment Variables
```bash
# JWT Security
2025-09-30 21:48:29 +03:00
JWT_SECRET_KEY=your_super_secret_key_minimum_256_bits
JWT_ALGORITHM=HS256
JWT_EXPIRATION_HOURS=720
# Cookie Security
SESSION_COOKIE_SECURE=true
SESSION_COOKIE_HTTPONLY=true
SESSION_COOKIE_SAMESITE=lax
# Rate Limiting
RATE_LIMIT_ENABLED=true
RATE_LIMIT_REQUESTS=100
RATE_LIMIT_WINDOW=3600
# Security Features
ACCOUNT_LOCKOUT_ENABLED=true
MAX_LOGIN_ATTEMPTS=5
LOCKOUT_DURATION=1800
# HTTPS Enforcement
FORCE_HTTPS=true
HSTS_MAX_AGE=31536000
```
### Production Checklist
#### Authentication Security
- [ ] JWT secret минимум 256 бит
- [ ] Короткое время жизни токенов (≤ 30 дней)
- [ ] httpOnly cookies включены
- [ ] Secure cookies для HTTPS
- [ ] SameSite cookies настроены
#### Password Security
- [ ] bcrypt с rounds ≥ 12
- [ ] Требования к сложности паролей
- [ ] Защита от брутфорса
- [ ] Account lockout настроен
#### OAuth Security
- [ ] State parameter валидация
- [ ] PKCE поддержка включена
- [ ] Redirect URI валидация
- [ ] Secure client secrets
#### Infrastructure Security
- [ ] HTTPS принудительно
- [ ] Security headers настроены
- [ ] Rate limiting включен
- [ ] Audit logging работает
#### Redis Security
- [ ] TTL для всех ключей
- [ ] Атомарные операции
- [ ] Connection pooling
- [ ] Health checks
## 🚨 Incident Response
### Security Incident Types
1. **Token Compromise**: Подозрение на кражу токенов
2. **Brute Force Attack**: Массовые попытки входа
3. **Account Takeover**: Несанкционированный доступ
4. **Data Breach**: Утечка данных
5. **System Compromise**: Компрометация системы
### Response Procedures
#### Token Compromise
```python
async def handle_token_compromise(user_id: str, reason: str):
"""Обработка компрометации токена"""
# 1. Отзываем все токены пользователя
sessions = SessionTokenManager()
revoked_count = await sessions.revoke_user_sessions(user_id)
# 2. Блокируем аккаунт
author = await Author.get(user_id)
author.account_locked_until = int(time.time()) + 3600 # 1 час
await author.save()
# 3. Логируем инцидент
await log_security_event(
"token_compromise",
user_id=user_id,
details={
"reason": reason,
"revoked_tokens": revoked_count,
"account_locked": True
}
)
# 4. Уведомляем пользователя
await send_security_notification(user_id, "token_compromise")
```
#### Brute Force Response
```python
async def handle_brute_force(ip_address: str, attempts: int):
"""Обработка брутфорс атаки"""
# 1. Блокируем IP
await block_ip_address(ip_address, duration=3600)
# 2. Логируем атаку
await log_security_event(
"brute_force_attack",
ip_address=ip_address,
details={"attempts": attempts}
)
# 3. Уведомляем администраторов
await notify_admins("brute_force_detected", {
"ip": ip_address,
"attempts": attempts
})
```
## 📚 Security Best Practices
### Development
- **Secure by Default**: Безопасные настройки по умолчанию
- **Fail Securely**: При ошибках блокируем доступ
- **Defense in Depth**: Многоуровневая защита
- **Principle of Least Privilege**: Минимальные права
### Operations
- **Regular Updates**: Обновление зависимостей
- **Security Monitoring**: Постоянный мониторинг
- **Incident Response**: Готовность к инцидентам
- **Regular Audits**: Регулярные аудиты безопасности
### Compliance
- **GDPR**: Защита персональных данных
- **OWASP**: Следование рекомендациям OWASP
- **Security Standards**: Соответствие стандартам
- **Documentation**: Документирование процедур