291 lines
11 KiB
Python
291 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Тест мутации updateSecurity для смены пароля и email.
|
||
Проверяет различные сценарии:
|
||
- Смена пароля
|
||
- Смена email
|
||
- Одновременная смена пароля и email
|
||
- Валидация и обработка ошибок
|
||
"""
|
||
|
||
import asyncio
|
||
import logging
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
sys.path.append(str(Path(__file__).parent))
|
||
|
||
from auth.orm import Author
|
||
from resolvers.auth import update_security
|
||
from services.db import local_session
|
||
|
||
# Настройка логгера
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MockInfo:
|
||
"""Мок для GraphQL info контекста"""
|
||
|
||
def __init__(self, author_id: int) -> None:
|
||
self.context = {
|
||
"author": {"id": author_id},
|
||
"roles": ["reader", "author"], # Добавляем необходимые роли
|
||
}
|
||
|
||
|
||
async def test_password_change() -> None:
|
||
"""Тестируем смену пароля"""
|
||
logger.info("🔐 Тестирование смены пароля")
|
||
|
||
# Создаем тестового пользователя
|
||
with local_session() as session:
|
||
# Проверяем, есть ли тестовый пользователь
|
||
test_user = session.query(Author).filter(Author.email == "test@example.com").first()
|
||
|
||
if not test_user:
|
||
test_user = Author(email="test@example.com", name="Test User", slug="test-user")
|
||
test_user.set_password("old_password123")
|
||
session.add(test_user)
|
||
session.commit()
|
||
logger.info(f" Создан тестовый пользователь с ID {test_user.id}")
|
||
else:
|
||
test_user.set_password("old_password123")
|
||
session.add(test_user)
|
||
session.commit()
|
||
logger.info(f" Используется существующий пользователь с ID {test_user.id}")
|
||
|
||
# Тест 1: Успешная смена пароля
|
||
logger.info(" 📝 Тест 1: Успешная смена пароля")
|
||
info = MockInfo(test_user.id)
|
||
|
||
result = await update_security(
|
||
None,
|
||
info,
|
||
email=None,
|
||
old_password="old_password123",
|
||
new_password="new_password456",
|
||
)
|
||
|
||
if result["success"]:
|
||
logger.info(" ✅ Смена пароля успешна")
|
||
|
||
# Проверяем, что новый пароль работает
|
||
with local_session() as session:
|
||
updated_user = session.query(Author).filter(Author.id == test_user.id).first()
|
||
if updated_user.verify_password("new_password456"):
|
||
logger.info(" ✅ Новый пароль работает")
|
||
else:
|
||
logger.error(" ❌ Новый пароль не работает")
|
||
else:
|
||
logger.error(f" ❌ Ошибка смены пароля: {result['error']}")
|
||
|
||
# Тест 2: Неверный старый пароль
|
||
logger.info(" 📝 Тест 2: Неверный старый пароль")
|
||
|
||
result = await update_security(
|
||
None,
|
||
info,
|
||
email=None,
|
||
old_password="wrong_password",
|
||
new_password="another_password789",
|
||
)
|
||
|
||
if not result["success"] and result["error"] == "incorrect old password":
|
||
logger.info(" ✅ Корректно отклонен неверный старый пароль")
|
||
else:
|
||
logger.error(f" ❌ Неожиданный результат: {result}")
|
||
|
||
# Тест 3: Пароли не совпадают
|
||
logger.info(" 📝 Тест 3: Пароли не совпадают")
|
||
|
||
result = await update_security(
|
||
None,
|
||
info,
|
||
email=None,
|
||
old_password="new_password456",
|
||
new_password="password1",
|
||
)
|
||
|
||
if not result["success"] and result["error"] == "PASSWORDS_NOT_MATCH":
|
||
logger.info(" ✅ Корректно отклонены несовпадающие пароли")
|
||
else:
|
||
logger.error(f" ❌ Неожиданный результат: {result}")
|
||
|
||
|
||
async def test_email_change() -> None:
|
||
"""Тестируем смену email"""
|
||
logger.info("📧 Тестирование смены email")
|
||
|
||
with local_session() as session:
|
||
test_user = session.query(Author).filter(Author.email == "test@example.com").first()
|
||
if not test_user:
|
||
logger.error(" ❌ Тестовый пользователь не найден")
|
||
return
|
||
|
||
# Тест 1: Успешная инициация смены email
|
||
logger.info(" 📝 Тест 1: Инициация смены email")
|
||
info = MockInfo(test_user.id)
|
||
|
||
result = await update_security(
|
||
None,
|
||
info,
|
||
email="newemail@example.com",
|
||
old_password="new_password456",
|
||
new_password=None,
|
||
)
|
||
|
||
if result["success"]:
|
||
logger.info(" ✅ Смена email инициирована")
|
||
else:
|
||
logger.error(f" ❌ Ошибка инициации смены email: {result['error']}")
|
||
|
||
# Тест 2: Email уже существует
|
||
logger.info(" 📝 Тест 2: Email уже существует")
|
||
|
||
# Создаем другого пользователя с новым email
|
||
with local_session() as session:
|
||
existing_user = session.query(Author).filter(Author.email == "existing@example.com").first()
|
||
if not existing_user:
|
||
existing_user = Author(email="existing@example.com", name="Existing User", slug="existing-user")
|
||
existing_user.set_password("password123")
|
||
session.add(existing_user)
|
||
session.commit()
|
||
|
||
result = await update_security(
|
||
None,
|
||
info,
|
||
email="existing@example.com",
|
||
old_password="new_password456",
|
||
new_password=None,
|
||
)
|
||
|
||
if not result["success"] and result["error"] == "email already exists":
|
||
logger.info(" ✅ Корректно отклонен существующий email")
|
||
else:
|
||
logger.error(f" ❌ Неожиданный результат: {result}")
|
||
|
||
|
||
async def test_combined_changes() -> None:
|
||
"""Тестируем одновременную смену пароля и email"""
|
||
logger.info("🔄 Тестирование одновременной смены пароля и email")
|
||
|
||
with local_session() as session:
|
||
test_user = session.query(Author).filter(Author.email == "test@example.com").first()
|
||
if not test_user:
|
||
logger.error(" ❌ Тестовый пользователь не найден")
|
||
return
|
||
|
||
info = MockInfo(test_user.id)
|
||
|
||
result = await update_security(
|
||
None,
|
||
info,
|
||
email="combined@example.com",
|
||
old_password="new_password456",
|
||
new_password="combined_password789",
|
||
)
|
||
|
||
if result["success"]:
|
||
logger.info(" ✅ Одновременная смена успешна")
|
||
|
||
# Проверяем изменения
|
||
with local_session() as session:
|
||
updated_user = session.query(Author).filter(Author.id == test_user.id).first()
|
||
|
||
# Проверяем пароль
|
||
if updated_user.verify_password("combined_password789"):
|
||
logger.info(" ✅ Новый пароль работает")
|
||
else:
|
||
logger.error(" ❌ Новый пароль не работает")
|
||
else:
|
||
logger.error(f" ❌ Ошибка одновременной смены: {result['error']}")
|
||
|
||
|
||
async def test_validation_errors() -> None:
|
||
"""Тестируем различные ошибки валидации"""
|
||
logger.info("⚠️ Тестирование ошибок валидации")
|
||
|
||
with local_session() as session:
|
||
test_user = session.query(Author).filter(Author.email == "test@example.com").first()
|
||
if not test_user:
|
||
logger.error(" ❌ Тестовый пользователь не найден")
|
||
return
|
||
|
||
info = MockInfo(test_user.id)
|
||
|
||
# Тест 1: Нет параметров для изменения
|
||
logger.info(" 📝 Тест 1: Нет параметров для изменения")
|
||
result = await update_security(None, info, email=None, old_password="combined_password789", new_password=None)
|
||
|
||
if not result["success"] and result["error"] == "VALIDATION_ERROR":
|
||
logger.info(" ✅ Корректно отклонен запрос без параметров")
|
||
else:
|
||
logger.error(f" ❌ Неожиданный результат: {result}")
|
||
|
||
# Тест 2: Слабый пароль
|
||
logger.info(" 📝 Тест 2: Слабый пароль")
|
||
result = await update_security(None, info, email=None, old_password="combined_password789", new_password="123")
|
||
|
||
if not result["success"] and result["error"] == "WEAK_PASSWORD":
|
||
logger.info(" ✅ Корректно отклонен слабый пароль")
|
||
else:
|
||
logger.error(f" ❌ Неожиданный результат: {result}")
|
||
|
||
# Тест 3: Неверный формат email
|
||
logger.info(" 📝 Тест 3: Неверный формат email")
|
||
result = await update_security(
|
||
None,
|
||
info,
|
||
email="invalid-email",
|
||
old_password="combined_password789",
|
||
new_password=None,
|
||
)
|
||
|
||
if not result["success"] and result["error"] == "INVALID_EMAIL":
|
||
logger.info(" ✅ Корректно отклонен неверный email")
|
||
else:
|
||
logger.error(f" ❌ Неожиданный результат: {result}")
|
||
|
||
|
||
async def cleanup_test_data() -> None:
|
||
"""Очищает тестовые данные"""
|
||
logger.info("🧹 Очистка тестовых данных")
|
||
|
||
with local_session() as session:
|
||
# Удаляем тестовых пользователей
|
||
test_emails = ["test@example.com", "existing@example.com"]
|
||
for email in test_emails:
|
||
user = session.query(Author).filter(Author.email == email).first()
|
||
if user:
|
||
session.delete(user)
|
||
|
||
session.commit()
|
||
|
||
logger.info("Тестовые данные очищены")
|
||
|
||
|
||
async def main() -> None:
|
||
"""Главная функция теста"""
|
||
try:
|
||
logger.info("🚀 Начало тестирования updateSecurity")
|
||
|
||
await test_password_change()
|
||
await test_email_change()
|
||
await test_combined_changes()
|
||
await test_validation_errors()
|
||
|
||
logger.info("🎉 Все тесты updateSecurity прошли успешно!")
|
||
|
||
except Exception:
|
||
logger.exception("❌ Тест провалился")
|
||
import traceback
|
||
|
||
traceback.print_exc()
|
||
finally:
|
||
await cleanup_test_data()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|