Files
core/tests/test_draft_publication_fix.py
Untone 2a6fcc3f45
Some checks failed
Deploy on push / deploy (push) Failing after 2m54s
[0.9.12] - 2025-08-26
### 🚨 Исправлено
- **Лимит топиков API**: Убрано жесткое ограничение в 100 топиков, теперь поддерживается до 1000 топиков
  - Обновлен лимит функции `get_topics_with_stats` с 100 до 1000
  - Обновлен лимит по умолчанию резолвера `get_topics_by_community` с 100 до 1000
  - Это решает проблему, когда API искусственно ограничивал получение топиков

### 🧪 Исправлено
- **Тест-сьют**: Исправлены все падающие тесты для достижения 100% прохождения
  - Исправлено утверждение теста уведомлений для невалидных действий (fallback к CREATE)
  - Исправлены тесты публикации черновиков путем добавления обязательных топиков
  - Исправлен контекст авторизации в тестах черновиков (добавлены роли и токен)
  - Установлены браузеры Playwright для решения проблем с браузерными тестами
  - Все тесты теперь проходят: 361 пройден, 31 пропущен, 0 провален

### 🔧 Техническое
- Улучшены тестовые фикстуры с правильным созданием топиков для черновиков
- Улучшено тестовое мокирование для GraphQL контекста с требуемыми данными авторизации
- Добавлена правильная обработка ошибок для требований публикации черновиков
2025-08-26 13:28:28 +03:00

