Files
core/tests/test_follow_cache_consistency.py
Untone f891b73608
All checks were successful
Deploy on push / deploy (push) Successful in 5m46s
following-debug
2025-08-30 18:23:15 +03:00

222 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Тест консистентности кеша подписок после операций follow/unfollow
"""
from __future__ import annotations
import asyncio
import pytest
import time
from unittest.mock import patch
from cache.cache import get_cached_follower_authors
from orm.author import Author, AuthorFollower
from resolvers.follower import follow, unfollow
from storage.db import local_session
from storage.redis import redis
class MockRequest:
"""Mock объект для HTTP request"""
def __init__(self):
self.method = "POST"
self.url = type('MockURL', (), {"path": "/graphql"})()
class MockGraphQLResolveInfo:
"""Mock объект для GraphQL resolve info"""
def __init__(self, author_id: int):
self.context = {
"author": {"id": author_id},
"request": MockRequest()
}
@pytest.mark.asyncio
async def test_follow_cache_consistency():
"""
Тест консистентности кеша после операции подписки:
1. Подписываемся на автора
2. Проверяем, что кеш корректно инвалидирован и обновлен
3. Проверяем, что следующий запрос возвращает актуальные данные
"""
# Создаем тестовых пользователей
with local_session() as session:
# Создаем подписчика
follower = Author(
name="Test Follower",
slug=f"test-follower-{int(time.time())}",
email=f"follower-{int(time.time())}@test.com"
)
session.add(follower)
# Создаем автора для подписки
target_author = Author(
name="Target Author",
slug=f"target-author-{int(time.time())}",
email=f"target-{int(time.time())}@test.com"
)
session.add(target_author)
session.commit()
follower_id = follower.id
target_author_id = target_author.id
target_author_slug = target_author.slug
try:
# Очищаем кеш перед тестом
cache_key = f"author:follows-authors:{follower_id}"
await redis.execute("DEL", cache_key)
# 1. Проверяем начальное состояние (пустой список подписок)
initial_follows = await get_cached_follower_authors(follower_id)
assert len(initial_follows) == 0, "Изначально подписок быть не должно"
# 2. Выполняем подписку (обходим авторизацию)
mock_info = MockGraphQLResolveInfo(follower_id)
# Патчим декоратор авторизации
with patch('resolvers.follower.login_required', lambda func: func):
result = await follow(
None,
mock_info,
what="AUTHOR",
slug=target_author_slug
)
# 3. Проверяем результат операции
assert result.get("error") is None, f"Операция подписки завершилась с ошибкой: {result.get('error')}"
returned_authors = result.get("authors", [])
assert len(returned_authors) == 1, "Должен вернуться 1 автор в списке подписок"
assert any(
author.get("id") == target_author_id for author in returned_authors
), "Целевой автор должен быть в списке подписок"
# 4. Проверяем кеш напрямую (должен быть обновлен)
fresh_follows = await get_cached_follower_authors(follower_id)
assert len(fresh_follows) == 1, "Кеш должен содержать 1 подписку"
assert fresh_follows[0]["id"] == target_author_id, "ID автора в кеше должен совпадать"
# 5. Проверяем консистентность с БД
with local_session() as session:
db_follows = session.query(AuthorFollower).filter(
AuthorFollower.follower == follower_id,
AuthorFollower.following == target_author_id
).all()
assert len(db_follows) == 1, "В БД должна быть запись о подписке"
# 6. Тестируем отписку (обходим авторизацию)
with patch('resolvers.follower.login_required', lambda func: func):
unfollow_result = await unfollow(
None,
mock_info,
what="AUTHOR",
slug=target_author_slug
)
assert unfollow_result.get("error") is None, f"Операция отписки завершилась с ошибкой: {unfollow_result.get('error')}"
# 7. Проверяем кеш после отписки
after_unfollow_follows = await get_cached_follower_authors(follower_id)
assert len(after_unfollow_follows) == 0, "После отписки кеш должен быть пустым"
# 8. Проверяем БД после отписки
with local_session() as session:
db_follows_after = session.query(AuthorFollower).filter(
AuthorFollower.follower == follower_id,
AuthorFollower.following == target_author_id
).all()
assert len(db_follows_after) == 0, "В БД не должно быть записи о подписке после отписки"
finally:
# Очистка тестовых данных
with local_session() as session:
# Удаляем подписки
session.query(AuthorFollower).filter(
AuthorFollower.follower == follower_id
).delete()
# Удаляем авторов
session.query(Author).filter(Author.id.in_([follower_id, target_author_id])).delete()
session.commit()
# Очищаем кеш
await redis.execute("DEL", f"author:follows-authors:{follower_id}")
@pytest.mark.asyncio
async def test_follow_already_following():
"""
Тест попытки повторной подписки на того же автора
"""
# Создаем тестовых пользователей
with local_session() as session:
follower = Author(
name="Test Follower 2",
slug=f"test-follower-2-{int(time.time())}",
email=f"follower-2-{int(time.time())}@test.com"
)
session.add(follower)
target_author = Author(
name="Target Author 2",
slug=f"target-author-2-{int(time.time())}",
email=f"target-2-{int(time.time())}@test.com"
)
session.add(target_author)
session.commit()
follower_id = follower.id
target_author_id = target_author.id
target_author_slug = target_author.slug
# Создаем изначальную подписку
subscription = AuthorFollower(
follower=follower_id,
following=target_author_id
)
session.add(subscription)
session.commit()
try:
# Очищаем кеш
cache_key = f"author:follows-authors:{follower_id}"
await redis.execute("DEL", cache_key)
# Пытаемся подписаться повторно (обходим авторизацию)
mock_info = MockGraphQLResolveInfo(follower_id)
with patch('resolvers.follower.login_required', lambda func: func):
result = await follow(
None,
mock_info,
what="AUTHOR",
slug=target_author_slug
)
# Должна вернуться ошибка "already following"
assert result.get("error") == "already following", "Должна быть ошибка повторной подписки"
# Но список авторов должен содержать целевого автора
returned_authors = result.get("authors", [])
assert len(returned_authors) == 1, "Должен вернуться список с 1 автором"
assert any(
author.get("id") == target_author_id for author in returned_authors
), "Целевой автор должен быть в списке"
finally:
# Очистка
with local_session() as session:
session.query(AuthorFollower).filter(
AuthorFollower.follower == follower_id
).delete()
session.query(Author).filter(Author.id.in_([follower_id, target_author_id])).delete()
session.commit()
await redis.execute("DEL", f"author:follows-authors:{follower_id}")
if __name__ == "__main__":
# Запуск тестов напрямую
asyncio.run(test_follow_cache_consistency())
asyncio.run(test_follow_already_following())
print("Все тесты консистентности кеша прошли успешно!")