import pytest import os from settings import FRONTEND_URL from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool import time import requests import subprocess from typing import Optional from unittest.mock import patch import importlib # 🚨 CRITICAL: Patch Redis BEFORE any other imports to prevent connection attempts try: import fakeredis.aioredis # Create a fake Redis instance fake_redis = fakeredis.aioredis.FakeRedis() # Patch Redis at module level import storage.redis # Mock the global redis instance storage.redis.redis = fake_redis print("✅ Redis patched with fakeredis at module level") except ImportError: print("❌ fakeredis not available, tests may fail") from orm.base import BaseModel as Base # 🚨 CRITICAL: Create all database tables at module level BEFORE any tests run def ensure_all_tables_exist(): """Создает все таблицы в in-memory базе для тестов""" try: # Create a temporary engine for table creation temp_engine = create_engine( "sqlite:///:memory:", echo=False, poolclass=StaticPool, connect_args={"check_same_thread": False} ) # Import all ORM modules to ensure they're registered import orm.base import orm.community import orm.author import orm.draft import orm.shout import orm.topic import orm.reaction import orm.invite import orm.notification import orm.collection import orm.rating # Force create all tables Base.metadata.create_all(temp_engine) # Verify tables were created from sqlalchemy import inspect inspector = inspect(temp_engine) created_tables = inspector.get_table_names() print(f"✅ Module-level table creation: {len(created_tables)} tables created") print(f"📋 Tables: {created_tables}") # Clean up temp_engine.dispose() except Exception as e: print(f"❌ Module-level table creation failed: {e}") raise # Execute table creation immediately ensure_all_tables_exist() def pytest_configure(config): """Pytest configuration hook - runs before any tests""" # Ensure Redis is patched before any tests run try: import fakeredis.aioredis # Create a fake Redis instance fake_redis = fakeredis.aioredis.FakeRedis() # Patch Redis at module level import storage.redis # Mock the global redis instance storage.redis.redis = fake_redis print("✅ Redis patched with fakeredis in pytest_configure") except ImportError: print("❌ fakeredis not available in pytest_configure") def force_create_all_tables(engine): """ Принудительно создает все таблицы, перезагружая модели если нужно. Это помогает в CI среде где могут быть проблемы с метаданными. """ from sqlalchemy import inspect import orm # Перезагружаем все модули ORM для гарантии актуальности метаданных importlib.reload(orm.base) importlib.reload(orm.community) importlib.reload(orm.author) importlib.reload(orm.draft) importlib.reload(orm.shout) importlib.reload(orm.topic) importlib.reload(orm.reaction) importlib.reload(orm.invite) importlib.reload(orm.notification) importlib.reload(orm.collection) importlib.reload(orm.rating) # Получаем обновленную Base from orm.base import BaseModel as Base # Создаем все таблицы Base.metadata.create_all(engine) # Проверяем результат inspector = inspect(engine) created_tables = inspector.get_table_names() print(f"🔧 Force created tables: {created_tables}") return created_tables def get_test_client(): """ Создает и возвращает тестовый клиент для интеграционных тестов. Returns: TestClient: Клиент для выполнения тестовых запросов """ from starlette.testclient import TestClient # Отложенный импорт для предотвращения циклических зависимостей def _import_app(): from main import app return app return TestClient(_import_app()) @pytest.fixture(autouse=True, scope="session") def _set_requests_default_timeout(): """Глобально задаем таймаут по умолчанию для requests в тестах, чтобы исключить зависания. 🪓 Упрощение: мокаем методы requests, добавляя timeout=10, если он не указан. """ original_request = requests.sessions.Session.request def request_with_default_timeout(self, method, url, **kwargs): # type: ignore[override] if "timeout" not in kwargs: kwargs["timeout"] = 10 return original_request(self, method, url, **kwargs) requests.sessions.Session.request = request_with_default_timeout # type: ignore[assignment] yield requests.sessions.Session.request = original_request # type: ignore[assignment] @pytest.fixture(scope="session") def test_engine(): """ Создает тестовый engine для всей сессии тестирования. Использует in-memory SQLite для быстрых тестов. """ # Принудительно импортируем ВСЕ модели чтобы они были зарегистрированы в Base.metadata import orm.base import orm.community import orm.author import orm.draft import orm.shout import orm.topic import orm.reaction import orm.invite import orm.notification import orm.collection # Явно импортируем классы для гарантии регистрации from orm.base import BaseModel as Base from orm.community import Community, CommunityAuthor, CommunityFollower from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark from orm.draft import Draft, DraftAuthor, DraftTopic from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower from orm.topic import Topic, TopicFollower from orm.reaction import Reaction from orm.invite import Invite from orm.notification import Notification from orm.collection import Collection # Инициализируем RBAC систему import rbac rbac.initialize_rbac() engine = create_engine( "sqlite:///:memory:", echo=False, poolclass=StaticPool, connect_args={"check_same_thread": False} ) # Принудительно удаляем все таблицы и создаем заново Base.metadata.drop_all(engine) Base.metadata.create_all(engine) # Debug: проверяем какие таблицы созданы на уровне сессии from sqlalchemy import inspect inspector = inspect(engine) session_tables = inspector.get_table_names() print(f"🔍 Created tables in test_engine fixture: {session_tables}") # Проверяем что все критические таблицы созданы required_tables = [ 'author', 'community', 'community_author', 'community_follower', 'draft', 'draft_author', 'draft_topic', 'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers', 'topic', 'topic_followers', 'reaction', 'invite', 'notification', 'collection', 'author_follower', 'author_rating', 'author_bookmark' ] missing_tables = [table for table in required_tables if table not in session_tables] if missing_tables: print(f"❌ Missing tables in test_engine: {missing_tables}") print(f"Available tables: {session_tables}") # Fallback: попробуем создать отсутствующие таблицы явно print("🔄 Attempting to create missing tables explicitly in test_engine...") try: # Создаем все таблицы снова с принудительным импортом моделей Base.metadata.create_all(engine) # Проверяем снова inspector = inspect(engine) updated_tables = inspector.get_table_names() still_missing = [table for table in required_tables if table not in updated_tables] if still_missing: print(f"❌ Still missing tables after explicit creation: {still_missing}") # Последняя попытка: создаем таблицы по одной print("🔄 Last attempt: creating tables one by one...") for table_name in still_missing: try: if table_name == 'community_author': CommunityAuthor.__table__.create(engine, checkfirst=True) elif table_name == 'community_follower': CommunityFollower.__table__.create(engine, checkfirst=True) elif table_name == 'author_follower': AuthorFollower.__table__.create(engine, checkfirst=True) elif table_name == 'author_rating': AuthorRating.__table__.create(engine, checkfirst=True) elif table_name == 'author_bookmark': AuthorBookmark.__table__.create(engine, checkfirst=True) elif table_name == 'draft_author': DraftAuthor.__table__.create(engine, checkfirst=True) elif table_name == 'draft_topic': DraftTopic.__table__.create(engine, checkfirst=True) elif table_name == 'shout_author': ShoutAuthor.__table__.create(engine, checkfirst=True) elif table_name == 'shout_topic': ShoutTopic.__table__.create(engine, checkfirst=True) elif table_name == 'shout_reactions_followers': ShoutReactionsFollower.__table__.create(engine, checkfirst=True) elif table_name == 'collection': Collection.__table__.create(engine, checkfirst=True) elif table_name == 'topic_followers': TopicFollower.__table__.create(engine, checkfirst=True) print(f"✅ Created table {table_name}") except Exception as e: print(f"❌ Failed to create table {table_name}: {e}") # Финальная проверка inspector = inspect(engine) final_tables = inspector.get_table_names() final_missing = [table for table in required_tables if table not in final_tables] if final_missing: print(f"❌ Still missing tables after individual creation: {final_missing}") print("🔄 Last resort: forcing table creation with module reload...") try: final_tables = force_create_all_tables(engine) final_missing = [table for table in required_tables if table not in final_tables] if final_missing: raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}") else: print("✅ All missing tables created successfully with force creation") except Exception as e: print(f"❌ Force creation failed: {e}") raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}") else: print("✅ All missing tables created successfully in test_engine") else: print("✅ All missing tables created successfully in test_engine") except Exception as e: print(f"❌ Failed to create missing tables in test_engine: {e}") raise else: print("✅ All required tables created in test_engine") yield engine # Cleanup после всех тестов Base.metadata.drop_all(engine) @pytest.fixture(scope="session") def test_session_factory(test_engine): """ Создает фабрику сессий для тестирования. """ return sessionmaker(bind=test_engine, expire_on_commit=False) @pytest.fixture def db_session(test_session_factory, test_engine): """ Создает новую сессию БД для каждого теста. Простая реализация без вложенных транзакций. """ # Принудительно пересоздаем таблицы для каждого теста from orm.base import BaseModel as Base from sqlalchemy import inspect # Убеждаемся что все модели импортированы перед созданием таблиц # Явно импортируем все модели чтобы они были зарегистрированы в Base.metadata from orm.community import Community, CommunityAuthor, CommunityFollower from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark from orm.draft import Draft, DraftAuthor, DraftTopic from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower from orm.topic import Topic, TopicFollower from orm.reaction import Reaction from orm.invite import Invite from orm.notification import Notification from orm.collection import Collection # Проверяем что все модели зарегистрированы print(f"🔍 Registered tables in Base.metadata: {list(Base.metadata.tables.keys())}") # Убеждаемся что все критические таблицы зарегистрированы в metadata required_tables = [ 'author', 'community', 'community_author', 'community_follower', 'draft', 'draft_author', 'draft_topic', 'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers', 'topic', 'topic_followers', 'reaction', 'invite', 'notification', 'collection', 'author_follower', 'author_rating', 'author_bookmark' ] missing_metadata_tables = [table for table in required_tables if table not in Base.metadata.tables] if missing_metadata_tables: print(f"❌ Missing tables in Base.metadata: {missing_metadata_tables}") print("Available tables:", list(Base.metadata.tables.keys())) raise RuntimeError(f"Critical tables not registered in Base.metadata: {missing_metadata_tables}") else: print("✅ All required tables registered in Base.metadata") # Удаляем все таблицы Base.metadata.drop_all(test_engine) # Создаем таблицы заново Base.metadata.create_all(test_engine) # Debug: проверяем какие таблицы созданы inspector = inspect(test_engine) created_tables = inspector.get_table_names() print(f"🔍 Created tables in db_session fixture: {created_tables}") # Проверяем что все критические таблицы созданы required_tables = [ 'author', 'community', 'community_author', 'community_follower', 'draft', 'draft_author', 'draft_topic', 'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers', 'topic', 'topic_followers', 'reaction', 'invite', 'notification', 'collection', 'author_follower', 'author_rating', 'author_bookmark' ] missing_tables = [table for table in required_tables if table not in created_tables] if missing_tables: print(f"❌ Missing tables in db_session: {missing_tables}") print(f"Available tables: {created_tables}") # Fallback: попробуем создать отсутствующие таблицы явно print("🔄 Attempting to create missing tables explicitly in db_session...") try: # Создаем все таблицы снова с принудительным импортом моделей Base.metadata.create_all(test_engine) # Проверяем снова inspector = inspect(test_engine) updated_tables = inspector.get_table_names() still_missing = [table for table in required_tables if table not in updated_tables] if still_missing: print(f"❌ Still missing tables after explicit creation: {still_missing}") raise RuntimeError(f"Failed to create required tables: {still_missing}") else: print("✅ All missing tables created successfully in db_session") except Exception as e: print(f"❌ Failed to create missing tables in db_session: {e}") raise else: print("✅ All required tables created in db_session") # Проверяем что таблица draft создана с правильной схемой inspector = inspect(test_engine) draft_columns = [col['name'] for col in inspector.get_columns('draft')] print(f"Draft table columns: {draft_columns}") # Убеждаемся что колонка shout существует if 'shout' not in draft_columns: print("WARNING: Column 'shout' not found in draft table!") session = test_session_factory() # Создаем дефолтное сообщество для тестов from orm.community import Community from orm.author import Author import time # Создаем системного автора если его нет system_author = session.query(Author).where(Author.slug == "system").first() if not system_author: system_author = Author( name="System", slug="system", email="system@test.local", created_at=int(time.time()), updated_at=int(time.time()), last_seen=int(time.time()) ) session.add(system_author) session.flush() # Создаем дефолтное сообщество если его нет default_community = session.query(Community).where(Community.id == 1).first() if not default_community: default_community = Community( id=1, name="Главное сообщество", slug="main", desc="Основное сообщество для тестов", pic="", created_at=int(time.time()), created_by=system_author.id, settings={"default_roles": ["reader", "author"], "available_roles": ["reader", "author", "artist", "expert", "editor", "admin"]}, private=False ) session.add(default_community) session.commit() yield session # Очищаем все данные после теста try: for table in reversed(Base.metadata.sorted_tables): session.execute(table.delete()) session.commit() except Exception: session.rollback() finally: session.close() @pytest.fixture def db_session_commit(test_session_factory): """ Создает сессию БД с реальными commit'ами для интеграционных тестов. Используется когда нужно тестировать реальные транзакции. """ session = test_session_factory() # Создаем дефолтное сообщество для тестов from orm.community import Community from orm.author import Author # Создаем системного автора если его нет system_author = session.query(Author).where(Author.slug == "system").first() if not system_author: system_author = Author( name="System", slug="system", email="system@test.local", created_at=int(time.time()), updated_at=int(time.time()), last_seen=int(time.time()) ) session.add(system_author) session.commit() # Создаем дефолтное сообщество если его нет default_community = session.query(Community).where(Community.id == 1).first() if not default_community: default_community = Community( id=1, name="Главное сообщество", slug="main", desc="Основное сообщество для тестов", pic="", created_at=int(time.time()), created_by=system_author.id, settings={"default_roles": ["reader", "author"], "available_roles": ["reader", "author", "artist", "expert", "editor", "admin"]}, private=False ) session.add(default_community) session.commit() yield session # Очищаем все данные после теста try: for table in reversed(Base.metadata.sorted_tables): session.execute(table.delete()) session.commit() except Exception: session.rollback() finally: session.close() @pytest.fixture def frontend_url(): """ Возвращает URL фронтенда для тестов. """ return FRONTEND_URL or "http://localhost:3000" @pytest.fixture def backend_url(): """ Возвращает URL бэкенда для тестов. """ return "http://localhost:8000" @pytest.fixture(scope="session") def backend_server(): """ 🚀 Фикстура для автоматического запуска/остановки бэкенд сервера. Запускает сервер только если он не запущен. """ backend_process: Optional[subprocess.Popen] = None backend_running = False # Проверяем, не запущен ли уже сервер try: response = requests.get("http://localhost:8000/", timeout=2) if response.status_code == 200: print("✅ Бэкенд сервер уже запущен") backend_running = True else: backend_running = False except: backend_running = False if not backend_running: print("🔄 Запускаем бэкенд сервер для тестов...") try: # Запускаем бэкенд сервер с тестовой базой данных env = os.environ.copy() env["DATABASE_URL"] = "sqlite:///test_e2e.db" # Используем тестовую БД для e2e env["TESTING"] = "true" backend_process = subprocess.Popen( ["uv", "run", "python", "dev.py"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=env, cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) # Ждем запуска бэкенда print("⏳ Ждем запуска бэкенда...") for i in range(30): # Ждем максимум 30 секунд try: response = requests.get("http://localhost:8000/", timeout=2) if response.status_code == 200: print("✅ Бэкенд сервер запущен") backend_running = True break except: pass time.sleep(1) else: print("❌ Бэкенд сервер не запустился за 30 секунд") if backend_process: backend_process.terminate() backend_process.wait() raise Exception("Бэкенд сервер не запустился за 30 секунд") except Exception as e: print(f"❌ Ошибка запуска сервера: {e}") if backend_process: backend_process.terminate() backend_process.wait() raise Exception(f"Не удалось запустить бэкенд сервер: {e}") yield backend_running # Cleanup: останавливаем сервер только если мы его запускали if backend_process and not backend_running: print("🛑 Останавливаем бэкенд сервер...") try: backend_process.terminate() backend_process.wait(timeout=10) except subprocess.TimeoutExpired: backend_process.kill() backend_process.wait() @pytest.fixture def test_client(backend_server): """ 🧪 Создает тестовый клиент для API тестов. Требует запущенный бэкенд сервер. """ return get_test_client() @pytest.fixture async def browser_context(): """ 🌐 Создает контекст браузера для e2e тестов. Автоматически управляет жизненным циклом браузера. """ try: from playwright.async_api import async_playwright except ImportError: pytest.skip("Playwright не установлен") async with async_playwright() as p: # Определяем headless режим headless = os.getenv("PLAYWRIGHT_HEADLESS", "true").lower() == "true" browser = await p.chromium.launch( headless=headless, args=[ "--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--disable-web-security", "--disable-features=VizDisplayCompositor" ] ) context = await browser.new_context( viewport={"width": 1280, "height": 720}, ignore_https_errors=True, java_script_enabled=True ) yield context await context.close() await browser.close() @pytest.fixture async def page(browser_context): """ 📄 Создает новую страницу для каждого теста. """ page = await browser_context.new_page() # Устанавливаем таймауты page.set_default_timeout(30000) page.set_default_navigation_timeout(30000) yield page await page.close() @pytest.fixture def api_base_url(backend_server): """ 🔗 Возвращает базовый URL для API тестов. """ return "http://localhost:8000/graphql" @pytest.fixture def test_user_credentials(): """ 👤 Возвращает тестовые учетные данные для авторизации. """ return { "email": "test_admin@discours.io", "password": "password123" } @pytest.fixture def auth_headers(api_base_url, test_user_credentials): """ 🔐 Создает заголовки авторизации для API тестов. """ def _get_auth_headers(token: Optional[str] = None): headers = {"Content-Type": "application/json"} if token: headers["Authorization"] = f"Bearer {token}" return headers return _get_auth_headers @pytest.fixture def wait_for_server(): """ ⏳ Утилита для ожидания готовности сервера. """ def _wait_for_server(url: str, max_attempts: int = 30, delay: float = 1.0): """Ждет готовности сервера по указанному URL.""" for attempt in range(max_attempts): try: response = requests.get(url, timeout=2) if response.status_code == 200: return True except: pass time.sleep(delay) return False return _wait_for_server @pytest.fixture def test_users(db_session): """Создает тестовых пользователей для тестов""" from orm.author import Author # Создаем первого пользователя (администратор) admin_user = Author( slug="test-admin", email="test_admin@discours.io", password="hashed_password_123", name="Test Admin", bio="Test admin user for testing", pic="https://example.com/avatar1.jpg", oauth={} ) db_session.add(admin_user) # Создаем второго пользователя (обычный пользователь) regular_user = Author( slug="test-user", email="test_user@discours.io", password="hashed_password_456", name="Test User", bio="Test regular user for testing", pic="https://example.com/avatar2.jpg", oauth={} ) db_session.add(regular_user) # Создаем третьего пользователя (только читатель) reader_user = Author( slug="test-reader", email="test_reader@discours.io", password="hashed_password_789", name="Test Reader", bio="Test reader user for testing", pic="https://example.com/avatar3.jpg", oauth={} ) db_session.add(reader_user) db_session.commit() return [admin_user, regular_user, reader_user] @pytest.fixture def test_community(db_session, test_users): """Создает тестовое сообщество для тестов""" from orm.community import Community community = Community( name="Test Community", slug="test-community", desc="A test community for testing purposes", created_by=test_users[0].id, # Администратор создает сообщество settings={ "default_roles": ["reader", "author"], "custom_setting": "custom_value" } ) db_session.add(community) db_session.commit() return community @pytest.fixture def community_with_creator(db_session, test_users): """Создает сообщество с создателем""" from orm.community import Community community = Community( name="Community With Creator", slug="community-with-creator", desc="A test community with a creator", created_by=test_users[0].id, settings={"default_roles": ["reader", "author"]} ) db_session.add(community) db_session.commit() return community @pytest.fixture def community_without_creator(db_session): """Создает сообщество без создателя""" from orm.community import Community community = Community( name="Community Without Creator", slug="community-without-creator", desc="A test community without a creator", created_by=None, # Без создателя settings={"default_roles": ["reader"]} ) db_session.add(community) db_session.commit() return community @pytest.fixture def admin_user_with_roles(db_session, test_users, test_community): """Создает администратора с ролями в сообществе""" from orm.community import CommunityAuthor ca = CommunityAuthor( community_id=test_community.id, author_id=test_users[0].id, roles="admin,author,reader" ) db_session.add(ca) db_session.commit() return test_users[0] @pytest.fixture def regular_user_with_roles(db_session, test_users, test_community): """Создает обычного пользователя с ролями в сообществе""" from orm.community import CommunityAuthor ca = CommunityAuthor( community_id=test_community.id, author_id=test_users[1].id, roles="author,reader" ) db_session.add(ca) db_session.commit() return test_users[1] @pytest.fixture def mock_verify(monkeypatch): """Мокает функцию верификации для тестов""" from unittest.mock import AsyncMock mock = AsyncMock() # Здесь можно настроить возвращаемые значения по умолчанию return mock @pytest.fixture def redis_client(): """Создает Redis клиент для тестов токенов""" from storage.redis import RedisService redis_service = RedisService() return redis_service._client @pytest.fixture def fake_redis(): """Создает fakeredis экземпляр для тестов""" try: import fakeredis.aioredis return fakeredis.aioredis.FakeRedis() except ImportError: pytest.skip("fakeredis не установлен - установите: pip install fakeredis[aioredis]") @pytest.fixture(autouse=True) def ensure_rbac_initialized(): """Обеспечивает инициализацию RBAC системы для каждого теста""" import rbac rbac.initialize_rbac() yield @pytest.fixture(autouse=True) def mock_redis_globally(): """Глобально мокает Redis для всех тестов, включая e2e""" try: import fakeredis.aioredis # Создаем fakeredis сервер fake_redis = fakeredis.aioredis.FakeRedis() # Патчим глобальный redis экземпляр with patch('storage.redis.redis') as mock_redis: # Эмулируем RedisService.execute метод async def mock_execute(command: str, *args): cmd_method = getattr(fake_redis, command.lower(), None) if cmd_method is not None: if hasattr(cmd_method, '__call__'): return await cmd_method(*args) else: return cmd_method return None # Патчим все основные методы Redis mock_redis.execute = mock_execute mock_redis.get = fake_redis.get mock_redis.set = fake_redis.set mock_redis.delete = fake_redis.delete mock_redis.exists = fake_redis.exists mock_redis.ping = fake_redis.ping mock_redis.hset = fake_redis.hset mock_redis.hget = fake_redis.hget mock_redis.hgetall = fake_redis.hgetall mock_redis.hdel = fake_redis.hdel mock_redis.expire = fake_redis.expire mock_redis.ttl = fake_redis.ttl mock_redis.keys = fake_redis.keys mock_redis.scan = fake_redis.scan mock_redis.is_connected = True # Async методы для connect/disconnect async def mock_connect(): return True async def mock_disconnect(): pass mock_redis.connect = mock_connect mock_redis.disconnect = mock_disconnect yield except ImportError: # Если fakeredis не доступен, используем базовый mock with patch('storage.redis.redis') as mock_redis: mock_redis.execute.return_value = None mock_redis.get.return_value = None mock_redis.set.return_value = True mock_redis.delete.return_value = True mock_redis.exists.return_value = False mock_redis.ping.return_value = True mock_redis.hset.return_value = True mock_redis.hget.return_value = None mock_redis.hgetall.return_value = {} mock_redis.hdel.return_value = True mock_redis.expire.return_value = True mock_redis.ttl.return_value = -1 mock_redis.keys.return_value = [] mock_redis.scan.return_value = ([], 0) mock_redis.is_connected = True async def mock_connect(): return True async def mock_disconnect(): pass mock_redis.connect = mock_connect mock_redis.disconnect = mock_disconnect yield @pytest.fixture(autouse=True) def mock_redis_service_globally(): """Глобально мокает RedisService для всех тестов, включая e2e""" try: import fakeredis.aioredis # Создаем fakeredis сервер fake_redis = fakeredis.aioredis.FakeRedis() # Патчим RedisService класс with patch('storage.redis.RedisService') as mock_service_class: # Создаем mock экземпляр mock_service = mock_service_class.return_value # Эмулируем RedisService.execute метод async def mock_execute(command: str, *args): cmd_method = getattr(fake_redis, command.lower(), None) if cmd_method is not None: if hasattr(cmd_method, '__call__'): return await cmd_method(*args) else: return cmd_method return None # Патчим все основные методы mock_service.execute = mock_execute mock_service.get = fake_redis.get mock_service.set = fake_redis.set mock_service.delete = fake_redis.delete mock_service.exists = fake_redis.exists mock_service.ping = fake_redis.ping mock_service.hset = fake_redis.hset mock_service.hget = fake_redis.hget mock_service.hgetall = fake_redis.hgetall mock_service.hdel = fake_redis.hdel mock_service.expire = fake_redis.expire mock_service.ttl = fake_redis.ttl mock_service.keys = fake_redis.keys mock_service.scan = fake_redis.scan mock_service._client = fake_redis mock_service.is_connected = True # Async методы для connect/disconnect async def mock_connect(): return True async def mock_disconnect(): pass mock_service.connect = mock_connect mock_service.disconnect = mock_disconnect yield except ImportError: # Если fakeredis не доступен, используем базовый mock with patch('storage.redis.RedisService') as mock_service_class: mock_service = mock_service_class.return_value mock_service.execute.return_value = None mock_service.get.return_value = None mock_service.set.return_value = True mock_service.delete.return_value = True mock_service.exists.return_value = False mock_service.ping.return_value = True mock_service.hset.return_value = True mock_service.hget.return_value = None mock_service.hgetall.return_value = {} mock_service.hdel.return_value = True mock_service.expire.return_value = True mock_service.ttl.return_value = -1 mock_service.keys.return_value = [] mock_service.scan.return_value = ([], 0) mock_service.is_connected = True async def mock_connect(): return True async def mock_disconnect(): pass mock_service.connect = mock_connect mock_service.disconnect = mock_disconnect yield