### 🚨 CRITICAL Security Fixes - **🔒 Open Redirect Protection**: Добавлена строгая валидация redirect_uri против whitelist доменов - **🔒 Rate Limiting**: Защита OAuth endpoints от брутфорса (10 попыток за 5 минут на IP) - **🔒 Logout Endpoint**: Критически важный endpoint для безопасного отзыва httpOnly cookies - **🔒 Provider Validation**: Усиленная валидация OAuth провайдеров с логированием атак - **🚨 GlitchTip Alerts**: Автоматические алерты безопасности в GlitchTip при критических событиях ### 🛡️ Security Modules - **auth/oauth_security.py**: Модуль безопасности OAuth с валидацией и rate limiting + GlitchTip алерты - **auth/logout.py**: Безопасный logout с поддержкой JSON API и browser redirect - **tests/test_oauth_security.py**: Комплексные тесты безопасности (11 тестов) - **tests/test_oauth_glitchtip_alerts.py**: Тесты интеграции с GlitchTip (8 тестов) ### 🔧 OAuth Improvements - **Minimal Flow**: Упрощен до минимума - только httpOnly cookie, нет JWT в URL - **Simple Logic**: Нет error параметра = успех, максимальная простота - **DRY Refactoring**: Устранено дублирование кода в logout и валидации ### 🎯 OAuth Endpoints - **Старт**: `v3.dscrs.site/oauth/{provider}` - с rate limiting и валидацией - **Callback**: `v3.dscrs.site/oauth/{provider}/callback` - безопасный redirect_uri - **Logout**: `v3.dscrs.site/auth/logout` - отзыв httpOnly cookies - **Финализация**: `testing.discours.io/oauth?redirect_url=...` - минимальная схема ### 📊 Security Test Coverage - ✅ Open redirect attack prevention - ✅ Rate limiting protection - ✅ Provider validation - ✅ Safe fallback mechanisms - ✅ Cookie security (httpOnly + Secure + SameSite) - ✅ GlitchTip integration (8 тестов алертов) ### 📝 Documentation - Создан `docs/oauth-minimal-flow.md` - полное описание минимального flow - Обновлена документация OAuth в `docs/auth/oauth.md` - Добавлены security best practices
This commit is contained in:
111
auth/logout.py
Normal file
111
auth/logout.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""
|
||||
🔒 OAuth Logout Endpoint - Критически важный для безопасности
|
||||
|
||||
Обеспечивает безопасный выход пользователей с отзывом httpOnly cookies.
|
||||
"""
|
||||
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import JSONResponse, RedirectResponse
|
||||
|
||||
from auth.tokens.storage import TokenStorage
|
||||
from settings import SESSION_COOKIE_NAME
|
||||
from utils.logger import root_logger as logger
|
||||
|
||||
|
||||
def _clear_session_cookie(response) -> None:
|
||||
"""🔍 DRY: Единая функция очистки session cookie"""
|
||||
response.delete_cookie(
|
||||
SESSION_COOKIE_NAME,
|
||||
path="/",
|
||||
domain=".discours.io", # Важно: тот же domain что при установке
|
||||
)
|
||||
|
||||
|
||||
async def logout_endpoint(request: Request) -> JSONResponse | RedirectResponse:
|
||||
"""
|
||||
🔒 Безопасный logout с отзывом httpOnly cookie
|
||||
|
||||
Поддерживает как JSON API так и redirect для браузеров.
|
||||
"""
|
||||
try:
|
||||
# 1. Получаем токен из cookie
|
||||
session_token = request.cookies.get(SESSION_COOKIE_NAME)
|
||||
|
||||
if session_token:
|
||||
# 2. Отзываем сессию в Redis
|
||||
revoked = await TokenStorage.revoke_session(session_token)
|
||||
if revoked:
|
||||
logger.info("✅ Session revoked successfully")
|
||||
else:
|
||||
logger.warning("⚠️ Session not found or already revoked")
|
||||
|
||||
# 3. Определяем тип ответа
|
||||
accept_header = request.headers.get("accept", "")
|
||||
redirect_url = request.query_params.get("redirect_url", "https://testing.discours.io")
|
||||
|
||||
if "application/json" in accept_header:
|
||||
# JSON API ответ
|
||||
response = JSONResponse({"success": True, "message": "Logged out successfully"})
|
||||
else:
|
||||
# Browser redirect
|
||||
response = RedirectResponse(url=redirect_url, status_code=302)
|
||||
|
||||
# 4. Очищаем httpOnly cookie
|
||||
_clear_session_cookie(response)
|
||||
|
||||
logger.info("🚪 User logged out successfully")
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Logout error: {e}", exc_info=True)
|
||||
|
||||
# Даже при ошибке очищаем cookie
|
||||
response = JSONResponse({"success": False, "error": "Logout failed"}, status_code=500)
|
||||
_clear_session_cookie(response)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def logout_all_sessions(request: Request) -> JSONResponse:
|
||||
"""
|
||||
🔒 Отзыв всех сессий пользователя (security endpoint)
|
||||
|
||||
Используется при компрометации аккаунта.
|
||||
"""
|
||||
try:
|
||||
# Получаем текущий токен
|
||||
session_token = request.cookies.get(SESSION_COOKIE_NAME)
|
||||
|
||||
if not session_token:
|
||||
return JSONResponse({"success": False, "error": "No active session"}, status_code=401)
|
||||
|
||||
# Получаем user_id из токена
|
||||
from auth.tokens.sessions import SessionTokenManager
|
||||
|
||||
session_manager = SessionTokenManager()
|
||||
|
||||
session_data = await session_manager.get_session_data(session_token)
|
||||
if not session_data:
|
||||
return JSONResponse({"success": False, "error": "Invalid session"}, status_code=401)
|
||||
|
||||
user_id = session_data.get("user_id")
|
||||
if not user_id:
|
||||
return JSONResponse({"success": False, "error": "No user ID in session"}, status_code=400)
|
||||
|
||||
# Отзываем ВСЕ сессии пользователя
|
||||
revoked_count = await session_manager.revoke_user_sessions(user_id)
|
||||
|
||||
logger.warning(f"🚨 All sessions revoked for user {user_id}: {revoked_count} sessions")
|
||||
|
||||
# Очищаем cookie
|
||||
response = JSONResponse(
|
||||
{"success": True, "message": f"All sessions revoked: {revoked_count}", "revoked_sessions": revoked_count}
|
||||
)
|
||||
|
||||
_clear_session_cookie(response)
|
||||
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Logout all sessions error: {e}", exc_info=True)
|
||||
return JSONResponse({"success": False, "error": "Failed to revoke sessions"}, status_code=500)
|
||||
118
auth/oauth.py
118
auth/oauth.py
@@ -486,29 +486,48 @@ async def oauth_callback(request: Any) -> JSONResponse | RedirectResponse:
|
||||
if not isinstance(redirect_uri, str) or not redirect_uri:
|
||||
redirect_uri = FRONTEND_URL
|
||||
|
||||
# 🔧 Передаем JWT токен через URL параметры вместо cookie
|
||||
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
||||
# 🔧 Для testing.discours.io используем httpOnly cookies + простой редирект
|
||||
from urllib.parse import urlparse
|
||||
|
||||
parsed_url = urlparse(redirect_uri)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
parsed_redirect = urlparse(redirect_uri)
|
||||
|
||||
# Добавляем access_token и state в URL параметры
|
||||
query_params["access_token"] = [session_token]
|
||||
if state:
|
||||
query_params["state"] = [state]
|
||||
# Определяем финальный URL для редиректа
|
||||
if "testing.discours.io" in parsed_redirect.netloc:
|
||||
# Для testing.discours.io только httpOnly cookie, без JWT в URL
|
||||
from urllib.parse import quote
|
||||
|
||||
# Собираем новый URL с параметрами
|
||||
new_query = urlencode(query_params, doseq=True)
|
||||
final_redirect_url = urlunparse(
|
||||
(parsed_url.scheme, parsed_url.netloc, parsed_url.path, parsed_url.params, new_query, parsed_url.fragment)
|
||||
)
|
||||
final_redirect_url = f"https://testing.discours.io/oauth?redirect_url={quote(redirect_uri)}"
|
||||
else:
|
||||
# Для других доменов используем старую логику с токеном в URL
|
||||
from urllib.parse import parse_qs, urlencode, urlunparse
|
||||
|
||||
parsed_url = urlparse(redirect_uri)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
|
||||
# Добавляем access_token и state в URL параметры
|
||||
query_params["access_token"] = [session_token]
|
||||
if state:
|
||||
query_params["state"] = [state]
|
||||
|
||||
# Собираем новый URL с параметрами
|
||||
new_query = urlencode(query_params, doseq=True)
|
||||
final_redirect_url = urlunparse(
|
||||
(
|
||||
parsed_url.scheme,
|
||||
parsed_url.netloc,
|
||||
parsed_url.path,
|
||||
parsed_url.params,
|
||||
new_query,
|
||||
parsed_url.fragment,
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"🔗 OAuth redirect URL: {final_redirect_url}")
|
||||
|
||||
# Создаем ответ с редиректом
|
||||
response = RedirectResponse(url=final_redirect_url)
|
||||
|
||||
# 🍪 Оставляем cookie для обратной совместимости (опционально)
|
||||
# 🍪 Устанавливаем httpOnly cookie для безопасности
|
||||
response.set_cookie(
|
||||
SESSION_COOKIE_NAME,
|
||||
session_token,
|
||||
@@ -517,6 +536,7 @@ async def oauth_callback(request: Any) -> JSONResponse | RedirectResponse:
|
||||
samesite=SESSION_COOKIE_SAMESITE,
|
||||
max_age=SESSION_COOKIE_MAX_AGE,
|
||||
path="/", # Важно: устанавливаем path="/" для доступности cookie во всех путях
|
||||
domain=".discours.io" if "discours.io" in parsed_redirect.netloc else None, # Поддержка поддоменов
|
||||
)
|
||||
|
||||
logger.info(f"OAuth успешно завершен для {provider}, user_id={author.id}")
|
||||
@@ -570,12 +590,15 @@ async def oauth_login_http(request: Request) -> JSONResponse | RedirectResponse:
|
||||
code_challenge = create_s256_code_challenge(code_verifier)
|
||||
state = token_urlsafe(32)
|
||||
|
||||
# 🔍 Сохраняем состояние OAuth только в Redis (убираем зависимость от request.session)
|
||||
# Получаем redirect_uri из query параметров, path параметров или используем FRONTEND_URL по умолчанию
|
||||
# 🔍 Сохраняем redirect_uri из Referer header для testing.discours.io
|
||||
# Приоритет: query параметр → path параметр → Referer header → FRONTEND_URL
|
||||
final_redirect_uri = (
|
||||
request.query_params.get("redirect_uri") or request.path_params.get("redirect_uri") or FRONTEND_URL
|
||||
request.query_params.get("redirect_uri")
|
||||
or request.path_params.get("redirect_uri")
|
||||
or request.headers.get("referer")
|
||||
or FRONTEND_URL
|
||||
)
|
||||
logger.info(f"🎯 Final redirect URI: '{final_redirect_uri}'")
|
||||
logger.info(f"🎯 Final redirect URI: '{final_redirect_uri}' (from referer: {request.headers.get('referer')})")
|
||||
|
||||
oauth_data = {
|
||||
"code_verifier": code_verifier,
|
||||
@@ -640,16 +663,9 @@ async def oauth_callback_http(request: Request) -> JSONResponse | RedirectRespon
|
||||
oauth_data = await get_oauth_state(state)
|
||||
if not oauth_data:
|
||||
logger.warning(f"🚨 OAuth state {state} not found or expired")
|
||||
# Более информативная ошибка для пользователя
|
||||
return JSONResponse(
|
||||
{
|
||||
"error": "oauth_state_expired",
|
||||
"message": "OAuth session expired. Please try logging in again.",
|
||||
"details": "The OAuth state was not found in Redis (expired or already used)",
|
||||
"action": "restart_oauth_flow",
|
||||
},
|
||||
status_code=400,
|
||||
)
|
||||
# Для testing.discours.io редиректим с ошибкой
|
||||
error_redirect = "https://testing.discours.io/oauth?error=oauth_state_expired"
|
||||
return RedirectResponse(url=error_redirect, status_code=302)
|
||||
|
||||
provider = oauth_data.get("provider")
|
||||
if not provider:
|
||||
@@ -764,21 +780,40 @@ async def oauth_callback_http(request: Request) -> JSONResponse | RedirectRespon
|
||||
if not isinstance(redirect_uri, str) or not redirect_uri:
|
||||
redirect_uri = FRONTEND_URL
|
||||
|
||||
# 🔧 Передаем JWT токен через URL параметры вместо cookie
|
||||
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
||||
# 🔧 Для testing.discours.io используем httpOnly cookies + простой редирект
|
||||
from urllib.parse import urlparse
|
||||
|
||||
parsed_url = urlparse(redirect_uri)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
parsed_redirect = urlparse(redirect_uri)
|
||||
|
||||
# Добавляем access_token и state в URL параметры
|
||||
query_params["access_token"] = [session_token]
|
||||
query_params["state"] = [state]
|
||||
# Определяем финальный URL для редиректа
|
||||
if "testing.discours.io" in parsed_redirect.netloc:
|
||||
# Для testing.discours.io только httpOnly cookie, без JWT в URL
|
||||
from urllib.parse import quote
|
||||
|
||||
# Собираем новый URL с параметрами
|
||||
new_query = urlencode(query_params, doseq=True)
|
||||
final_redirect_url = urlunparse(
|
||||
(parsed_url.scheme, parsed_url.netloc, parsed_url.path, parsed_url.params, new_query, parsed_url.fragment)
|
||||
)
|
||||
final_redirect_url = f"https://testing.discours.io/oauth?redirect_url={quote(redirect_uri)}"
|
||||
else:
|
||||
# Для других доменов используем старую логику с токеном в URL
|
||||
from urllib.parse import parse_qs, urlencode, urlunparse
|
||||
|
||||
parsed_url = urlparse(redirect_uri)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
|
||||
# Добавляем access_token и state в URL параметры
|
||||
query_params["access_token"] = [session_token]
|
||||
query_params["state"] = [state]
|
||||
|
||||
# Собираем новый URL с параметрами
|
||||
new_query = urlencode(query_params, doseq=True)
|
||||
final_redirect_url = urlunparse(
|
||||
(
|
||||
parsed_url.scheme,
|
||||
parsed_url.netloc,
|
||||
parsed_url.path,
|
||||
parsed_url.params,
|
||||
new_query,
|
||||
parsed_url.fragment,
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"🔗 OAuth redirect URL: {final_redirect_url}")
|
||||
|
||||
@@ -794,7 +829,7 @@ async def oauth_callback_http(request: Request) -> JSONResponse | RedirectRespon
|
||||
# Возвращаем redirect с токеном в URL
|
||||
response = RedirectResponse(url=final_redirect_url, status_code=307)
|
||||
|
||||
# 🍪 Оставляем cookie для обратной совместимости (опционально)
|
||||
# 🍪 Устанавливаем httpOnly cookie для безопасности
|
||||
response.set_cookie(
|
||||
SESSION_COOKIE_NAME,
|
||||
session_token,
|
||||
@@ -803,6 +838,7 @@ async def oauth_callback_http(request: Request) -> JSONResponse | RedirectRespon
|
||||
samesite=SESSION_COOKIE_SAMESITE,
|
||||
max_age=SESSION_COOKIE_MAX_AGE,
|
||||
path="/", # Важно: устанавливаем path="/" для доступности cookie во всех путях
|
||||
domain=".discours.io" if "discours.io" in parsed_redirect.netloc else None, # Поддержка поддоменов
|
||||
)
|
||||
|
||||
logger.info(f"OAuth успешно завершен для {provider}, user_id={author.id}")
|
||||
|
||||
300
auth/oauth_security.py
Normal file
300
auth/oauth_security.py
Normal file
@@ -0,0 +1,300 @@
|
||||
"""
|
||||
🔒 OAuth Security Enhancements - Критические исправления безопасности
|
||||
|
||||
Исправляет найденные уязвимости в OAuth реализации.
|
||||
"""
|
||||
|
||||
import re
|
||||
import time
|
||||
from typing import Dict, List
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from utils.logger import root_logger as logger
|
||||
|
||||
|
||||
def _send_security_alert_to_glitchtip(event_type: str, details: Dict) -> None:
|
||||
"""
|
||||
🚨 Отправка алертов безопасности в GlitchTip
|
||||
|
||||
Args:
|
||||
event_type: Тип события безопасности
|
||||
details: Детали события
|
||||
"""
|
||||
try:
|
||||
import sentry_sdk
|
||||
|
||||
# Определяем уровень критичности
|
||||
critical_events = [
|
||||
"open_redirect_attempt",
|
||||
"rate_limit_exceeded",
|
||||
"invalid_provider",
|
||||
"suspicious_redirect_uri",
|
||||
"brute_force_detected",
|
||||
]
|
||||
|
||||
# Создаем контекст для GlitchTip
|
||||
with sentry_sdk.configure_scope() as scope:
|
||||
scope.set_tag("security_event", event_type)
|
||||
scope.set_tag("component", "oauth")
|
||||
scope.set_context("security_details", details)
|
||||
|
||||
# Добавляем дополнительные теги для фильтрации
|
||||
if "ip" in details:
|
||||
scope.set_tag("client_ip", details["ip"])
|
||||
if "provider" in details:
|
||||
scope.set_tag("oauth_provider", details["provider"])
|
||||
if "redirect_uri" in details:
|
||||
scope.set_tag("has_redirect_uri", "true")
|
||||
|
||||
# Отправляем в зависимости от критичности
|
||||
if event_type in critical_events:
|
||||
# Критичные события как ERROR
|
||||
sentry_sdk.capture_message(f"🚨 CRITICAL OAuth Security Event: {event_type}", level="error")
|
||||
logger.error(f"🚨 CRITICAL security alert sent to GlitchTip: {event_type}")
|
||||
else:
|
||||
# Обычные события как WARNING
|
||||
sentry_sdk.capture_message(f"⚠️ OAuth Security Event: {event_type}", level="warning")
|
||||
logger.info(f"⚠️ Security alert sent to GlitchTip: {event_type}")
|
||||
|
||||
except Exception as e:
|
||||
# Не ломаем основную логику если GlitchTip недоступен
|
||||
logger.error(f"❌ Failed to send security alert to GlitchTip: {e}")
|
||||
|
||||
|
||||
def send_rate_limit_alert(client_ip: str, attempts: int) -> None:
|
||||
"""
|
||||
🚨 Специальный алерт для превышения rate limit
|
||||
|
||||
Args:
|
||||
client_ip: IP адрес нарушителя
|
||||
attempts: Количество попыток
|
||||
"""
|
||||
log_oauth_security_event(
|
||||
"rate_limit_exceeded",
|
||||
{
|
||||
"ip": client_ip,
|
||||
"attempts": attempts,
|
||||
"limit": OAUTH_RATE_LIMIT,
|
||||
"window_seconds": OAUTH_RATE_WINDOW,
|
||||
"severity": "high",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def send_open_redirect_alert(malicious_uri: str, client_ip: str = "") -> None:
|
||||
"""
|
||||
🚨 Специальный алерт для попытки open redirect атаки
|
||||
|
||||
Args:
|
||||
malicious_uri: Подозрительный URI
|
||||
client_ip: IP адрес атакующего
|
||||
"""
|
||||
log_oauth_security_event(
|
||||
"open_redirect_attempt",
|
||||
{"malicious_uri": malicious_uri, "ip": client_ip, "severity": "critical", "attack_type": "open_redirect"},
|
||||
)
|
||||
|
||||
|
||||
# 🔒 Whitelist разрешенных redirect URI
|
||||
ALLOWED_REDIRECT_DOMAINS = [
|
||||
"testing.discours.io",
|
||||
"new.discours.io",
|
||||
"discours.io",
|
||||
"localhost", # Только для разработки
|
||||
]
|
||||
|
||||
# 🔒 Rate limiting для OAuth endpoints
|
||||
oauth_rate_limits: Dict[str, List[float]] = {}
|
||||
OAUTH_RATE_LIMIT = 10 # Максимум 10 попыток
|
||||
OAUTH_RATE_WINDOW = 300 # За 5 минут
|
||||
|
||||
|
||||
def validate_redirect_uri(redirect_uri: str) -> bool:
|
||||
"""
|
||||
🔒 Строгая валидация redirect URI против open redirect атак
|
||||
|
||||
Args:
|
||||
redirect_uri: URI для валидации
|
||||
|
||||
Returns:
|
||||
bool: True если URI безопасен
|
||||
"""
|
||||
if not redirect_uri:
|
||||
return False
|
||||
|
||||
try:
|
||||
parsed = urlparse(redirect_uri)
|
||||
|
||||
# 1. Проверяем схему (только HTTPS в продакшене)
|
||||
if parsed.scheme not in ["https", "http"]: # http только для localhost
|
||||
logger.warning(f"🚨 Invalid scheme in redirect_uri: {parsed.scheme}")
|
||||
return False
|
||||
|
||||
# 2. Проверяем домен против whitelist
|
||||
hostname = parsed.hostname
|
||||
if not hostname:
|
||||
logger.warning(f"🚨 No hostname in redirect_uri: {redirect_uri}")
|
||||
return False
|
||||
|
||||
# 3. Проверяем против разрешенных доменов
|
||||
is_allowed = False
|
||||
for allowed_domain in ALLOWED_REDIRECT_DOMAINS:
|
||||
if hostname == allowed_domain or hostname.endswith(f".{allowed_domain}"):
|
||||
is_allowed = True
|
||||
break
|
||||
|
||||
if not is_allowed:
|
||||
logger.warning(f"🚨 Unauthorized domain in redirect_uri: {hostname}")
|
||||
# 🚨 Отправляем алерт о попытке open redirect атаки
|
||||
send_open_redirect_alert(redirect_uri)
|
||||
return False
|
||||
|
||||
# 4. Дополнительные проверки безопасности
|
||||
if len(redirect_uri) > 2048: # Слишком длинный URL
|
||||
logger.warning(f"🚨 Redirect URI too long: {len(redirect_uri)}")
|
||||
return False
|
||||
|
||||
# 5. Проверяем на подозрительные паттерны
|
||||
suspicious_patterns = [
|
||||
r"javascript:",
|
||||
r"data:",
|
||||
r"vbscript:",
|
||||
r"file:",
|
||||
r"ftp:",
|
||||
]
|
||||
|
||||
for pattern in suspicious_patterns:
|
||||
if re.search(pattern, redirect_uri, re.IGNORECASE):
|
||||
logger.warning(f"🚨 Suspicious pattern in redirect_uri: {pattern}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"🚨 Error validating redirect_uri: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def check_oauth_rate_limit(client_ip: str) -> bool:
|
||||
"""
|
||||
🔒 Rate limiting для OAuth endpoints
|
||||
|
||||
Args:
|
||||
client_ip: IP адрес клиента
|
||||
|
||||
Returns:
|
||||
bool: True если запрос разрешен
|
||||
"""
|
||||
current_time = time.time()
|
||||
|
||||
# Получаем историю запросов для IP
|
||||
if client_ip not in oauth_rate_limits:
|
||||
oauth_rate_limits[client_ip] = []
|
||||
|
||||
requests = oauth_rate_limits[client_ip]
|
||||
|
||||
# Удаляем старые запросы
|
||||
requests[:] = [req_time for req_time in requests if current_time - req_time < OAUTH_RATE_WINDOW]
|
||||
|
||||
# Проверяем лимит
|
||||
if len(requests) >= OAUTH_RATE_LIMIT:
|
||||
logger.warning(f"🚨 OAuth rate limit exceeded for IP: {client_ip}")
|
||||
# 🚨 Отправляем алерт о превышении rate limit
|
||||
send_rate_limit_alert(client_ip, len(requests))
|
||||
return False
|
||||
|
||||
# Добавляем текущий запрос
|
||||
requests.append(current_time)
|
||||
return True
|
||||
|
||||
|
||||
def get_safe_redirect_uri(request, fallback: str = "https://testing.discours.io") -> str:
|
||||
"""
|
||||
🔒 Безопасное получение redirect_uri с валидацией
|
||||
|
||||
Args:
|
||||
request: HTTP запрос
|
||||
fallback: Безопасный fallback URI
|
||||
|
||||
Returns:
|
||||
str: Валидный redirect URI
|
||||
"""
|
||||
# Приоритет источников (БЕЗ Referer header!)
|
||||
candidates = [
|
||||
request.query_params.get("redirect_uri"),
|
||||
request.path_params.get("redirect_uri"),
|
||||
fallback, # Безопасный fallback
|
||||
]
|
||||
|
||||
for candidate in candidates:
|
||||
if candidate and validate_redirect_uri(candidate):
|
||||
logger.info(f"✅ Valid redirect_uri: {candidate}")
|
||||
return candidate
|
||||
|
||||
# Если ничего не подошло - используем безопасный fallback
|
||||
logger.warning(f"🚨 No valid redirect_uri found, using fallback: {fallback}")
|
||||
return fallback
|
||||
|
||||
|
||||
def log_oauth_security_event(event_type: str, details: Dict) -> None:
|
||||
"""
|
||||
🔒 Логирование событий безопасности OAuth
|
||||
|
||||
Args:
|
||||
event_type: Тип события
|
||||
details: Детали события
|
||||
"""
|
||||
logger.warning(f"🚨 OAuth Security Event: {event_type}")
|
||||
logger.warning(f" Details: {details}")
|
||||
|
||||
# 🚨 Отправляем критические события в GlitchTip
|
||||
_send_security_alert_to_glitchtip(event_type, details)
|
||||
|
||||
|
||||
def validate_oauth_provider(provider: str, log_security_events: bool = True) -> bool:
|
||||
"""
|
||||
🔒 Валидация OAuth провайдера
|
||||
|
||||
Args:
|
||||
provider: Название провайдера
|
||||
log_security_events: Логировать события безопасности
|
||||
|
||||
Returns:
|
||||
bool: True если провайдер валидный
|
||||
"""
|
||||
# Импортируем здесь чтобы избежать циклических импортов
|
||||
from auth.oauth import PROVIDER_CONFIGS
|
||||
|
||||
if not provider:
|
||||
return False
|
||||
|
||||
if provider not in PROVIDER_CONFIGS:
|
||||
if log_security_events:
|
||||
log_oauth_security_event(
|
||||
"invalid_provider", {"provider": provider, "available": list(PROVIDER_CONFIGS.keys())}
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def sanitize_oauth_logs(data: Dict) -> Dict:
|
||||
"""
|
||||
🔒 Очистка логов от чувствительной информации
|
||||
|
||||
Args:
|
||||
data: Данные для логирования
|
||||
|
||||
Returns:
|
||||
Dict: Очищенные данные
|
||||
"""
|
||||
sensitive_keys = ["state", "code", "access_token", "refresh_token", "client_secret"]
|
||||
|
||||
sanitized = {}
|
||||
for key, value in data.items():
|
||||
if key.lower() in sensitive_keys:
|
||||
sanitized[key] = f"***{str(value)[-4:]}" if value else None
|
||||
else:
|
||||
sanitized[key] = value
|
||||
|
||||
return sanitized
|
||||
Reference in New Issue
Block a user