Files
core/tests/test_follow_sse_notifications.py

146 lines
5.0 KiB
Python
Raw Normal View History

2025-08-30 18:47:27 +03:00
"""
Тест SSE уведомлений о новых подписчиках
"""
from __future__ import annotations
from unittest.mock import patch
import orjson
import pytest
from services.notify import notify_follower
from storage.redis import redis
@pytest.mark.asyncio
async def test_follow_sse_notification_format():
"""
Тест формата SSE уведомления о новой подписке
"""
# Мокаем Redis publish, чтобы перехватить отправляемые данные
published_data = []
async def mock_publish(channel: str, data: bytes) -> None:
published_data.append((channel, orjson.loads(data)))
with patch.object(redis, 'publish', side_effect=mock_publish):
# Данные подписавшегося пользователя
follower_data = {
"id": 123,
"name": "Test Follower",
"slug": "test-follower",
"pic": "https://example.com/avatar.jpg"
}
target_author_id = 456
# Отправляем уведомление
await notify_follower(
follower=follower_data,
author_id=target_author_id,
action="follow"
)
# Проверяем, что данные отправлены правильно
assert len(published_data) == 1
channel, data = published_data[0]
# Проверяем канал
assert channel == f"follower:{target_author_id}"
# Проверяем формат данных согласно обновленной спецификации фронтенда
assert data["action"] == "create"
assert data["entity"] == "follower"
assert data["payload"]["follower_id"] == 123
assert data["payload"]["following_id"] == target_author_id
assert "id" in data["payload"]
assert "created_at" in data["payload"]
print(f"✅ SSE уведомление отправлено правильно: {data}")
@pytest.mark.asyncio
async def test_unfollow_sse_notification_format():
"""
Тест формата SSE уведомления об отписке
"""
published_data = []
async def mock_publish(channel: str, data: bytes) -> None:
published_data.append((channel, orjson.loads(data)))
with patch.object(redis, 'publish', side_effect=mock_publish):
# Данные отписавшегося пользователя
follower_data = {
"id": 789,
"name": "Test Unfollower",
"slug": "test-unfollower",
"pic": "https://example.com/avatar2.jpg"
}
target_author_id = 101
# Отправляем уведомление об отписке
await notify_follower(
follower=follower_data,
author_id=target_author_id,
action="unfollow"
)
# Проверяем формат для отписки
assert len(published_data) == 1
channel, data = published_data[0]
assert channel == f"follower:{target_author_id}"
# Для отписки action должен быть "delete"
assert data["action"] == "delete"
assert data["entity"] == "follower"
assert data["payload"]["follower_id"] == 789
assert data["payload"]["following_id"] == target_author_id
assert "id" in data["payload"]
assert "created_at" in data["payload"]
print(f"✅ SSE уведомление об отписке отправлено правильно: {data}")
@pytest.mark.asyncio
async def test_custom_subscription_id():
"""
Тест передачи пользовательского ID подписки
"""
published_data = []
async def mock_publish(channel: str, data: bytes) -> None:
published_data.append((channel, orjson.loads(data)))
with patch.object(redis, 'publish', side_effect=mock_publish):
# Данные подписчика
follower_data = {
"id": 777,
"name": "Test User",
"slug": "test-user",
"pic": "https://example.com/avatar.jpg"
}
target_author_id = 333
custom_subscription_id = 12345
# Отправляем уведомление с пользовательским ID
await notify_follower(
follower=follower_data,
author_id=target_author_id,
action="follow",
subscription_id=custom_subscription_id
)
# Проверяем, что передается правильный subscription_id
assert len(published_data) == 1
channel, data = published_data[0]
assert data["payload"]["id"] == custom_subscription_id
assert data["payload"]["follower_id"] == 777
assert data["payload"]["following_id"] == target_author_id
print(f"✅ Передан пользовательский subscription_id: {custom_subscription_id}")