published-at-fix
Some checks failed
Deploy on push / deploy (push) Failing after 1m0s

This commit is contained in:
2025-08-23 15:06:53 +03:00
parent 00a866876c
commit e60b97a5c5
2 changed files with 217 additions and 1 deletions

View File

@@ -0,0 +1,213 @@
"""
🧪 Тест для исправления бага публикации черновиков
Проверяет, что после публикации черновика shout корректно отображается
как в единичных запросах по slug, так и в списках load_shouts_by.
"""
import time
from typing import Any
import pytest
from sqlalchemy.orm import Session
from orm.author import Author
from orm.community import Community
from orm.draft import Draft
from orm.shout import Shout
from resolvers.draft import publish_draft
from resolvers.reader import get_shout, load_shouts_by
from storage.db import local_session
from tests.conftest import GraphQLResolveInfoMock
@pytest.fixture
def test_data() -> dict[str, Any]:
"""Создает тестовые данные для проверки публикации"""
with local_session() as session:
# Создаем автора
author = Author(
name="Test Author",
slug="test-author",
email="test@example.com",
)
session.add(author)
session.flush()
# Создаем сообщество
community = Community(
name="Test Community",
slug="test-community",
created_by=author.id,
)
session.add(community)
session.flush()
# Создаем черновик
draft = Draft(
title="Test Draft Title",
body="<p>Test draft content</p>",
slug="test-draft-slug",
created_by=author.id,
community=community.id,
)
session.add(draft)
session.flush()
# Создаем существующий shout для тестирования обновления
existing_shout = Shout(
title="Old Title",
body="<p>Old content</p>",
slug="existing-shout-slug",
created_by=author.id,
community=community.id,
created_at=int(time.time()),
published_at=None, # Важно: изначально не опубликован
)
session.add(existing_shout)
session.flush()
# Связываем черновик с существующим shout
draft_with_shout = Draft(
title="Updated Draft Title",
body="<p>Updated draft content</p>",
slug="updated-draft-slug",
created_by=author.id,
community=community.id,
shout=existing_shout.id, # Связываем с существующим shout
)
session.add(draft_with_shout)
session.flush()
session.commit()
return {
"author_id": author.id,
"community_id": community.id,
"draft_id": draft.id,
"draft_with_shout_id": draft_with_shout.id,
"existing_shout_id": existing_shout.id,
}
@pytest.mark.asyncio
async def test_new_draft_publication_visibility(test_data: dict[str, Any]) -> None:
"""
🧪 Тест публикации нового черновика
Проверяет, что новый опубликованный черновик виден как в единичных запросах,
так и в списках.
"""
# Подготавливаем контекст
info = GraphQLResolveInfoMock()
info.context = {"author": {"id": test_data["author_id"]}}
# Публикуем черновик
result = await publish_draft(None, info, test_data["draft_id"])
assert "error" not in result or result["error"] is None
assert "draft" in result
shout_info = result["draft"]["shout"]
shout_id = shout_info["id"]
shout_slug = shout_info["slug"]
# Проверяем, что published_at установлен
assert shout_info["published_at"] is not None
# Проверяем видимость в единичном запросе
single_shout = await get_shout(None, info, slug=shout_slug)
assert single_shout is not None
assert single_shout["id"] == shout_id
# Проверяем видимость в списке
shouts_list = await load_shouts_by(None, info, {"limit": 10, "offset": 0})
shout_ids_in_list = [shout["id"] for shout in shouts_list]
assert shout_id in shout_ids_in_list
@pytest.mark.asyncio
async def test_existing_shout_update_visibility(test_data: dict[str, Any]) -> None:
"""
🧪 Тест обновления существующего shout через черновик
Проверяет, что при обновлении существующего shout через публикацию черновика
shout становится видимым в списках (устанавливается published_at).
"""
# Подготавливаем контекст
info = GraphQLResolveInfoMock()
info.context = {"author": {"id": test_data["author_id"]}}
# Проверяем, что изначально shout не виден в списках (published_at = None)
with local_session() as session:
existing_shout = session.query(Shout).filter(
Shout.id == test_data["existing_shout_id"]
).first()
assert existing_shout is not None
assert existing_shout.published_at is None
# Публикуем черновик, который обновляет существующий shout
result = await publish_draft(None, info, test_data["draft_with_shout_id"])
assert "error" not in result or result["error"] is None
assert "draft" in result
shout_info = result["draft"]["shout"]
shout_id = shout_info["id"]
shout_slug = shout_info["slug"]
# 🎯 Критическая проверка: published_at должен быть установлен
assert shout_info["published_at"] is not None
# Проверяем, что в базе данных published_at действительно установлен
with local_session() as session:
updated_shout = session.query(Shout).filter(Shout.id == shout_id).first()
assert updated_shout is not None
assert updated_shout.published_at is not None
assert updated_shout.title == "Updated Draft Title" # Контент обновлен
# Проверяем видимость в единичном запросе
single_shout = await get_shout(None, info, slug=shout_slug)
assert single_shout is not None
assert single_shout["id"] == shout_id
# 🎯 Главная проверка: видимость в списке (это и было проблемой)
shouts_list = await load_shouts_by(None, info, {"limit": 10, "offset": 0})
shout_ids_in_list = [shout["id"] for shout in shouts_list]
assert shout_id in shout_ids_in_list, f"Shout {shout_id} должен быть виден в списке после публикации"
@pytest.mark.asyncio
async def test_unpublish_draft_removes_from_lists(test_data: dict[str, Any]) -> None:
"""
🧪 Тест снятия с публикации
Проверяет, что после снятия с публикации shout исчезает из списков,
но остается доступным по прямому запросу (если это поведение нужно изменить).
"""
from resolvers.draft import unpublish_draft
# Подготавливаем контекст
info = GraphQLResolveInfoMock()
info.context = {"author": {"id": test_data["author_id"]}}
# Сначала публикуем черновик
publish_result = await publish_draft(None, info, test_data["draft_id"])
assert "error" not in publish_result or publish_result["error"] is None
shout_info = publish_result["draft"]["shout"]
shout_id = shout_info["id"]
# Проверяем, что shout виден в списке
shouts_list_before = await load_shouts_by(None, info, {"limit": 10, "offset": 0})
shout_ids_before = [shout["id"] for shout in shouts_list_before]
assert shout_id in shout_ids_before
# Снимаем с публикации
unpublish_result = await unpublish_draft(None, info, test_data["draft_id"])
assert "error" not in unpublish_result or unpublish_result["error"] is None
# Проверяем, что shout исчез из списка
shouts_list_after = await load_shouts_by(None, info, {"limit": 10, "offset": 0})
shout_ids_after = [shout["id"] for shout in shouts_list_after]
assert shout_id not in shout_ids_after, "Shout должен исчезнуть из списка после снятия с публикации"