#!/usr/bin/env python3 """ Тест мутации unpublishShout для снятия поста с публикации. Проверяет различные сценарии: - Успешное снятие публикации автором - Снятие публикации редактором - Отказ в доступе неавторизованному пользователю - Отказ в доступе не-автору без прав редактора - Обработку несуществующих публикаций """ import asyncio import logging import sys import time from pathlib import Path sys.path.append(str(Path(__file__).parent)) from auth.orm import Author, AuthorRole, Role from orm.shout import Shout from resolvers.editor import unpublish_shout from services.db import local_session # Настройка логгера logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) def ensure_roles_exist(): """Создает стандартные роли в БД если их нет""" with local_session() as session: # Создаем базовые роли если их нет roles_to_create = [ ("reader", "Читатель"), ("author", "Автор"), ("editor", "Редактор"), ("admin", "Администратор"), ] for role_id, role_name in roles_to_create: role = session.query(Role).filter(Role.id == role_id).first() if not role: role = Role(id=role_id, name=role_name) session.add(role) session.commit() def add_roles_to_author(author_id: int, roles: list[str]): """Добавляет роли пользователю в БД""" with local_session() as session: # Удаляем старые роли session.query(AuthorRole).filter(AuthorRole.author == author_id).delete() # Добавляем новые роли for role_id in roles: author_role = AuthorRole( community=1, # Основное сообщество author=author_id, role=role_id, ) session.add(author_role) session.commit() class MockInfo: """Мок для GraphQL info контекста""" def __init__(self, author_id: int, roles: list[str] | None = None) -> None: if author_id: self.context = { "author": {"id": author_id}, "roles": roles or ["reader", "author"], "request": None, # Важно: указываем None для тестового режима } else: # Для неавторизованного пользователя self.context = { "author": {}, "roles": [], "request": None, } async def setup_test_data() -> tuple[Author, Shout, Author]: """Создаем тестовые данные: автора, публикацию и другого автора""" logger.info("🔧 Настройка тестовых данных") # Создаем роли в БД ensure_roles_exist() current_time = int(time.time()) with local_session() as session: # Создаем первого автора (владельца публикации) test_author = session.query(Author).filter(Author.email == "test_author@example.com").first() if not test_author: test_author = Author(email="test_author@example.com", name="Test Author", slug="test-author") test_author.set_password("password123") session.add(test_author) session.flush() # Получаем ID # Создаем второго автора (не владельца) other_author = session.query(Author).filter(Author.email == "other_author@example.com").first() if not other_author: other_author = Author(email="other_author@example.com", name="Other Author", slug="other-author") other_author.set_password("password456") session.add(other_author) session.flush() # Создаем опубликованную публикацию test_shout = session.query(Shout).filter(Shout.slug == "test-shout-published").first() if not test_shout: test_shout = Shout( title="Test Published Shout", slug="test-shout-published", body="This is a test published shout content", layout="article", created_by=test_author.id, created_at=current_time, published_at=current_time, # Публикация опубликована community=1, seo="Test shout for unpublish testing", ) session.add(test_shout) else: # Убедимся что публикация опубликована test_shout.published_at = current_time session.add(test_shout) session.commit() # Добавляем роли пользователям в БД add_roles_to_author(test_author.id, ["reader", "author"]) add_roles_to_author(other_author.id, ["reader", "author"]) logger.info( f" ✅ Созданы: автор {test_author.id}, другой автор {other_author.id}, публикация {test_shout.id}" ) return test_author, test_shout, other_author async def test_successful_unpublish_by_author() -> None: """Тестируем успешное снятие публикации автором""" logger.info("📰 Тестирование успешного снятия публикации автором") test_author, test_shout, _ = await setup_test_data() # Тест 1: Успешное снятие публикации автором logger.info(" 📝 Тест 1: Снятие публикации автором") info = MockInfo(test_author.id) result = await unpublish_shout(None, info, test_shout.id) if not result.error: logger.info(" ✅ Снятие публикации успешно") # Проверяем, что published_at теперь None with local_session() as session: updated_shout = session.query(Shout).filter(Shout.id == test_shout.id).first() if updated_shout and updated_shout.published_at is None: logger.info(" ✅ published_at корректно установлен в None") else: logger.error( f" ❌ published_at неверен: {updated_shout.published_at if updated_shout else 'shout not found'}" ) if result.shout and result.shout.id == test_shout.id: logger.info(" ✅ Возвращен корректный объект публикации") else: logger.error(" ❌ Возвращен неверный объект публикации") else: logger.error(f" ❌ Ошибка снятия публикации: {result.error}") async def test_unpublish_by_editor() -> None: """Тестируем снятие публикации редактором""" logger.info("👨‍💼 Тестирование снятия публикации редактором") test_author, test_shout, other_author = await setup_test_data() # Восстанавливаем публикацию для теста with local_session() as session: shout = session.query(Shout).filter(Shout.id == test_shout.id).first() if shout: shout.published_at = int(time.time()) session.add(shout) session.commit() # Добавляем роль "editor" другому автору в БД add_roles_to_author(other_author.id, ["reader", "author", "editor"]) logger.info(" 📝 Тест: Снятие публикации редактором") info = MockInfo(other_author.id, roles=["reader", "author", "editor"]) # Другой автор с ролью редактора result = await unpublish_shout(None, info, test_shout.id) if not result.error: logger.info(" ✅ Редактор успешно снял публикацию") with local_session() as session: updated_shout = session.query(Shout).filter(Shout.id == test_shout.id).first() if updated_shout and updated_shout.published_at is None: logger.info(" ✅ published_at корректно установлен в None редактором") else: logger.error( f" ❌ published_at неверен после действий редактора: {updated_shout.published_at if updated_shout else 'shout not found'}" ) else: logger.error(f" ❌ Ошибка снятия публикации редактором: {result.error}") async def test_access_denied_scenarios() -> None: """Тестируем сценарии отказа в доступе""" logger.info("🚫 Тестирование отказа в доступе") test_author, test_shout, other_author = await setup_test_data() # Восстанавливаем публикацию для теста with local_session() as session: shout = session.query(Shout).filter(Shout.id == test_shout.id).first() if shout: shout.published_at = int(time.time()) session.add(shout) session.commit() # Тест 1: Неавторизованный пользователь logger.info(" 📝 Тест 1: Неавторизованный пользователь") info = MockInfo(0) # Нет author_id try: result = await unpublish_shout(None, info, test_shout.id) logger.error(" ❌ Неожиданный результат для неавторизованного: ошибка не была выброшена") except Exception as e: if "Требуется авторизация" in str(e): logger.info(" ✅ Корректно отклонен неавторизованный пользователь") else: logger.error(f" ❌ Неожиданная ошибка для неавторизованного: {e}") # Тест 2: Не-автор без прав редактора logger.info(" 📝 Тест 2: Не-автор без прав редактора") # Убеждаемся что у other_author нет роли editor add_roles_to_author(other_author.id, ["reader", "author"]) # Только базовые роли info = MockInfo(other_author.id, roles=["reader", "author"]) # Другой автор без прав редактора result = await unpublish_shout(None, info, test_shout.id) if result.error == "Access denied": logger.info(" ✅ Корректно отклонен не-автор без прав редактора") else: logger.error(f" ❌ Неожиданный результат для не-автора: {result.error}") async def test_nonexistent_shout() -> None: """Тестируем обработку несуществующих публикаций""" logger.info("👻 Тестирование несуществующих публикаций") test_author, _, _ = await setup_test_data() logger.info(" 📝 Тест: Несуществующая публикация") info = MockInfo(test_author.id) # Используем заведомо несуществующий ID nonexistent_id = 999999 result = await unpublish_shout(None, info, nonexistent_id) if result.error == "Shout not found": logger.info(" ✅ Корректно обработана несуществующая публикация") else: logger.error(f" ❌ Неожиданный результат для несуществующей публикации: {result.error}") async def test_already_unpublished_shout() -> None: """Тестируем снятие публикации с уже неопубликованной публикации""" logger.info("📝 Тестирование уже неопубликованной публикации") test_author, test_shout, _ = await setup_test_data() # Убеждаемся что публикация не опубликована with local_session() as session: shout = session.query(Shout).filter(Shout.id == test_shout.id).first() if shout: shout.published_at = None session.add(shout) session.commit() logger.info(" 📝 Тест: Снятие публикации с уже неопубликованной") info = MockInfo(test_author.id) result = await unpublish_shout(None, info, test_shout.id) # Функция должна отработать нормально даже для уже неопубликованной публикации if not result.error: logger.info(" ✅ Операция с уже неопубликованной публикацией прошла успешно") with local_session() as session: updated_shout = session.query(Shout).filter(Shout.id == test_shout.id).first() if updated_shout and updated_shout.published_at is None: logger.info(" ✅ published_at остался None") else: logger.error( f" ❌ published_at изменился неожиданно: {updated_shout.published_at if updated_shout else 'shout not found'}" ) else: logger.error(f" ❌ Неожиданная ошибка для уже неопубликованной публикации: {result.error}") async def cleanup_test_data() -> None: """Очистка тестовых данных""" logger.info("🧹 Очистка тестовых данных") try: with local_session() as session: # Удаляем роли тестовых авторов test_author = session.query(Author).filter(Author.email == "test_author@example.com").first() if test_author: session.query(AuthorRole).filter(AuthorRole.author == test_author.id).delete() other_author = session.query(Author).filter(Author.email == "other_author@example.com").first() if other_author: session.query(AuthorRole).filter(AuthorRole.author == other_author.id).delete() # Удаляем тестовую публикацию test_shout = session.query(Shout).filter(Shout.slug == "test-shout-published").first() if test_shout: session.delete(test_shout) # Удаляем тестовых авторов if test_author: session.delete(test_author) if other_author: session.delete(other_author) session.commit() logger.info(" ✅ Тестовые данные очищены") except Exception as e: logger.warning(f" ⚠️ Ошибка при очистке: {e}") async def main() -> None: """Главная функция теста""" logger.info("🚀 Запуск тестов unpublish_shout") try: await test_successful_unpublish_by_author() await test_unpublish_by_editor() await test_access_denied_scenarios() await test_nonexistent_shout() await test_already_unpublished_shout() logger.info("✅ Все тесты unpublish_shout завершены успешно") except Exception as e: logger.error(f"❌ Ошибка в тестах: {e}") import traceback traceback.print_exc() finally: await cleanup_test_data() if __name__ == "__main__": asyncio.run(main())