Files
core/tests/conftest.py
Untone 783b7ca15f
Some checks failed
Deploy on push / deploy (push) Failing after 2m50s
testconf-fix
2025-08-20 11:52:53 +03:00

1508 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pytest
import os
from settings import FRONTEND_URL
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
import time
import requests
import subprocess
from typing import Optional
from unittest.mock import patch
import importlib
# 🚨 CRITICAL: Patch Redis BEFORE any other imports to prevent connection attempts
try:
import fakeredis.aioredis
# Create a fake Redis instance
fake_redis = fakeredis.aioredis.FakeRedis()
# Patch Redis at module level
import storage.redis
# Add execute method to FakeRedis
async def fake_redis_execute(command: str, *args):
print(f"🔍 FakeRedis.execute called with: {command}, {args}")
# Handle Redis commands that might not exist in FakeRedis
if command.upper() == "HSET":
if len(args) >= 3:
key, field, value = args[0], args[1], args[2]
result = fake_redis.hset(key, field, value)
print(f"✅ FakeRedis.execute HSET result: {result}")
return result
elif command.upper() == "HGET":
if len(args) >= 2:
key, field = args[0], args[1]
result = fake_redis.hget(key, field)
print(f"✅ FakeRedis.execute HGET result: {result}")
return result
elif command.upper() == "HDEL":
if len(args) >= 2:
key, field = args[0], args[1]
result = fake_redis.hdel(key, field)
print(f"✅ FakeRedis.execute HDEL result: {result}")
return result
elif command.upper() == "HGETALL":
if len(args) >= 1:
key = args[0]
result = fake_redis.hgetall(key)
print(f"✅ FakeRedis.execute HGETALL result: {result}")
return result
# Try to use the original FakeRedis method if it exists
cmd_method = getattr(fake_redis, command.lower(), None)
if cmd_method is not None:
if hasattr(cmd_method, '__call__'):
result = await cmd_method(*args)
print(f"✅ FakeRedis.execute result: {result}")
return result
else:
print(f"✅ FakeRedis.execute result: {cmd_method}")
return cmd_method
print(f"❌ FakeRedis.execute: command {command} not found")
return None
# Ensure fake_redis is the actual FakeRedis instance, not a fixture
if hasattr(fake_redis, 'hset'):
fake_redis.execute = fake_redis_execute
else:
print("❌ fake_redis is not a proper FakeRedis instance")
# Create a new instance if needed
fake_redis = fakeredis.aioredis.FakeRedis()
fake_redis.execute = fake_redis_execute
# Mock the global redis instance
storage.redis.redis = fake_redis
# Mock RedisService class
class MockRedisService:
def __init__(self):
self._client = fake_redis
self.is_connected = True
async def connect(self):
return True
async def disconnect(self):
pass
async def execute(self, command: str, *args):
cmd_method = getattr(fake_redis, command.lower(), None)
if cmd_method is not None:
if hasattr(cmd_method, '__call__'):
return await cmd_method(*args)
else:
return cmd_method
return None
def get(self, key):
return fake_redis.get(key)
def set(self, key, value):
return fake_redis.set(key, value)
def delete(self, key):
return fake_redis.delete(key)
def exists(self, key):
return fake_redis.exists(key)
def ping(self):
return True
def hset(self, key, field, value):
return fake_redis.hset(key, field, value)
def hget(self, key, field):
return fake_redis.hget(key, field)
def hgetall(self, key):
return fake_redis.hgetall(key)
def hdel(self, key, field):
return fake_redis.hdel(key, field)
def expire(self, key, time):
return fake_redis.expire(key, time)
def ttl(self, key):
return fake_redis.ttl(key)
def keys(self, pattern):
return fake_redis.keys(pattern)
def scan(self, cursor=0, match=None, count=None):
return fake_redis.scan(cursor, match, count)
storage.redis.RedisService = MockRedisService
print("✅ Redis patched with fakeredis at module level")
except ImportError:
# If fakeredis is not available, use basic mocks
import storage.redis
class MockRedisService:
def __init__(self):
self.is_connected = True
async def connect(self):
return True
async def disconnect(self):
pass
async def execute(self, command: str, *args):
return None
def get(self, key):
return None
def set(self, key, value):
return True
def delete(self, key):
return True
def exists(self, key):
return False
def ping(self):
return True
def hset(self, key, field, value):
return True
def hget(self, key, field):
return None
def hgetall(self, key):
return {}
def hdel(self, key, field):
return True
def expire(self, key, time):
return True
def ttl(self, key):
return -1
def keys(self, pattern):
return []
def scan(self, cursor=0, match=None, count=None):
return ([], 0)
# Mock the global redis instance
storage.redis.redis = MockRedisService()
storage.redis.RedisService = MockRedisService
print("✅ Redis patched with basic mocks at module level")
from orm.base import BaseModel as Base
# 🚨 CRITICAL: Create all database tables at module level BEFORE any tests run
def ensure_all_tables_exist():
"""Создает все таблицы в in-memory базе для тестов"""
try:
# Create a temporary engine for table creation
temp_engine = create_engine(
"sqlite:///:memory:",
echo=False,
poolclass=StaticPool,
connect_args={"check_same_thread": False}
)
# Import all ORM modules to ensure they're registered
import orm.base
import orm.community
import orm.author
import orm.draft
import orm.shout
import orm.topic
import orm.reaction
import orm.invite
import orm.notification
import orm.collection
import orm.rating
# Force create all tables
Base.metadata.create_all(temp_engine)
# Verify tables were created
from sqlalchemy import inspect
inspector = inspect(temp_engine)
created_tables = inspector.get_table_names()
print(f"✅ Module-level table creation: {len(created_tables)} tables created")
print(f"📋 Tables: {created_tables}")
# Clean up
temp_engine.dispose()
except Exception as e:
print(f"❌ Module-level table creation failed: {e}")
raise
# Execute table creation immediately
ensure_all_tables_exist()
def pytest_configure(config):
"""Pytest configuration hook - runs before any tests"""
# Ensure Redis is patched before any tests run
try:
import fakeredis.aioredis
# Create a fake Redis instance
fake_redis = fakeredis.aioredis.FakeRedis()
# Patch Redis at module level
import storage.redis
# Add execute method to FakeRedis
async def fake_redis_execute(command: str, *args):
print(f"🔍 FakeRedis.execute called with: {command}, {args}")
# Handle Redis commands that might not exist in FakeRedis
if command.upper() == "HSET":
if len(args) >= 3:
key, field, value = args[0], args[1], args[2]
result = fake_redis.hset(key, field, value)
print(f"✅ FakeRedis.execute HSET result: {result}")
return result
elif command.upper() == "HGET":
if len(args) >= 2:
key, field = args[0], args[1]
result = fake_redis.hget(key, field)
print(f"✅ FakeRedis.execute HGET result: {result}")
return result
elif command.upper() == "HDEL":
if len(args) >= 2:
key, field = args[0], args[1]
result = fake_redis.hdel(key, field)
print(f"✅ FakeRedis.execute HDEL result: {result}")
return result
elif command.upper() == "HGETALL":
if len(args) >= 1:
key = args[0]
result = fake_redis.hgetall(key)
print(f"✅ FakeRedis.execute HGETALL result: {result}")
return result
# Try to use the original FakeRedis method if it exists
cmd_method = getattr(fake_redis, command.lower(), None)
if cmd_method is not None:
if hasattr(cmd_method, '__call__'):
result = await cmd_method(*args)
print(f"✅ FakeRedis.execute result: {result}")
return result
else:
print(f"✅ FakeRedis.execute result: {cmd_method}")
return cmd_method
print(f"❌ FakeRedis.execute: command {command} not found")
return None
fake_redis.execute = fake_redis_execute
# Mock the global redis instance
storage.redis.redis = fake_redis
# Mock RedisService class
class MockRedisService:
def __init__(self):
self._client = fake_redis
self.is_connected = True
async def connect(self):
return True
async def disconnect(self):
pass
async def execute(self, command: str, *args):
return await fake_redis_execute(command, *args)
def get(self, key):
return fake_redis.get(key)
def set(self, key, value):
return fake_redis.set(key, value)
def delete(self, key):
return fake_redis.delete(key)
def exists(self, key):
return fake_redis.exists(key)
def ping(self):
return True
def hset(self, key, field, value):
return fake_redis.hset(key, field, value)
def hget(self, key, field):
return fake_redis.hget(key, field)
def hgetall(self, key):
return fake_redis.hgetall(key)
def hdel(self, key, field):
return fake_redis.hdel(key, field)
def expire(self, key, time):
return fake_redis.expire(key, time)
def ttl(self, key):
return fake_redis.ttl(key)
def keys(self, key):
return fake_redis.keys(key)
def scan(self, cursor=0, match=None, count=None):
return fake_redis.scan(cursor, match, count)
storage.redis.RedisService = MockRedisService
print("✅ Redis patched with fakeredis in pytest_configure")
except ImportError:
# If fakeredis is not available, use basic mocks
import storage.redis
class MockRedisService:
def __init__(self):
self.is_connected = True
async def connect(self):
return True
async def disconnect(self):
pass
async def execute(self, command: str, *args):
return None
def get(self, key):
return None
def set(self, key, value):
return True
def delete(self, key):
return True
def exists(self, key):
return False
def ping(self):
return True
def hset(self, key, field, value):
return True
def hget(self, key, field):
return None
def hgetall(self, key):
return {}
def hdel(self, key, field):
return True
def expire(self, key, time):
return True
def ttl(self, key):
return -1
def keys(self, key):
return []
def scan(self, cursor=0, match=None, count=None):
return ([], 0)
# Mock the global redis instance
storage.redis.redis = MockRedisService()
storage.redis.RedisService = MockRedisService
print("✅ Redis patched with basic mocks in pytest_configure")
def force_create_all_tables(engine):
"""
Принудительно создает все таблицы, перезагружая модели если нужно.
Это помогает в CI среде где могут быть проблемы с метаданными.
"""
from sqlalchemy import inspect
import orm
# Перезагружаем все модули ORM для гарантии актуальности метаданных
importlib.reload(orm.base)
importlib.reload(orm.community)
importlib.reload(orm.author)
importlib.reload(orm.draft)
importlib.reload(orm.shout)
importlib.reload(orm.topic)
importlib.reload(orm.reaction)
importlib.reload(orm.invite)
importlib.reload(orm.notification)
importlib.reload(orm.collection)
importlib.reload(orm.rating)
# Получаем обновленную Base
from orm.base import BaseModel as Base
# Создаем все таблицы
Base.metadata.create_all(engine)
# Проверяем результат
inspector = inspect(engine)
created_tables = inspector.get_table_names()
print(f"🔧 Force created tables: {created_tables}")
return created_tables
def get_test_client():
"""
Создает и возвращает тестовый клиент для интеграционных тестов.
Returns:
TestClient: Клиент для выполнения тестовых запросов
"""
from starlette.testclient import TestClient
# Отложенный импорт для предотвращения циклических зависимостей
def _import_app():
from main import app
return app
return TestClient(_import_app())
@pytest.fixture(autouse=True, scope="session")
def _set_requests_default_timeout():
"""Глобально задаем таймаут по умолчанию для requests в тестах, чтобы исключить зависания.
🪓 Упрощение: мокаем методы requests, добавляя timeout=10, если он не указан.
"""
original_request = requests.sessions.Session.request
def request_with_default_timeout(self, method, url, **kwargs): # type: ignore[override]
if "timeout" not in kwargs:
kwargs["timeout"] = 10
return original_request(self, method, url, **kwargs)
requests.sessions.Session.request = request_with_default_timeout # type: ignore[assignment]
yield
requests.sessions.Session.request = original_request # type: ignore[assignment]
@pytest.fixture(scope="session")
def test_engine():
"""
Создает тестовый engine для всей сессии тестирования.
Использует in-memory SQLite для быстрых тестов.
"""
# Принудительно импортируем ВСЕ модели чтобы они были зарегистрированы в Base.metadata
# Это критично для CI среды где импорты могут работать по-разному
import orm.base
import orm.community
import orm.author
import orm.draft
import orm.shout
import orm.topic
import orm.reaction
import orm.invite
import orm.notification
import orm.collection
# Явно импортируем классы для гарантии регистрации
from orm.base import BaseModel as Base
from orm.community import Community, CommunityAuthor, CommunityFollower
from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark
from orm.draft import Draft, DraftAuthor, DraftTopic
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower
from orm.reaction import Reaction
from orm.invite import Invite
from orm.notification import Notification
from orm.collection import Collection
# Инициализируем RBAC систему
import rbac
rbac.initialize_rbac()
engine = create_engine(
"sqlite:///:memory:", echo=False, poolclass=StaticPool, connect_args={"check_same_thread": False}
)
# Принудительно удаляем все таблицы и создаем заново
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
# Debug: проверяем какие таблицы созданы на уровне сессии
from sqlalchemy import inspect
inspector = inspect(engine)
session_tables = inspector.get_table_names()
print(f"🔍 Created tables in test_engine fixture: {session_tables}")
# Проверяем что все критические таблицы созданы
required_tables = [
'author', 'community', 'community_author', 'community_follower',
'draft', 'draft_author', 'draft_topic',
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers', 'shout_collection',
'topic', 'topic_followers', 'reaction', 'invite', 'notification', 'notification_seen',
'collection', 'author_follower', 'author_rating', 'author_bookmark'
]
missing_tables = [table for table in required_tables if table not in session_tables]
if missing_tables:
print(f"❌ Missing tables in test_engine: {missing_tables}")
print(f"Available tables: {session_tables}")
# Fallback: попробуем создать отсутствующие таблицы явно
print("🔄 Attempting to create missing tables explicitly in test_engine...")
try:
# Создаем все таблицы снова с принудительным импортом моделей
Base.metadata.create_all(engine)
# Проверяем снова
inspector = inspect(engine)
updated_tables = inspector.get_table_names()
still_missing = [table for table in required_tables if table not in updated_tables]
if still_missing:
print(f"❌ Still missing tables after explicit creation: {still_missing}")
# Последняя попытка: создаем таблицы по одной
print("🔄 Last attempt: creating tables one by one...")
for table_name in still_missing:
try:
if table_name == 'community_author':
CommunityAuthor.__table__.create(engine, checkfirst=True)
elif table_name == 'community_follower':
CommunityFollower.__table__.create(engine, checkfirst=True)
elif table_name == 'author_follower':
AuthorFollower.__table__.create(engine, checkfirst=True)
elif table_name == 'author_rating':
AuthorRating.__table__.create(engine, checkfirst=True)
elif table_name == 'author_bookmark':
AuthorBookmark.__table__.create(engine, checkfirst=True)
elif table_name == 'draft_author':
DraftAuthor.__table__.create(engine, checkfirst=True)
elif table_name == 'draft_topic':
DraftTopic.__table__.create(engine, checkfirst=True)
elif table_name == 'shout_author':
ShoutAuthor.__table__.create(engine, checkfirst=True)
elif table_name == 'shout_topic':
ShoutTopic.__table__.create(engine, checkfirst=True)
elif table_name == 'shout_reactions_followers':
ShoutReactionsFollower.__table__.create(engine, checkfirst=True)
elif table_name == 'collection':
Collection.__table__.create(engine, checkfirst=True)
elif table_name == 'topic_followers':
TopicFollower.__table__.create(engine, checkfirst=True)
elif table_name == 'notification_seen':
# notification_seen может быть частью notification модели
pass
print(f"✅ Created table {table_name}")
except Exception as e:
print(f"❌ Failed to create table {table_name}: {e}")
# Финальная проверка
inspector = inspect(engine)
final_tables = inspector.get_table_names()
final_missing = [table for table in required_tables if table not in final_tables]
if final_missing:
print(f"❌ Still missing tables after individual creation: {final_missing}")
print("🔄 Last resort: forcing table creation with module reload...")
try:
final_tables = force_create_all_tables(engine)
final_missing = [table for table in required_tables if table not in final_tables]
if final_missing:
raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}")
else:
print("✅ All missing tables created successfully with force creation")
except Exception as e:
print(f"❌ Force creation failed: {e}")
raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}")
else:
print("✅ All missing tables created successfully in test_engine")
else:
print("✅ All missing tables created successfully in test_engine")
except Exception as e:
print(f"❌ Failed to create missing tables in test_engine: {e}")
raise
else:
print("✅ All required tables created in test_engine")
yield engine
# Cleanup после всех тестов
Base.metadata.drop_all(engine)
@pytest.fixture(scope="session")
def test_session_factory(test_engine):
"""
Создает фабрику сессий для тестирования.
"""
return sessionmaker(bind=test_engine, expire_on_commit=False)
@pytest.fixture
def db_session(test_session_factory, test_engine):
"""
Создает новую сессию БД для каждого теста.
Простая реализация без вложенных транзакций.
"""
# Принудительно пересоздаем таблицы для каждого теста
from orm.base import BaseModel as Base
from sqlalchemy import inspect
# Убеждаемся что все модели импортированы перед созданием таблиц
# Явно импортируем все модели чтобы они были зарегистрированы в Base.metadata
from orm.community import Community, CommunityAuthor, CommunityFollower
from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark
from orm.draft import Draft, DraftAuthor, DraftTopic
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower
from orm.reaction import Reaction
from orm.invite import Invite
from orm.notification import Notification
from orm.collection import Collection
# Проверяем что все модели зарегистрированы
print(f"🔍 Registered tables in Base.metadata: {list(Base.metadata.tables.keys())}")
# Убеждаемся что все критические таблицы зарегистрированы в metadata
required_tables = [
'author', 'community', 'community_author', 'community_follower',
'draft', 'draft_author', 'draft_topic',
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
'collection', 'author_follower', 'author_rating', 'author_bookmark'
]
missing_metadata_tables = [table for table in required_tables if table not in Base.metadata.tables]
if missing_metadata_tables:
print(f"❌ Missing tables in Base.metadata: {missing_metadata_tables}")
print("Available tables:", list(Base.metadata.tables.keys()))
raise RuntimeError(f"Critical tables not registered in Base.metadata: {missing_metadata_tables}")
else:
print("✅ All required tables registered in Base.metadata")
# Удаляем все таблицы
Base.metadata.drop_all(test_engine)
# Создаем таблицы заново
Base.metadata.create_all(test_engine)
# Debug: проверяем какие таблицы созданы
inspector = inspect(test_engine)
created_tables = inspector.get_table_names()
print(f"🔍 Created tables in db_session fixture: {created_tables}")
# Проверяем что все критические таблицы созданы
required_tables = [
'author', 'community', 'community_author', 'community_follower',
'draft', 'draft_author', 'draft_topic',
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
'collection', 'author_follower', 'author_rating', 'author_bookmark'
]
missing_tables = [table for table in required_tables if table not in created_tables]
if missing_tables:
print(f"❌ Missing tables in db_session: {missing_tables}")
print(f"Available tables: {created_tables}")
# Fallback: попробуем создать отсутствующие таблицы явно
print("🔄 Attempting to create missing tables explicitly in db_session...")
try:
# Создаем все таблицы снова с принудительным импортом моделей
Base.metadata.create_all(test_engine)
# Проверяем снова
inspector = inspect(test_engine)
updated_tables = inspector.get_table_names()
still_missing = [table for table in required_tables if table not in updated_tables]
if still_missing:
print(f"❌ Still missing tables after explicit creation: {still_missing}")
raise RuntimeError(f"Failed to create required tables: {still_missing}")
else:
print("✅ All missing tables created successfully in db_session")
except Exception as e:
print(f"❌ Failed to create missing tables in db_session: {e}")
raise
else:
print("✅ All required tables created in db_session")
# Проверяем что таблица draft создана с правильной схемой
inspector = inspect(test_engine)
draft_columns = [col['name'] for col in inspector.get_columns('draft')]
print(f"Draft table columns: {draft_columns}")
# Убеждаемся что колонка shout существует
if 'shout' not in draft_columns:
print("WARNING: Column 'shout' not found in draft table!")
session = test_session_factory()
# Создаем дефолтное сообщество для тестов
from orm.community import Community
from orm.author import Author
import time
# Создаем системного автора если его нет
system_author = session.query(Author).where(Author.slug == "system").first()
if not system_author:
system_author = Author(
name="System",
slug="system",
email="system@test.local",
created_at=int(time.time()),
updated_at=int(time.time()),
last_seen=int(time.time())
)
session.add(system_author)
session.flush()
# Создаем дефолтное сообщество если его нет
default_community = session.query(Community).where(Community.id == 1).first()
if not default_community:
default_community = Community(
id=1,
name="Главное сообщество",
slug="main",
desc="Основное сообщество для тестов",
pic="",
created_at=int(time.time()),
created_by=system_author.id,
settings={"default_roles": ["reader", "author"], "available_roles": ["reader", "author", "artist", "expert", "editor", "admin"]},
private=False
)
session.add(default_community)
session.commit()
yield session
# Очищаем все данные после теста
try:
for table in reversed(Base.metadata.sorted_tables):
session.execute(table.delete())
session.commit()
except Exception:
session.rollback()
finally:
session.close()
@pytest.fixture
def db_session_commit(test_session_factory):
"""
Создает сессию БД с реальными commit'ами для интеграционных тестов.
Используется когда нужно тестировать реальные транзакции.
"""
session = test_session_factory()
# Создаем дефолтное сообщество для тестов
from orm.community import Community
from orm.author import Author
# Создаем системного автора если его нет
system_author = session.query(Author).where(Author.slug == "system").first()
if not system_author:
system_author = Author(
name="System",
slug="system",
email="system@test.local",
created_at=int(time.time()),
updated_at=int(time.time()),
last_seen=int(time.time())
)
session.add(system_author)
session.commit()
# Создаем дефолтное сообщество если его нет
default_community = session.query(Community).where(Community.id == 1).first()
if not default_community:
default_community = Community(
id=1,
name="Главное сообщество",
slug="main",
desc="Основное сообщество для тестов",
pic="",
created_at=int(time.time()),
created_by=system_author.id,
settings={"default_roles": ["reader", "author"], "available_roles": ["reader", "author", "artist", "expert", "editor", "admin"]},
private=False
)
session.add(default_community)
session.commit()
yield session
# Очищаем все данные после теста
try:
for table in reversed(Base.metadata.sorted_tables):
session.execute(table.delete())
session.commit()
except Exception:
session.rollback()
finally:
session.close()
@pytest.fixture
def frontend_url():
"""
Возвращает URL фронтенда для тестов.
"""
return FRONTEND_URL or "http://localhost:3000"
@pytest.fixture
def backend_url():
"""
Возвращает URL бэкенда для тестов.
"""
return "http://localhost:8000"
@pytest.fixture(scope="session")
def backend_server():
"""
🚀 Фикстура для автоматического запуска/остановки бэкенд сервера.
Запускает сервер только если он не запущен.
"""
backend_process: Optional[subprocess.Popen] = None
backend_running = False
# Проверяем, не запущен ли уже сервер
try:
response = requests.get("http://localhost:8000/", timeout=2)
if response.status_code == 200:
print("✅ Бэкенд сервер уже запущен")
backend_running = True
else:
backend_running = False
except:
backend_running = False
if not backend_running:
print("🔄 Запускаем бэкенд сервер для тестов...")
try:
# Запускаем бэкенд сервер с тестовой базой данных
env = os.environ.copy()
env["DATABASE_URL"] = "sqlite:///test_e2e.db" # Используем тестовую БД для e2e
env["TESTING"] = "true"
backend_process = subprocess.Popen(
["uv", "run", "python", "dev.py"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=env,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
# Ждем запуска бэкенда
print("⏳ Ждем запуска бэкенда...")
for i in range(30): # Ждем максимум 30 секунд
try:
response = requests.get("http://localhost:8000/", timeout=2)
if response.status_code == 200:
print("✅ Бэкенд сервер запущен")
backend_running = True
break
except:
pass
time.sleep(1)
else:
print("❌ Бэкенд сервер не запустился за 30 секунд")
if backend_process:
backend_process.terminate()
backend_process.wait()
raise Exception("Бэкенд сервер не запустился за 30 секунд")
except Exception as e:
print(f"❌ Ошибка запуска сервера: {e}")
if backend_process:
backend_process.terminate()
backend_process.wait()
raise Exception(f"Не удалось запустить бэкенд сервер: {e}")
yield backend_running
# Cleanup: останавливаем сервер только если мы его запускали
if backend_process and not backend_running:
print("🛑 Останавливаем бэкенд сервер...")
try:
backend_process.terminate()
backend_process.wait(timeout=10)
except subprocess.TimeoutExpired:
backend_process.kill()
backend_process.wait()
@pytest.fixture
def test_client(backend_server):
"""
🧪 Создает тестовый клиент для API тестов.
Требует запущенный бэкенд сервер.
"""
return get_test_client()
@pytest.fixture
async def browser_context():
"""
🌐 Создает контекст браузера для e2e тестов.
Автоматически управляет жизненным циклом браузера.
"""
try:
from playwright.async_api import async_playwright
except ImportError:
pytest.skip("Playwright не установлен")
async with async_playwright() as p:
# Определяем headless режим
headless = os.getenv("PLAYWRIGHT_HEADLESS", "true").lower() == "true"
browser = await p.chromium.launch(
headless=headless,
args=[
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--disable-web-security",
"--disable-features=VizDisplayCompositor"
]
)
context = await browser.new_context(
viewport={"width": 1280, "height": 720},
ignore_https_errors=True,
java_script_enabled=True
)
yield context
await context.close()
await browser.close()
@pytest.fixture
async def page(browser_context):
"""
📄 Создает новую страницу для каждого теста.
"""
page = await browser_context.new_page()
# Устанавливаем таймауты
page.set_default_timeout(30000)
page.set_default_navigation_timeout(30000)
yield page
await page.close()
@pytest.fixture
def api_base_url(backend_server):
"""
🔗 Возвращает базовый URL для API тестов.
"""
return "http://localhost:8000/graphql"
@pytest.fixture
def test_user_credentials():
"""
👤 Возвращает тестовые учетные данные для авторизации.
"""
return {
"email": "test_admin@discours.io",
"password": "password123"
}
@pytest.fixture
def auth_headers(api_base_url, test_user_credentials):
"""
🔐 Создает заголовки авторизации для API тестов.
"""
def _get_auth_headers(token: Optional[str] = None):
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
return _get_auth_headers
@pytest.fixture
def wait_for_server():
"""
⏳ Утилита для ожидания готовности сервера.
"""
def _wait_for_server(url: str, max_attempts: int = 30, delay: float = 1.0):
"""Ждет готовности сервера по указанному URL."""
for attempt in range(max_attempts):
try:
response = requests.get(url, timeout=2)
if response.status_code == 200:
return True
except:
pass
time.sleep(delay)
return False
return _wait_for_server
@pytest.fixture
def test_users(db_session):
"""Создает тестовых пользователей для тестов"""
from orm.author import Author
# Создаем первого пользователя (администратор)
admin_user = Author(
slug="test-admin",
email="test_admin@discours.io",
password="hashed_password_123",
name="Test Admin",
bio="Test admin user for testing",
pic="https://example.com/avatar1.jpg",
oauth={}
)
db_session.add(admin_user)
# Создаем второго пользователя (обычный пользователь)
regular_user = Author(
slug="test-user",
email="test_user@discours.io",
password="hashed_password_456",
name="Test User",
bio="Test regular user for testing",
pic="https://example.com/avatar2.jpg",
oauth={}
)
db_session.add(regular_user)
# Создаем третьего пользователя (только читатель)
reader_user = Author(
slug="test-reader",
email="test_reader@discours.io",
password="hashed_password_789",
name="Test Reader",
bio="Test reader user for testing",
pic="https://example.com/avatar3.jpg",
oauth={}
)
db_session.add(reader_user)
db_session.commit()
return [admin_user, regular_user, reader_user]
@pytest.fixture
def test_community(db_session, test_users):
"""Создает тестовое сообщество для тестов"""
from orm.community import Community
community = Community(
name="Test Community",
slug="test-community",
desc="A test community for testing purposes",
created_by=test_users[0].id, # Администратор создает сообщество
settings={
"default_roles": ["reader", "author"],
"custom_setting": "custom_value"
}
)
db_session.add(community)
db_session.commit()
return community
@pytest.fixture
def community_with_creator(db_session, test_users):
"""Создает сообщество с создателем"""
from orm.community import Community
community = Community(
name="Community With Creator",
slug="community-with-creator",
desc="A test community with a creator",
created_by=test_users[0].id,
settings={"default_roles": ["reader", "author"]}
)
db_session.add(community)
db_session.commit()
return community
@pytest.fixture
def community_without_creator(db_session):
"""Создает сообщество без создателя"""
from orm.community import Community
community = Community(
name="Community Without Creator",
slug="community-without-creator",
desc="A test community without a creator",
created_by=None, # Без создателя
settings={"default_roles": ["reader"]}
)
db_session.add(community)
db_session.commit()
return community
@pytest.fixture
def admin_user_with_roles(db_session, test_users, test_community):
"""Создает администратора с ролями в сообществе"""
from orm.community import CommunityAuthor
ca = CommunityAuthor(
community_id=test_community.id,
author_id=test_users[0].id,
roles="admin,author,reader"
)
db_session.add(ca)
db_session.commit()
return test_users[0]
@pytest.fixture
def regular_user_with_roles(db_session, test_users, test_community):
"""Создает обычного пользователя с ролями в сообществе"""
from orm.community import CommunityAuthor
ca = CommunityAuthor(
community_id=test_community.id,
author_id=test_users[1].id,
roles="author,reader"
)
db_session.add(ca)
db_session.commit()
return test_users[1]
@pytest.fixture
def mock_verify(monkeypatch):
"""Мокает функцию верификации для тестов"""
from unittest.mock import AsyncMock
mock = AsyncMock()
# Здесь можно настроить возвращаемые значения по умолчанию
return mock
@pytest.fixture
def redis_client():
"""Создает Redis клиент для тестов токенов"""
from storage.redis import RedisService
redis_service = RedisService()
return redis_service._client
@pytest.fixture
def fake_redis():
"""Создает fakeredis экземпляр для тестов"""
try:
import fakeredis.aioredis
return fakeredis.aioredis.FakeRedis()
except ImportError:
pytest.skip("fakeredis не установлен - установите: pip install fakeredis[aioredis]")
@pytest.fixture
def redis_service_mock(fake_redis):
"""Создает мок RedisService с fakeredis"""
try:
import fakeredis.aioredis
with patch('storage.redis.RedisService') as mock_service:
# Создаем экземпляр с fakeredis
mock_service.return_value._client = fake_redis
# Эмулируем execute метод
async def mock_execute(command: str, *args):
cmd_method = getattr(fake_redis, command.lower(), None)
if cmd_method is not None:
if hasattr(cmd_method, '__call__'):
return await cmd_method(*args)
else:
return cmd_method
return None
mock_service.return_value.execute = mock_execute
mock_service.return_value.get = fake_redis.get
mock_service.return_value.set = fake_redis.set
mock_service.return_value.delete = fake_redis.delete
mock_service.return_value.exists = fake_redis.exists
mock_service.return_value.hset = fake_redis.hset
mock_service.return_value.hget = fake_redis.hget
mock_service.return_value.hgetall = fake_redis.hgetall
mock_service.return_value.hdel = fake_redis.hdel
mock_service.return_value.expire = fake_redis.expire
mock_service.return_value.ttl = fake_redis.ttl
mock_service.return_value.keys = fake_redis.keys
mock_service.return_value.scan = fake_redis.scan
yield mock_service.return_value
except ImportError:
pytest.skip("fakeredis не установлен - установите: pip install fakeredis[aioredis]")
# Используем fakeredis для тестов Redis
@pytest.fixture
def mock_redis_if_unavailable():
"""Заменяет Redis на fakeredis для тестов - более реалистичная имитация Redis"""
try:
import fakeredis.aioredis
# Создаем fakeredis сервер
fake_redis = fakeredis.aioredis.FakeRedis()
# Патчим глобальный redis экземпляр
with patch('storage.redis.redis') as mock_redis:
# Эмулируем RedisService.execute метод
async def mock_execute(command: str, *args):
cmd_method = getattr(fake_redis, command.lower(), None)
if cmd_method is not None:
if hasattr(cmd_method, '__call__'):
return await cmd_method(*args)
else:
return cmd_method
return None
# Патчим все основные методы Redis
mock_redis.execute = mock_execute
mock_redis.get = fake_redis.get
mock_redis.set = fake_redis.set
mock_redis.delete = fake_redis.delete
mock_redis.exists = fake_redis.exists
mock_redis.ping = fake_redis.ping
mock_redis.hset = fake_redis.hset
mock_redis.hget = fake_redis.hget
mock_redis.hgetall = fake_redis.hgetall
mock_redis.hdel = fake_redis.hdel
mock_redis.expire = fake_redis.expire
mock_redis.ttl = fake_redis.ttl
mock_redis.keys = fake_redis.keys
mock_redis.scan = fake_redis.scan
mock_redis.is_connected = True
# Async методы для connect/disconnect
async def mock_connect():
return True
async def mock_disconnect():
pass
mock_redis.connect = mock_connect
mock_redis.disconnect = mock_disconnect
yield
except ImportError:
pytest.skip("fakeredis не установлен - установите: pip install fakeredis[aioredis]")
@pytest.fixture(autouse=True)
def ensure_rbac_initialized():
"""Обеспечивает инициализацию RBAC системы для каждого теста"""
import rbac
rbac.initialize_rbac()
yield
@pytest.fixture(autouse=True)
def mock_redis_globally():
"""Глобально мокает Redis для всех тестов, включая e2e"""
try:
import fakeredis.aioredis
# Создаем fakeredis сервер
fake_redis = fakeredis.aioredis.FakeRedis()
# Патчим глобальный redis экземпляр
with patch('storage.redis.redis') as mock_redis:
# Эмулируем RedisService.execute метод
async def mock_execute(command: str, *args):
cmd_method = getattr(fake_redis, command.lower(), None)
if cmd_method is not None:
if hasattr(cmd_method, '__call__'):
return await cmd_method(*args)
else:
return cmd_method
return None
# Патчим все основные методы Redis
mock_redis.execute = mock_execute
mock_redis.get = fake_redis.get
mock_redis.set = fake_redis.set
mock_redis.delete = fake_redis.delete
mock_redis.exists = fake_redis.exists
mock_redis.ping = fake_redis.ping
mock_redis.hset = fake_redis.hset
mock_redis.hget = fake_redis.hget
mock_redis.hgetall = fake_redis.hgetall
mock_redis.hdel = fake_redis.hdel
mock_redis.expire = fake_redis.expire
mock_redis.ttl = fake_redis.ttl
mock_redis.keys = fake_redis.keys
mock_redis.scan = fake_redis.scan
mock_redis.is_connected = True
# Async методы для connect/disconnect
async def mock_connect():
return True
async def mock_disconnect():
pass
mock_redis.connect = mock_connect
mock_redis.disconnect = mock_disconnect
yield
except ImportError:
# Если fakeredis не доступен, используем базовый mock
with patch('storage.redis.redis') as mock_redis:
mock_redis.execute.return_value = None
mock_redis.get.return_value = None
mock_redis.set.return_value = True
mock_redis.delete.return_value = True
mock_redis.exists.return_value = False
mock_redis.ping.return_value = True
mock_redis.hset.return_value = True
mock_redis.hget.return_value = None
mock_redis.hgetall.return_value = {}
mock_redis.hdel.return_value = True
mock_redis.expire.return_value = True
mock_redis.ttl.return_value = -1
mock_redis.keys.return_value = []
mock_redis.scan.return_value = ([], 0)
mock_redis.is_connected = True
async def mock_connect():
return True
async def mock_disconnect():
pass
mock_redis.connect = mock_connect
mock_redis.disconnect = mock_disconnect
yield
@pytest.fixture(autouse=True)
def mock_redis_service_globally():
"""Глобально мокает RedisService для всех тестов, включая e2e"""
try:
import fakeredis.aioredis
# Создаем fakeredis сервер
fake_redis = fakeredis.aioredis.FakeRedis()
# Патчим RedisService класс
with patch('storage.redis.RedisService') as mock_service_class:
# Создаем mock экземпляр
mock_service = mock_service_class.return_value
# Эмулируем RedisService.execute метод
async def mock_execute(command: str, *args):
cmd_method = getattr(fake_redis, command.lower(), None)
if cmd_method is not None:
if hasattr(cmd_method, '__call__'):
return await cmd_method(*args)
else:
return cmd_method
return None
# Патчим все основные методы
mock_service.execute = mock_execute
mock_service.get = fake_redis.get
mock_service.set = fake_redis.set
mock_service.delete = fake_redis.delete
mock_service.exists = fake_redis.exists
mock_service.ping = fake_redis.ping
mock_service.hset = fake_redis.hset
mock_service.hget = fake_redis.hget
mock_service.hgetall = fake_redis.hgetall
mock_service.hdel = fake_redis.hdel
mock_service.expire = fake_redis.expire
mock_service.ttl = fake_redis.ttl
mock_service.keys = fake_redis.keys
mock_service.scan = fake_redis.scan
mock_service._client = fake_redis
mock_service.is_connected = True
# Async методы для connect/disconnect
async def mock_connect():
return True
async def mock_disconnect():
pass
mock_service.connect = mock_connect
mock_service.disconnect = mock_disconnect
yield
except ImportError:
# Если fakeredis не доступен, используем базовый mock
with patch('storage.redis.RedisService') as mock_service_class:
mock_service = mock_service_class.return_value
mock_service.execute.return_value = None
mock_service.get.return_value = None
mock_service.set.return_value = True
mock_service.delete.return_value = True
mock_service.exists.return_value = False
mock_service.ping.return_value = True
mock_service.hset.return_value = True
mock_service.hget.return_value = None
mock_service.hgetall.return_value = {}
mock_service.hdel.return_value = True
mock_service.expire.return_value = True
mock_service.ttl.return_value = -1
mock_service.keys.return_value = []
mock_service.scan.return_value = ([], 0)
mock_service.is_connected = True
async def mock_connect():
return True
async def mock_disconnect():
pass
mock_service.connect = mock_connect
mock_service.disconnect = mock_disconnect
yield