654 lines
22 KiB
Python
654 lines
22 KiB
Python
import pytest
|
||
import os
|
||
from settings import FRONTEND_URL
|
||
from sqlalchemy import create_engine
|
||
from sqlalchemy.orm import sessionmaker
|
||
from sqlalchemy.pool import StaticPool
|
||
import time
|
||
import requests
|
||
import subprocess
|
||
from typing import Optional
|
||
from unittest.mock import patch
|
||
|
||
from orm.base import BaseModel as Base
|
||
|
||
|
||
def get_test_client():
|
||
"""
|
||
Создает и возвращает тестовый клиент для интеграционных тестов.
|
||
|
||
Returns:
|
||
TestClient: Клиент для выполнения тестовых запросов
|
||
"""
|
||
from starlette.testclient import TestClient
|
||
|
||
# Отложенный импорт для предотвращения циклических зависимостей
|
||
def _import_app():
|
||
from main import app
|
||
return app
|
||
|
||
return TestClient(_import_app())
|
||
|
||
|
||
@pytest.fixture(autouse=True, scope="session")
|
||
def _set_requests_default_timeout():
|
||
"""Глобально задаем таймаут по умолчанию для requests в тестах, чтобы исключить зависания.
|
||
|
||
🪓 Упрощение: мокаем методы requests, добавляя timeout=10, если он не указан.
|
||
"""
|
||
original_request = requests.sessions.Session.request
|
||
|
||
def request_with_default_timeout(self, method, url, **kwargs): # type: ignore[override]
|
||
if "timeout" not in kwargs:
|
||
kwargs["timeout"] = 10
|
||
return original_request(self, method, url, **kwargs)
|
||
|
||
requests.sessions.Session.request = request_with_default_timeout # type: ignore[assignment]
|
||
yield
|
||
requests.sessions.Session.request = original_request # type: ignore[assignment]
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def test_engine():
|
||
"""
|
||
Создает тестовый engine для всей сессии тестирования.
|
||
Использует in-memory SQLite для быстрых тестов.
|
||
"""
|
||
# Импортируем все модели, чтобы они были зарегистрированы
|
||
from orm.base import BaseModel as Base
|
||
from orm.community import Community, CommunityAuthor
|
||
from orm.author import Author
|
||
from orm.draft import Draft, DraftAuthor, DraftTopic
|
||
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
|
||
from orm.topic import Topic
|
||
from orm.reaction import Reaction
|
||
from orm.invite import Invite
|
||
from orm.notification import Notification
|
||
|
||
# Инициализируем RBAC систему
|
||
import rbac
|
||
rbac.initialize_rbac()
|
||
|
||
engine = create_engine(
|
||
"sqlite:///:memory:", echo=False, poolclass=StaticPool, connect_args={"check_same_thread": False}
|
||
)
|
||
|
||
# Принудительно удаляем все таблицы и создаем заново
|
||
Base.metadata.drop_all(engine)
|
||
Base.metadata.create_all(engine)
|
||
|
||
yield engine
|
||
|
||
# Cleanup после всех тестов
|
||
Base.metadata.drop_all(engine)
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def test_session_factory(test_engine):
|
||
"""
|
||
Создает фабрику сессий для тестирования.
|
||
"""
|
||
return sessionmaker(bind=test_engine, expire_on_commit=False)
|
||
|
||
|
||
@pytest.fixture
|
||
def db_session(test_session_factory, test_engine):
|
||
"""
|
||
Создает новую сессию БД для каждого теста.
|
||
Простая реализация без вложенных транзакций.
|
||
"""
|
||
# Принудительно пересоздаем таблицы для каждого теста
|
||
from orm.base import BaseModel as Base
|
||
from sqlalchemy import inspect
|
||
|
||
# Удаляем все таблицы
|
||
Base.metadata.drop_all(test_engine)
|
||
|
||
# Создаем таблицы заново
|
||
Base.metadata.create_all(test_engine)
|
||
|
||
# Проверяем что таблица draft создана с правильной схемой
|
||
inspector = inspect(test_engine)
|
||
draft_columns = [col['name'] for col in inspector.get_columns('draft')]
|
||
print(f"Draft table columns: {draft_columns}")
|
||
|
||
# Убеждаемся что колонка shout существует
|
||
if 'shout' not in draft_columns:
|
||
print("WARNING: Column 'shout' not found in draft table!")
|
||
|
||
session = test_session_factory()
|
||
|
||
# Создаем дефолтное сообщество для тестов
|
||
from orm.community import Community
|
||
from orm.author import Author
|
||
import time
|
||
|
||
# Создаем системного автора если его нет
|
||
system_author = session.query(Author).where(Author.slug == "system").first()
|
||
if not system_author:
|
||
system_author = Author(
|
||
name="System",
|
||
slug="system",
|
||
email="system@test.local",
|
||
created_at=int(time.time()),
|
||
updated_at=int(time.time()),
|
||
last_seen=int(time.time())
|
||
)
|
||
session.add(system_author)
|
||
session.flush()
|
||
|
||
# Создаем дефолтное сообщество если его нет
|
||
default_community = session.query(Community).where(Community.id == 1).first()
|
||
if not default_community:
|
||
default_community = Community(
|
||
id=1,
|
||
name="Главное сообщество",
|
||
slug="main",
|
||
desc="Основное сообщество для тестов",
|
||
pic="",
|
||
created_at=int(time.time()),
|
||
created_by=system_author.id,
|
||
settings={"default_roles": ["reader", "author"], "available_roles": ["reader", "author", "artist", "expert", "editor", "admin"]},
|
||
private=False
|
||
)
|
||
session.add(default_community)
|
||
session.commit()
|
||
|
||
yield session
|
||
|
||
# Очищаем все данные после теста
|
||
try:
|
||
for table in reversed(Base.metadata.sorted_tables):
|
||
session.execute(table.delete())
|
||
session.commit()
|
||
except Exception:
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
@pytest.fixture
|
||
def db_session_commit(test_session_factory):
|
||
"""
|
||
Создает сессию БД с реальными commit'ами для интеграционных тестов.
|
||
Используется когда нужно тестировать реальные транзакции.
|
||
"""
|
||
session = test_session_factory()
|
||
|
||
# Создаем дефолтное сообщество для тестов
|
||
from orm.community import Community
|
||
from orm.author import Author
|
||
|
||
# Создаем системного автора если его нет
|
||
system_author = session.query(Author).where(Author.slug == "system").first()
|
||
if not system_author:
|
||
system_author = Author(
|
||
name="System",
|
||
slug="system",
|
||
email="system@test.local",
|
||
created_at=int(time.time()),
|
||
updated_at=int(time.time()),
|
||
last_seen=int(time.time())
|
||
)
|
||
session.add(system_author)
|
||
session.commit()
|
||
|
||
# Создаем дефолтное сообщество если его нет
|
||
default_community = session.query(Community).where(Community.id == 1).first()
|
||
if not default_community:
|
||
default_community = Community(
|
||
id=1,
|
||
name="Главное сообщество",
|
||
slug="main",
|
||
desc="Основное сообщество для тестов",
|
||
pic="",
|
||
created_at=int(time.time()),
|
||
created_by=system_author.id,
|
||
settings={"default_roles": ["reader", "author"], "available_roles": ["reader", "author", "artist", "expert", "editor", "admin"]},
|
||
private=False
|
||
)
|
||
session.add(default_community)
|
||
session.commit()
|
||
|
||
yield session
|
||
|
||
# Очищаем все данные после теста
|
||
try:
|
||
for table in reversed(Base.metadata.sorted_tables):
|
||
session.execute(table.delete())
|
||
session.commit()
|
||
except Exception:
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
@pytest.fixture
|
||
def frontend_url():
|
||
"""
|
||
Возвращает URL фронтенда для тестов.
|
||
"""
|
||
return FRONTEND_URL or "http://localhost:3000"
|
||
|
||
|
||
@pytest.fixture
|
||
def backend_url():
|
||
"""
|
||
Возвращает URL бэкенда для тестов.
|
||
"""
|
||
return "http://localhost:8000"
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def backend_server():
|
||
"""
|
||
🚀 Фикстура для автоматического запуска/остановки бэкенд сервера.
|
||
Запускает сервер только если он не запущен.
|
||
"""
|
||
backend_process: Optional[subprocess.Popen] = None
|
||
backend_running = False
|
||
|
||
# Проверяем, не запущен ли уже сервер
|
||
try:
|
||
response = requests.get("http://localhost:8000/", timeout=2)
|
||
if response.status_code == 200:
|
||
print("✅ Бэкенд сервер уже запущен")
|
||
backend_running = True
|
||
else:
|
||
backend_running = False
|
||
except:
|
||
backend_running = False
|
||
|
||
if not backend_running:
|
||
print("🔄 Запускаем бэкенд сервер для тестов...")
|
||
try:
|
||
# Запускаем бэкенд сервер
|
||
backend_process = subprocess.Popen(
|
||
["uv", "run", "python", "dev.py"],
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
)
|
||
|
||
# Ждем запуска бэкенда
|
||
print("⏳ Ждем запуска бэкенда...")
|
||
for i in range(30): # Ждем максимум 30 секунд
|
||
try:
|
||
response = requests.get("http://localhost:8000/", timeout=2)
|
||
if response.status_code == 200:
|
||
print("✅ Бэкенд сервер запущен")
|
||
backend_running = True
|
||
break
|
||
except:
|
||
pass
|
||
time.sleep(1)
|
||
else:
|
||
print("❌ Бэкенд сервер не запустился за 30 секунд")
|
||
if backend_process:
|
||
backend_process.terminate()
|
||
backend_process.wait()
|
||
raise Exception("Бэкенд сервер не запустился за 30 секунд")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Ошибка запуска сервера: {e}")
|
||
if backend_process:
|
||
backend_process.terminate()
|
||
backend_process.wait()
|
||
raise Exception(f"Не удалось запустить бэкенд сервер: {e}")
|
||
|
||
yield backend_running
|
||
|
||
# Cleanup: останавливаем сервер только если мы его запускали
|
||
if backend_process and not backend_running:
|
||
print("🛑 Останавливаем бэкенд сервер...")
|
||
try:
|
||
backend_process.terminate()
|
||
backend_process.wait(timeout=10)
|
||
except subprocess.TimeoutExpired:
|
||
backend_process.kill()
|
||
backend_process.wait()
|
||
|
||
|
||
@pytest.fixture
|
||
def test_client(backend_server):
|
||
"""
|
||
🧪 Создает тестовый клиент для API тестов.
|
||
Требует запущенный бэкенд сервер.
|
||
"""
|
||
return get_test_client()
|
||
|
||
|
||
@pytest.fixture
|
||
async def browser_context():
|
||
"""
|
||
🌐 Создает контекст браузера для e2e тестов.
|
||
Автоматически управляет жизненным циклом браузера.
|
||
"""
|
||
try:
|
||
from playwright.async_api import async_playwright
|
||
except ImportError:
|
||
pytest.skip("Playwright не установлен")
|
||
|
||
async with async_playwright() as p:
|
||
# Определяем headless режим
|
||
headless = os.getenv("PLAYWRIGHT_HEADLESS", "true").lower() == "true"
|
||
|
||
browser = await p.chromium.launch(
|
||
headless=headless,
|
||
args=[
|
||
"--no-sandbox",
|
||
"--disable-dev-shm-usage",
|
||
"--disable-gpu",
|
||
"--disable-web-security",
|
||
"--disable-features=VizDisplayCompositor"
|
||
]
|
||
)
|
||
|
||
context = await browser.new_context(
|
||
viewport={"width": 1280, "height": 720},
|
||
ignore_https_errors=True,
|
||
java_script_enabled=True
|
||
)
|
||
|
||
yield context
|
||
|
||
await context.close()
|
||
await browser.close()
|
||
|
||
|
||
@pytest.fixture
|
||
async def page(browser_context):
|
||
"""
|
||
📄 Создает новую страницу для каждого теста.
|
||
"""
|
||
page = await browser_context.new_page()
|
||
|
||
# Устанавливаем таймауты
|
||
page.set_default_timeout(30000)
|
||
page.set_default_navigation_timeout(30000)
|
||
|
||
yield page
|
||
|
||
await page.close()
|
||
|
||
|
||
@pytest.fixture
|
||
def api_base_url(backend_server):
|
||
"""
|
||
🔗 Возвращает базовый URL для API тестов.
|
||
"""
|
||
return "http://localhost:8000/graphql"
|
||
|
||
|
||
@pytest.fixture
|
||
def test_user_credentials():
|
||
"""
|
||
👤 Возвращает тестовые учетные данные для авторизации.
|
||
"""
|
||
return {
|
||
"email": "test_admin@discours.io",
|
||
"password": "password123"
|
||
}
|
||
|
||
|
||
@pytest.fixture
|
||
def auth_headers(api_base_url, test_user_credentials):
|
||
"""
|
||
🔐 Создает заголовки авторизации для API тестов.
|
||
"""
|
||
def _get_auth_headers(token: Optional[str] = None):
|
||
headers = {"Content-Type": "application/json"}
|
||
if token:
|
||
headers["Authorization"] = f"Bearer {token}"
|
||
return headers
|
||
|
||
return _get_auth_headers
|
||
|
||
|
||
@pytest.fixture
|
||
def wait_for_server():
|
||
"""
|
||
⏳ Утилита для ожидания готовности сервера.
|
||
"""
|
||
def _wait_for_server(url: str, max_attempts: int = 30, delay: float = 1.0):
|
||
"""Ждет готовности сервера по указанному URL."""
|
||
for attempt in range(max_attempts):
|
||
try:
|
||
response = requests.get(url, timeout=2)
|
||
if response.status_code == 200:
|
||
return True
|
||
except:
|
||
pass
|
||
time.sleep(delay)
|
||
return False
|
||
|
||
return _wait_for_server
|
||
|
||
|
||
@pytest.fixture
|
||
def test_users(db_session):
|
||
"""Создает тестовых пользователей для тестов"""
|
||
from orm.author import Author
|
||
|
||
# Создаем первого пользователя (администратор)
|
||
admin_user = Author(
|
||
slug="test-admin",
|
||
email="test_admin@discours.io",
|
||
password="hashed_password_123",
|
||
name="Test Admin",
|
||
bio="Test admin user for testing",
|
||
pic="https://example.com/avatar1.jpg",
|
||
oauth={}
|
||
)
|
||
db_session.add(admin_user)
|
||
|
||
# Создаем второго пользователя (обычный пользователь)
|
||
regular_user = Author(
|
||
slug="test-user",
|
||
email="test_user@discours.io",
|
||
password="hashed_password_456",
|
||
name="Test User",
|
||
bio="Test regular user for testing",
|
||
pic="https://example.com/avatar2.jpg",
|
||
oauth={}
|
||
)
|
||
db_session.add(regular_user)
|
||
|
||
# Создаем третьего пользователя (только читатель)
|
||
reader_user = Author(
|
||
slug="test-reader",
|
||
email="test_reader@discours.io",
|
||
password="hashed_password_789",
|
||
name="Test Reader",
|
||
bio="Test reader user for testing",
|
||
pic="https://example.com/avatar3.jpg",
|
||
oauth={}
|
||
)
|
||
db_session.add(reader_user)
|
||
|
||
db_session.commit()
|
||
|
||
return [admin_user, regular_user, reader_user]
|
||
|
||
|
||
@pytest.fixture
|
||
def test_community(db_session, test_users):
|
||
"""Создает тестовое сообщество для тестов"""
|
||
from orm.community import Community
|
||
|
||
community = Community(
|
||
name="Test Community",
|
||
slug="test-community",
|
||
desc="A test community for testing purposes",
|
||
created_by=test_users[0].id, # Администратор создает сообщество
|
||
settings={
|
||
"default_roles": ["reader", "author"],
|
||
"custom_setting": "custom_value"
|
||
}
|
||
)
|
||
db_session.add(community)
|
||
db_session.commit()
|
||
|
||
return community
|
||
|
||
|
||
@pytest.fixture
|
||
def community_with_creator(db_session, test_users):
|
||
"""Создает сообщество с создателем"""
|
||
from orm.community import Community
|
||
|
||
community = Community(
|
||
name="Community With Creator",
|
||
slug="community-with-creator",
|
||
desc="A test community with a creator",
|
||
created_by=test_users[0].id,
|
||
settings={"default_roles": ["reader", "author"]}
|
||
)
|
||
db_session.add(community)
|
||
db_session.commit()
|
||
|
||
return community
|
||
|
||
|
||
@pytest.fixture
|
||
def community_without_creator(db_session):
|
||
"""Создает сообщество без создателя"""
|
||
from orm.community import Community
|
||
|
||
community = Community(
|
||
name="Community Without Creator",
|
||
slug="community-without-creator",
|
||
desc="A test community without a creator",
|
||
created_by=None, # Без создателя
|
||
settings={"default_roles": ["reader"]}
|
||
)
|
||
db_session.add(community)
|
||
db_session.commit()
|
||
|
||
return community
|
||
|
||
|
||
@pytest.fixture
|
||
def admin_user_with_roles(db_session, test_users, test_community):
|
||
"""Создает администратора с ролями в сообществе"""
|
||
from orm.community import CommunityAuthor
|
||
|
||
ca = CommunityAuthor(
|
||
community_id=test_community.id,
|
||
author_id=test_users[0].id,
|
||
roles="admin,author,reader"
|
||
)
|
||
db_session.add(ca)
|
||
db_session.commit()
|
||
|
||
return test_users[0]
|
||
|
||
|
||
@pytest.fixture
|
||
def regular_user_with_roles(db_session, test_users, test_community):
|
||
"""Создает обычного пользователя с ролями в сообществе"""
|
||
from orm.community import CommunityAuthor
|
||
|
||
ca = CommunityAuthor(
|
||
community_id=test_community.id,
|
||
author_id=test_users[1].id,
|
||
roles="author,reader"
|
||
)
|
||
db_session.add(ca)
|
||
db_session.commit()
|
||
|
||
return test_users[1]
|
||
|
||
|
||
@pytest.fixture
|
||
def mock_verify(monkeypatch):
|
||
"""Мокает функцию верификации для тестов"""
|
||
from unittest.mock import AsyncMock
|
||
|
||
mock = AsyncMock()
|
||
# Здесь можно настроить возвращаемые значения по умолчанию
|
||
return mock
|
||
|
||
|
||
@pytest.fixture
|
||
def redis_client():
|
||
"""Создает Redis клиент для тестов токенов"""
|
||
from storage.redis import RedisService
|
||
|
||
redis_service = RedisService()
|
||
return redis_service._client
|
||
|
||
|
||
# Mock для Redis если он недоступен
|
||
@pytest.fixture
|
||
def mock_redis_if_unavailable():
|
||
"""Мокает Redis если он недоступен - для тестов которые нуждаются в Redis"""
|
||
try:
|
||
import fakeredis.aioredis
|
||
# Используем fakeredis для тестов
|
||
with patch('storage.redis.redis') as mock_redis:
|
||
# Создаем fakeredis сервер
|
||
fake_redis = fakeredis.aioredis.FakeRedis()
|
||
|
||
# Создаем mock для execute метода, который эмулирует поведение RedisService.execute
|
||
async def mock_execute(command: str, *args):
|
||
cmd_method = getattr(fake_redis, command.lower(), None)
|
||
if cmd_method is not None:
|
||
if hasattr(cmd_method, '__call__'):
|
||
return await cmd_method(*args)
|
||
else:
|
||
return cmd_method
|
||
return None
|
||
|
||
# Патчим методы Redis
|
||
mock_redis.execute = mock_execute
|
||
mock_redis.get = fake_redis.get
|
||
mock_redis.set = fake_redis.set
|
||
mock_redis.delete = fake_redis.delete
|
||
mock_redis.exists = fake_redis.exists
|
||
mock_redis.ping = fake_redis.ping
|
||
mock_redis.is_connected = True
|
||
|
||
# Добавляем async методы для connect/disconnect
|
||
async def mock_connect():
|
||
return True
|
||
|
||
async def mock_disconnect():
|
||
pass
|
||
|
||
mock_redis.connect = mock_connect
|
||
mock_redis.disconnect = mock_disconnect
|
||
|
||
yield
|
||
except ImportError:
|
||
# fakeredis не установлен, используем базовый mock
|
||
with patch('storage.redis.redis') as mock_redis:
|
||
# Создаем базовый mock для Redis методов
|
||
mock_redis.execute.return_value = None
|
||
mock_redis.get.return_value = None
|
||
mock_redis.set.return_value = True
|
||
mock_redis.delete.return_value = True
|
||
mock_redis.exists.return_value = False
|
||
mock_redis.ping.return_value = True
|
||
mock_redis.is_connected = False
|
||
|
||
# Добавляем async методы для connect/disconnect
|
||
async def mock_connect():
|
||
return True
|
||
|
||
async def mock_disconnect():
|
||
pass
|
||
|
||
mock_redis.connect = mock_connect
|
||
mock_redis.disconnect = mock_disconnect
|
||
|
||
yield
|
||
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def ensure_rbac_initialized():
|
||
"""Обеспечивает инициализацию RBAC системы для каждого теста"""
|
||
import rbac
|
||
rbac.initialize_rbac()
|
||
yield
|