Files
core/auth/oauth_security.py
Untone 05c188df62
Some checks failed
Deploy on push / deploy (push) Failing after 39s
[0.9.29] - 2025-09-26
### 🚨 CRITICAL Security Fixes
- **🔒 Open Redirect Protection**: Добавлена строгая валидация redirect_uri против whitelist доменов
- **🔒 Rate Limiting**: Защита OAuth endpoints от брутфорса (10 попыток за 5 минут на IP)
- **🔒 Logout Endpoint**: Критически важный endpoint для безопасного отзыва httpOnly cookies
- **🔒 Provider Validation**: Усиленная валидация OAuth провайдеров с логированием атак
- **🚨 GlitchTip Alerts**: Автоматические алерты безопасности в GlitchTip при критических событиях

### 🛡️ Security Modules
- **auth/oauth_security.py**: Модуль безопасности OAuth с валидацией и rate limiting + GlitchTip алерты
- **auth/logout.py**: Безопасный logout с поддержкой JSON API и browser redirect
- **tests/test_oauth_security.py**: Комплексные тесты безопасности (11 тестов)
- **tests/test_oauth_glitchtip_alerts.py**: Тесты интеграции с GlitchTip (8 тестов)

### 🔧 OAuth Improvements
- **Minimal Flow**: Упрощен до минимума - только httpOnly cookie, нет JWT в URL
- **Simple Logic**: Нет error параметра = успех, максимальная простота
- **DRY Refactoring**: Устранено дублирование кода в logout и валидации

### 🎯 OAuth Endpoints
- **Старт**: `v3.dscrs.site/oauth/{provider}` - с rate limiting и валидацией
- **Callback**: `v3.dscrs.site/oauth/{provider}/callback` - безопасный redirect_uri
- **Logout**: `v3.dscrs.site/auth/logout` - отзыв httpOnly cookies
- **Финализация**: `testing.discours.io/oauth?redirect_url=...` - минимальная схема

### 📊 Security Test Coverage
-  Open redirect attack prevention
-  Rate limiting protection
-  Provider validation
-  Safe fallback mechanisms
-  Cookie security (httpOnly + Secure + SameSite)
-  GlitchTip integration (8 тестов алертов)

### 📝 Documentation
- Создан `docs/oauth-minimal-flow.md` - полное описание минимального flow
- Обновлена документация OAuth в `docs/auth/oauth.md`
- Добавлены security best practices
2025-09-26 21:03:45 +03:00

