core/tests/test_unpublish_shout.py

368 lines
16 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Тест мутации unpublishShout для снятия поста с публикации.
Проверяет различные сценарии:
- Успешное снятие публикации автором
- Снятие публикации редактором
- Отказ в доступе неавторизованному пользователю
- Отказ в доступе не-автору без прав редактора
- Обработку несуществующих публикаций
"""
import asyncio
import logging
import sys
import time
from pathlib import Path
sys.path.append(str(Path(__file__).parent))
from auth.orm import Author, AuthorRole, Role
from orm.shout import Shout
from resolvers.editor import unpublish_shout
from services.db import local_session
# Настройка логгера
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
def ensure_roles_exist():
"""Создает стандартные роли в БД если их нет"""
with local_session() as session:
# Создаем базовые роли если их нет
roles_to_create = [
("reader", "Читатель"),
("author", "Автор"),
("editor", "Редактор"),
("admin", "Администратор"),
]
for role_id, role_name in roles_to_create:
role = session.query(Role).filter(Role.id == role_id).first()
if not role:
role = Role(id=role_id, name=role_name)
session.add(role)
session.commit()
def add_roles_to_author(author_id: int, roles: list[str]):
"""Добавляет роли пользователю в БД"""
with local_session() as session:
# Удаляем старые роли
session.query(AuthorRole).filter(AuthorRole.author == author_id).delete()
# Добавляем новые роли
for role_id in roles:
author_role = AuthorRole(
community=1, # Основное сообщество
author=author_id,
role=role_id,
)
session.add(author_role)
session.commit()
class MockInfo:
"""Мок для GraphQL info контекста"""
def __init__(self, author_id: int, roles: list[str] | None = None) -> None:
if author_id:
self.context = {
"author": {"id": author_id},
"roles": roles or ["reader", "author"],
"request": None, # Важно: указываем None для тестового режима
}
else:
# Для неавторизованного пользователя
self.context = {
"author": {},
"roles": [],
"request": None,
}
async def setup_test_data() -> tuple[Author, Shout, Author]:
"""Создаем тестовые данные: автора, публикацию и другого автора"""
logger.info("🔧 Настройка тестовых данных")
# Создаем роли в БД
ensure_roles_exist()
current_time = int(time.time())
with local_session() as session:
# Создаем первого автора (владельца публикации)
test_author = session.query(Author).filter(Author.email == "test_author@example.com").first()
if not test_author:
test_author = Author(email="test_author@example.com", name="Test Author", slug="test-author")
test_author.set_password("password123")
session.add(test_author)
session.flush() # Получаем ID
# Создаем второго автора (не владельца)
other_author = session.query(Author).filter(Author.email == "other_author@example.com").first()
if not other_author:
other_author = Author(email="other_author@example.com", name="Other Author", slug="other-author")
other_author.set_password("password456")
session.add(other_author)
session.flush()
# Создаем опубликованную публикацию
test_shout = session.query(Shout).filter(Shout.slug == "test-shout-published").first()
if not test_shout:
test_shout = Shout(
title="Test Published Shout",
slug="test-shout-published",
body="This is a test published shout content",
layout="article",
created_by=test_author.id,
created_at=current_time,
published_at=current_time, # Публикация опубликована
community=1,
seo="Test shout for unpublish testing",
)
session.add(test_shout)
else:
# Убедимся что публикация опубликована
test_shout.published_at = current_time
session.add(test_shout)
session.commit()
# Добавляем роли пользователям в БД
add_roles_to_author(test_author.id, ["reader", "author"])
add_roles_to_author(other_author.id, ["reader", "author"])
logger.info(
f" ✅ Созданы: автор {test_author.id}, другой автор {other_author.id}, публикация {test_shout.id}"
)
return test_author, test_shout, other_author
async def test_successful_unpublish_by_author() -> None:
"""Тестируем успешное снятие публикации автором"""
logger.info("📰 Тестирование успешного снятия публикации автором")
test_author, test_shout, _ = await setup_test_data()
# Тест 1: Успешное снятие публикации автором
logger.info(" 📝 Тест 1: Снятие публикации автором")
info = MockInfo(test_author.id)
result = await unpublish_shout(None, info, test_shout.id)
if not result.error:
logger.info(" ✅ Снятие публикации успешно")
# Проверяем, что published_at теперь None
with local_session() as session:
updated_shout = session.query(Shout).filter(Shout.id == test_shout.id).first()
if updated_shout and updated_shout.published_at is None:
logger.info(" ✅ published_at корректно установлен в None")
else:
logger.error(
f" ❌ published_at неверен: {updated_shout.published_at if updated_shout else 'shout not found'}"
)
if result.shout and result.shout.id == test_shout.id:
logger.info(" ✅ Возвращен корректный объект публикации")
else:
logger.error(" ❌ Возвращен неверный объект публикации")
else:
logger.error(f" ❌ Ошибка снятия публикации: {result.error}")
async def test_unpublish_by_editor() -> None:
"""Тестируем снятие публикации редактором"""
logger.info("👨‍💼 Тестирование снятия публикации редактором")
test_author, test_shout, other_author = await setup_test_data()
# Восстанавливаем публикацию для теста
with local_session() as session:
shout = session.query(Shout).filter(Shout.id == test_shout.id).first()
if shout:
shout.published_at = int(time.time())
session.add(shout)
session.commit()
# Добавляем роль "editor" другому автору в БД
add_roles_to_author(other_author.id, ["reader", "author", "editor"])
logger.info(" 📝 Тест: Снятие публикации редактором")
info = MockInfo(other_author.id, roles=["reader", "author", "editor"]) # Другой автор с ролью редактора
result = await unpublish_shout(None, info, test_shout.id)
if not result.error:
logger.info(" ✅ Редактор успешно снял публикацию")
with local_session() as session:
updated_shout = session.query(Shout).filter(Shout.id == test_shout.id).first()
if updated_shout and updated_shout.published_at is None:
logger.info(" ✅ published_at корректно установлен в None редактором")
else:
logger.error(
f" ❌ published_at неверен после действий редактора: {updated_shout.published_at if updated_shout else 'shout not found'}"
)
else:
logger.error(f" ❌ Ошибка снятия публикации редактором: {result.error}")
async def test_access_denied_scenarios() -> None:
"""Тестируем сценарии отказа в доступе"""
logger.info("🚫 Тестирование отказа в доступе")
test_author, test_shout, other_author = await setup_test_data()
# Восстанавливаем публикацию для теста
with local_session() as session:
shout = session.query(Shout).filter(Shout.id == test_shout.id).first()
if shout:
shout.published_at = int(time.time())
session.add(shout)
session.commit()
# Тест 1: Неавторизованный пользователь
logger.info(" 📝 Тест 1: Неавторизованный пользователь")
info = MockInfo(0) # Нет author_id
try:
result = await unpublish_shout(None, info, test_shout.id)
logger.error(" ❌ Неожиданный результат для неавторизованного: ошибка не была выброшена")
except Exception as e:
if "Требуется авторизация" in str(e):
logger.info(" ✅ Корректно отклонен неавторизованный пользователь")
else:
logger.error(f" ❌ Неожиданная ошибка для неавторизованного: {e}")
# Тест 2: Не-автор без прав редактора
logger.info(" 📝 Тест 2: Не-автор без прав редактора")
# Убеждаемся что у other_author нет роли editor
add_roles_to_author(other_author.id, ["reader", "author"]) # Только базовые роли
info = MockInfo(other_author.id, roles=["reader", "author"]) # Другой автор без прав редактора
result = await unpublish_shout(None, info, test_shout.id)
if result.error == "Access denied":
logger.info(" ✅ Корректно отклонен не-автор без прав редактора")
else:
logger.error(f" ❌ Неожиданный результат для не-автора: {result.error}")
async def test_nonexistent_shout() -> None:
"""Тестируем обработку несуществующих публикаций"""
logger.info("👻 Тестирование несуществующих публикаций")
test_author, _, _ = await setup_test_data()
logger.info(" 📝 Тест: Несуществующая публикация")
info = MockInfo(test_author.id)
# Используем заведомо несуществующий ID
nonexistent_id = 999999
result = await unpublish_shout(None, info, nonexistent_id)
if result.error == "Shout not found":
logger.info(" ✅ Корректно обработана несуществующая публикация")
else:
logger.error(f" ❌ Неожиданный результат для несуществующей публикации: {result.error}")
async def test_already_unpublished_shout() -> None:
"""Тестируем снятие публикации с уже неопубликованной публикации"""
logger.info("📝 Тестирование уже неопубликованной публикации")
test_author, test_shout, _ = await setup_test_data()
# Убеждаемся что публикация не опубликована
with local_session() as session:
shout = session.query(Shout).filter(Shout.id == test_shout.id).first()
if shout:
shout.published_at = None
session.add(shout)
session.commit()
logger.info(" 📝 Тест: Снятие публикации с уже неопубликованной")
info = MockInfo(test_author.id)
result = await unpublish_shout(None, info, test_shout.id)
# Функция должна отработать нормально даже для уже неопубликованной публикации
if not result.error:
logger.info(" ✅ Операция с уже неопубликованной публикацией прошла успешно")
with local_session() as session:
updated_shout = session.query(Shout).filter(Shout.id == test_shout.id).first()
if updated_shout and updated_shout.published_at is None:
logger.info(" ✅ published_at остался None")
else:
logger.error(
f" ❌ published_at изменился неожиданно: {updated_shout.published_at if updated_shout else 'shout not found'}"
)
else:
logger.error(f" ❌ Неожиданная ошибка для уже неопубликованной публикации: {result.error}")
async def cleanup_test_data() -> None:
"""Очистка тестовых данных"""
logger.info("🧹 Очистка тестовых данных")
try:
with local_session() as session:
# Удаляем роли тестовых авторов
test_author = session.query(Author).filter(Author.email == "test_author@example.com").first()
if test_author:
session.query(AuthorRole).filter(AuthorRole.author == test_author.id).delete()
other_author = session.query(Author).filter(Author.email == "other_author@example.com").first()
if other_author:
session.query(AuthorRole).filter(AuthorRole.author == other_author.id).delete()
# Удаляем тестовую публикацию
test_shout = session.query(Shout).filter(Shout.slug == "test-shout-published").first()
if test_shout:
session.delete(test_shout)
# Удаляем тестовых авторов
if test_author:
session.delete(test_author)
if other_author:
session.delete(other_author)
session.commit()
logger.info(" ✅ Тестовые данные очищены")
except Exception as e:
logger.warning(f" ⚠️ Ошибка при очистке: {e}")
async def main() -> None:
"""Главная функция теста"""
logger.info("🚀 Запуск тестов unpublish_shout")
try:
await test_successful_unpublish_by_author()
await test_unpublish_by_editor()
await test_access_denied_scenarios()
await test_nonexistent_shout()
await test_already_unpublished_shout()
logger.info("Все тесты unpublish_shout завершены успешно")
except Exception as e:
logger.error(f"❌ Ошибка в тестах: {e}")
import traceback
traceback.print_exc()
finally:
await cleanup_test_data()
if __name__ == "__main__":
asyncio.run(main())