Files
core/tests/conftest.py
Untone a37d9c6364
Some checks failed
Deploy on push / deploy (push) Failing after 2m46s
minor-fix
2025-08-19 12:34:24 +03:00

657 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pytest
import os
from settings import FRONTEND_URL
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
import time
import requests
import subprocess
from typing import Optional
from unittest.mock import patch
from orm.base import BaseModel as Base
def get_test_client():
"""
Создает и возвращает тестовый клиент для интеграционных тестов.
Returns:
TestClient: Клиент для выполнения тестовых запросов
"""
from starlette.testclient import TestClient
# Отложенный импорт для предотвращения циклических зависимостей
def _import_app():
from main import app
return app
return TestClient(_import_app())
@pytest.fixture(autouse=True, scope="session")
def _set_requests_default_timeout():
"""Глобально задаем таймаут по умолчанию для requests в тестах, чтобы исключить зависания.
🪓 Упрощение: мокаем методы requests, добавляя timeout=10, если он не указан.
"""
original_request = requests.sessions.Session.request
def request_with_default_timeout(self, method, url, **kwargs): # type: ignore[override]
if "timeout" not in kwargs:
kwargs["timeout"] = 10
return original_request(self, method, url, **kwargs)
requests.sessions.Session.request = request_with_default_timeout # type: ignore[assignment]
yield
requests.sessions.Session.request = original_request # type: ignore[assignment]
@pytest.fixture(scope="session")
def test_engine():
"""
Создает тестовый engine для всей сессии тестирования.
Использует in-memory SQLite для быстрых тестов.
"""
# Импортируем все модели, чтобы они были зарегистрированы
from orm.base import BaseModel as Base
from orm.community import Community, CommunityAuthor
from orm.author import Author
from orm.draft import Draft, DraftAuthor, DraftTopic
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower
from orm.reaction import Reaction
from orm.invite import Invite
from orm.notification import Notification
from orm.collection import Collection
from orm.shout import ShoutReactionsFollower
from orm.author import AuthorFollower, AuthorRating
# Инициализируем RBAC систему
import rbac
rbac.initialize_rbac()
engine = create_engine(
"sqlite:///:memory:", echo=False, poolclass=StaticPool, connect_args={"check_same_thread": False}
)
# Принудительно удаляем все таблицы и создаем заново
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
yield engine
# Cleanup после всех тестов
Base.metadata.drop_all(engine)
@pytest.fixture(scope="session")
def test_session_factory(test_engine):
"""
Создает фабрику сессий для тестирования.
"""
return sessionmaker(bind=test_engine, expire_on_commit=False)
@pytest.fixture
def db_session(test_session_factory, test_engine):
"""
Создает новую сессию БД для каждого теста.
Простая реализация без вложенных транзакций.
"""
# Принудительно пересоздаем таблицы для каждого теста
from orm.base import BaseModel as Base
from sqlalchemy import inspect
# Удаляем все таблицы
Base.metadata.drop_all(test_engine)
# Создаем таблицы заново
Base.metadata.create_all(test_engine)
# Проверяем что таблица draft создана с правильной схемой
inspector = inspect(test_engine)
draft_columns = [col['name'] for col in inspector.get_columns('draft')]
print(f"Draft table columns: {draft_columns}")
# Убеждаемся что колонка shout существует
if 'shout' not in draft_columns:
print("WARNING: Column 'shout' not found in draft table!")
session = test_session_factory()
# Создаем дефолтное сообщество для тестов
from orm.community import Community
from orm.author import Author
import time
# Создаем системного автора если его нет
system_author = session.query(Author).where(Author.slug == "system").first()
if not system_author:
system_author = Author(
name="System",
slug="system",
email="system@test.local",
created_at=int(time.time()),
updated_at=int(time.time()),
last_seen=int(time.time())
)
session.add(system_author)
session.flush()
# Создаем дефолтное сообщество если его нет
default_community = session.query(Community).where(Community.id == 1).first()
if not default_community:
default_community = Community(
id=1,
name="Главное сообщество",
slug="main",
desc="Основное сообщество для тестов",
pic="",
created_at=int(time.time()),
created_by=system_author.id,
settings={"default_roles": ["reader", "author"], "available_roles": ["reader", "author", "artist", "expert", "editor", "admin"]},
private=False
)
session.add(default_community)
session.commit()
yield session
# Очищаем все данные после теста
try:
for table in reversed(Base.metadata.sorted_tables):
session.execute(table.delete())
session.commit()
except Exception:
session.rollback()
finally:
session.close()
@pytest.fixture
def db_session_commit(test_session_factory):
"""
Создает сессию БД с реальными commit'ами для интеграционных тестов.
Используется когда нужно тестировать реальные транзакции.
"""
session = test_session_factory()
# Создаем дефолтное сообщество для тестов
from orm.community import Community
from orm.author import Author
# Создаем системного автора если его нет
system_author = session.query(Author).where(Author.slug == "system").first()
if not system_author:
system_author = Author(
name="System",
slug="system",
email="system@test.local",
created_at=int(time.time()),
updated_at=int(time.time()),
last_seen=int(time.time())
)
session.add(system_author)
session.commit()
# Создаем дефолтное сообщество если его нет
default_community = session.query(Community).where(Community.id == 1).first()
if not default_community:
default_community = Community(
id=1,
name="Главное сообщество",
slug="main",
desc="Основное сообщество для тестов",
pic="",
created_at=int(time.time()),
created_by=system_author.id,
settings={"default_roles": ["reader", "author"], "available_roles": ["reader", "author", "artist", "expert", "editor", "admin"]},
private=False
)
session.add(default_community)
session.commit()
yield session
# Очищаем все данные после теста
try:
for table in reversed(Base.metadata.sorted_tables):
session.execute(table.delete())
session.commit()
except Exception:
session.rollback()
finally:
session.close()
@pytest.fixture
def frontend_url():
"""
Возвращает URL фронтенда для тестов.
"""
return FRONTEND_URL or "http://localhost:3000"
@pytest.fixture
def backend_url():
"""
Возвращает URL бэкенда для тестов.
"""
return "http://localhost:8000"
@pytest.fixture(scope="session")
def backend_server():
"""
🚀 Фикстура для автоматического запуска/остановки бэкенд сервера.
Запускает сервер только если он не запущен.
"""
backend_process: Optional[subprocess.Popen] = None
backend_running = False
# Проверяем, не запущен ли уже сервер
try:
response = requests.get("http://localhost:8000/", timeout=2)
if response.status_code == 200:
print("✅ Бэкенд сервер уже запущен")
backend_running = True
else:
backend_running = False
except:
backend_running = False
if not backend_running:
print("🔄 Запускаем бэкенд сервер для тестов...")
try:
# Запускаем бэкенд сервер
backend_process = subprocess.Popen(
["uv", "run", "python", "dev.py"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
# Ждем запуска бэкенда
print("⏳ Ждем запуска бэкенда...")
for i in range(30): # Ждем максимум 30 секунд
try:
response = requests.get("http://localhost:8000/", timeout=2)
if response.status_code == 200:
print("✅ Бэкенд сервер запущен")
backend_running = True
break
except:
pass
time.sleep(1)
else:
print("❌ Бэкенд сервер не запустился за 30 секунд")
if backend_process:
backend_process.terminate()
backend_process.wait()
raise Exception("Бэкенд сервер не запустился за 30 секунд")
except Exception as e:
print(f"❌ Ошибка запуска сервера: {e}")
if backend_process:
backend_process.terminate()
backend_process.wait()
raise Exception(f"Не удалось запустить бэкенд сервер: {e}")
yield backend_running
# Cleanup: останавливаем сервер только если мы его запускали
if backend_process and not backend_running:
print("🛑 Останавливаем бэкенд сервер...")
try:
backend_process.terminate()
backend_process.wait(timeout=10)
except subprocess.TimeoutExpired:
backend_process.kill()
backend_process.wait()
@pytest.fixture
def test_client(backend_server):
"""
🧪 Создает тестовый клиент для API тестов.
Требует запущенный бэкенд сервер.
"""
return get_test_client()
@pytest.fixture
async def browser_context():
"""
🌐 Создает контекст браузера для e2e тестов.
Автоматически управляет жизненным циклом браузера.
"""
try:
from playwright.async_api import async_playwright
except ImportError:
pytest.skip("Playwright не установлен")
async with async_playwright() as p:
# Определяем headless режим
headless = os.getenv("PLAYWRIGHT_HEADLESS", "true").lower() == "true"
browser = await p.chromium.launch(
headless=headless,
args=[
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--disable-web-security",
"--disable-features=VizDisplayCompositor"
]
)
context = await browser.new_context(
viewport={"width": 1280, "height": 720},
ignore_https_errors=True,
java_script_enabled=True
)
yield context
await context.close()
await browser.close()
@pytest.fixture
async def page(browser_context):
"""
📄 Создает новую страницу для каждого теста.
"""
page = await browser_context.new_page()
# Устанавливаем таймауты
page.set_default_timeout(30000)
page.set_default_navigation_timeout(30000)
yield page
await page.close()
@pytest.fixture
def api_base_url(backend_server):
"""
🔗 Возвращает базовый URL для API тестов.
"""
return "http://localhost:8000/graphql"
@pytest.fixture
def test_user_credentials():
"""
👤 Возвращает тестовые учетные данные для авторизации.
"""
return {
"email": "test_admin@discours.io",
"password": "password123"
}
@pytest.fixture
def auth_headers(api_base_url, test_user_credentials):
"""
🔐 Создает заголовки авторизации для API тестов.
"""
def _get_auth_headers(token: Optional[str] = None):
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
return _get_auth_headers
@pytest.fixture
def wait_for_server():
"""
⏳ Утилита для ожидания готовности сервера.
"""
def _wait_for_server(url: str, max_attempts: int = 30, delay: float = 1.0):
"""Ждет готовности сервера по указанному URL."""
for attempt in range(max_attempts):
try:
response = requests.get(url, timeout=2)
if response.status_code == 200:
return True
except:
pass
time.sleep(delay)
return False
return _wait_for_server
@pytest.fixture
def test_users(db_session):
"""Создает тестовых пользователей для тестов"""
from orm.author import Author
# Создаем первого пользователя (администратор)
admin_user = Author(
slug="test-admin",
email="test_admin@discours.io",
password="hashed_password_123",
name="Test Admin",
bio="Test admin user for testing",
pic="https://example.com/avatar1.jpg",
oauth={}
)
db_session.add(admin_user)
# Создаем второго пользователя (обычный пользователь)
regular_user = Author(
slug="test-user",
email="test_user@discours.io",
password="hashed_password_456",
name="Test User",
bio="Test regular user for testing",
pic="https://example.com/avatar2.jpg",
oauth={}
)
db_session.add(regular_user)
# Создаем третьего пользователя (только читатель)
reader_user = Author(
slug="test-reader",
email="test_reader@discours.io",
password="hashed_password_789",
name="Test Reader",
bio="Test reader user for testing",
pic="https://example.com/avatar3.jpg",
oauth={}
)
db_session.add(reader_user)
db_session.commit()
return [admin_user, regular_user, reader_user]
@pytest.fixture
def test_community(db_session, test_users):
"""Создает тестовое сообщество для тестов"""
from orm.community import Community
community = Community(
name="Test Community",
slug="test-community",
desc="A test community for testing purposes",
created_by=test_users[0].id, # Администратор создает сообщество
settings={
"default_roles": ["reader", "author"],
"custom_setting": "custom_value"
}
)
db_session.add(community)
db_session.commit()
return community
@pytest.fixture
def community_with_creator(db_session, test_users):
"""Создает сообщество с создателем"""
from orm.community import Community
community = Community(
name="Community With Creator",
slug="community-with-creator",
desc="A test community with a creator",
created_by=test_users[0].id,
settings={"default_roles": ["reader", "author"]}
)
db_session.add(community)
db_session.commit()
return community
@pytest.fixture
def community_without_creator(db_session):
"""Создает сообщество без создателя"""
from orm.community import Community
community = Community(
name="Community Without Creator",
slug="community-without-creator",
desc="A test community without a creator",
created_by=None, # Без создателя
settings={"default_roles": ["reader"]}
)
db_session.add(community)
db_session.commit()
return community
@pytest.fixture
def admin_user_with_roles(db_session, test_users, test_community):
"""Создает администратора с ролями в сообществе"""
from orm.community import CommunityAuthor
ca = CommunityAuthor(
community_id=test_community.id,
author_id=test_users[0].id,
roles="admin,author,reader"
)
db_session.add(ca)
db_session.commit()
return test_users[0]
@pytest.fixture
def regular_user_with_roles(db_session, test_users, test_community):
"""Создает обычного пользователя с ролями в сообществе"""
from orm.community import CommunityAuthor
ca = CommunityAuthor(
community_id=test_community.id,
author_id=test_users[1].id,
roles="author,reader"
)
db_session.add(ca)
db_session.commit()
return test_users[1]
@pytest.fixture
def mock_verify(monkeypatch):
"""Мокает функцию верификации для тестов"""
from unittest.mock import AsyncMock
mock = AsyncMock()
# Здесь можно настроить возвращаемые значения по умолчанию
return mock
@pytest.fixture
def redis_client():
"""Создает Redis клиент для тестов токенов"""
from storage.redis import RedisService
redis_service = RedisService()
return redis_service._client
# Mock для Redis если он недоступен
@pytest.fixture
def mock_redis_if_unavailable():
"""Мокает Redis если он недоступен - для тестов которые нуждаются в Redis"""
try:
import fakeredis.aioredis
# Используем fakeredis для тестов
with patch('storage.redis.redis') as mock_redis:
# Создаем fakeredis сервер
fake_redis = fakeredis.aioredis.FakeRedis()
# Создаем mock для execute метода, который эмулирует поведение RedisService.execute
async def mock_execute(command: str, *args):
cmd_method = getattr(fake_redis, command.lower(), None)
if cmd_method is not None:
if hasattr(cmd_method, '__call__'):
return await cmd_method(*args)
else:
return cmd_method
return None
# Патчим методы Redis
mock_redis.execute = mock_execute
mock_redis.get = fake_redis.get
mock_redis.set = fake_redis.set
mock_redis.delete = fake_redis.delete
mock_redis.exists = fake_redis.exists
mock_redis.ping = fake_redis.ping
mock_redis.is_connected = True
# Добавляем async методы для connect/disconnect
async def mock_connect():
return True
async def mock_disconnect():
pass
mock_redis.connect = mock_connect
mock_redis.disconnect = mock_disconnect
yield
except ImportError:
# fakeredis не установлен, используем базовый mock
with patch('storage.redis.redis') as mock_redis:
# Создаем базовый mock для Redis методов
mock_redis.execute.return_value = None
mock_redis.get.return_value = None
mock_redis.set.return_value = True
mock_redis.delete.return_value = True
mock_redis.exists.return_value = False
mock_redis.ping.return_value = True
mock_redis.is_connected = False
# Добавляем async методы для connect/disconnect
async def mock_connect():
return True
async def mock_disconnect():
pass
mock_redis.connect = mock_connect
mock_redis.disconnect = mock_disconnect
yield
@pytest.fixture(autouse=True)
def ensure_rbac_initialized():
"""Обеспечивает инициализацию RBAC системы для каждого теста"""
import rbac
rbac.initialize_rbac()
yield