""" Тест консистентности кеша подписок после операций follow/unfollow """ from __future__ import annotations import asyncio import pytest import time from unittest.mock import patch from cache.cache import get_cached_follower_authors from orm.author import Author, AuthorFollower from resolvers.follower import follow, unfollow from storage.db import local_session from storage.redis import redis class MockRequest: """Mock объект для HTTP request""" def __init__(self): self.method = "POST" self.url = type('MockURL', (), {"path": "/graphql"})() class MockGraphQLResolveInfo: """Mock объект для GraphQL resolve info""" def __init__(self, author_id: int): self.context = { "author": {"id": author_id}, "request": MockRequest() } @pytest.mark.asyncio async def test_follow_cache_consistency(): """ Тест консистентности кеша после операции подписки: 1. Подписываемся на автора 2. Проверяем, что кеш корректно инвалидирован и обновлен 3. Проверяем, что следующий запрос возвращает актуальные данные """ # Создаем тестовых пользователей with local_session() as session: # Создаем подписчика follower = Author( name="Test Follower", slug=f"test-follower-{int(time.time())}", email=f"follower-{int(time.time())}@test.com" ) session.add(follower) # Создаем автора для подписки target_author = Author( name="Target Author", slug=f"target-author-{int(time.time())}", email=f"target-{int(time.time())}@test.com" ) session.add(target_author) session.commit() follower_id = follower.id target_author_id = target_author.id target_author_slug = target_author.slug try: # Очищаем кеш перед тестом cache_key = f"author:follows-authors:{follower_id}" await redis.execute("DEL", cache_key) # 1. Проверяем начальное состояние (пустой список подписок) initial_follows = await get_cached_follower_authors(follower_id) assert len(initial_follows) == 0, "Изначально подписок быть не должно" # 2. Выполняем подписку (обходим авторизацию) mock_info = MockGraphQLResolveInfo(follower_id) # Патчим декоратор авторизации with patch('resolvers.follower.login_required', lambda func: func): result = await follow( None, mock_info, what="AUTHOR", slug=target_author_slug ) # 3. Проверяем результат операции assert result.get("error") is None, f"Операция подписки завершилась с ошибкой: {result.get('error')}" returned_authors = result.get("authors", []) assert len(returned_authors) == 1, "Должен вернуться 1 автор в списке подписок" assert any( author.get("id") == target_author_id for author in returned_authors ), "Целевой автор должен быть в списке подписок" # 4. Проверяем кеш напрямую (должен быть обновлен) fresh_follows = await get_cached_follower_authors(follower_id) assert len(fresh_follows) == 1, "Кеш должен содержать 1 подписку" assert fresh_follows[0]["id"] == target_author_id, "ID автора в кеше должен совпадать" # 5. Проверяем консистентность с БД with local_session() as session: db_follows = session.query(AuthorFollower).filter( AuthorFollower.follower == follower_id, AuthorFollower.following == target_author_id ).all() assert len(db_follows) == 1, "В БД должна быть запись о подписке" # 6. Тестируем отписку (обходим авторизацию) with patch('resolvers.follower.login_required', lambda func: func): unfollow_result = await unfollow( None, mock_info, what="AUTHOR", slug=target_author_slug ) assert unfollow_result.get("error") is None, f"Операция отписки завершилась с ошибкой: {unfollow_result.get('error')}" # 7. Проверяем кеш после отписки after_unfollow_follows = await get_cached_follower_authors(follower_id) assert len(after_unfollow_follows) == 0, "После отписки кеш должен быть пустым" # 8. Проверяем БД после отписки with local_session() as session: db_follows_after = session.query(AuthorFollower).filter( AuthorFollower.follower == follower_id, AuthorFollower.following == target_author_id ).all() assert len(db_follows_after) == 0, "В БД не должно быть записи о подписке после отписки" finally: # Очистка тестовых данных with local_session() as session: # Удаляем подписки session.query(AuthorFollower).filter( AuthorFollower.follower == follower_id ).delete() # Удаляем авторов session.query(Author).filter(Author.id.in_([follower_id, target_author_id])).delete() session.commit() # Очищаем кеш await redis.execute("DEL", f"author:follows-authors:{follower_id}") @pytest.mark.asyncio async def test_follow_already_following(): """ Тест попытки повторной подписки на того же автора """ # Создаем тестовых пользователей with local_session() as session: follower = Author( name="Test Follower 2", slug=f"test-follower-2-{int(time.time())}", email=f"follower-2-{int(time.time())}@test.com" ) session.add(follower) target_author = Author( name="Target Author 2", slug=f"target-author-2-{int(time.time())}", email=f"target-2-{int(time.time())}@test.com" ) session.add(target_author) session.commit() follower_id = follower.id target_author_id = target_author.id target_author_slug = target_author.slug # Создаем изначальную подписку subscription = AuthorFollower( follower=follower_id, following=target_author_id ) session.add(subscription) session.commit() try: # Очищаем кеш cache_key = f"author:follows-authors:{follower_id}" await redis.execute("DEL", cache_key) # Пытаемся подписаться повторно (обходим авторизацию) mock_info = MockGraphQLResolveInfo(follower_id) with patch('resolvers.follower.login_required', lambda func: func): result = await follow( None, mock_info, what="AUTHOR", slug=target_author_slug ) # Должна вернуться ошибка "already following" assert result.get("error") == "already following", "Должна быть ошибка повторной подписки" # Но список авторов должен содержать целевого автора returned_authors = result.get("authors", []) assert len(returned_authors) == 1, "Должен вернуться список с 1 автором" assert any( author.get("id") == target_author_id for author in returned_authors ), "Целевой автор должен быть в списке" finally: # Очистка with local_session() as session: session.query(AuthorFollower).filter( AuthorFollower.follower == follower_id ).delete() session.query(Author).filter(Author.id.in_([follower_id, target_author_id])).delete() session.commit() await redis.execute("DEL", f"author:follows-authors:{follower_id}") if __name__ == "__main__": # Запуск тестов напрямую asyncio.run(test_follow_cache_consistency()) asyncio.run(test_follow_already_following()) print("✅ Все тесты консистентности кеша прошли успешно!")