293 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
🧪 Тест для исправления бага публикации черновиков
Проверяет, что после публикации черновика shout корректно отображается
как в единичных запросах по slug, так и в списках load_shouts_by.
"""
import time
from typing import Any
from unittest.mock import patch, AsyncMock
import pytest
from sqlalchemy.orm import Session
from orm.author import Author
from orm.community import Community
from orm.draft import Draft, DraftTopic
from orm.shout import Shout
from orm.topic import Topic
from resolvers.draft import publish_draft
from resolvers.reader import get_shout, load_shouts_by
from storage.db import local_session
# from tests.conftest import GraphQLResolveInfoMock
class GraphQLResolveInfoMock:
"""Mock объект для GraphQLResolveInfo в тестах"""
def __init__(self, context: dict | None = None):
from unittest.mock import MagicMock
self.context = context or {}
self.field_nodes = [MagicMock()]
self.field_nodes[0].selection_set = None
self.field_name = "test_field"
self.return_type = MagicMock()
self.parent_type = MagicMock()
self.path = MagicMock()
self.schema = MagicMock()
self.fragments = {}
self.root_value = None
self.operation = MagicMock()
self.variable_values = {}
self.is_awaitable = False
@pytest.fixture(scope="function")
def test_data() -> dict[str, Any]:
"""Создает тестовые данные для проверки публикации"""
with local_session() as session:
# 🔍 Отладка: проверяем схему таблицы draft и добавляем недостающую колонку
from sqlalchemy import inspect, text
inspector = inspect(session.bind)
draft_columns = [col['name'] for col in inspector.get_columns('draft')]
print(f"🔍 Draft table columns in test: {draft_columns}")
if 'shout' not in draft_columns:
print("🔧 Adding missing 'shout' column to draft table")
session.execute(text("ALTER TABLE draft ADD COLUMN shout INTEGER"))
session.commit()
# Создаем автора с уникальным email
import time
import random
timestamp = int(time.time()) + random.randint(1, 10000)
author = Author(
name="Test Author",
slug=f"test-author-{timestamp}",
email=f"test-{timestamp}@example.com",
)
session.add(author)
session.flush()
# Создаем сообщество с уникальным slug
community = Community(
name="Test Community",
slug=f"test-community-{timestamp}",
created_by=author.id,
)
session.add(community)
session.flush()
# Создаем черновик с уникальным slug
draft = Draft(
title="Test Draft Title",
body="<p>Test draft content</p>",
slug=f"test-draft-slug-{timestamp}",
created_by=author.id,
community=community.id,
)
session.add(draft)
session.flush()
# Создаем топик для черновика
topic = Topic(
title="Test Topic",
slug=f"test-topic-{timestamp}",
community=community.id,
)
session.add(topic)
session.flush()
# Связываем черновик с топиком
draft_topic = DraftTopic(draft=draft.id, topic=topic.id, main=True)
session.add(draft_topic)
# Создаем существующий shout для тестирования обновления
existing_shout = Shout(
title="Old Title",
body="<p>Old content</p>",
slug=f"existing-shout-slug-{timestamp}",
created_by=author.id,
community=community.id,
created_at=int(time.time()),
published_at=None, # Важно: изначально не опубликован
)
session.add(existing_shout)
session.flush()
# Связываем черновик с существующим shout
draft_with_shout = Draft(
title="Updated Draft Title",
body="<p>Updated draft content</p>",
slug=f"updated-draft-slug-{timestamp}",
created_by=author.id,
community=community.id,
shout=existing_shout.id, # Связываем с существующим shout
)
session.add(draft_with_shout)
session.flush()
# Создаем топик для второго черновика
topic2 = Topic(
title="Updated Topic",
slug=f"updated-topic-{timestamp}",
community=community.id,
)
session.add(topic2)
session.flush()
# Связываем второй черновик с топиком
draft_topic2 = DraftTopic(draft=draft_with_shout.id, topic=topic2.id, main=True)
session.add(draft_topic2)
session.commit()
return {
"author_id": author.id,
"community_id": community.id,
"draft_id": draft.id,
"draft_with_shout_id": draft_with_shout.id,
"existing_shout_id": existing_shout.id,
}
@pytest.mark.asyncio
@patch('resolvers.draft.notify_shout', new=AsyncMock())
@patch('resolvers.draft.invalidate_shouts_cache', new=AsyncMock())
@patch('resolvers.draft.invalidate_shout_related_cache', new=AsyncMock())
async def test_new_draft_publication_visibility(test_data: dict[str, Any]) -> None:
"""
🧪 Тест публикации нового черновика
Проверяет, что новый опубликованный черновик виден как в единичных запросах,
так и в списках.
"""
# Подготавливаем контекст
info = GraphQLResolveInfoMock()
info.context = {
"author": {"id": test_data["author_id"]},
"roles": ["author", "reader"]
}
# Публикуем черновик
result = await publish_draft(None, info, test_data["draft_id"])
assert "error" not in result or result["error"] is None
assert "draft" in result
shout_info = result["draft"]["shout"]
shout_id = shout_info["id"]
shout_slug = shout_info["slug"]
# Проверяем, что published_at установлен
assert shout_info["published_at"] is not None
# Проверяем видимость в единичном запросе
single_shout = await get_shout(None, info, slug=shout_slug)
assert single_shout is not None
assert single_shout["id"] == shout_id
# Проверяем видимость в списке
shouts_list = await load_shouts_by(None, info, {"limit": 10, "offset": 0})
shout_ids_in_list = [shout["id"] for shout in shouts_list]
assert shout_id in shout_ids_in_list
@pytest.mark.asyncio
@patch('resolvers.draft.notify_shout', new=AsyncMock())
@patch('resolvers.draft.invalidate_shouts_cache', new=AsyncMock())
@patch('resolvers.draft.invalidate_shout_related_cache', new=AsyncMock())
async def test_existing_shout_update_visibility(test_data: dict[str, Any]) -> None:
"""
🧪 Тест обновления существующего shout через черновик
Проверяет, что при обновлении существующего shout через публикацию черновика
shout становится видимым в списках (устанавливается published_at).
"""
# Подготавливаем контекст
info = GraphQLResolveInfoMock()
info.context = {
"author": {"id": test_data["author_id"]},
"roles": ["author", "reader"]
}
# Проверяем, что изначально shout не виден в списках (published_at = None)
with local_session() as session:
existing_shout = session.query(Shout).filter(
Shout.id == test_data["existing_shout_id"]
).first()
assert existing_shout is not None
assert existing_shout.published_at is None
# Публикуем черновик, который обновляет существующий shout
result = await publish_draft(None, info, test_data["draft_with_shout_id"])
assert "error" not in result or result["error"] is None
assert "draft" in result
shout_info = result["draft"]["shout"]
shout_id = shout_info["id"]
shout_slug = shout_info["slug"]
# 🎯 Критическая проверка: published_at должен быть установлен
assert shout_info["published_at"] is not None
# Проверяем, что в базе данных published_at действительно установлен
with local_session() as session:
updated_shout = session.query(Shout).filter(Shout.id == shout_id).first()
assert updated_shout is not None
assert updated_shout.published_at is not None
assert updated_shout.title == "Updated Draft Title" # Контент обновлен
# Проверяем видимость в единичном запросе
single_shout = await get_shout(None, info, slug=shout_slug)
assert single_shout is not None
assert single_shout["id"] == shout_id
# 🎯 Главная проверка: видимость в списке (это и было проблемой)
shouts_list = await load_shouts_by(None, info, {"limit": 10, "offset": 0})
shout_ids_in_list = [shout["id"] for shout in shouts_list]
assert shout_id in shout_ids_in_list, f"Shout {shout_id} должен быть виден в списке после публикации"
@pytest.mark.asyncio
@patch('resolvers.draft.notify_shout', new=AsyncMock())
@patch('resolvers.draft.invalidate_shouts_cache', new=AsyncMock())
@patch('resolvers.draft.invalidate_shout_related_cache', new=AsyncMock())
async def test_unpublish_draft_removes_from_lists(test_data: dict[str, Any]) -> None:
"""
🧪 Тест снятия с публикации
Проверяет, что после снятия с публикации shout исчезает из списков,
но остается доступным по прямому запросу (если это поведение нужно изменить).
"""
from resolvers.draft import unpublish_draft
# Подготавливаем контекст
info = GraphQLResolveInfoMock()
info.context = {
"author": {"id": test_data["author_id"]},
"roles": ["author", "reader"]
}
# Сначала публикуем черновик
publish_result = await publish_draft(None, info, test_data["draft_id"])
assert "error" not in publish_result or publish_result["error"] is None
shout_info = publish_result["draft"]["shout"]
shout_id = shout_info["id"]
# Проверяем, что shout виден в списке
shouts_list_before = await load_shouts_by(None, info, {"limit": 10, "offset": 0})
shout_ids_before = [shout["id"] for shout in shouts_list_before]
assert shout_id in shout_ids_before
# Снимаем с публикации
unpublish_result = await unpublish_draft(None, info, test_data["draft_id"])
assert "error" not in unpublish_result or unpublish_result["error"] is None
# Проверяем, что shout исчез из списка
shouts_list_after = await load_shouts_by(None, info, {"limit": 10, "offset": 0})
shout_ids_after = [shout["id"] for shout in shouts_list_after]
assert shout_id not in shout_ids_after, "Shout должен исчезнуть из списка после снятия с публикации"