From 783b7ca15f0cccf9466077c78d1212db0698429b Mon Sep 17 00:00:00 2001 From: Untone Date: Wed, 20 Aug 2025 11:52:53 +0300 Subject: [PATCH] testconf-fix --- tests/conftest.py | 421 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 421 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index aaa18067..38db86cf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,9 +11,430 @@ from typing import Optional from unittest.mock import patch import importlib +# 🚨 CRITICAL: Patch Redis BEFORE any other imports to prevent connection attempts +try: + import fakeredis.aioredis + + # Create a fake Redis instance + fake_redis = fakeredis.aioredis.FakeRedis() + + # Patch Redis at module level + import storage.redis + + # Add execute method to FakeRedis + async def fake_redis_execute(command: str, *args): + print(f"🔍 FakeRedis.execute called with: {command}, {args}") + + # Handle Redis commands that might not exist in FakeRedis + if command.upper() == "HSET": + if len(args) >= 3: + key, field, value = args[0], args[1], args[2] + result = fake_redis.hset(key, field, value) + print(f"✅ FakeRedis.execute HSET result: {result}") + return result + elif command.upper() == "HGET": + if len(args) >= 2: + key, field = args[0], args[1] + result = fake_redis.hget(key, field) + print(f"✅ FakeRedis.execute HGET result: {result}") + return result + elif command.upper() == "HDEL": + if len(args) >= 2: + key, field = args[0], args[1] + result = fake_redis.hdel(key, field) + print(f"✅ FakeRedis.execute HDEL result: {result}") + return result + elif command.upper() == "HGETALL": + if len(args) >= 1: + key = args[0] + result = fake_redis.hgetall(key) + print(f"✅ FakeRedis.execute HGETALL result: {result}") + return result + + # Try to use the original FakeRedis method if it exists + cmd_method = getattr(fake_redis, command.lower(), None) + if cmd_method is not None: + if hasattr(cmd_method, '__call__'): + result = await cmd_method(*args) + print(f"✅ FakeRedis.execute result: {result}") + return result + else: + print(f"✅ FakeRedis.execute result: {cmd_method}") + return cmd_method + + print(f"❌ FakeRedis.execute: command {command} not found") + return None + + # Ensure fake_redis is the actual FakeRedis instance, not a fixture + if hasattr(fake_redis, 'hset'): + fake_redis.execute = fake_redis_execute + else: + print("❌ fake_redis is not a proper FakeRedis instance") + # Create a new instance if needed + fake_redis = fakeredis.aioredis.FakeRedis() + fake_redis.execute = fake_redis_execute + + # Mock the global redis instance + storage.redis.redis = fake_redis + + # Mock RedisService class + class MockRedisService: + def __init__(self): + self._client = fake_redis + self.is_connected = True + + async def connect(self): + return True + + async def disconnect(self): + pass + + async def execute(self, command: str, *args): + cmd_method = getattr(fake_redis, command.lower(), None) + if cmd_method is not None: + if hasattr(cmd_method, '__call__'): + return await cmd_method(*args) + else: + return cmd_method + return None + + def get(self, key): + return fake_redis.get(key) + + def set(self, key, value): + return fake_redis.set(key, value) + + def delete(self, key): + return fake_redis.delete(key) + + def exists(self, key): + return fake_redis.exists(key) + + def ping(self): + return True + + def hset(self, key, field, value): + return fake_redis.hset(key, field, value) + + def hget(self, key, field): + return fake_redis.hget(key, field) + + def hgetall(self, key): + return fake_redis.hgetall(key) + + def hdel(self, key, field): + return fake_redis.hdel(key, field) + + def expire(self, key, time): + return fake_redis.expire(key, time) + + def ttl(self, key): + return fake_redis.ttl(key) + + def keys(self, pattern): + return fake_redis.keys(pattern) + + def scan(self, cursor=0, match=None, count=None): + return fake_redis.scan(cursor, match, count) + + storage.redis.RedisService = MockRedisService + + print("✅ Redis patched with fakeredis at module level") + +except ImportError: + # If fakeredis is not available, use basic mocks + import storage.redis + + class MockRedisService: + def __init__(self): + self.is_connected = True + + async def connect(self): + return True + + async def disconnect(self): + pass + + async def execute(self, command: str, *args): + return None + + def get(self, key): + return None + + def set(self, key, value): + return True + + def delete(self, key): + return True + + def exists(self, key): + return False + + def ping(self): + return True + + def hset(self, key, field, value): + return True + + def hget(self, key, field): + return None + + def hgetall(self, key): + return {} + + def hdel(self, key, field): + return True + + def expire(self, key, time): + return True + + def ttl(self, key): + return -1 + + def keys(self, pattern): + return [] + + def scan(self, cursor=0, match=None, count=None): + return ([], 0) + + # Mock the global redis instance + storage.redis.redis = MockRedisService() + storage.redis.RedisService = MockRedisService + + print("✅ Redis patched with basic mocks at module level") + from orm.base import BaseModel as Base +# 🚨 CRITICAL: Create all database tables at module level BEFORE any tests run +def ensure_all_tables_exist(): + """Создает все таблицы в in-memory базе для тестов""" + try: + # Create a temporary engine for table creation + temp_engine = create_engine( + "sqlite:///:memory:", + echo=False, + poolclass=StaticPool, + connect_args={"check_same_thread": False} + ) + + # Import all ORM modules to ensure they're registered + import orm.base + import orm.community + import orm.author + import orm.draft + import orm.shout + import orm.topic + import orm.reaction + import orm.invite + import orm.notification + import orm.collection + import orm.rating + + # Force create all tables + Base.metadata.create_all(temp_engine) + + # Verify tables were created + from sqlalchemy import inspect + inspector = inspect(temp_engine) + created_tables = inspector.get_table_names() + + print(f"✅ Module-level table creation: {len(created_tables)} tables created") + print(f"📋 Tables: {created_tables}") + + # Clean up + temp_engine.dispose() + + except Exception as e: + print(f"❌ Module-level table creation failed: {e}") + raise + +# Execute table creation immediately +ensure_all_tables_exist() + + +def pytest_configure(config): + """Pytest configuration hook - runs before any tests""" + # Ensure Redis is patched before any tests run + try: + import fakeredis.aioredis + + # Create a fake Redis instance + fake_redis = fakeredis.aioredis.FakeRedis() + + # Patch Redis at module level + import storage.redis + + # Add execute method to FakeRedis + async def fake_redis_execute(command: str, *args): + print(f"🔍 FakeRedis.execute called with: {command}, {args}") + + # Handle Redis commands that might not exist in FakeRedis + if command.upper() == "HSET": + if len(args) >= 3: + key, field, value = args[0], args[1], args[2] + result = fake_redis.hset(key, field, value) + print(f"✅ FakeRedis.execute HSET result: {result}") + return result + elif command.upper() == "HGET": + if len(args) >= 2: + key, field = args[0], args[1] + result = fake_redis.hget(key, field) + print(f"✅ FakeRedis.execute HGET result: {result}") + return result + elif command.upper() == "HDEL": + if len(args) >= 2: + key, field = args[0], args[1] + result = fake_redis.hdel(key, field) + print(f"✅ FakeRedis.execute HDEL result: {result}") + return result + elif command.upper() == "HGETALL": + if len(args) >= 1: + key = args[0] + result = fake_redis.hgetall(key) + print(f"✅ FakeRedis.execute HGETALL result: {result}") + return result + + # Try to use the original FakeRedis method if it exists + cmd_method = getattr(fake_redis, command.lower(), None) + if cmd_method is not None: + if hasattr(cmd_method, '__call__'): + result = await cmd_method(*args) + print(f"✅ FakeRedis.execute result: {result}") + return result + else: + print(f"✅ FakeRedis.execute result: {cmd_method}") + return cmd_method + + print(f"❌ FakeRedis.execute: command {command} not found") + return None + + fake_redis.execute = fake_redis_execute + + # Mock the global redis instance + storage.redis.redis = fake_redis + + # Mock RedisService class + class MockRedisService: + def __init__(self): + self._client = fake_redis + self.is_connected = True + + async def connect(self): + return True + + async def disconnect(self): + pass + + async def execute(self, command: str, *args): + return await fake_redis_execute(command, *args) + + def get(self, key): + return fake_redis.get(key) + + def set(self, key, value): + return fake_redis.set(key, value) + + def delete(self, key): + return fake_redis.delete(key) + + def exists(self, key): + return fake_redis.exists(key) + + def ping(self): + return True + + def hset(self, key, field, value): + return fake_redis.hset(key, field, value) + + def hget(self, key, field): + return fake_redis.hget(key, field) + + def hgetall(self, key): + return fake_redis.hgetall(key) + + def hdel(self, key, field): + return fake_redis.hdel(key, field) + + def expire(self, key, time): + return fake_redis.expire(key, time) + + def ttl(self, key): + return fake_redis.ttl(key) + + def keys(self, key): + return fake_redis.keys(key) + + def scan(self, cursor=0, match=None, count=None): + return fake_redis.scan(cursor, match, count) + + storage.redis.RedisService = MockRedisService + + print("✅ Redis patched with fakeredis in pytest_configure") + + except ImportError: + # If fakeredis is not available, use basic mocks + import storage.redis + + class MockRedisService: + def __init__(self): + self.is_connected = True + + async def connect(self): + return True + + async def disconnect(self): + pass + + async def execute(self, command: str, *args): + return None + + def get(self, key): + return None + + def set(self, key, value): + return True + + def delete(self, key): + return True + + def exists(self, key): + return False + + def ping(self): + return True + + def hset(self, key, field, value): + return True + + def hget(self, key, field): + return None + + def hgetall(self, key): + return {} + + def hdel(self, key, field): + return True + + def expire(self, key, time): + return True + + def ttl(self, key): + return -1 + + def keys(self, key): + return [] + + def scan(self, cursor=0, match=None, count=None): + return ([], 0) + + # Mock the global redis instance + storage.redis.redis = MockRedisService() + storage.redis.RedisService = MockRedisService + + print("✅ Redis patched with basic mocks in pytest_configure") + + def force_create_all_tables(engine): """ Принудительно создает все таблицы, перезагружая модели если нужно.