1074 lines
40 KiB
Python
1074 lines
40 KiB
Python
import pytest
|
||
import os
|
||
from settings import FRONTEND_URL
|
||
from sqlalchemy import create_engine
|
||
from sqlalchemy.orm import sessionmaker
|
||
from sqlalchemy.pool import StaticPool
|
||
import time
|
||
import requests
|
||
import subprocess
|
||
from typing import Optional
|
||
from unittest.mock import patch
|
||
import importlib
|
||
|
||
# 🚨 CRITICAL: Patch Redis BEFORE any other imports to prevent connection attempts
|
||
try:
|
||
import fakeredis.aioredis
|
||
|
||
# Create a fake Redis instance
|
||
fake_redis = fakeredis.aioredis.FakeRedis()
|
||
|
||
# Patch Redis at module level
|
||
import storage.redis
|
||
|
||
# Mock the global redis instance
|
||
storage.redis.redis = fake_redis
|
||
|
||
print("✅ Redis patched with fakeredis at module level")
|
||
|
||
except ImportError:
|
||
print("❌ fakeredis not available, tests may fail")
|
||
|
||
from orm.base import BaseModel as Base
|
||
|
||
|
||
# 🚨 CRITICAL: Create all database tables at module level BEFORE any tests run
|
||
def ensure_all_tables_exist():
|
||
"""Создает все таблицы в in-memory базе для тестов"""
|
||
try:
|
||
# Create a temporary engine for table creation
|
||
temp_engine = create_engine(
|
||
"sqlite:///:memory:",
|
||
echo=False,
|
||
poolclass=StaticPool,
|
||
connect_args={"check_same_thread": False}
|
||
)
|
||
|
||
# Import all ORM modules to ensure they're registered
|
||
import orm.base
|
||
import orm.community
|
||
import orm.author
|
||
import orm.draft
|
||
import orm.shout
|
||
import orm.topic
|
||
import orm.reaction
|
||
import orm.invite
|
||
import orm.notification
|
||
import orm.collection
|
||
import orm.rating
|
||
|
||
# Force create all tables
|
||
Base.metadata.create_all(temp_engine)
|
||
|
||
# Verify tables were created
|
||
from sqlalchemy import inspect
|
||
inspector = inspect(temp_engine)
|
||
created_tables = inspector.get_table_names()
|
||
|
||
print(f"✅ Module-level table creation: {len(created_tables)} tables created")
|
||
print(f"📋 Tables: {created_tables}")
|
||
|
||
# Clean up
|
||
temp_engine.dispose()
|
||
|
||
except Exception as e:
|
||
print(f"❌ Module-level table creation failed: {e}")
|
||
raise
|
||
|
||
# Execute table creation immediately
|
||
ensure_all_tables_exist()
|
||
|
||
|
||
def pytest_configure(config):
|
||
"""Pytest configuration hook - runs before any tests"""
|
||
# Ensure Redis is patched before any tests run
|
||
try:
|
||
import fakeredis.aioredis
|
||
|
||
# Create a fake Redis instance
|
||
fake_redis = fakeredis.aioredis.FakeRedis()
|
||
|
||
# Patch Redis at module level
|
||
import storage.redis
|
||
|
||
# Mock the global redis instance
|
||
storage.redis.redis = fake_redis
|
||
|
||
print("✅ Redis patched with fakeredis in pytest_configure")
|
||
|
||
except ImportError:
|
||
print("❌ fakeredis not available in pytest_configure")
|
||
|
||
|
||
def force_create_all_tables(engine):
|
||
"""
|
||
Принудительно создает все таблицы, перезагружая модели если нужно.
|
||
Это помогает в CI среде где могут быть проблемы с метаданными.
|
||
"""
|
||
from sqlalchemy import inspect
|
||
import orm
|
||
|
||
# Перезагружаем все модули ORM для гарантии актуальности метаданных
|
||
importlib.reload(orm.base)
|
||
importlib.reload(orm.community)
|
||
importlib.reload(orm.author)
|
||
importlib.reload(orm.draft)
|
||
importlib.reload(orm.shout)
|
||
importlib.reload(orm.topic)
|
||
importlib.reload(orm.reaction)
|
||
importlib.reload(orm.invite)
|
||
importlib.reload(orm.notification)
|
||
importlib.reload(orm.collection)
|
||
importlib.reload(orm.rating)
|
||
|
||
# Получаем обновленную Base
|
||
from orm.base import BaseModel as Base
|
||
|
||
# Создаем все таблицы
|
||
Base.metadata.create_all(engine)
|
||
|
||
# Проверяем результат
|
||
inspector = inspect(engine)
|
||
created_tables = inspector.get_table_names()
|
||
print(f"🔧 Force created tables: {created_tables}")
|
||
|
||
return created_tables
|
||
|
||
|
||
def get_test_client():
|
||
"""
|
||
Создает и возвращает тестовый клиент для интеграционных тестов.
|
||
|
||
Returns:
|
||
TestClient: Клиент для выполнения тестовых запросов
|
||
"""
|
||
from starlette.testclient import TestClient
|
||
|
||
# Отложенный импорт для предотвращения циклических зависимостей
|
||
def _import_app():
|
||
from main import app
|
||
return app
|
||
|
||
return TestClient(_import_app())
|
||
|
||
|
||
@pytest.fixture(autouse=True, scope="session")
|
||
def _set_requests_default_timeout():
|
||
"""Глобально задаем таймаут по умолчанию для requests в тестах, чтобы исключить зависания.
|
||
|
||
🪓 Упрощение: мокаем методы requests, добавляя timeout=10, если он не указан.
|
||
"""
|
||
original_request = requests.sessions.Session.request
|
||
|
||
def request_with_default_timeout(self, method, url, **kwargs): # type: ignore[override]
|
||
if "timeout" not in kwargs:
|
||
kwargs["timeout"] = 10
|
||
return original_request(self, method, url, **kwargs)
|
||
|
||
requests.sessions.Session.request = request_with_default_timeout # type: ignore[assignment]
|
||
yield
|
||
requests.sessions.Session.request = original_request # type: ignore[assignment]
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def test_engine():
|
||
"""
|
||
Создает тестовый engine для всей сессии тестирования.
|
||
Использует in-memory SQLite для быстрых тестов.
|
||
"""
|
||
# Принудительно импортируем ВСЕ модели чтобы они были зарегистрированы в Base.metadata
|
||
import orm.base
|
||
import orm.community
|
||
import orm.author
|
||
import orm.draft
|
||
import orm.shout
|
||
import orm.topic
|
||
import orm.reaction
|
||
import orm.invite
|
||
import orm.notification
|
||
import orm.collection
|
||
|
||
# Явно импортируем классы для гарантии регистрации
|
||
from orm.base import BaseModel as Base
|
||
from orm.community import Community, CommunityAuthor, CommunityFollower
|
||
from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark
|
||
from orm.draft import Draft, DraftAuthor, DraftTopic
|
||
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
|
||
from orm.topic import Topic, TopicFollower
|
||
from orm.reaction import Reaction
|
||
from orm.invite import Invite
|
||
from orm.notification import Notification
|
||
from orm.collection import Collection
|
||
|
||
# Инициализируем RBAC систему
|
||
import rbac
|
||
rbac.initialize_rbac()
|
||
|
||
engine = create_engine(
|
||
"sqlite:///:memory:", echo=False, poolclass=StaticPool, connect_args={"check_same_thread": False}
|
||
)
|
||
|
||
# Принудительно удаляем все таблицы и создаем заново
|
||
Base.metadata.drop_all(engine)
|
||
Base.metadata.create_all(engine)
|
||
|
||
# Debug: проверяем какие таблицы созданы на уровне сессии
|
||
from sqlalchemy import inspect
|
||
inspector = inspect(engine)
|
||
session_tables = inspector.get_table_names()
|
||
print(f"🔍 Created tables in test_engine fixture: {session_tables}")
|
||
|
||
# Проверяем что все критические таблицы созданы
|
||
required_tables = [
|
||
'author', 'community', 'community_author', 'community_follower',
|
||
'draft', 'draft_author', 'draft_topic',
|
||
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
|
||
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
|
||
'collection', 'author_follower', 'author_rating', 'author_bookmark'
|
||
]
|
||
|
||
missing_tables = [table for table in required_tables if table not in session_tables]
|
||
|
||
if missing_tables:
|
||
print(f"❌ Missing tables in test_engine: {missing_tables}")
|
||
print(f"Available tables: {session_tables}")
|
||
|
||
# Fallback: попробуем создать отсутствующие таблицы явно
|
||
print("🔄 Attempting to create missing tables explicitly in test_engine...")
|
||
try:
|
||
# Создаем все таблицы снова с принудительным импортом моделей
|
||
Base.metadata.create_all(engine)
|
||
|
||
# Проверяем снова
|
||
inspector = inspect(engine)
|
||
updated_tables = inspector.get_table_names()
|
||
still_missing = [table for table in required_tables if table not in updated_tables]
|
||
|
||
if still_missing:
|
||
print(f"❌ Still missing tables after explicit creation: {still_missing}")
|
||
# Последняя попытка: создаем таблицы по одной
|
||
print("🔄 Last attempt: creating tables one by one...")
|
||
for table_name in still_missing:
|
||
try:
|
||
if table_name == 'community_author':
|
||
CommunityAuthor.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'community_follower':
|
||
CommunityFollower.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'author_follower':
|
||
AuthorFollower.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'author_rating':
|
||
AuthorRating.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'author_bookmark':
|
||
AuthorBookmark.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'draft_author':
|
||
DraftAuthor.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'draft_topic':
|
||
DraftTopic.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'shout_author':
|
||
ShoutAuthor.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'shout_topic':
|
||
ShoutTopic.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'shout_reactions_followers':
|
||
ShoutReactionsFollower.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'collection':
|
||
Collection.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'topic_followers':
|
||
TopicFollower.__table__.create(engine, checkfirst=True)
|
||
print(f"✅ Created table {table_name}")
|
||
except Exception as e:
|
||
print(f"❌ Failed to create table {table_name}: {e}")
|
||
|
||
# Финальная проверка
|
||
inspector = inspect(engine)
|
||
final_tables = inspector.get_table_names()
|
||
final_missing = [table for table in required_tables if table not in final_tables]
|
||
if final_missing:
|
||
print(f"❌ Still missing tables after individual creation: {final_missing}")
|
||
print("🔄 Last resort: forcing table creation with module reload...")
|
||
try:
|
||
final_tables = force_create_all_tables(engine)
|
||
final_missing = [table for table in required_tables if table not in final_tables]
|
||
if final_missing:
|
||
raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}")
|
||
else:
|
||
print("✅ All missing tables created successfully with force creation")
|
||
except Exception as e:
|
||
print(f"❌ Force creation failed: {e}")
|
||
raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}")
|
||
else:
|
||
print("✅ All missing tables created successfully in test_engine")
|
||
else:
|
||
print("✅ All missing tables created successfully in test_engine")
|
||
except Exception as e:
|
||
print(f"❌ Failed to create missing tables in test_engine: {e}")
|
||
raise
|
||
else:
|
||
print("✅ All required tables created in test_engine")
|
||
|
||
yield engine
|
||
|
||
# Cleanup после всех тестов
|
||
Base.metadata.drop_all(engine)
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def test_session_factory(test_engine):
|
||
"""
|
||
Создает фабрику сессий для тестирования.
|
||
"""
|
||
return sessionmaker(bind=test_engine, expire_on_commit=False)
|
||
|
||
|
||
@pytest.fixture
|
||
def db_session(test_session_factory, test_engine):
|
||
"""
|
||
Создает новую сессию БД для каждого теста.
|
||
Простая реализация без вложенных транзакций.
|
||
"""
|
||
# Принудительно пересоздаем таблицы для каждого теста
|
||
from orm.base import BaseModel as Base
|
||
from sqlalchemy import inspect
|
||
|
||
# Убеждаемся что все модели импортированы перед созданием таблиц
|
||
# Явно импортируем все модели чтобы они были зарегистрированы в Base.metadata
|
||
from orm.community import Community, CommunityAuthor, CommunityFollower
|
||
from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark
|
||
from orm.draft import Draft, DraftAuthor, DraftTopic
|
||
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
|
||
from orm.topic import Topic, TopicFollower
|
||
from orm.reaction import Reaction
|
||
from orm.invite import Invite
|
||
from orm.notification import Notification
|
||
from orm.collection import Collection
|
||
|
||
# Проверяем что все модели зарегистрированы
|
||
print(f"🔍 Registered tables in Base.metadata: {list(Base.metadata.tables.keys())}")
|
||
|
||
# Убеждаемся что все критические таблицы зарегистрированы в metadata
|
||
required_tables = [
|
||
'author', 'community', 'community_author', 'community_follower',
|
||
'draft', 'draft_author', 'draft_topic',
|
||
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
|
||
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
|
||
'collection', 'author_follower', 'author_rating', 'author_bookmark'
|
||
]
|
||
|
||
missing_metadata_tables = [table for table in required_tables if table not in Base.metadata.tables]
|
||
|
||
if missing_metadata_tables:
|
||
print(f"❌ Missing tables in Base.metadata: {missing_metadata_tables}")
|
||
print("Available tables:", list(Base.metadata.tables.keys()))
|
||
raise RuntimeError(f"Critical tables not registered in Base.metadata: {missing_metadata_tables}")
|
||
else:
|
||
print("✅ All required tables registered in Base.metadata")
|
||
|
||
# Удаляем все таблицы
|
||
Base.metadata.drop_all(test_engine)
|
||
|
||
# Создаем таблицы заново
|
||
Base.metadata.create_all(test_engine)
|
||
|
||
# Debug: проверяем какие таблицы созданы
|
||
inspector = inspect(test_engine)
|
||
created_tables = inspector.get_table_names()
|
||
print(f"🔍 Created tables in db_session fixture: {created_tables}")
|
||
|
||
# Проверяем что все критические таблицы созданы
|
||
required_tables = [
|
||
'author', 'community', 'community_author', 'community_follower',
|
||
'draft', 'draft_author', 'draft_topic',
|
||
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
|
||
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
|
||
'collection', 'author_follower', 'author_rating', 'author_bookmark'
|
||
]
|
||
|
||
missing_tables = [table for table in required_tables if table not in created_tables]
|
||
|
||
if missing_tables:
|
||
print(f"❌ Missing tables in db_session: {missing_tables}")
|
||
print(f"Available tables: {created_tables}")
|
||
|
||
# Fallback: попробуем создать отсутствующие таблицы явно
|
||
print("🔄 Attempting to create missing tables explicitly in db_session...")
|
||
try:
|
||
# Создаем все таблицы снова с принудительным импортом моделей
|
||
Base.metadata.create_all(test_engine)
|
||
|
||
# Проверяем снова
|
||
inspector = inspect(test_engine)
|
||
updated_tables = inspector.get_table_names()
|
||
still_missing = [table for table in required_tables if table not in updated_tables]
|
||
|
||
if still_missing:
|
||
print(f"❌ Still missing tables after explicit creation: {still_missing}")
|
||
raise RuntimeError(f"Failed to create required tables: {still_missing}")
|
||
else:
|
||
print("✅ All missing tables created successfully in db_session")
|
||
except Exception as e:
|
||
print(f"❌ Failed to create missing tables in db_session: {e}")
|
||
raise
|
||
else:
|
||
print("✅ All required tables created in db_session")
|
||
|
||
# Проверяем что таблица draft создана с правильной схемой
|
||
inspector = inspect(test_engine)
|
||
draft_columns = [col['name'] for col in inspector.get_columns('draft')]
|
||
print(f"Draft table columns: {draft_columns}")
|
||
|
||
# Убеждаемся что колонка shout существует
|
||
if 'shout' not in draft_columns:
|
||
print("WARNING: Column 'shout' not found in draft table!")
|
||
|
||
session = test_session_factory()
|
||
|
||
# Создаем дефолтное сообщество для тестов
|
||
from orm.community import Community
|
||
from orm.author import Author
|
||
import time
|
||
|
||
# Создаем системного автора если его нет
|
||
system_author = session.query(Author).where(Author.slug == "system").first()
|
||
if not system_author:
|
||
system_author = Author(
|
||
name="System",
|
||
slug="system",
|
||
email="system@test.local",
|
||
created_at=int(time.time()),
|
||
updated_at=int(time.time()),
|
||
last_seen=int(time.time())
|
||
)
|
||
session.add(system_author)
|
||
session.flush()
|
||
|
||
# Создаем дефолтное сообщество если его нет
|
||
default_community = session.query(Community).where(Community.id == 1).first()
|
||
if not default_community:
|
||
default_community = Community(
|
||
id=1,
|
||
name="Главное сообщество",
|
||
slug="main",
|
||
desc="Основное сообщество для тестов",
|
||
pic="",
|
||
created_at=int(time.time()),
|
||
created_by=system_author.id,
|
||
settings={"default_roles": ["reader", "author"], "available_roles": ["reader", "author", "artist", "expert", "editor", "admin"]},
|
||
private=False
|
||
)
|
||
session.add(default_community)
|
||
session.commit()
|
||
|
||
yield session
|
||
|
||
# Очищаем все данные после теста
|
||
try:
|
||
for table in reversed(Base.metadata.sorted_tables):
|
||
session.execute(table.delete())
|
||
session.commit()
|
||
except Exception:
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
@pytest.fixture
|
||
def db_session_commit(test_session_factory):
|
||
"""
|
||
Создает сессию БД с реальными commit'ами для интеграционных тестов.
|
||
Используется когда нужно тестировать реальные транзакции.
|
||
"""
|
||
session = test_session_factory()
|
||
|
||
# Создаем дефолтное сообщество для тестов
|
||
from orm.community import Community
|
||
from orm.author import Author
|
||
|
||
# Создаем системного автора если его нет
|
||
system_author = session.query(Author).where(Author.slug == "system").first()
|
||
if not system_author:
|
||
system_author = Author(
|
||
name="System",
|
||
slug="system",
|
||
email="system@test.local",
|
||
created_at=int(time.time()),
|
||
updated_at=int(time.time()),
|
||
last_seen=int(time.time())
|
||
)
|
||
session.add(system_author)
|
||
session.commit()
|
||
|
||
# Создаем дефолтное сообщество если его нет
|
||
default_community = session.query(Community).where(Community.id == 1).first()
|
||
if not default_community:
|
||
default_community = Community(
|
||
id=1,
|
||
name="Главное сообщество",
|
||
slug="main",
|
||
desc="Основное сообщество для тестов",
|
||
pic="",
|
||
created_at=int(time.time()),
|
||
created_by=system_author.id,
|
||
settings={"default_roles": ["reader", "author"], "available_roles": ["reader", "author", "artist", "expert", "editor", "admin"]},
|
||
private=False
|
||
)
|
||
session.add(default_community)
|
||
session.commit()
|
||
|
||
yield session
|
||
|
||
# Очищаем все данные после теста
|
||
try:
|
||
for table in reversed(Base.metadata.sorted_tables):
|
||
session.execute(table.delete())
|
||
session.commit()
|
||
except Exception:
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
@pytest.fixture
|
||
def frontend_url():
|
||
"""
|
||
Возвращает URL фронтенда для тестов.
|
||
"""
|
||
return FRONTEND_URL or "http://localhost:3000"
|
||
|
||
|
||
@pytest.fixture
|
||
def backend_url():
|
||
"""
|
||
Возвращает URL бэкенда для тестов.
|
||
"""
|
||
return "http://localhost:8000"
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def backend_server():
|
||
"""
|
||
🚀 Фикстура для автоматического запуска/остановки бэкенд сервера.
|
||
Запускает сервер только если он не запущен.
|
||
"""
|
||
backend_process: Optional[subprocess.Popen] = None
|
||
backend_running = False
|
||
|
||
# Проверяем, не запущен ли уже сервер
|
||
try:
|
||
response = requests.get("http://localhost:8000/", timeout=2)
|
||
if response.status_code == 200:
|
||
print("✅ Бэкенд сервер уже запущен")
|
||
backend_running = True
|
||
else:
|
||
backend_running = False
|
||
except:
|
||
backend_running = False
|
||
|
||
if not backend_running:
|
||
print("🔄 Запускаем бэкенд сервер для тестов...")
|
||
try:
|
||
# Запускаем бэкенд сервер с тестовой базой данных
|
||
env = os.environ.copy()
|
||
env["DATABASE_URL"] = "sqlite:///test_e2e.db" # Используем тестовую БД для e2e
|
||
env["TESTING"] = "true"
|
||
|
||
backend_process = subprocess.Popen(
|
||
["uv", "run", "python", "dev.py"],
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
env=env,
|
||
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
)
|
||
|
||
# Ждем запуска бэкенда
|
||
print("⏳ Ждем запуска бэкенда...")
|
||
for i in range(30): # Ждем максимум 30 секунд
|
||
try:
|
||
response = requests.get("http://localhost:8000/", timeout=2)
|
||
if response.status_code == 200:
|
||
print("✅ Бэкенд сервер запущен")
|
||
backend_running = True
|
||
break
|
||
except:
|
||
pass
|
||
time.sleep(1)
|
||
else:
|
||
print("❌ Бэкенд сервер не запустился за 30 секунд")
|
||
if backend_process:
|
||
backend_process.terminate()
|
||
backend_process.wait()
|
||
raise Exception("Бэкенд сервер не запустился за 30 секунд")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Ошибка запуска сервера: {e}")
|
||
if backend_process:
|
||
backend_process.terminate()
|
||
backend_process.wait()
|
||
raise Exception(f"Не удалось запустить бэкенд сервер: {e}")
|
||
|
||
yield backend_running
|
||
|
||
# Cleanup: останавливаем сервер только если мы его запускали
|
||
if backend_process and not backend_running:
|
||
print("🛑 Останавливаем бэкенд сервер...")
|
||
try:
|
||
backend_process.terminate()
|
||
backend_process.wait(timeout=10)
|
||
except subprocess.TimeoutExpired:
|
||
backend_process.kill()
|
||
backend_process.wait()
|
||
|
||
|
||
@pytest.fixture
|
||
def test_client(backend_server):
|
||
"""
|
||
🧪 Создает тестовый клиент для API тестов.
|
||
Требует запущенный бэкенд сервер.
|
||
"""
|
||
return get_test_client()
|
||
|
||
|
||
@pytest.fixture
|
||
async def browser_context():
|
||
"""
|
||
🌐 Создает контекст браузера для e2e тестов.
|
||
Автоматически управляет жизненным циклом браузера.
|
||
"""
|
||
try:
|
||
from playwright.async_api import async_playwright
|
||
except ImportError:
|
||
pytest.skip("Playwright не установлен")
|
||
|
||
async with async_playwright() as p:
|
||
# Определяем headless режим
|
||
headless = os.getenv("PLAYWRIGHT_HEADLESS", "true").lower() == "true"
|
||
|
||
browser = await p.chromium.launch(
|
||
headless=headless,
|
||
args=[
|
||
"--no-sandbox",
|
||
"--disable-dev-shm-usage",
|
||
"--disable-gpu",
|
||
"--disable-web-security",
|
||
"--disable-features=VizDisplayCompositor"
|
||
]
|
||
)
|
||
|
||
context = await browser.new_context(
|
||
viewport={"width": 1280, "height": 720},
|
||
ignore_https_errors=True,
|
||
java_script_enabled=True
|
||
)
|
||
|
||
yield context
|
||
|
||
await context.close()
|
||
await browser.close()
|
||
|
||
|
||
@pytest.fixture
|
||
async def page(browser_context):
|
||
"""
|
||
📄 Создает новую страницу для каждого теста.
|
||
"""
|
||
page = await browser_context.new_page()
|
||
|
||
# Устанавливаем таймауты
|
||
page.set_default_timeout(30000)
|
||
page.set_default_navigation_timeout(30000)
|
||
|
||
yield page
|
||
|
||
await page.close()
|
||
|
||
|
||
@pytest.fixture
|
||
def api_base_url(backend_server):
|
||
"""
|
||
🔗 Возвращает базовый URL для API тестов.
|
||
"""
|
||
return "http://localhost:8000/graphql"
|
||
|
||
|
||
@pytest.fixture
|
||
def test_user_credentials():
|
||
"""
|
||
👤 Возвращает тестовые учетные данные для авторизации.
|
||
"""
|
||
return {
|
||
"email": "test_admin@discours.io",
|
||
"password": "password123"
|
||
}
|
||
|
||
|
||
@pytest.fixture
|
||
def auth_headers(api_base_url, test_user_credentials):
|
||
"""
|
||
🔐 Создает заголовки авторизации для API тестов.
|
||
"""
|
||
def _get_auth_headers(token: Optional[str] = None):
|
||
headers = {"Content-Type": "application/json"}
|
||
if token:
|
||
headers["Authorization"] = f"Bearer {token}"
|
||
return headers
|
||
|
||
return _get_auth_headers
|
||
|
||
|
||
@pytest.fixture
|
||
def wait_for_server():
|
||
"""
|
||
⏳ Утилита для ожидания готовности сервера.
|
||
"""
|
||
def _wait_for_server(url: str, max_attempts: int = 30, delay: float = 1.0):
|
||
"""Ждет готовности сервера по указанному URL."""
|
||
for attempt in range(max_attempts):
|
||
try:
|
||
response = requests.get(url, timeout=2)
|
||
if response.status_code == 200:
|
||
return True
|
||
except:
|
||
pass
|
||
time.sleep(delay)
|
||
return False
|
||
|
||
return _wait_for_server
|
||
|
||
|
||
@pytest.fixture
|
||
def test_users(db_session):
|
||
"""Создает тестовых пользователей для тестов"""
|
||
from orm.author import Author
|
||
|
||
# Создаем первого пользователя (администратор)
|
||
admin_user = Author(
|
||
slug="test-admin",
|
||
email="test_admin@discours.io",
|
||
password="hashed_password_123",
|
||
name="Test Admin",
|
||
bio="Test admin user for testing",
|
||
pic="https://example.com/avatar1.jpg",
|
||
oauth={}
|
||
)
|
||
db_session.add(admin_user)
|
||
|
||
# Создаем второго пользователя (обычный пользователь)
|
||
regular_user = Author(
|
||
slug="test-user",
|
||
email="test_user@discours.io",
|
||
password="hashed_password_456",
|
||
name="Test User",
|
||
bio="Test regular user for testing",
|
||
pic="https://example.com/avatar2.jpg",
|
||
oauth={}
|
||
)
|
||
db_session.add(regular_user)
|
||
|
||
# Создаем третьего пользователя (только читатель)
|
||
reader_user = Author(
|
||
slug="test-reader",
|
||
email="test_reader@discours.io",
|
||
password="hashed_password_789",
|
||
name="Test Reader",
|
||
bio="Test reader user for testing",
|
||
pic="https://example.com/avatar3.jpg",
|
||
oauth={}
|
||
)
|
||
db_session.add(reader_user)
|
||
|
||
db_session.commit()
|
||
|
||
return [admin_user, regular_user, reader_user]
|
||
|
||
|
||
@pytest.fixture
|
||
def test_community(db_session, test_users):
|
||
"""Создает тестовое сообщество для тестов"""
|
||
from orm.community import Community
|
||
|
||
community = Community(
|
||
name="Test Community",
|
||
slug="test-community",
|
||
desc="A test community for testing purposes",
|
||
created_by=test_users[0].id, # Администратор создает сообщество
|
||
settings={
|
||
"default_roles": ["reader", "author"],
|
||
"custom_setting": "custom_value"
|
||
}
|
||
)
|
||
db_session.add(community)
|
||
db_session.commit()
|
||
|
||
return community
|
||
|
||
|
||
@pytest.fixture
|
||
def community_with_creator(db_session, test_users):
|
||
"""Создает сообщество с создателем"""
|
||
from orm.community import Community
|
||
|
||
community = Community(
|
||
name="Community With Creator",
|
||
slug="community-with-creator",
|
||
desc="A test community with a creator",
|
||
created_by=test_users[0].id,
|
||
settings={"default_roles": ["reader", "author"]}
|
||
)
|
||
db_session.add(community)
|
||
db_session.commit()
|
||
|
||
return community
|
||
|
||
|
||
@pytest.fixture
|
||
def community_without_creator(db_session):
|
||
"""Создает сообщество без создателя"""
|
||
from orm.community import Community
|
||
|
||
community = Community(
|
||
name="Community Without Creator",
|
||
slug="community-without-creator",
|
||
desc="A test community without a creator",
|
||
created_by=None, # Без создателя
|
||
settings={"default_roles": ["reader"]}
|
||
)
|
||
db_session.add(community)
|
||
db_session.commit()
|
||
|
||
return community
|
||
|
||
|
||
@pytest.fixture
|
||
def admin_user_with_roles(db_session, test_users, test_community):
|
||
"""Создает администратора с ролями в сообществе"""
|
||
from orm.community import CommunityAuthor
|
||
|
||
ca = CommunityAuthor(
|
||
community_id=test_community.id,
|
||
author_id=test_users[0].id,
|
||
roles="admin,author,reader"
|
||
)
|
||
db_session.add(ca)
|
||
db_session.commit()
|
||
|
||
return test_users[0]
|
||
|
||
|
||
@pytest.fixture
|
||
def regular_user_with_roles(db_session, test_users, test_community):
|
||
"""Создает обычного пользователя с ролями в сообществе"""
|
||
from orm.community import CommunityAuthor
|
||
|
||
ca = CommunityAuthor(
|
||
community_id=test_community.id,
|
||
author_id=test_users[1].id,
|
||
roles="author,reader"
|
||
)
|
||
db_session.add(ca)
|
||
db_session.commit()
|
||
|
||
return test_users[1]
|
||
|
||
|
||
@pytest.fixture
|
||
def mock_verify(monkeypatch):
|
||
"""Мокает функцию верификации для тестов"""
|
||
from unittest.mock import AsyncMock
|
||
|
||
mock = AsyncMock()
|
||
# Здесь можно настроить возвращаемые значения по умолчанию
|
||
return mock
|
||
|
||
|
||
@pytest.fixture
|
||
def redis_client():
|
||
"""Создает Redis клиент для тестов токенов"""
|
||
from storage.redis import RedisService
|
||
|
||
redis_service = RedisService()
|
||
return redis_service._client
|
||
|
||
|
||
@pytest.fixture
|
||
def fake_redis():
|
||
"""Создает fakeredis экземпляр для тестов"""
|
||
try:
|
||
import fakeredis.aioredis
|
||
return fakeredis.aioredis.FakeRedis()
|
||
except ImportError:
|
||
pytest.skip("fakeredis не установлен - установите: pip install fakeredis[aioredis]")
|
||
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def ensure_rbac_initialized():
|
||
"""Обеспечивает инициализацию RBAC системы для каждого теста"""
|
||
import rbac
|
||
rbac.initialize_rbac()
|
||
yield
|
||
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def mock_redis_globally():
|
||
"""Глобально мокает Redis для всех тестов, включая e2e"""
|
||
try:
|
||
import fakeredis.aioredis
|
||
|
||
# Создаем fakeredis сервер
|
||
fake_redis = fakeredis.aioredis.FakeRedis()
|
||
|
||
# Патчим глобальный redis экземпляр
|
||
with patch('storage.redis.redis') as mock_redis:
|
||
# Эмулируем RedisService.execute метод
|
||
async def mock_execute(command: str, *args):
|
||
cmd_method = getattr(fake_redis, command.lower(), None)
|
||
if cmd_method is not None:
|
||
if hasattr(cmd_method, '__call__'):
|
||
return await cmd_method(*args)
|
||
else:
|
||
return cmd_method
|
||
return None
|
||
|
||
# Патчим все основные методы Redis
|
||
mock_redis.execute = mock_execute
|
||
mock_redis.get = fake_redis.get
|
||
mock_redis.set = fake_redis.set
|
||
mock_redis.delete = fake_redis.delete
|
||
mock_redis.exists = fake_redis.exists
|
||
mock_redis.ping = fake_redis.ping
|
||
mock_redis.hset = fake_redis.hset
|
||
mock_redis.hget = fake_redis.hget
|
||
mock_redis.hgetall = fake_redis.hgetall
|
||
mock_redis.hdel = fake_redis.hdel
|
||
mock_redis.expire = fake_redis.expire
|
||
mock_redis.ttl = fake_redis.ttl
|
||
mock_redis.keys = fake_redis.keys
|
||
mock_redis.scan = fake_redis.scan
|
||
mock_redis.is_connected = True
|
||
|
||
# Async методы для connect/disconnect
|
||
async def mock_connect():
|
||
return True
|
||
|
||
async def mock_disconnect():
|
||
pass
|
||
|
||
mock_redis.connect = mock_connect
|
||
mock_redis.disconnect = mock_disconnect
|
||
|
||
yield
|
||
|
||
except ImportError:
|
||
# Если fakeredis не доступен, используем базовый mock
|
||
with patch('storage.redis.redis') as mock_redis:
|
||
mock_redis.execute.return_value = None
|
||
mock_redis.get.return_value = None
|
||
mock_redis.set.return_value = True
|
||
mock_redis.delete.return_value = True
|
||
mock_redis.exists.return_value = False
|
||
mock_redis.ping.return_value = True
|
||
mock_redis.hset.return_value = True
|
||
mock_redis.hget.return_value = None
|
||
mock_redis.hgetall.return_value = {}
|
||
mock_redis.hdel.return_value = True
|
||
mock_redis.expire.return_value = True
|
||
mock_redis.ttl.return_value = -1
|
||
mock_redis.keys.return_value = []
|
||
mock_redis.scan.return_value = ([], 0)
|
||
mock_redis.is_connected = True
|
||
|
||
async def mock_connect():
|
||
return True
|
||
|
||
async def mock_disconnect():
|
||
pass
|
||
|
||
mock_redis.connect = mock_connect
|
||
mock_redis.disconnect = mock_disconnect
|
||
|
||
yield
|
||
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def mock_redis_service_globally():
|
||
"""Глобально мокает RedisService для всех тестов, включая e2e"""
|
||
try:
|
||
import fakeredis.aioredis
|
||
|
||
# Создаем fakeredis сервер
|
||
fake_redis = fakeredis.aioredis.FakeRedis()
|
||
|
||
# Патчим RedisService класс
|
||
with patch('storage.redis.RedisService') as mock_service_class:
|
||
# Создаем mock экземпляр
|
||
mock_service = mock_service_class.return_value
|
||
|
||
# Эмулируем RedisService.execute метод
|
||
async def mock_execute(command: str, *args):
|
||
cmd_method = getattr(fake_redis, command.lower(), None)
|
||
if cmd_method is not None:
|
||
if hasattr(cmd_method, '__call__'):
|
||
return await cmd_method(*args)
|
||
else:
|
||
return cmd_method
|
||
return None
|
||
|
||
# Патчим все основные методы
|
||
mock_service.execute = mock_execute
|
||
mock_service.get = fake_redis.get
|
||
mock_service.set = fake_redis.set
|
||
mock_service.delete = fake_redis.delete
|
||
mock_service.exists = fake_redis.exists
|
||
mock_service.ping = fake_redis.ping
|
||
mock_service.hset = fake_redis.hset
|
||
mock_service.hget = fake_redis.hget
|
||
mock_service.hgetall = fake_redis.hgetall
|
||
mock_service.hdel = fake_redis.hdel
|
||
mock_service.expire = fake_redis.expire
|
||
mock_service.ttl = fake_redis.ttl
|
||
mock_service.keys = fake_redis.keys
|
||
mock_service.scan = fake_redis.scan
|
||
mock_service._client = fake_redis
|
||
mock_service.is_connected = True
|
||
|
||
# Async методы для connect/disconnect
|
||
async def mock_connect():
|
||
return True
|
||
|
||
async def mock_disconnect():
|
||
pass
|
||
|
||
mock_service.connect = mock_connect
|
||
mock_service.disconnect = mock_disconnect
|
||
|
||
yield
|
||
|
||
except ImportError:
|
||
# Если fakeredis не доступен, используем базовый mock
|
||
with patch('storage.redis.RedisService') as mock_service_class:
|
||
mock_service = mock_service_class.return_value
|
||
|
||
mock_service.execute.return_value = None
|
||
mock_service.get.return_value = None
|
||
mock_service.set.return_value = True
|
||
mock_service.delete.return_value = True
|
||
mock_service.exists.return_value = False
|
||
mock_service.ping.return_value = True
|
||
mock_service.hset.return_value = True
|
||
mock_service.hget.return_value = None
|
||
mock_service.hgetall.return_value = {}
|
||
mock_service.hdel.return_value = True
|
||
mock_service.expire.return_value = True
|
||
mock_service.ttl.return_value = -1
|
||
mock_service.keys.return_value = []
|
||
mock_service.scan.return_value = ([], 0)
|
||
mock_service.is_connected = True
|
||
|
||
async def mock_connect():
|
||
return True
|
||
|
||
async def mock_disconnect():
|
||
pass
|
||
|
||
mock_service.connect = mock_connect
|
||
mock_service.disconnect = mock_disconnect
|
||
|
||
yield
|