""" 🔒 OAuth Security Enhancements - Критические исправления безопасности Исправляет найденные уязвимости в OAuth реализации. """ import re import time from typing import Dict, List from urllib.parse import urlparse from utils.logger import root_logger as logger def _send_security_alert_to_glitchtip(event_type: str, details: Dict) -> None: """ 🚨 Отправка алертов безопасности в GlitchTip Args: event_type: Тип события безопасности details: Детали события """ try: import sentry_sdk # Определяем уровень критичности critical_events = [ "open_redirect_attempt", "rate_limit_exceeded", "invalid_provider", "suspicious_redirect_uri", "brute_force_detected", ] # Создаем контекст для GlitchTip with sentry_sdk.configure_scope() as scope: scope.set_tag("security_event", event_type) scope.set_tag("component", "oauth") scope.set_context("security_details", details) # Добавляем дополнительные теги для фильтрации if "ip" in details: scope.set_tag("client_ip", details["ip"]) if "provider" in details: scope.set_tag("oauth_provider", details["provider"]) if "redirect_uri" in details: scope.set_tag("has_redirect_uri", "true") # Отправляем в зависимости от критичности if event_type in critical_events: # Критичные события как ERROR sentry_sdk.capture_message(f"🚨 CRITICAL OAuth Security Event: {event_type}", level="error") logger.error(f"🚨 CRITICAL security alert sent to GlitchTip: {event_type}") else: # Обычные события как WARNING sentry_sdk.capture_message(f"⚠️ OAuth Security Event: {event_type}", level="warning") logger.info(f"⚠️ Security alert sent to GlitchTip: {event_type}") except Exception as e: # Не ломаем основную логику если GlitchTip недоступен logger.error(f"❌ Failed to send security alert to GlitchTip: {e}") def send_rate_limit_alert(client_ip: str, attempts: int) -> None: """ 🚨 Специальный алерт для превышения rate limit Args: client_ip: IP адрес нарушителя attempts: Количество попыток """ log_oauth_security_event( "rate_limit_exceeded", { "ip": client_ip, "attempts": attempts, "limit": OAUTH_RATE_LIMIT, "window_seconds": OAUTH_RATE_WINDOW, "severity": "high", }, ) def send_open_redirect_alert(malicious_uri: str, client_ip: str = "") -> None: """ 🚨 Специальный алерт для попытки open redirect атаки Args: malicious_uri: Подозрительный URI client_ip: IP адрес атакующего """ log_oauth_security_event( "open_redirect_attempt", {"malicious_uri": malicious_uri, "ip": client_ip, "severity": "critical", "attack_type": "open_redirect"}, ) # 🔒 Whitelist разрешенных redirect URI ALLOWED_REDIRECT_DOMAINS = [ "testing.discours.io", "new.discours.io", "discours.io", "localhost", # Только для разработки ] # 🔒 Rate limiting для OAuth endpoints oauth_rate_limits: Dict[str, List[float]] = {} OAUTH_RATE_LIMIT = 10 # Максимум 10 попыток OAUTH_RATE_WINDOW = 300 # За 5 минут def validate_redirect_uri(redirect_uri: str) -> bool: """ 🔒 Строгая валидация redirect URI против open redirect атак Args: redirect_uri: URI для валидации Returns: bool: True если URI безопасен """ if not redirect_uri: return False try: parsed = urlparse(redirect_uri) # 1. Проверяем схему (только HTTPS в продакшене) if parsed.scheme not in ["https", "http"]: # http только для localhost logger.warning(f"🚨 Invalid scheme in redirect_uri: {parsed.scheme}") return False # 2. Проверяем домен против whitelist hostname = parsed.hostname if not hostname: logger.warning(f"🚨 No hostname in redirect_uri: {redirect_uri}") return False # 3. Проверяем против разрешенных доменов is_allowed = False for allowed_domain in ALLOWED_REDIRECT_DOMAINS: if hostname == allowed_domain or hostname.endswith(f".{allowed_domain}"): is_allowed = True break if not is_allowed: logger.warning(f"🚨 Unauthorized domain in redirect_uri: {hostname}") # 🚨 Отправляем алерт о попытке open redirect атаки send_open_redirect_alert(redirect_uri) return False # 4. Дополнительные проверки безопасности if len(redirect_uri) > 2048: # Слишком длинный URL logger.warning(f"🚨 Redirect URI too long: {len(redirect_uri)}") return False # 5. Проверяем на подозрительные паттерны suspicious_patterns = [ r"javascript:", r"data:", r"vbscript:", r"file:", r"ftp:", ] for pattern in suspicious_patterns: if re.search(pattern, redirect_uri, re.IGNORECASE): logger.warning(f"🚨 Suspicious pattern in redirect_uri: {pattern}") return False return True except Exception as e: logger.error(f"🚨 Error validating redirect_uri: {e}") return False def check_oauth_rate_limit(client_ip: str) -> bool: """ 🔒 Rate limiting для OAuth endpoints Args: client_ip: IP адрес клиента Returns: bool: True если запрос разрешен """ current_time = time.time() # Получаем историю запросов для IP if client_ip not in oauth_rate_limits: oauth_rate_limits[client_ip] = [] requests = oauth_rate_limits[client_ip] # Удаляем старые запросы requests[:] = [req_time for req_time in requests if current_time - req_time < OAUTH_RATE_WINDOW] # Проверяем лимит if len(requests) >= OAUTH_RATE_LIMIT: logger.warning(f"🚨 OAuth rate limit exceeded for IP: {client_ip}") # 🚨 Отправляем алерт о превышении rate limit send_rate_limit_alert(client_ip, len(requests)) return False # Добавляем текущий запрос requests.append(current_time) return True def get_safe_redirect_uri(request, fallback: str = "https://testing.discours.io") -> str: """ 🔒 Безопасное получение redirect_uri с валидацией Args: request: HTTP запрос fallback: Безопасный fallback URI Returns: str: Валидный redirect URI """ # Приоритет источников (БЕЗ Referer header!) candidates = [ request.query_params.get("redirect_uri"), request.path_params.get("redirect_uri"), fallback, # Безопасный fallback ] for candidate in candidates: if candidate and validate_redirect_uri(candidate): logger.info(f"✅ Valid redirect_uri: {candidate}") return candidate # Если ничего не подошло - используем безопасный fallback logger.warning(f"🚨 No valid redirect_uri found, using fallback: {fallback}") return fallback def log_oauth_security_event(event_type: str, details: Dict) -> None: """ 🔒 Логирование событий безопасности OAuth Args: event_type: Тип события details: Детали события """ logger.warning(f"🚨 OAuth Security Event: {event_type}") logger.warning(f" Details: {details}") # 🚨 Отправляем критические события в GlitchTip _send_security_alert_to_glitchtip(event_type, details) def validate_oauth_provider(provider: str, log_security_events: bool = True) -> bool: """ 🔒 Валидация OAuth провайдера Args: provider: Название провайдера log_security_events: Логировать события безопасности Returns: bool: True если провайдер валидный """ # Импортируем здесь чтобы избежать циклических импортов from auth.oauth import PROVIDER_CONFIGS if not provider: return False if provider not in PROVIDER_CONFIGS: if log_security_events: log_oauth_security_event( "invalid_provider", {"provider": provider, "available": list(PROVIDER_CONFIGS.keys())} ) return False return True def sanitize_oauth_logs(data: Dict) -> Dict: """ 🔒 Очистка логов от чувствительной информации Args: data: Данные для логирования Returns: Dict: Очищенные данные """ sensitive_keys = ["state", "code", "access_token", "refresh_token", "client_secret"] sanitized = {} for key, value in data.items(): if key.lower() in sensitive_keys: sanitized[key] = f"***{str(value)[-4:]}" if value else None else: sanitized[key] = value return sanitized