""" 🔒 OAuth Logout Endpoint - Критически важный для безопасности Обеспечивает безопасный выход пользователей с отзывом httpOnly cookies. """ from starlette.requests import Request from starlette.responses import JSONResponse, RedirectResponse from auth.tokens.storage import TokenStorage from settings import SESSION_COOKIE_NAME from utils.logger import root_logger as logger def _clear_session_cookie(response) -> None: """🔍 DRY: Единая функция очистки session cookie""" response.delete_cookie( SESSION_COOKIE_NAME, path="/", domain=".discours.io", # Важно: тот же domain что при установке ) async def logout_endpoint(request: Request) -> JSONResponse | RedirectResponse: """ 🔒 Безопасный logout с отзывом httpOnly cookie Поддерживает как JSON API так и redirect для браузеров. """ try: # 1. Получаем токен из cookie session_token = request.cookies.get(SESSION_COOKIE_NAME) if session_token: # 2. Отзываем сессию в Redis revoked = await TokenStorage.revoke_session(session_token) if revoked: logger.info("✅ Session revoked successfully") else: logger.warning("⚠️ Session not found or already revoked") # 3. Определяем тип ответа accept_header = request.headers.get("accept", "") redirect_url = request.query_params.get("redirect_url", "https://testing.discours.io") if "application/json" in accept_header: # JSON API ответ response: JSONResponse | RedirectResponse = JSONResponse( {"success": True, "message": "Logged out successfully"} ) else: # Browser redirect response = RedirectResponse(url=redirect_url, status_code=302) # 4. Очищаем httpOnly cookie _clear_session_cookie(response) logger.info("🚪 User logged out successfully") return response except Exception as e: logger.error(f"❌ Logout error: {e}", exc_info=True) # Даже при ошибке очищаем cookie response = JSONResponse({"success": False, "error": "Logout failed"}, status_code=500) _clear_session_cookie(response) return response async def logout_all_sessions(request: Request) -> JSONResponse: """ 🔒 Отзыв всех сессий пользователя (security endpoint) Используется при компрометации аккаунта. """ try: # Получаем текущий токен session_token = request.cookies.get(SESSION_COOKIE_NAME) if not session_token: return JSONResponse({"success": False, "error": "No active session"}, status_code=401) # Получаем user_id из токена from auth.tokens.sessions import SessionTokenManager session_manager = SessionTokenManager() session_data = await session_manager.get_session_data(session_token) if not session_data: return JSONResponse({"success": False, "error": "Invalid session"}, status_code=401) user_id = session_data.get("user_id") if not user_id: return JSONResponse({"success": False, "error": "No user ID in session"}, status_code=400) # Отзываем ВСЕ сессии пользователя revoked_count = await session_manager.revoke_user_sessions(user_id) logger.warning(f"🚨 All sessions revoked for user {user_id}: {revoked_count} sessions") # Очищаем cookie response = JSONResponse( {"success": True, "message": f"All sessions revoked: {revoked_count}", "revoked_sessions": revoked_count} ) _clear_session_cookie(response) return response except Exception as e: logger.error(f"❌ Logout all sessions error: {e}", exc_info=True) return JSONResponse({"success": False, "error": "Failed to revoke sessions"}, status_code=500)