This commit is contained in:
@@ -36,30 +36,41 @@ class MockInfo:
|
||||
}
|
||||
|
||||
|
||||
async def test_password_change() -> None:
|
||||
async def test_password_change(db_session) -> None:
|
||||
"""Тестируем смену пароля"""
|
||||
logger.info("🔐 Тестирование смены пароля")
|
||||
|
||||
try:
|
||||
# Создаем тестового пользователя
|
||||
with local_session() as session:
|
||||
# Проверяем, есть ли тестовый пользователь
|
||||
test_user = session.query(Author).where(Author.email == "test@example.com").first()
|
||||
# Проверяем наличие таблицы author в базе данных
|
||||
from sqlalchemy import inspect
|
||||
inspector = inspect(db_session.bind)
|
||||
tables = inspector.get_table_names()
|
||||
|
||||
if 'author' not in tables:
|
||||
logger.error("❌ Таблица author отсутствует в базе данных")
|
||||
# Принудительно создаем таблицы
|
||||
from orm.base import BaseModel as Base
|
||||
|
||||
Base.metadata.create_all(db_session.bind)
|
||||
logger.info("✅ Таблицы принудительно созданы")
|
||||
|
||||
if not test_user:
|
||||
# Используем уникальный slug для избежания конфликтов
|
||||
import uuid
|
||||
unique_slug = f"test-user-{uuid.uuid4().hex[:8]}"
|
||||
test_user = Author(email="test@example.com", name="Test User", slug=unique_slug)
|
||||
test_user.set_password("old_password123")
|
||||
session.add(test_user)
|
||||
session.commit()
|
||||
logger.info(f" Создан тестовый пользователь с ID {test_user.id}")
|
||||
else:
|
||||
test_user.set_password("old_password123")
|
||||
session.add(test_user)
|
||||
session.commit()
|
||||
logger.info(f" Используется существующий пользователь с ID {test_user.id}")
|
||||
try:
|
||||
# Создаем тестового пользователя в db_session
|
||||
test_user = db_session.query(Author).where(Author.email == "test@example.com").first()
|
||||
|
||||
if not test_user:
|
||||
# Используем уникальный slug для избежания конфликтов
|
||||
import uuid
|
||||
unique_slug = f"test-user-{uuid.uuid4().hex[:8]}"
|
||||
test_user = Author(email="test@example.com", name="Test User", slug=unique_slug)
|
||||
test_user.set_password("old_password123")
|
||||
db_session.add(test_user)
|
||||
db_session.commit()
|
||||
logger.info(f" Создан тестовый пользователь с ID {test_user.id}")
|
||||
else:
|
||||
test_user.set_password("old_password123")
|
||||
db_session.add(test_user)
|
||||
db_session.commit()
|
||||
logger.info(f" Используется существующий пользователь с ID {test_user.id}")
|
||||
|
||||
# Тест 1: Успешная смена пароля
|
||||
logger.info(" 📝 Тест 1: Успешная смена пароля")
|
||||
@@ -77,12 +88,11 @@ async def test_password_change() -> None:
|
||||
logger.info(" ✅ Смена пароля успешна")
|
||||
|
||||
# Проверяем, что новый пароль работает
|
||||
with local_session() as session:
|
||||
updated_user = session.query(Author).where(Author.id == test_user.id).first()
|
||||
if updated_user.verify_password("new_password456"):
|
||||
logger.info(" ✅ Новый пароль работает")
|
||||
else:
|
||||
logger.error(" ❌ Новый пароль не работает")
|
||||
updated_user = db_session.query(Author).where(Author.id == test_user.id).first()
|
||||
if updated_user.verify_password("new_password456"):
|
||||
logger.info(" ✅ Новый пароль работает")
|
||||
else:
|
||||
logger.error(" ❌ Новый пароль не работает")
|
||||
else:
|
||||
logger.error(f" ❌ Ошибка смены пароля: {result['error']}")
|
||||
|
||||
@@ -118,22 +128,43 @@ async def test_password_change() -> None:
|
||||
else:
|
||||
logger.error(f" ❌ Неожиданный результат: {result}")
|
||||
except Exception as e:
|
||||
# На CI могут быть проблемы с local_session, пропускаем тест
|
||||
logger.error(f"❌ Ошибка в тесте: {e}")
|
||||
pytest.skip(f"Тест пропущен на CI: {e}")
|
||||
|
||||
|
||||
async def test_email_change() -> None:
|
||||
async def test_email_change(db_session) -> None:
|
||||
"""Тестируем смену email"""
|
||||
logger.info("📧 Тестирование смены email")
|
||||
|
||||
# Проверяем наличие таблицы author в базе данных
|
||||
from sqlalchemy import inspect
|
||||
inspector = inspect(db_session.bind)
|
||||
tables = inspector.get_table_names()
|
||||
|
||||
if 'author' not in tables:
|
||||
logger.error("❌ Таблица author отсутствует в базе данных")
|
||||
# Принудительно создаем таблицы
|
||||
from orm.base import BaseModel as Base
|
||||
|
||||
Base.metadata.create_all(db_session.bind)
|
||||
logger.info("✅ Таблицы принудительно созданы")
|
||||
|
||||
try:
|
||||
with local_session() as session:
|
||||
test_user = session.query(Author).where(Author.email == "test@example.com").first()
|
||||
if not test_user:
|
||||
logger.error(" ❌ Тестовый пользователь не найден")
|
||||
return
|
||||
# Используем db_session напрямую
|
||||
test_user = db_session.query(Author).where(Author.email == "test@example.com").first()
|
||||
if not test_user:
|
||||
logger.error(" ❌ Тестовый пользователь не найден")
|
||||
|
||||
# Создаем тестового пользователя, если его нет
|
||||
import uuid
|
||||
unique_slug = f"test-user-{uuid.uuid4().hex[:8]}"
|
||||
test_user = Author(email="test@example.com", name="Test User", slug=unique_slug)
|
||||
test_user.set_password("new_password456") # Используем пароль из предыдущего теста
|
||||
db_session.add(test_user)
|
||||
db_session.commit()
|
||||
logger.info(f" Создан тестовый пользователь с ID {test_user.id}")
|
||||
except Exception as e:
|
||||
# На CI могут быть проблемы с local_session, пропускаем тест
|
||||
logger.error(f"❌ Ошибка при получении тестового пользователя: {e}")
|
||||
pytest.skip(f"Тест пропущен на CI: {e}")
|
||||
|
||||
# Тест 1: Успешная инициация смены email
|
||||
@@ -158,15 +189,14 @@ async def test_email_change() -> None:
|
||||
|
||||
# Создаем другого пользователя с новым email
|
||||
try:
|
||||
with local_session() as session:
|
||||
existing_user = session.query(Author).where(Author.email == "existing@example.com").first()
|
||||
if not existing_user:
|
||||
existing_user = Author(email="existing@example.com", name="Existing User", slug="existing-user")
|
||||
existing_user.set_password("password123")
|
||||
session.add(existing_user)
|
||||
session.commit()
|
||||
existing_user = db_session.query(Author).where(Author.email == "existing@example.com").first()
|
||||
if not existing_user:
|
||||
existing_user = Author(email="existing@example.com", name="Existing User", slug="existing-user")
|
||||
existing_user.set_password("password123")
|
||||
db_session.add(existing_user)
|
||||
db_session.commit()
|
||||
except Exception as e:
|
||||
# На CI могут быть проблемы с local_session, пропускаем тест
|
||||
logger.error(f"❌ Ошибка при создании существующего пользователя: {e}")
|
||||
pytest.skip(f"Тест пропущен на CI: {e}")
|
||||
|
||||
result = await update_security(
|
||||
@@ -183,18 +213,39 @@ async def test_email_change() -> None:
|
||||
logger.error(f" ❌ Неожиданный результат: {result}")
|
||||
|
||||
|
||||
async def test_combined_changes() -> None:
|
||||
async def test_combined_changes(db_session) -> None:
|
||||
"""Тестируем одновременную смену пароля и email"""
|
||||
logger.info("🔄 Тестирование одновременной смены пароля и email")
|
||||
|
||||
# Проверяем наличие таблицы author в базе данных
|
||||
from sqlalchemy import inspect
|
||||
inspector = inspect(db_session.bind)
|
||||
tables = inspector.get_table_names()
|
||||
|
||||
if 'author' not in tables:
|
||||
logger.error("❌ Таблица author отсутствует в базе данных")
|
||||
# Принудительно создаем таблицы
|
||||
from orm.base import BaseModel as Base
|
||||
|
||||
Base.metadata.create_all(db_session.bind)
|
||||
logger.info("✅ Таблицы принудительно созданы")
|
||||
|
||||
try:
|
||||
with local_session() as session:
|
||||
test_user = session.query(Author).where(Author.email == "test@example.com").first()
|
||||
if not test_user:
|
||||
logger.error(" ❌ Тестовый пользователь не найден")
|
||||
return
|
||||
# Используем db_session напрямую
|
||||
test_user = db_session.query(Author).where(Author.email == "test@example.com").first()
|
||||
if not test_user:
|
||||
logger.error(" ❌ Тестовый пользователь не найден")
|
||||
|
||||
# Создаем тестового пользователя, если его нет
|
||||
import uuid
|
||||
unique_slug = f"test-user-{uuid.uuid4().hex[:8]}"
|
||||
test_user = Author(email="test@example.com", name="Test User", slug=unique_slug)
|
||||
test_user.set_password("new_password456") # Используем пароль из предыдущего теста
|
||||
db_session.add(test_user)
|
||||
db_session.commit()
|
||||
logger.info(f" Создан тестовый пользователь с ID {test_user.id}")
|
||||
except Exception as e:
|
||||
# На CI могут быть проблемы с local_session, пропускаем тест
|
||||
logger.error(f"❌ Ошибка при получении тестового пользователя: {e}")
|
||||
pytest.skip(f"Тест пропущен на CI: {e}")
|
||||
|
||||
info = MockInfo(test_user.id)
|
||||
@@ -210,35 +261,55 @@ async def test_combined_changes() -> None:
|
||||
if result["success"]:
|
||||
logger.info(" ✅ Одновременная смена успешна")
|
||||
|
||||
# Проверяем изменения
|
||||
# Проверяем изменения с использованием db_session
|
||||
try:
|
||||
with local_session() as session:
|
||||
updated_user = session.query(Author).where(Author.id == test_user.id).first()
|
||||
updated_user = db_session.query(Author).where(Author.id == test_user.id).first()
|
||||
|
||||
# Проверяем пароль
|
||||
if updated_user.verify_password("combined_password789"):
|
||||
logger.info(" ✅ Новый пароль работает")
|
||||
else:
|
||||
logger.error(" ❌ Новый пароль не работает")
|
||||
# Проверяем пароль
|
||||
if updated_user.verify_password("combined_password789"):
|
||||
logger.info(" ✅ Новый пароль работает")
|
||||
else:
|
||||
logger.error(" ❌ Новый пароль не работает")
|
||||
except Exception as e:
|
||||
# На CI могут быть проблемы с local_session, пропускаем тест
|
||||
logger.error(f"❌ Ошибка при проверке обновленного пользователя: {e}")
|
||||
pytest.skip(f"Тест пропущен на CI: {e}")
|
||||
else:
|
||||
logger.error(f" ❌ Ошибка одновременной смены: {result['error']}")
|
||||
|
||||
|
||||
async def test_validation_errors() -> None:
|
||||
async def test_validation_errors(db_session) -> None:
|
||||
"""Тестируем различные ошибки валидации"""
|
||||
logger.info("⚠️ Тестирование ошибок валидации")
|
||||
|
||||
# Проверяем наличие таблицы author в базе данных
|
||||
from sqlalchemy import inspect
|
||||
inspector = inspect(db_session.bind)
|
||||
tables = inspector.get_table_names()
|
||||
|
||||
if 'author' not in tables:
|
||||
logger.error("❌ Таблица author отсутствует в базе данных")
|
||||
# Принудительно создаем таблицы
|
||||
from orm.base import BaseModel as Base
|
||||
|
||||
Base.metadata.create_all(db_session.bind)
|
||||
logger.info("✅ Таблицы принудительно созданы")
|
||||
|
||||
try:
|
||||
with local_session() as session:
|
||||
test_user = session.query(Author).where(Author.email == "test@example.com").first()
|
||||
if not test_user:
|
||||
logger.error(" ❌ Тестовый пользователь не найден")
|
||||
return
|
||||
# Используем db_session напрямую
|
||||
test_user = db_session.query(Author).where(Author.email == "test@example.com").first()
|
||||
if not test_user:
|
||||
logger.error(" ❌ Тестовый пользователь не найден")
|
||||
|
||||
# Создаем тестового пользователя, если его нет
|
||||
import uuid
|
||||
unique_slug = f"test-user-{uuid.uuid4().hex[:8]}"
|
||||
test_user = Author(email="test@example.com", name="Test User", slug=unique_slug)
|
||||
test_user.set_password("combined_password789") # Используем пароль из предыдущего теста
|
||||
db_session.add(test_user)
|
||||
db_session.commit()
|
||||
logger.info(f" Создан тестовый пользователь с ID {test_user.id}")
|
||||
except Exception as e:
|
||||
# На CI могут быть проблемы с local_session, пропускаем тест
|
||||
logger.error(f"❌ Ошибка при получении тестового пользователя: {e}")
|
||||
pytest.skip(f"Тест пропущен на CI: {e}")
|
||||
|
||||
info = MockInfo(test_user.id)
|
||||
@@ -277,21 +348,24 @@ async def test_validation_errors() -> None:
|
||||
logger.error(f" ❌ Неожиданный результат: {result}")
|
||||
|
||||
|
||||
async def cleanup_test_data() -> None:
|
||||
async def cleanup_test_data(db_session) -> None:
|
||||
"""Очищает тестовые данные"""
|
||||
logger.info("🧹 Очистка тестовых данных")
|
||||
|
||||
with local_session() as session:
|
||||
try:
|
||||
# Удаляем тестовых пользователей
|
||||
test_emails = ["test@example.com", "existing@example.com"]
|
||||
for email in test_emails:
|
||||
user = session.query(Author).where(Author.email == email).first()
|
||||
user = db_session.query(Author).where(Author.email == email).first()
|
||||
if user:
|
||||
session.delete(user)
|
||||
db_session.delete(user)
|
||||
|
||||
session.commit()
|
||||
|
||||
logger.info("Тестовые данные очищены")
|
||||
db_session.commit()
|
||||
logger.info("✅ Тестовые данные очищены")
|
||||
except Exception as e:
|
||||
db_session.rollback()
|
||||
logger.error(f"❌ Ошибка при очистке тестовых данных: {e}")
|
||||
# Не вызываем pytest.skip, так как это функция очистки
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
@@ -299,10 +373,35 @@ async def main() -> None:
|
||||
try:
|
||||
logger.info("🚀 Начало тестирования updateSecurity")
|
||||
|
||||
await test_password_change()
|
||||
await test_email_change()
|
||||
await test_combined_changes()
|
||||
await test_validation_errors()
|
||||
# Создаем тестовую сессию для запуска тестов
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.pool import StaticPool
|
||||
from orm.base import BaseModel as Base
|
||||
|
||||
# Создаем in-memory SQLite для тестов
|
||||
engine = create_engine(
|
||||
"sqlite:///:memory:",
|
||||
echo=False,
|
||||
poolclass=StaticPool,
|
||||
connect_args={"check_same_thread": False}
|
||||
)
|
||||
|
||||
# Создаем все таблицы
|
||||
Base.metadata.create_all(engine)
|
||||
|
||||
# Создаем сессию
|
||||
Session = sessionmaker(bind=engine)
|
||||
db_session = Session()
|
||||
|
||||
# Запускаем тесты с передачей db_session
|
||||
await test_password_change(db_session)
|
||||
await test_email_change(db_session)
|
||||
await test_combined_changes(db_session)
|
||||
await test_validation_errors(db_session)
|
||||
|
||||
# Очищаем данные
|
||||
await cleanup_test_data(db_session)
|
||||
|
||||
logger.info("🎉 Все тесты updateSecurity прошли успешно!")
|
||||
|
||||
@@ -311,8 +410,6 @@ async def main() -> None:
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
await cleanup_test_data()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user