301 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
🔒 OAuth Security Enhancements - Критические исправления безопасности
Исправляет найденные уязвимости в OAuth реализации.
"""
import re
import time
from typing import Dict, List
from urllib.parse import urlparse
from utils.logger import root_logger as logger
def _send_security_alert_to_glitchtip(event_type: str, details: Dict) -> None:
"""
🚨 Отправка алертов безопасности в GlitchTip
Args:
event_type: Тип события безопасности
details: Детали события
"""
try:
import sentry_sdk
# Определяем уровень критичности
critical_events = [
"open_redirect_attempt",
"rate_limit_exceeded",
"invalid_provider",
"suspicious_redirect_uri",
"brute_force_detected",
]
# Создаем контекст для GlitchTip
with sentry_sdk.configure_scope() as scope:
scope.set_tag("security_event", event_type)
scope.set_tag("component", "oauth")
scope.set_context("security_details", details)
# Добавляем дополнительные теги для фильтрации
if "ip" in details:
scope.set_tag("client_ip", details["ip"])
if "provider" in details:
scope.set_tag("oauth_provider", details["provider"])
if "redirect_uri" in details:
scope.set_tag("has_redirect_uri", "true")
# Отправляем в зависимости от критичности
if event_type in critical_events:
# Критичные события как ERROR
sentry_sdk.capture_message(f"🚨 CRITICAL OAuth Security Event: {event_type}", level="error")
logger.error(f"🚨 CRITICAL security alert sent to GlitchTip: {event_type}")
else:
# Обычные события как WARNING
sentry_sdk.capture_message(f"⚠️ OAuth Security Event: {event_type}", level="warning")
logger.info(f"⚠️ Security alert sent to GlitchTip: {event_type}")
except Exception as e:
# Не ломаем основную логику если GlitchTip недоступен
logger.error(f"❌ Failed to send security alert to GlitchTip: {e}")
def send_rate_limit_alert(client_ip: str, attempts: int) -> None:
"""
🚨 Специальный алерт для превышения rate limit
Args:
client_ip: IP адрес нарушителя
attempts: Количество попыток
"""
log_oauth_security_event(
"rate_limit_exceeded",
{
"ip": client_ip,
"attempts": attempts,
"limit": OAUTH_RATE_LIMIT,
"window_seconds": OAUTH_RATE_WINDOW,
"severity": "high",
},
)
def send_open_redirect_alert(malicious_uri: str, client_ip: str = "") -> None:
"""
🚨 Специальный алерт для попытки open redirect атаки
Args:
malicious_uri: Подозрительный URI
client_ip: IP адрес атакующего
"""
log_oauth_security_event(
"open_redirect_attempt",
{"malicious_uri": malicious_uri, "ip": client_ip, "severity": "critical", "attack_type": "open_redirect"},
)
# 🔒 Whitelist разрешенных redirect URI
ALLOWED_REDIRECT_DOMAINS = [
"testing.discours.io",
"new.discours.io",
"discours.io",
"localhost", # Только для разработки
]
# 🔒 Rate limiting для OAuth endpoints
oauth_rate_limits: Dict[str, List[float]] = {}
OAUTH_RATE_LIMIT = 10 # Максимум 10 попыток
OAUTH_RATE_WINDOW = 300 # За 5 минут
def validate_redirect_uri(redirect_uri: str) -> bool:
"""
🔒 Строгая валидация redirect URI против open redirect атак
Args:
redirect_uri: URI для валидации
Returns:
bool: True если URI безопасен
"""
if not redirect_uri:
return False
try:
parsed = urlparse(redirect_uri)
# 1. Проверяем схему (только HTTPS в продакшене)
if parsed.scheme not in ["https", "http"]: # http только для localhost
logger.warning(f"🚨 Invalid scheme in redirect_uri: {parsed.scheme}")
return False
# 2. Проверяем домен против whitelist
hostname = parsed.hostname
if not hostname:
logger.warning(f"🚨 No hostname in redirect_uri: {redirect_uri}")
return False
# 3. Проверяем против разрешенных доменов
is_allowed = False
for allowed_domain in ALLOWED_REDIRECT_DOMAINS:
if hostname == allowed_domain or hostname.endswith(f".{allowed_domain}"):
is_allowed = True
break
if not is_allowed:
logger.warning(f"🚨 Unauthorized domain in redirect_uri: {hostname}")
# 🚨 Отправляем алерт о попытке open redirect атаки
send_open_redirect_alert(redirect_uri)
return False
# 4. Дополнительные проверки безопасности
if len(redirect_uri) > 2048: # Слишком длинный URL
logger.warning(f"🚨 Redirect URI too long: {len(redirect_uri)}")
return False
# 5. Проверяем на подозрительные паттерны
suspicious_patterns = [
r"javascript:",
r"data:",
r"vbscript:",
r"file:",
r"ftp:",
]
for pattern in suspicious_patterns:
if re.search(pattern, redirect_uri, re.IGNORECASE):
logger.warning(f"🚨 Suspicious pattern in redirect_uri: {pattern}")
return False
return True
except Exception as e:
logger.error(f"🚨 Error validating redirect_uri: {e}")
return False
def check_oauth_rate_limit(client_ip: str) -> bool:
"""
🔒 Rate limiting для OAuth endpoints
Args:
client_ip: IP адрес клиента
Returns:
bool: True если запрос разрешен
"""
current_time = time.time()
# Получаем историю запросов для IP
if client_ip not in oauth_rate_limits:
oauth_rate_limits[client_ip] = []
requests = oauth_rate_limits[client_ip]
# Удаляем старые запросы
requests[:] = [req_time for req_time in requests if current_time - req_time < OAUTH_RATE_WINDOW]
# Проверяем лимит
if len(requests) >= OAUTH_RATE_LIMIT:
logger.warning(f"🚨 OAuth rate limit exceeded for IP: {client_ip}")
# 🚨 Отправляем алерт о превышении rate limit
send_rate_limit_alert(client_ip, len(requests))
return False
# Добавляем текущий запрос
requests.append(current_time)
return True
def get_safe_redirect_uri(request, fallback: str = "https://testing.discours.io") -> str:
"""
🔒 Безопасное получение redirect_uri с валидацией
Args:
request: HTTP запрос
fallback: Безопасный fallback URI
Returns:
str: Валидный redirect URI
"""
# Приоритет источников (БЕЗ Referer header!)
candidates = [
request.query_params.get("redirect_uri"),
request.path_params.get("redirect_uri"),
fallback, # Безопасный fallback
]
for candidate in candidates:
if candidate and validate_redirect_uri(candidate):
logger.info(f"✅ Valid redirect_uri: {candidate}")
return candidate
# Если ничего не подошло - используем безопасный fallback
logger.warning(f"🚨 No valid redirect_uri found, using fallback: {fallback}")
return fallback
def log_oauth_security_event(event_type: str, details: Dict) -> None:
"""
🔒 Логирование событий безопасности OAuth
Args:
event_type: Тип события
details: Детали события
"""
logger.warning(f"🚨 OAuth Security Event: {event_type}")
logger.warning(f" Details: {details}")
# 🚨 Отправляем критические события в GlitchTip
_send_security_alert_to_glitchtip(event_type, details)
def validate_oauth_provider(provider: str, log_security_events: bool = True) -> bool:
"""
🔒 Валидация OAuth провайдера
Args:
provider: Название провайдера
log_security_events: Логировать события безопасности
Returns:
bool: True если провайдер валидный
"""
# Импортируем здесь чтобы избежать циклических импортов
from auth.oauth import PROVIDER_CONFIGS
if not provider:
return False
if provider not in PROVIDER_CONFIGS:
if log_security_events:
log_oauth_security_event(
"invalid_provider", {"provider": provider, "available": list(PROVIDER_CONFIGS.keys())}
)
return False
return True
def sanitize_oauth_logs(data: Dict) -> Dict:
"""
🔒 Очистка логов от чувствительной информации
Args:
data: Данные для логирования
Returns:
Dict: Очищенные данные
"""
sensitive_keys = ["state", "code", "access_token", "refresh_token", "client_secret"]
sanitized = {}
for key, value in data.items():
if key.lower() in sensitive_keys:
sanitized[key] = f"***{str(value)[-4:]}" if value else None
else:
sanitized[key] = value
return sanitized