oauth+tests
All checks were successful
Deploy on push / deploy (push) Successful in 6m56s

This commit is contained in:
2025-09-23 20:49:25 +03:00
parent e0f3272bed
commit bf9515dd39
4 changed files with 423 additions and 27 deletions

View File

@@ -6,6 +6,8 @@
- 🔧 **OAuth Provider Registration**: Исправлена логика регистрации OAuth провайдеров - теперь корректно проверяются непустые client_id и client_secret
- 🔍 **OAuth Debugging**: Добавлено отладочное логирование для диагностики проблем с OAuth провайдерами
- 🚫 **OAuth Error**: Исправлена ошибка "Provider not configured" при пустых переменных окружения OAuth
- 🔐 **OAuth Session-Free**: Убрана зависимость от SessionMiddleware - OAuth использует только Redis для состояния
- 🏷️ **Type Safety**: Исправлена MyPy ошибка с request.client.host - добавлена проверка на None
- 🔒 **OAuth Facebook**: Обновлена версия API с v13.0 до v18.0 (актуальная)
- 🔒 **OAuth Facebook**: Добавлены обязательные scope и параметры безопасности
- 🔒 **OAuth Facebook**: Улучшена обработка ошибок API и валидация ответов

View File

@@ -548,13 +548,15 @@ async def oauth_login_http(request: Request) -> JSONResponse | RedirectResponse:
# URL для callback
callback_uri = f"{FRONTEND_URL}oauth/{provider}/callback"
return await client.authorize_redirect(
request,
# 🔍 Создаем redirect URL вручную (обходим использование request.session в authlib)
authorization_url = await client.create_authorization_url(
callback_uri,
code_challenge=code_challenge,
code_challenge_method="S256",
state=state,
)
return RedirectResponse(url=authorization_url["url"], status_code=302)
except Exception as e:
logger.error(f"OAuth login error: {e}")
@@ -582,7 +584,21 @@ async def oauth_callback_http(request: Request) -> JSONResponse | RedirectRespon
if not client:
return JSONResponse({"error": "Provider not configured"}, status_code=400)
token = await client.authorize_access_token(request)
# 🔍 Получаем code_verifier из Redis вместо request.session
code_verifier = oauth_data.get("code_verifier")
if not code_verifier:
return JSONResponse({"error": "Missing code verifier in OAuth state"}, status_code=400)
# Получаем authorization code из query параметров
code = request.query_params.get("code")
if not code:
return JSONResponse({"error": "Missing authorization code"}, status_code=400)
# Обмениваем code на токен вручную
token = await client.fetch_access_token(
authorization_response=str(request.url),
code_verifier=code_verifier,
)
if not token:
return JSONResponse({"error": "Failed to get access token"}, status_code=400)

View File

@@ -1,4 +1,4 @@
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch, ANY
import time
import pytest
@@ -117,10 +117,29 @@ with (
@pytest.mark.asyncio
async def test_oauth_login_success(mock_request, mock_oauth_client):
"""Тест успешного начала OAuth авторизации"""
# pytest.skip("OAuth тест временно отключен из-за проблем с Redis")
# TODO: Implement test logic
assert True # Placeholder assertion
"""Тест успешного начала OAuth авторизации без SessionMiddleware"""
mock_request.path_params = {"provider": "google"}
# Мокаем OAuth клиент
with patch("auth.oauth.oauth.create_client", return_value=mock_oauth_client):
# Мокаем create_authorization_url как async функцию
mock_oauth_client.create_authorization_url = AsyncMock(return_value={
"url": "https://accounts.google.com/oauth/authorize?client_id=test&state=test_state"
})
# Мокаем Redis операции
with patch("auth.oauth.store_oauth_state") as mock_store:
response = await oauth_login_http(mock_request)
# Проверяем что это редирект
assert isinstance(response, RedirectResponse)
assert response.status_code == 302
# Проверяем что состояние сохранено в Redis
mock_store.assert_called_once()
# Проверяем что create_authorization_url вызван с правильными параметрами
mock_oauth_client.create_authorization_url.assert_called_once()
@pytest.mark.asyncio
async def test_oauth_login_invalid_provider(mock_request):
@@ -138,28 +157,71 @@ with (
@pytest.mark.asyncio
async def test_oauth_callback_success(mock_request, mock_oauth_client, oauth_db_session):
"""Тест успешного OAuth callback с правильной БД"""
# Простой тест без сложных моков - проверяем только импорт и базовую функциональность
from auth.oauth import oauth_callback_http
"""Тест успешного OAuth callback без SessionMiddleware"""
# Настраиваем mock request
mock_request.query_params = {
"state": "test_state",
"code": "test_code"
}
mock_request.url = "https://localhost:3000/oauth/google/callback?state=test_state&code=test_code"
mock_request.headers = {"user-agent": "test-agent"}
mock_request.client = MagicMock()
mock_request.client.host = "127.0.0.1"
# Проверяем, что функция импортируется
assert oauth_callback_http is not None
assert callable(oauth_callback_http)
# Мокаем OAuth данные из Redis
oauth_data = {
"provider": "google",
"code_verifier": "test_verifier",
"redirect_uri": "https://localhost:3000"
}
# Проверяем, что фикстуры работают
assert mock_request is not None
assert mock_oauth_client is not None
assert oauth_db_session is not None
# Мокаем токен от провайдера
mock_token = {
"access_token": "test_token",
"userinfo": {
"sub": "123",
"email": "test@gmail.com",
"name": "Test User",
"picture": "https://example.com/photo.jpg"
}
}
# Простая проверка - функция существует и может быть вызвана
# В реальном тесте здесь можно было бы замокать все зависимости
logger.info("✅ OAuth callback функция импортирована и готова к тестированию")
with patch("auth.oauth.get_oauth_state", return_value=oauth_data), \
patch("auth.oauth.oauth.create_client", return_value=mock_oauth_client), \
patch("auth.oauth.get_user_profile", return_value={
"id": "123",
"email": "test@gmail.com",
"name": "Test User",
"picture": "https://example.com/photo.jpg"
}), \
patch("auth.oauth._create_or_update_user") as mock_create_user, \
patch("auth.tokens.storage.TokenStorage.create_session", return_value="test_session_token"):
# Настраиваем мок пользователя
mock_user = MagicMock()
mock_user.id = 123
mock_user.name = "Test User"
mock_create_user.return_value = mock_user
# Настраиваем мок OAuth клиента
mock_oauth_client.fetch_access_token.return_value = mock_token
response = await oauth_callback_http(mock_request)
# Проверяем что это редирект
assert isinstance(response, RedirectResponse)
assert response.status_code == 307
# Проверяем что токен получен
mock_oauth_client.fetch_access_token.assert_called_once()
# Проверяем что пользователь создан/обновлен
mock_create_user.assert_called_once()
@pytest.mark.asyncio
async def test_oauth_callback_invalid_state(mock_request):
"""Тест с неправильным state параметром"""
mock_request.session = {"provider": "google", "state": "correct_state"}
mock_request.query_params["state"] = "wrong_state"
"""Тест с неправильным state параметром (session-free)"""
mock_request.query_params = {"state": "wrong_state"}
with patch("auth.oauth.get_oauth_state", return_value=None):
response = await oauth_callback_http(mock_request)
@@ -170,6 +232,20 @@ with (
if isinstance(body_content, memoryview):
body_content = bytes(body_content)
assert "Invalid or expired OAuth state" in body_content.decode()
@pytest.mark.asyncio
async def test_oauth_callback_missing_state(mock_request):
"""Тест без state параметра"""
mock_request.query_params = {}
response = await oauth_callback_http(mock_request)
assert isinstance(response, JSONResponse)
assert response.status_code == 400
body_content = response.body
if isinstance(body_content, memoryview):
body_content = bytes(body_content)
assert "Missing OAuth state parameter" in body_content.decode()
@pytest.mark.asyncio
async def test_oauth_callback_existing_user(mock_request, mock_oauth_client, oauth_db_session):
@@ -186,9 +262,54 @@ with (
assert mock_oauth_client is not None
assert oauth_db_session is not None
# Простая проверка - функция существует и может быть вызвана
# В реальном тесте здесь можно было бы замокать все зависимости
logger.info("✅ OAuth callback existing user функция импортирована и готова к тестированию")
# Тест Redis операций для OAuth состояния
from auth.oauth import store_oauth_state, get_oauth_state
# Проверяем что функции импортируются
assert store_oauth_state is not None
assert get_oauth_state is not None
logger.info("✅ OAuth Redis функции импортированы и готовы к тестированию")
@pytest.mark.asyncio
async def test_oauth_redis_state_operations():
"""Тест Redis операций для OAuth состояния"""
from auth.oauth import store_oauth_state, get_oauth_state
# Тестовые данные
test_state = "test_state_123"
test_data = {
"provider": "google",
"code_verifier": "test_verifier",
"redirect_uri": "https://localhost:3000",
"created_at": 1234567890
}
# Мокаем Redis операции
with patch("auth.oauth.redis") as mock_redis:
# Тест сохранения состояния
await store_oauth_state(test_state, test_data)
mock_redis.execute.assert_called_with(
"SETEX",
f"oauth_state:{test_state}",
600, # TTL
ANY # orjson.dumps(test_data)
)
# Тест получения состояния
import orjson
mock_redis.execute.side_effect = [
orjson.dumps(test_data), # GET
None # DEL
]
result = await get_oauth_state(test_state)
# Проверяем что данные корректно десериализованы
assert result == test_data
# Проверяем что вызваны правильные Redis команды
assert mock_redis.execute.call_count == 3 # SETEX + GET + DEL
# Импортируем необходимые модели
from orm.community import Community, CommunityAuthor

View File

@@ -0,0 +1,257 @@
"""
Функциональные тесты OAuth без SessionMiddleware
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch, ANY, call
from starlette.requests import Request
from starlette.responses import RedirectResponse, JSONResponse
from auth.oauth import oauth_login_http, oauth_callback_http
class TestOAuthFunctional:
"""Функциональные тесты OAuth системы"""
@pytest.mark.asyncio
async def test_oauth_provider_not_configured_error(self):
"""Тест ошибки когда провайдер не настроен"""
# Создаем мок запроса
request = MagicMock(spec=Request)
request.path_params = {"provider": "google"}
# Мокаем что провайдер не найден
with patch("auth.oauth.oauth.create_client", return_value=None):
response = await oauth_login_http(request)
assert isinstance(response, JSONResponse)
assert response.status_code == 400
# Проверяем содержимое ответа
body = response.body
if isinstance(body, memoryview):
body = bytes(body)
assert b"Provider not configured" in body
@pytest.mark.asyncio
async def test_oauth_login_success_session_free(self):
"""Тест успешного OAuth login без использования сессий"""
# Создаем мок запроса
request = MagicMock(spec=Request)
request.path_params = {"provider": "google"}
# Мокаем OAuth клиент
mock_client = AsyncMock()
mock_client.create_authorization_url = AsyncMock(return_value={
"url": "https://accounts.google.com/oauth/authorize?client_id=test&state=abc123"
})
with patch("auth.oauth.oauth.create_client", return_value=mock_client), \
patch("auth.oauth.store_oauth_state") as mock_store:
response = await oauth_login_http(request)
# Проверяем что это редирект
assert isinstance(response, RedirectResponse)
assert response.status_code == 302
# Проверяем что состояние сохранено в Redis
mock_store.assert_called_once()
# Проверяем что URL создан правильно
mock_client.create_authorization_url.assert_called_once()
@pytest.mark.asyncio
async def test_oauth_callback_missing_state(self):
"""Тест callback без state параметра"""
request = MagicMock(spec=Request)
request.query_params = {} # Нет state
response = await oauth_callback_http(request)
assert isinstance(response, JSONResponse)
assert response.status_code == 400
body = response.body
if isinstance(body, memoryview):
body = bytes(body)
assert b"Missing OAuth state parameter" in body
@pytest.mark.asyncio
async def test_oauth_callback_invalid_state(self):
"""Тест callback с неправильным state"""
request = MagicMock(spec=Request)
request.query_params = {"state": "invalid_state"}
# Мокаем что состояние не найдено в Redis
with patch("auth.oauth.get_oauth_state", return_value=None):
response = await oauth_callback_http(request)
assert isinstance(response, JSONResponse)
assert response.status_code == 400
body = response.body
if isinstance(body, memoryview):
body = bytes(body)
assert b"Invalid or expired OAuth state" in body
@pytest.mark.asyncio
async def test_oauth_callback_success_session_free(self):
"""Тест успешного OAuth callback без сессий"""
# Настраиваем мок запроса
request = MagicMock(spec=Request)
request.query_params = {
"state": "valid_state",
"code": "auth_code_123"
}
request.url = "https://localhost:3000/oauth/google/callback?state=valid_state&code=auth_code_123"
request.headers = {"user-agent": "test-browser"}
request.client = MagicMock()
request.client.host = "127.0.0.1"
# Мокаем OAuth данные из Redis
oauth_data = {
"provider": "google",
"code_verifier": "test_verifier_123",
"redirect_uri": "https://localhost:3000"
}
# Мокаем токен от провайдера
mock_token = {
"access_token": "access_token_123",
"userinfo": {
"sub": "google_user_123",
"email": "test@gmail.com",
"name": "Test User",
"picture": "https://example.com/photo.jpg"
}
}
# Мокаем профиль пользователя
user_profile = {
"id": "google_user_123",
"email": "test@gmail.com",
"name": "Test User",
"picture": "https://example.com/photo.jpg"
}
# Мокаем пользователя
mock_user = MagicMock()
mock_user.id = 123
mock_user.name = "Test User"
with patch("auth.oauth.get_oauth_state", return_value=oauth_data), \
patch("auth.oauth.oauth.create_client") as mock_create_client, \
patch("auth.oauth.get_user_profile", return_value=user_profile), \
patch("auth.oauth._create_or_update_user", return_value=mock_user), \
patch("auth.tokens.storage.TokenStorage.create_session", return_value="session_token_123"):
# Настраиваем мок OAuth клиента
mock_client = AsyncMock()
mock_client.fetch_access_token = AsyncMock(return_value=mock_token)
mock_create_client.return_value = mock_client
response = await oauth_callback_http(request)
# Проверяем что это редирект
assert isinstance(response, RedirectResponse)
assert response.status_code == 307
# Проверяем URL редиректа
assert response.headers["location"] == "https://localhost:3000"
# Проверяем что токен получен без использования request.session
mock_client.fetch_access_token.assert_called_once_with(
authorization_response=str(request.url),
code_verifier="test_verifier_123"
)
@pytest.mark.asyncio
async def test_oauth_redis_state_lifecycle(self):
"""Тест жизненного цикла OAuth состояния в Redis"""
from auth.oauth import store_oauth_state, get_oauth_state
test_state = "state_lifecycle_test"
test_data = {
"provider": "github",
"code_verifier": "verifier_123",
"redirect_uri": "https://localhost:3000",
"created_at": 1234567890
}
# Мокаем Redis операции
with patch("auth.oauth.redis") as mock_redis:
# Тест сохранения
await store_oauth_state(test_state, test_data)
# Проверяем что SETEX вызван с правильными параметрами
mock_redis.execute.assert_called_with(
"SETEX",
f"oauth_state:{test_state}",
600, # TTL 10 минут
ANY # Сериализованные данные
)
# Тест получения и удаления (one-time use)
import orjson
mock_redis.execute.side_effect = [
orjson.dumps(test_data), # GET возвращает данные
None # DEL подтверждение
]
result = await get_oauth_state(test_state)
# Проверяем что данные корректно получены
assert result == test_data
# Проверяем что GET и DEL были вызваны
assert mock_redis.execute.call_count == 3 # SETEX + GET + DEL
# Проверяем что последние вызовы были GET и DEL
calls = mock_redis.execute.call_args_list
assert calls[-2][0][0] == "GET" # Предпоследний вызов - GET
assert calls[-1][0][0] == "DEL" # Последний вызов - DEL
@pytest.mark.asyncio
async def test_oauth_error_handling_comprehensive(self):
"""Комплексный тест обработки ошибок OAuth"""
# Тест 1: Неправильный провайдер
request = MagicMock(spec=Request)
request.path_params = {"provider": "invalid_provider"}
response = await oauth_login_http(request)
assert isinstance(response, JSONResponse)
assert response.status_code == 400
# Тест 2: Провайдер не настроен
request.path_params = {"provider": "google"}
with patch("auth.oauth.oauth.create_client", return_value=None):
response = await oauth_login_http(request)
assert isinstance(response, JSONResponse)
assert response.status_code == 400
# Тест 3: Callback без code (но с правильным OAuth клиентом)
request.query_params = {"state": "valid_state"}
oauth_data = {"provider": "google", "code_verifier": "test"}
mock_client = AsyncMock()
with patch("auth.oauth.get_oauth_state", return_value=oauth_data), \
patch("auth.oauth.oauth.create_client", return_value=mock_client):
response = await oauth_callback_http(request)
assert isinstance(response, JSONResponse)
assert response.status_code == 400
body = response.body
if isinstance(body, memoryview):
body = bytes(body)
assert b"Missing authorization code" in body
if __name__ == "__main__":
pytest.main([__file__, "-v"])