Files
core/auth/oauth_security.py

301 lines
10 KiB
Python
Raw Normal View History

[0.9.29] - 2025-09-26 ### 🚨 CRITICAL Security Fixes - **🔒 Open Redirect Protection**: Добавлена строгая валидация redirect_uri против whitelist доменов - **🔒 Rate Limiting**: Защита OAuth endpoints от брутфорса (10 попыток за 5 минут на IP) - **🔒 Logout Endpoint**: Критически важный endpoint для безопасного отзыва httpOnly cookies - **🔒 Provider Validation**: Усиленная валидация OAuth провайдеров с логированием атак - **🚨 GlitchTip Alerts**: Автоматические алерты безопасности в GlitchTip при критических событиях ### 🛡️ Security Modules - **auth/oauth_security.py**: Модуль безопасности OAuth с валидацией и rate limiting + GlitchTip алерты - **auth/logout.py**: Безопасный logout с поддержкой JSON API и browser redirect - **tests/test_oauth_security.py**: Комплексные тесты безопасности (11 тестов) - **tests/test_oauth_glitchtip_alerts.py**: Тесты интеграции с GlitchTip (8 тестов) ### 🔧 OAuth Improvements - **Minimal Flow**: Упрощен до минимума - только httpOnly cookie, нет JWT в URL - **Simple Logic**: Нет error параметра = успех, максимальная простота - **DRY Refactoring**: Устранено дублирование кода в logout и валидации ### 🎯 OAuth Endpoints - **Старт**: `v3.dscrs.site/oauth/{provider}` - с rate limiting и валидацией - **Callback**: `v3.dscrs.site/oauth/{provider}/callback` - безопасный redirect_uri - **Logout**: `v3.dscrs.site/auth/logout` - отзыв httpOnly cookies - **Финализация**: `testing.discours.io/oauth?redirect_url=...` - минимальная схема ### 📊 Security Test Coverage - ✅ Open redirect attack prevention - ✅ Rate limiting protection - ✅ Provider validation - ✅ Safe fallback mechanisms - ✅ Cookie security (httpOnly + Secure + SameSite) - ✅ GlitchTip integration (8 тестов алертов) ### 📝 Documentation - Создан `docs/oauth-minimal-flow.md` - полное описание минимального flow - Обновлена документация OAuth в `docs/auth/oauth.md` - Добавлены security best practices
2025-09-26 21:03:45 +03:00
"""
🔒 OAuth Security Enhancements - Критические исправления безопасности
Исправляет найденные уязвимости в OAuth реализации.
"""
import re
import time
from typing import Dict, List
from urllib.parse import urlparse
from utils.logger import root_logger as logger
def _send_security_alert_to_glitchtip(event_type: str, details: Dict) -> None:
"""
🚨 Отправка алертов безопасности в GlitchTip
Args:
event_type: Тип события безопасности
details: Детали события
"""
try:
import sentry_sdk
# Определяем уровень критичности
critical_events = [
"open_redirect_attempt",
"rate_limit_exceeded",
"invalid_provider",
"suspicious_redirect_uri",
"brute_force_detected",
]
# Создаем контекст для GlitchTip
with sentry_sdk.configure_scope() as scope:
scope.set_tag("security_event", event_type)
scope.set_tag("component", "oauth")
scope.set_context("security_details", details)
# Добавляем дополнительные теги для фильтрации
if "ip" in details:
scope.set_tag("client_ip", details["ip"])
if "provider" in details:
scope.set_tag("oauth_provider", details["provider"])
if "redirect_uri" in details:
scope.set_tag("has_redirect_uri", "true")
# Отправляем в зависимости от критичности
if event_type in critical_events:
# Критичные события как ERROR
sentry_sdk.capture_message(f"🚨 CRITICAL OAuth Security Event: {event_type}", level="error")
logger.error(f"🚨 CRITICAL security alert sent to GlitchTip: {event_type}")
else:
# Обычные события как WARNING
sentry_sdk.capture_message(f"⚠️ OAuth Security Event: {event_type}", level="warning")
logger.info(f"⚠️ Security alert sent to GlitchTip: {event_type}")
except Exception as e:
# Не ломаем основную логику если GlitchTip недоступен
logger.error(f"❌ Failed to send security alert to GlitchTip: {e}")
def send_rate_limit_alert(client_ip: str, attempts: int) -> None:
"""
🚨 Специальный алерт для превышения rate limit
Args:
client_ip: IP адрес нарушителя
attempts: Количество попыток
"""
log_oauth_security_event(
"rate_limit_exceeded",
{
"ip": client_ip,
"attempts": attempts,
"limit": OAUTH_RATE_LIMIT,
"window_seconds": OAUTH_RATE_WINDOW,
"severity": "high",
},
)
def send_open_redirect_alert(malicious_uri: str, client_ip: str = "") -> None:
"""
🚨 Специальный алерт для попытки open redirect атаки
Args:
malicious_uri: Подозрительный URI
client_ip: IP адрес атакующего
"""
log_oauth_security_event(
"open_redirect_attempt",
{"malicious_uri": malicious_uri, "ip": client_ip, "severity": "critical", "attack_type": "open_redirect"},
)
# 🔒 Whitelist разрешенных redirect URI
ALLOWED_REDIRECT_DOMAINS = [
"testing.discours.io",
"new.discours.io",
"discours.io",
"localhost", # Только для разработки
]
# 🔒 Rate limiting для OAuth endpoints
oauth_rate_limits: Dict[str, List[float]] = {}
OAUTH_RATE_LIMIT = 10 # Максимум 10 попыток
OAUTH_RATE_WINDOW = 300 # За 5 минут
def validate_redirect_uri(redirect_uri: str) -> bool:
"""
🔒 Строгая валидация redirect URI против open redirect атак
Args:
redirect_uri: URI для валидации
Returns:
bool: True если URI безопасен
"""
if not redirect_uri:
return False
try:
parsed = urlparse(redirect_uri)
# 1. Проверяем схему (только HTTPS в продакшене)
if parsed.scheme not in ["https", "http"]: # http только для localhost
logger.warning(f"🚨 Invalid scheme in redirect_uri: {parsed.scheme}")
return False
# 2. Проверяем домен против whitelist
hostname = parsed.hostname
if not hostname:
logger.warning(f"🚨 No hostname in redirect_uri: {redirect_uri}")
return False
# 3. Проверяем против разрешенных доменов
is_allowed = False
for allowed_domain in ALLOWED_REDIRECT_DOMAINS:
if hostname == allowed_domain or hostname.endswith(f".{allowed_domain}"):
is_allowed = True
break
if not is_allowed:
logger.warning(f"🚨 Unauthorized domain in redirect_uri: {hostname}")
# 🚨 Отправляем алерт о попытке open redirect атаки
send_open_redirect_alert(redirect_uri)
return False
# 4. Дополнительные проверки безопасности
if len(redirect_uri) > 2048: # Слишком длинный URL
logger.warning(f"🚨 Redirect URI too long: {len(redirect_uri)}")
return False
# 5. Проверяем на подозрительные паттерны
suspicious_patterns = [
r"javascript:",
r"data:",
r"vbscript:",
r"file:",
r"ftp:",
]
for pattern in suspicious_patterns:
if re.search(pattern, redirect_uri, re.IGNORECASE):
logger.warning(f"🚨 Suspicious pattern in redirect_uri: {pattern}")
return False
return True
except Exception as e:
logger.error(f"🚨 Error validating redirect_uri: {e}")
return False
def check_oauth_rate_limit(client_ip: str) -> bool:
"""
🔒 Rate limiting для OAuth endpoints
Args:
client_ip: IP адрес клиента
Returns:
bool: True если запрос разрешен
"""
current_time = time.time()
# Получаем историю запросов для IP
if client_ip not in oauth_rate_limits:
oauth_rate_limits[client_ip] = []
requests = oauth_rate_limits[client_ip]
# Удаляем старые запросы
requests[:] = [req_time for req_time in requests if current_time - req_time < OAUTH_RATE_WINDOW]
# Проверяем лимит
if len(requests) >= OAUTH_RATE_LIMIT:
logger.warning(f"🚨 OAuth rate limit exceeded for IP: {client_ip}")
# 🚨 Отправляем алерт о превышении rate limit
send_rate_limit_alert(client_ip, len(requests))
return False
# Добавляем текущий запрос
requests.append(current_time)
return True
def get_safe_redirect_uri(request, fallback: str = "https://testing.discours.io") -> str:
"""
🔒 Безопасное получение redirect_uri с валидацией
Args:
request: HTTP запрос
fallback: Безопасный fallback URI
Returns:
str: Валидный redirect URI
"""
# Приоритет источников (БЕЗ Referer header!)
candidates = [
request.query_params.get("redirect_uri"),
request.path_params.get("redirect_uri"),
fallback, # Безопасный fallback
]
for candidate in candidates:
if candidate and validate_redirect_uri(candidate):
logger.info(f"✅ Valid redirect_uri: {candidate}")
return candidate
# Если ничего не подошло - используем безопасный fallback
logger.warning(f"🚨 No valid redirect_uri found, using fallback: {fallback}")
return fallback
def log_oauth_security_event(event_type: str, details: Dict) -> None:
"""
🔒 Логирование событий безопасности OAuth
Args:
event_type: Тип события
details: Детали события
"""
logger.warning(f"🚨 OAuth Security Event: {event_type}")
logger.warning(f" Details: {details}")
# 🚨 Отправляем критические события в GlitchTip
_send_security_alert_to_glitchtip(event_type, details)
def validate_oauth_provider(provider: str, log_security_events: bool = True) -> bool:
"""
🔒 Валидация OAuth провайдера
Args:
provider: Название провайдера
log_security_events: Логировать события безопасности
Returns:
bool: True если провайдер валидный
"""
# Импортируем здесь чтобы избежать циклических импортов
from auth.oauth import PROVIDER_CONFIGS
if not provider:
return False
if provider not in PROVIDER_CONFIGS:
if log_security_events:
log_oauth_security_event(
"invalid_provider", {"provider": provider, "available": list(PROVIDER_CONFIGS.keys())}
)
return False
return True
def sanitize_oauth_logs(data: Dict) -> Dict:
"""
🔒 Очистка логов от чувствительной информации
Args:
data: Данные для логирования
Returns:
Dict: Очищенные данные
"""
sensitive_keys = ["state", "code", "access_token", "refresh_token", "client_secret"]
sanitized = {}
for key, value in data.items():
if key.lower() in sensitive_keys:
sanitized[key] = f"***{str(value)[-4:]}" if value else None
else:
sanitized[key] = value
return sanitized