Files
core/tests/test_update_security.py
Untone 1b48675b92
Some checks failed
Deploy on push / deploy (push) Failing after 2m22s
[0.9.7] - 2025-08-18
### 🔄 Изменения
- **SQLAlchemy KeyError** - исправление ошибки `KeyError: Reaction` при инициализации
- **Исправлена ошибка SQLAlchemy**: Устранена проблема `InvalidRequestError: When initializing mapper Mapper[Shout(shout)], expression Reaction failed to locate a name (Reaction)`

### 🧪 Тестирование
- **Исправление тестов** - адаптация к новой структуре моделей
- **RBAC инициализация** - добавление `rbac.initialize_rbac()` в `conftest.py`
- **Создан тест для getSession**: Добавлен комплексный тест `test_getSession_cookies.py` с проверкой всех сценариев
- **Покрытие edge cases**: Тесты проверяют работу с валидными/невалидными токенами, отсутствующими пользователями
- **Мокирование зависимостей**: Использование unittest.mock для изоляции тестируемого кода

### 🔧 Рефакторинг
- **Упрощена архитектура**: Убраны сложные конструкции с отложенными импортами, заменены на чистую архитектуру
- **Перемещение моделей** - `Author` и связанные модели перенесены в `orm/author.py`: Вынесены базовые модели пользователей (`Author`, `AuthorFollower`, `AuthorBookmark`, `AuthorRating`) из `orm.author` в отдельный модуль
- **Устранены циклические импорты**: Разорван цикл между `auth.core` → `orm.community` → `orm.author` через реструктуризацию архитектуры
- **Создан модуль `utils/password.py`**: Класс `Password` вынесен в utils для избежания циклических зависимостей
- **Оптимизированы импорты моделей**: Убран прямой импорт `Shout` из `orm/community.py`, заменен на строковые ссылки

### 🔧 Авторизация с cookies
- **getSession теперь работает с cookies**: Мутация `getSession` теперь может получать токен из httpOnly cookies даже без заголовка Authorization
- **Убрано требование авторизации**: `getSession` больше не требует декоратор `@login_required`, работает автономно
- **Поддержка dual-авторизации**: Токен может быть получен как из заголовка Authorization, так и из cookie `session_token`
- **Автоматическая установка cookies**: Middleware автоматически устанавливает httpOnly cookies при успешном `getSession`
- **Обновлена GraphQL схема**: `SessionInfo` теперь содержит поля `success`, `error` и опциональные `token`, `author`
- **Единообразная обработка токенов**: Все модули теперь используют централизованные функции для работы с токенами
- **Улучшена обработка ошибок**: Добавлена детальная валидация токенов и пользователей в `getSession`
- **Логирование операций**: Добавлены подробные логи для отслеживания процесса авторизации

### 📝 Документация
- **Обновлена схема GraphQL**: `SessionInfo` тип теперь соответствует новому формату ответа
- Обновлена документация RBAC
- Обновлена документация авторизации с cookies
2025-08-18 14:25:25 +03:00

