2025-06-02 02:56:11 +03:00
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
"""
|
|
|
|
|
|
Тест мутации unpublishShout для снятия поста с публикации.
|
|
|
|
|
|
Проверяет различные сценарии:
|
|
|
|
|
|
- Успешное снятие публикации автором
|
|
|
|
|
|
- Снятие публикации редактором
|
|
|
|
|
|
- Отказ в доступе неавторизованному пользователю
|
|
|
|
|
|
- Отказ в доступе не-автору без прав редактора
|
|
|
|
|
|
- Обработку несуществующих публикаций
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
|
import logging
|
|
|
|
|
|
import sys
|
|
|
|
|
|
import time
|
|
|
|
|
|
from pathlib import Path
|
2025-08-12 13:41:31 +03:00
|
|
|
|
import pytest
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
|
|
|
|
|
sys.path.append(str(Path(__file__).parent))
|
|
|
|
|
|
|
[0.9.7] - 2025-08-18
### 🔄 Изменения
- **SQLAlchemy KeyError** - исправление ошибки `KeyError: Reaction` при инициализации
- **Исправлена ошибка SQLAlchemy**: Устранена проблема `InvalidRequestError: When initializing mapper Mapper[Shout(shout)], expression Reaction failed to locate a name (Reaction)`
### 🧪 Тестирование
- **Исправление тестов** - адаптация к новой структуре моделей
- **RBAC инициализация** - добавление `rbac.initialize_rbac()` в `conftest.py`
- **Создан тест для getSession**: Добавлен комплексный тест `test_getSession_cookies.py` с проверкой всех сценариев
- **Покрытие edge cases**: Тесты проверяют работу с валидными/невалидными токенами, отсутствующими пользователями
- **Мокирование зависимостей**: Использование unittest.mock для изоляции тестируемого кода
### 🔧 Рефакторинг
- **Упрощена архитектура**: Убраны сложные конструкции с отложенными импортами, заменены на чистую архитектуру
- **Перемещение моделей** - `Author` и связанные модели перенесены в `orm/author.py`: Вынесены базовые модели пользователей (`Author`, `AuthorFollower`, `AuthorBookmark`, `AuthorRating`) из `orm.author` в отдельный модуль
- **Устранены циклические импорты**: Разорван цикл между `auth.core` → `orm.community` → `orm.author` через реструктуризацию архитектуры
- **Создан модуль `utils/password.py`**: Класс `Password` вынесен в utils для избежания циклических зависимостей
- **Оптимизированы импорты моделей**: Убран прямой импорт `Shout` из `orm/community.py`, заменен на строковые ссылки
### 🔧 Авторизация с cookies
- **getSession теперь работает с cookies**: Мутация `getSession` теперь может получать токен из httpOnly cookies даже без заголовка Authorization
- **Убрано требование авторизации**: `getSession` больше не требует декоратор `@login_required`, работает автономно
- **Поддержка dual-авторизации**: Токен может быть получен как из заголовка Authorization, так и из cookie `session_token`
- **Автоматическая установка cookies**: Middleware автоматически устанавливает httpOnly cookies при успешном `getSession`
- **Обновлена GraphQL схема**: `SessionInfo` теперь содержит поля `success`, `error` и опциональные `token`, `author`
- **Единообразная обработка токенов**: Все модули теперь используют централизованные функции для работы с токенами
- **Улучшена обработка ошибок**: Добавлена детальная валидация токенов и пользователей в `getSession`
- **Логирование операций**: Добавлены подробные логи для отслеживания процесса авторизации
### 📝 Документация
- **Обновлена схема GraphQL**: `SessionInfo` тип теперь соответствует новому формату ответа
- Обновлена документация RBAC
- Обновлена документация авторизации с cookies
2025-08-18 14:25:25 +03:00
|
|
|
|
from orm.author import Author
|
2025-06-02 02:56:11 +03:00
|
|
|
|
from orm.shout import Shout
|
2025-08-20 18:33:58 +03:00
|
|
|
|
from rbac.api import assign_role_to_user
|
2025-06-02 02:56:11 +03:00
|
|
|
|
from resolvers.editor import unpublish_shout
|
2025-08-17 17:56:31 +03:00
|
|
|
|
from storage.db import local_session
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
|
|
|
|
|
# Настройка логгера
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MockInfo:
|
|
|
|
|
|
"""Мок для GraphQL info контекста"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, author_id: int, roles: list[str] | None = None) -> None:
|
|
|
|
|
|
if author_id:
|
|
|
|
|
|
self.context = {
|
|
|
|
|
|
"author": {"id": author_id},
|
|
|
|
|
|
"roles": roles or ["reader", "author"],
|
|
|
|
|
|
"request": None, # Важно: указываем None для тестового режима
|
|
|
|
|
|
}
|
|
|
|
|
|
else:
|
|
|
|
|
|
# Для неавторизованного пользователя
|
|
|
|
|
|
self.context = {
|
|
|
|
|
|
"author": {},
|
|
|
|
|
|
"roles": [],
|
|
|
|
|
|
"request": None,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
async def setup_test_data(db_session) -> tuple[Author, Shout, Author]:
|
2025-06-02 02:56:11 +03:00
|
|
|
|
"""Создаем тестовые данные: автора, публикацию и другого автора"""
|
|
|
|
|
|
logger.info("🔧 Настройка тестовых данных")
|
|
|
|
|
|
|
|
|
|
|
|
current_time = int(time.time())
|
|
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
# Создаем первого автора (владельца публикации)
|
|
|
|
|
|
test_author = db_session.query(Author).where(Author.email == "test_author@example.com").first()
|
|
|
|
|
|
if not test_author:
|
|
|
|
|
|
test_author = Author(email="test_author@example.com", name="Test Author", slug="test-author")
|
|
|
|
|
|
test_author.set_password("password123")
|
|
|
|
|
|
db_session.add(test_author)
|
|
|
|
|
|
db_session.flush() # Получаем ID
|
|
|
|
|
|
|
|
|
|
|
|
# Создаем второго автора (не владельца)
|
|
|
|
|
|
other_author = db_session.query(Author).where(Author.email == "other_author@example.com").first()
|
|
|
|
|
|
if not other_author:
|
|
|
|
|
|
other_author = Author(email="other_author@example.com", name="Other Author", slug="other-author")
|
|
|
|
|
|
other_author.set_password("password456")
|
|
|
|
|
|
db_session.add(other_author)
|
|
|
|
|
|
db_session.flush()
|
|
|
|
|
|
|
|
|
|
|
|
# Создаем опубликованную публикацию
|
|
|
|
|
|
test_shout = db_session.query(Shout).where(Shout.slug == "test-shout-published").first()
|
|
|
|
|
|
if not test_shout:
|
|
|
|
|
|
test_shout = Shout(
|
|
|
|
|
|
title="Test Published Shout",
|
|
|
|
|
|
slug="test-shout-published",
|
|
|
|
|
|
body="This is a test published shout content",
|
|
|
|
|
|
layout="article",
|
|
|
|
|
|
created_by=test_author.id,
|
|
|
|
|
|
created_at=current_time,
|
|
|
|
|
|
published_at=current_time, # Публикация опубликована
|
|
|
|
|
|
community=1,
|
|
|
|
|
|
seo="Test shout for unpublish testing",
|
|
|
|
|
|
)
|
|
|
|
|
|
db_session.add(test_shout)
|
|
|
|
|
|
else:
|
|
|
|
|
|
# Убедимся что публикация опубликована
|
|
|
|
|
|
test_shout.published_at = current_time
|
|
|
|
|
|
db_session.add(test_shout)
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
db_session.commit()
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
# Добавляем роли пользователям в БД
|
|
|
|
|
|
assign_role_to_user(test_author.id, "reader")
|
|
|
|
|
|
assign_role_to_user(test_author.id, "author")
|
|
|
|
|
|
assign_role_to_user(other_author.id, "reader")
|
|
|
|
|
|
assign_role_to_user(other_author.id, "author")
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
logger.info(
|
|
|
|
|
|
f" ✅ Созданы: автор {test_author.id}, другой автор {other_author.id}, публикация {test_shout.id}"
|
|
|
|
|
|
)
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
return test_author, test_shout, other_author
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
|
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
|
async def test_successful_unpublish_by_author(db_session) -> None:
|
2025-06-02 02:56:11 +03:00
|
|
|
|
"""Тестируем успешное снятие публикации автором"""
|
|
|
|
|
|
logger.info("📰 Тестирование успешного снятия публикации автором")
|
|
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
test_author, test_shout, _ = await setup_test_data(db_session)
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
|
|
|
|
|
# Тест 1: Успешное снятие публикации автором
|
|
|
|
|
|
logger.info(" 📝 Тест 1: Снятие публикации автором")
|
|
|
|
|
|
info = MockInfo(test_author.id)
|
|
|
|
|
|
|
|
|
|
|
|
result = await unpublish_shout(None, info, test_shout.id)
|
|
|
|
|
|
|
|
|
|
|
|
if not result.error:
|
|
|
|
|
|
logger.info(" ✅ Снятие публикации успешно")
|
|
|
|
|
|
|
|
|
|
|
|
# Проверяем, что published_at теперь None
|
|
|
|
|
|
with local_session() as session:
|
2025-07-31 18:55:59 +03:00
|
|
|
|
updated_shout = session.query(Shout).where(Shout.id == test_shout.id).first()
|
2025-06-02 02:56:11 +03:00
|
|
|
|
if updated_shout and updated_shout.published_at is None:
|
|
|
|
|
|
logger.info(" ✅ published_at корректно установлен в None")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(
|
|
|
|
|
|
f" ❌ published_at неверен: {updated_shout.published_at if updated_shout else 'shout not found'}"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if result.shout and result.shout.id == test_shout.id:
|
|
|
|
|
|
logger.info(" ✅ Возвращен корректный объект публикации")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(" ❌ Возвращен неверный объект публикации")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(f" ❌ Ошибка снятия публикации: {result.error}")
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
|
async def test_unpublish_by_editor(db_session) -> None:
|
2025-06-02 02:56:11 +03:00
|
|
|
|
"""Тестируем снятие публикации редактором"""
|
|
|
|
|
|
logger.info("👨💼 Тестирование снятия публикации редактором")
|
|
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
test_author, test_shout, other_author = await setup_test_data(db_session)
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
|
|
|
|
|
# Восстанавливаем публикацию для теста
|
|
|
|
|
|
with local_session() as session:
|
2025-07-31 18:55:59 +03:00
|
|
|
|
shout = session.query(Shout).where(Shout.id == test_shout.id).first()
|
2025-06-02 02:56:11 +03:00
|
|
|
|
if shout:
|
|
|
|
|
|
shout.published_at = int(time.time())
|
|
|
|
|
|
session.add(shout)
|
|
|
|
|
|
session.commit()
|
|
|
|
|
|
|
|
|
|
|
|
# Добавляем роль "editor" другому автору в БД
|
2025-07-02 22:30:21 +03:00
|
|
|
|
assign_role_to_user(other_author.id, "reader")
|
|
|
|
|
|
assign_role_to_user(other_author.id, "author")
|
|
|
|
|
|
assign_role_to_user(other_author.id, "editor")
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
|
|
|
|
|
logger.info(" 📝 Тест: Снятие публикации редактором")
|
|
|
|
|
|
info = MockInfo(other_author.id, roles=["reader", "author", "editor"]) # Другой автор с ролью редактора
|
|
|
|
|
|
|
|
|
|
|
|
result = await unpublish_shout(None, info, test_shout.id)
|
|
|
|
|
|
|
|
|
|
|
|
if not result.error:
|
|
|
|
|
|
logger.info(" ✅ Редактор успешно снял публикацию")
|
|
|
|
|
|
|
|
|
|
|
|
with local_session() as session:
|
2025-07-31 18:55:59 +03:00
|
|
|
|
updated_shout = session.query(Shout).where(Shout.id == test_shout.id).first()
|
2025-06-02 02:56:11 +03:00
|
|
|
|
if updated_shout and updated_shout.published_at is None:
|
|
|
|
|
|
logger.info(" ✅ published_at корректно установлен в None редактором")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(
|
|
|
|
|
|
f" ❌ published_at неверен после действий редактора: {updated_shout.published_at if updated_shout else 'shout not found'}"
|
|
|
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(f" ❌ Ошибка снятия публикации редактором: {result.error}")
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
|
async def test_access_denied_scenarios(db_session) -> None:
|
2025-06-02 02:56:11 +03:00
|
|
|
|
"""Тестируем сценарии отказа в доступе"""
|
|
|
|
|
|
logger.info("🚫 Тестирование отказа в доступе")
|
|
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
test_author, test_shout, other_author = await setup_test_data(db_session)
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
|
|
|
|
|
# Восстанавливаем публикацию для теста
|
|
|
|
|
|
with local_session() as session:
|
2025-07-31 18:55:59 +03:00
|
|
|
|
shout = session.query(Shout).where(Shout.id == test_shout.id).first()
|
2025-06-02 02:56:11 +03:00
|
|
|
|
if shout:
|
|
|
|
|
|
shout.published_at = int(time.time())
|
|
|
|
|
|
session.add(shout)
|
|
|
|
|
|
session.commit()
|
|
|
|
|
|
|
|
|
|
|
|
# Тест 1: Неавторизованный пользователь
|
|
|
|
|
|
logger.info(" 📝 Тест 1: Неавторизованный пользователь")
|
|
|
|
|
|
info = MockInfo(0) # Нет author_id
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
result = await unpublish_shout(None, info, test_shout.id)
|
|
|
|
|
|
logger.error(" ❌ Неожиданный результат для неавторизованного: ошибка не была выброшена")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
if "Требуется авторизация" in str(e):
|
|
|
|
|
|
logger.info(" ✅ Корректно отклонен неавторизованный пользователь")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(f" ❌ Неожиданная ошибка для неавторизованного: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# Тест 2: Не-автор без прав редактора
|
|
|
|
|
|
logger.info(" 📝 Тест 2: Не-автор без прав редактора")
|
|
|
|
|
|
# Убеждаемся что у other_author нет роли editor
|
2025-07-02 22:30:21 +03:00
|
|
|
|
assign_role_to_user(other_author.id, "reader")
|
|
|
|
|
|
assign_role_to_user(other_author.id, "author")
|
2025-06-02 02:56:11 +03:00
|
|
|
|
info = MockInfo(other_author.id, roles=["reader", "author"]) # Другой автор без прав редактора
|
|
|
|
|
|
|
|
|
|
|
|
result = await unpublish_shout(None, info, test_shout.id)
|
|
|
|
|
|
|
|
|
|
|
|
if result.error == "Access denied":
|
|
|
|
|
|
logger.info(" ✅ Корректно отклонен не-автор без прав редактора")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(f" ❌ Неожиданный результат для не-автора: {result.error}")
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
|
async def test_nonexistent_shout(db_session) -> None:
|
2025-06-02 02:56:11 +03:00
|
|
|
|
"""Тестируем обработку несуществующих публикаций"""
|
|
|
|
|
|
logger.info("👻 Тестирование несуществующих публикаций")
|
|
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
test_author, _, _ = await setup_test_data(db_session)
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
|
|
|
|
|
logger.info(" 📝 Тест: Несуществующая публикация")
|
|
|
|
|
|
info = MockInfo(test_author.id)
|
|
|
|
|
|
|
|
|
|
|
|
# Используем заведомо несуществующий ID
|
|
|
|
|
|
nonexistent_id = 999999
|
|
|
|
|
|
result = await unpublish_shout(None, info, nonexistent_id)
|
|
|
|
|
|
|
|
|
|
|
|
if result.error == "Shout not found":
|
|
|
|
|
|
logger.info(" ✅ Корректно обработана несуществующая публикация")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(f" ❌ Неожиданный результат для несуществующей публикации: {result.error}")
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
|
async def test_already_unpublished_shout(db_session) -> None:
|
2025-06-02 02:56:11 +03:00
|
|
|
|
"""Тестируем снятие публикации с уже неопубликованной публикации"""
|
|
|
|
|
|
logger.info("📝 Тестирование уже неопубликованной публикации")
|
|
|
|
|
|
|
2025-08-12 13:41:31 +03:00
|
|
|
|
test_author, test_shout, _ = await setup_test_data(db_session)
|
2025-06-02 02:56:11 +03:00
|
|
|
|
|
|
|
|
|
|
# Убеждаемся что публикация не опубликована
|
|
|
|
|
|
with local_session() as session:
|
2025-07-31 18:55:59 +03:00
|
|
|
|
shout = session.query(Shout).where(Shout.id == test_shout.id).first()
|
2025-06-02 02:56:11 +03:00
|
|
|
|
if shout:
|
|
|
|
|
|
shout.published_at = None
|
|
|
|
|
|
session.add(shout)
|
|
|
|
|
|
session.commit()
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(" 📝 Тест: Снятие публикации с уже неопубликованной")
|
|
|
|
|
|
info = MockInfo(test_author.id)
|
|
|
|
|
|
|
|
|
|
|
|
result = await unpublish_shout(None, info, test_shout.id)
|
|
|
|
|
|
|
|
|
|
|
|
# Функция должна отработать нормально даже для уже неопубликованной публикации
|
|
|
|
|
|
if not result.error:
|
|
|
|
|
|
logger.info(" ✅ Операция с уже неопубликованной публикацией прошла успешно")
|
|
|
|
|
|
|
|
|
|
|
|
with local_session() as session:
|
2025-07-31 18:55:59 +03:00
|
|
|
|
updated_shout = session.query(Shout).where(Shout.id == test_shout.id).first()
|
2025-06-02 02:56:11 +03:00
|
|
|
|
if updated_shout and updated_shout.published_at is None:
|
|
|
|
|
|
logger.info(" ✅ published_at остался None")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(
|
|
|
|
|
|
f" ❌ published_at изменился неожиданно: {updated_shout.published_at if updated_shout else 'shout not found'}"
|
|
|
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(f" ❌ Неожиданная ошибка для уже неопубликованной публикации: {result.error}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def cleanup_test_data() -> None:
|
|
|
|
|
|
"""Очистка тестовых данных"""
|
|
|
|
|
|
logger.info("🧹 Очистка тестовых данных")
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
with local_session() as session:
|
|
|
|
|
|
# Удаляем тестовую публикацию
|
2025-07-31 18:55:59 +03:00
|
|
|
|
test_shout = session.query(Shout).where(Shout.slug == "test-shout-published").first()
|
2025-06-02 02:56:11 +03:00
|
|
|
|
if test_shout:
|
|
|
|
|
|
session.delete(test_shout)
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(" ✅ Тестовые данные очищены")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.warning(f" ⚠️ Ошибка при очистке: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main() -> None:
|
|
|
|
|
|
"""Главная функция теста"""
|
|
|
|
|
|
logger.info("🚀 Запуск тестов unpublish_shout")
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
await test_successful_unpublish_by_author()
|
|
|
|
|
|
await test_unpublish_by_editor()
|
|
|
|
|
|
await test_access_denied_scenarios()
|
|
|
|
|
|
await test_nonexistent_shout()
|
|
|
|
|
|
await test_already_unpublished_shout()
|
|
|
|
|
|
|
|
|
|
|
|
logger.info("✅ Все тесты unpublish_shout завершены успешно")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"❌ Ошибка в тестах: {e}")
|
|
|
|
|
|
import traceback
|
|
|
|
|
|
|
|
|
|
|
|
traceback.print_exc()
|
|
|
|
|
|
finally:
|
|
|
|
|
|
await cleanup_test_data()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
asyncio.run(main())
|