295 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Тест мутации updateSecurity для смены пароля и email.
Проверяет различные сценарии:
- Смена пароля
- Смена email
- Одновременная смена пароля и email
- Валидация и обработка ошибок
"""
import asyncio
import logging
import sys
from pathlib import Path
from typing import Any
sys.path.append(str(Path(__file__).parent))
from orm.author import Author
from resolvers.auth import update_security
from storage.db import local_session
# Настройка логгера
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
class MockInfo:
"""Мок для GraphQL info контекста"""
def __init__(self, author_id: int) -> None:
self.context = {
"author": {"id": author_id},
"roles": ["reader", "author"], # Добавляем необходимые роли
}
async def test_password_change() -> None:
"""Тестируем смену пароля"""
logger.info("🔐 Тестирование смены пароля")
# Создаем тестового пользователя
with local_session() as session:
# Проверяем, есть ли тестовый пользователь
test_user = session.query(Author).where(Author.email == "test@example.com").first()
if not test_user:
# Используем уникальный slug для избежания конфликтов
import uuid
unique_slug = f"test-user-{uuid.uuid4().hex[:8]}"
test_user = Author(email="test@example.com", name="Test User", slug=unique_slug)
test_user.set_password("old_password123")
session.add(test_user)
session.commit()
logger.info(f" Создан тестовый пользователь с ID {test_user.id}")
else:
test_user.set_password("old_password123")
session.add(test_user)
session.commit()
logger.info(f" Используется существующий пользователь с ID {test_user.id}")
# Тест 1: Успешная смена пароля
logger.info(" 📝 Тест 1: Успешная смена пароля")
info = MockInfo(test_user.id)
result = await update_security(
None,
info,
email=None,
old_password="old_password123",
new_password="new_password456",
)
if result["success"]:
logger.info(" ✅ Смена пароля успешна")
# Проверяем, что новый пароль работает
with local_session() as session:
updated_user = session.query(Author).where(Author.id == test_user.id).first()
if updated_user.verify_password("new_password456"):
logger.info(" ✅ Новый пароль работает")
else:
logger.error(" ❌ Новый пароль не работает")
else:
logger.error(f" ❌ Ошибка смены пароля: {result['error']}")
# Тест 2: Неверный старый пароль
logger.info(" 📝 Тест 2: Неверный старый пароль")
result = await update_security(
None,
info,
email=None,
old_password="wrong_password",
new_password="another_password789",
)
if not result["success"] and result["error"] == "incorrect old password":
logger.info(" ✅ Корректно отклонен неверный старый пароль")
else:
logger.error(f" ❌ Неожиданный результат: {result}")
# Тест 3: Пароли не совпадают
logger.info(" 📝 Тест 3: Пароли не совпадают")
result = await update_security(
None,
info,
email=None,
old_password="new_password456",
new_password="password1",
)
if not result["success"] and result["error"] == "PASSWORDS_NOT_MATCH":
logger.info(" ✅ Корректно отклонены несовпадающие пароли")
else:
logger.error(f" ❌ Неожиданный результат: {result}")
async def test_email_change() -> None:
"""Тестируем смену email"""
logger.info("📧 Тестирование смены email")
with local_session() as session:
test_user = session.query(Author).where(Author.email == "test@example.com").first()
if not test_user:
logger.error(" ❌ Тестовый пользователь не найден")
return
# Тест 1: Успешная инициация смены email
logger.info(" 📝 Тест 1: Инициация смены email")
info = MockInfo(test_user.id)
result = await update_security(
None,
info,
email="newemail@example.com",
old_password="new_password456",
new_password=None,
)
if result["success"]:
logger.info(" ✅ Смена email инициирована")
else:
logger.error(f" ❌ Ошибка инициации смены email: {result['error']}")
# Тест 2: Email уже существует
logger.info(" 📝 Тест 2: Email уже существует")
# Создаем другого пользователя с новым email
with local_session() as session:
existing_user = session.query(Author).where(Author.email == "existing@example.com").first()
if not existing_user:
existing_user = Author(email="existing@example.com", name="Existing User", slug="existing-user")
existing_user.set_password("password123")
session.add(existing_user)
session.commit()
result = await update_security(
None,
info,
email="existing@example.com",
old_password="new_password456",
new_password=None,
)
if not result["success"] and result["error"] == "email already exists":
logger.info(" ✅ Корректно отклонен существующий email")
else:
logger.error(f" ❌ Неожиданный результат: {result}")
async def test_combined_changes() -> None:
"""Тестируем одновременную смену пароля и email"""
logger.info("🔄 Тестирование одновременной смены пароля и email")
with local_session() as session:
test_user = session.query(Author).where(Author.email == "test@example.com").first()
if not test_user:
logger.error(" ❌ Тестовый пользователь не найден")
return
info = MockInfo(test_user.id)
result = await update_security(
None,
info,
email="combined@example.com",
old_password="new_password456",
new_password="combined_password789",
)
if result["success"]:
logger.info(" ✅ Одновременная смена успешна")
# Проверяем изменения
with local_session() as session:
updated_user = session.query(Author).where(Author.id == test_user.id).first()
# Проверяем пароль
if updated_user.verify_password("combined_password789"):
logger.info(" ✅ Новый пароль работает")
else:
logger.error(" ❌ Новый пароль не работает")
else:
logger.error(f" ❌ Ошибка одновременной смены: {result['error']}")
async def test_validation_errors() -> None:
"""Тестируем различные ошибки валидации"""
logger.info("⚠️ Тестирование ошибок валидации")
with local_session() as session:
test_user = session.query(Author).where(Author.email == "test@example.com").first()
if not test_user:
logger.error(" ❌ Тестовый пользователь не найден")
return
info = MockInfo(test_user.id)
# Тест 1: Нет параметров для изменения
logger.info(" 📝 Тест 1: Нет параметров для изменения")
result = await update_security(None, info, email=None, old_password="combined_password789", new_password=None)
if not result["success"] and result["error"] == "VALIDATION_ERROR":
logger.info(" ✅ Корректно отклонен запрос без параметров")
else:
logger.error(f" ❌ Неожиданный результат: {result}")
# Тест 2: Слабый пароль
logger.info(" 📝 Тест 2: Слабый пароль")
result = await update_security(None, info, email=None, old_password="combined_password789", new_password="123")
if not result["success"] and result["error"] == "WEAK_PASSWORD":
logger.info(" ✅ Корректно отклонен слабый пароль")
else:
logger.error(f" ❌ Неожиданный результат: {result}")
# Тест 3: Неверный формат email
logger.info(" 📝 Тест 3: Неверный формат email")
result = await update_security(
None,
info,
email="invalid-email",
old_password="combined_password789",
new_password=None,
)
if not result["success"] and result["error"] == "INVALID_EMAIL":
logger.info(" ✅ Корректно отклонен неверный email")
else:
logger.error(f" ❌ Неожиданный результат: {result}")
async def cleanup_test_data() -> None:
"""Очищает тестовые данные"""
logger.info("🧹 Очистка тестовых данных")
with local_session() as session:
# Удаляем тестовых пользователей
test_emails = ["test@example.com", "existing@example.com"]
for email in test_emails:
user = session.query(Author).where(Author.email == email).first()
if user:
session.delete(user)
session.commit()
logger.info("Тестовые данные очищены")
async def main() -> None:
"""Главная функция теста"""
try:
logger.info("🚀 Начало тестирования updateSecurity")
await test_password_change()
await test_email_change()
await test_combined_changes()
await test_validation_errors()
logger.info("🎉 Все тесты updateSecurity прошли успешно!")
except Exception:
logger.exception("❌ Тест провалился")
import traceback
traceback.print_exc()
finally:
await cleanup_test_data()
if __name__ == "__main__":
asyncio.run(main())