Files
core/tests/conftest.py
Untone ac0111cdb9
All checks were successful
Deploy on push / deploy (push) Successful in 57m1s
tests-upgrade
2025-09-25 09:40:12 +03:00

1564 lines
65 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pytest
import os
from settings import FRONTEND_URL
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
import time
import requests
import subprocess
from typing import Optional
from unittest.mock import patch, MagicMock
import importlib
# 🚨 CRITICAL: Patch Redis BEFORE any other imports to prevent connection attempts
try:
import fakeredis.aioredis
# Create a fake Redis instance
fake_redis = fakeredis.aioredis.FakeRedis()
# Add the execute method that our Redis service needs
async def execute_method(command: str, *args):
"""Добавляем метод execute для совместимости с RedisService"""
cmd_method = getattr(fake_redis, command.lower(), None)
if cmd_method is not None:
if hasattr(cmd_method, '__call__'):
return await cmd_method(*args)
else:
return cmd_method
return None
fake_redis.execute = execute_method
# Patch Redis at module level
import storage.redis
# Mock the global redis instance
storage.redis.redis = fake_redis
print("✅ Redis patched with fakeredis at module level")
except ImportError:
print("❌ fakeredis not available, tests may fail")
from orm.base import BaseModel as Base
# 🚨 CRITICAL: Create all database tables at module level BEFORE any tests run
def ensure_all_tables_exist():
"""Создает все таблицы в in-memory базе для тестов"""
try:
# Create a temporary engine for table creation
temp_engine = create_engine(
"sqlite:///:memory:",
echo=False,
poolclass=StaticPool,
connect_args={"check_same_thread": False}
)
# Import all ORM modules to ensure they're registered
import orm.base
import orm.community
import orm.author
import orm.draft
import orm.shout
import orm.topic
import orm.reaction
import orm.invite
import orm.notification
import orm.collection
import orm.rating
# Force create all tables
Base.metadata.create_all(temp_engine)
# Verify tables were created
from sqlalchemy import inspect
inspector = inspect(temp_engine)
created_tables = inspector.get_table_names()
print(f"✅ Module-level table creation: {len(created_tables)} tables created")
print(f"📋 Tables: {created_tables}")
# Clean up
temp_engine.dispose()
except Exception as e:
print(f"❌ Module-level table creation failed: {e}")
# На CI не падаем, просто логируем ошибку
print(f"⚠️ Continuing despite table creation failure (CI environment)")
# Execute table creation immediately
ensure_all_tables_exist()
# Дополнительная проверка: убеждаемся что все модели импортированы
def ensure_all_models_imported():
"""Убеждается что все модели ORM импортированы и зарегистрированы"""
try:
# Принудительно импортируем все модели
import orm.base
import orm.community
import orm.author
import orm.draft
import orm.shout
import orm.topic
import orm.reaction
import orm.invite
import orm.notification
import orm.collection
import orm.rating
# Проверяем что все модели зарегистрированы
from orm.base import BaseModel as Base
registered_tables = list(Base.metadata.tables.keys())
print(f"🔍 ensure_all_models_imported: {len(registered_tables)} tables registered")
print(f"📋 Registered tables: {registered_tables}")
# Проверяем что все критические таблицы зарегистрированы
required_tables = [
'author', 'community', 'community_author', 'community_follower',
'draft', 'draft_author', 'draft_topic',
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
'collection', 'author_follower', 'author_rating', 'author_bookmark'
]
missing_tables = [table for table in required_tables if table not in registered_tables]
if missing_tables:
print(f"⚠️ ensure_all_models_imported: missing tables: {missing_tables}")
print(f"Available tables: {registered_tables}")
# Пробуем принудительно импортировать модели
try:
print("🔄 ensure_all_models_imported: attempting explicit model imports...")
# Явно импортируем все модели
from orm.community import Community, CommunityAuthor, CommunityFollower
from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark
from orm.draft import Draft, DraftAuthor, DraftTopic
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower
from orm.reaction import Reaction
from orm.invite import Invite
from orm.notification import Notification
from orm.collection import Collection
# Проверяем снова
updated_tables = list(Base.metadata.tables.keys())
still_missing = [table for table in required_tables if table not in updated_tables]
if still_missing:
print(f"⚠️ ensure_all_models_imported: still missing tables: {still_missing}")
else:
print("✅ ensure_all_models_imported: all tables registered after explicit import")
except Exception as e:
print(f"⚠️ ensure_all_models_imported: failed to import models explicitly: {e}")
else:
print("✅ ensure_all_models_imported: all required tables registered in metadata")
except Exception as e:
print(f"⚠️ ensure_all_models_imported: model import check failed: {e}")
# Проверяем импорт моделей
ensure_all_models_imported()
def pytest_configure(config):
"""Pytest configuration hook - runs before any tests"""
# Redis is already patched at module level, no need to do it again
print("✅ Redis already patched at module level")
def force_create_all_tables(engine):
"""
Принудительно создает все таблицы, перезагружая модели если нужно.
Это помогает в CI среде где могут быть проблемы с метаданными.
"""
from sqlalchemy import inspect
import orm
# Перезагружаем все модули ORM для гарантии актуальности метаданных
importlib.reload(orm.base)
importlib.reload(orm.community)
importlib.reload(orm.author)
importlib.reload(orm.draft)
importlib.reload(orm.shout)
importlib.reload(orm.topic)
importlib.reload(orm.reaction)
importlib.reload(orm.invite)
importlib.reload(orm.notification)
importlib.reload(orm.collection)
importlib.reload(orm.rating)
# Получаем обновленную Base
from orm.base import BaseModel as Base
# Создаем все таблицы
Base.metadata.create_all(engine)
# Проверяем результат
inspector = inspect(engine)
created_tables = inspector.get_table_names()
print(f"🔧 Force created tables: {created_tables}")
# Если таблицы все еще не созданы, пробуем создать их по одной
if not created_tables:
print("🔄 No tables created, trying individual table creation...")
try:
# Импортируем все модели
from orm.community import Community, CommunityAuthor, CommunityFollower
from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark
from orm.draft import Draft, DraftAuthor, DraftTopic
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower
from orm.reaction import Reaction
from orm.invite import Invite
from orm.notification import Notification
from orm.collection import Collection
# Создаем таблицы по одной
tables_to_create = [
(Community, 'community'),
(CommunityAuthor, 'community_author'),
(CommunityFollower, 'community_follower'),
(Author, 'author'),
(AuthorFollower, 'author_follower'),
(AuthorRating, 'author_rating'),
(AuthorBookmark, 'author_bookmark'),
(Draft, 'draft'),
(DraftAuthor, 'draft_author'),
(DraftTopic, 'draft_topic'),
(Shout, 'shout'),
(ShoutAuthor, 'shout_author'),
(ShoutTopic, 'shout_topic'),
(ShoutReactionsFollower, 'shout_reactions_followers'),
(Topic, 'topic'),
(TopicFollower, 'topic_followers'),
(Reaction, 'reaction'),
(Invite, 'invite'),
(Notification, 'notification'),
(Collection, 'collection')
]
for model, table_name in tables_to_create:
try:
model.__table__.create(engine, checkfirst=True)
print(f"✅ Created table {table_name}")
except Exception as e:
print(f"⚠️ Failed to create table {table_name}: {e}")
# Проверяем результат
inspector = inspect(engine)
created_tables = inspector.get_table_names()
print(f"🔧 Individual table creation result: {created_tables}")
except Exception as e:
print(f"❌ Individual table creation failed: {e}")
return created_tables
def get_test_client():
"""
Создает и возвращает тестовый клиент для интеграционных тестов.
Returns:
TestClient: Клиент для выполнения тестовых запросов
"""
from starlette.testclient import TestClient
# Отложенный импорт для предотвращения циклических зависимостей
def _import_app():
from main import app
return app
return TestClient(_import_app())
@pytest.fixture(autouse=True, scope="session")
def _set_requests_default_timeout():
"""Глобально задаем таймаут по умолчанию для requests в тестах, чтобы исключить зависания.
🪓 Упрощение: мокаем методы requests, добавляя timeout=10, если он не указан.
"""
original_request = requests.sessions.Session.request
def request_with_default_timeout(self, method, url, **kwargs): # type: ignore[override]
if "timeout" not in kwargs:
kwargs["timeout"] = 10
return original_request(self, method, url, **kwargs)
requests.sessions.Session.request = request_with_default_timeout # type: ignore[assignment]
yield
requests.sessions.Session.request = original_request # type: ignore[assignment]
@pytest.fixture(scope="session")
def test_engine():
"""
Создает тестовый engine для всей сессии тестирования.
Использует in-memory SQLite для быстрых тестов.
"""
# Принудительно импортируем ВСЕ модели чтобы они были зарегистрированы в Base.metadata
import orm.base
import orm.community
import orm.author
import orm.draft
import orm.shout
import orm.topic
import orm.reaction
import orm.invite
import orm.notification
import orm.collection
# Явно импортируем классы для гарантии регистрации
from orm.base import BaseModel as Base
from orm.community import Community, CommunityAuthor, CommunityFollower
from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark
from orm.draft import Draft, DraftAuthor, DraftTopic
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower
from orm.reaction import Reaction
from orm.invite import Invite
from orm.notification import Notification
from orm.collection import Collection
# Инициализируем RBAC систему
import rbac
rbac.initialize_rbac()
engine = create_engine(
"sqlite:///:memory:", echo=False, poolclass=StaticPool, connect_args={"check_same_thread": False}
)
# Принудительно удаляем все таблицы и создаем заново
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
# Debug: проверяем какие таблицы созданы на уровне сессии
from sqlalchemy import inspect
inspector = inspect(engine)
session_tables = inspector.get_table_names()
print(f"🔍 Created tables in test_engine fixture: {session_tables}")
# Проверяем что все критические таблицы созданы
required_tables = [
'author', 'community', 'community_author', 'community_follower',
'draft', 'draft_author', 'draft_topic',
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
'collection', 'author_follower', 'author_rating', 'author_bookmark'
]
missing_tables = [table for table in required_tables if table not in session_tables]
if missing_tables:
print(f"❌ Missing tables in test_engine: {missing_tables}")
print(f"Available tables: {session_tables}")
# Fallback: попробуем создать отсутствующие таблицы явно
print("🔄 Attempting to create missing tables explicitly in test_engine...")
try:
# Создаем все таблицы снова с принудительным импортом моделей
Base.metadata.create_all(engine)
# Проверяем снова
inspector = inspect(engine)
updated_tables = inspector.get_table_names()
still_missing = [table for table in required_tables if table not in updated_tables]
if still_missing:
print(f"❌ Still missing tables after explicit creation: {still_missing}")
# Последняя попытка: создаем таблицы по одной
print("🔄 Last attempt: creating tables one by one...")
for table_name in still_missing:
try:
if table_name == 'community_author':
CommunityAuthor.__table__.create(engine, checkfirst=True)
elif table_name == 'community_follower':
CommunityFollower.__table__.create(engine, checkfirst=True)
elif table_name == 'author_follower':
AuthorFollower.__table__.create(engine, checkfirst=True)
elif table_name == 'author_rating':
AuthorRating.__table__.create(engine, checkfirst=True)
elif table_name == 'author_bookmark':
AuthorBookmark.__table__.create(engine, checkfirst=True)
elif table_name == 'draft_author':
DraftAuthor.__table__.create(engine, checkfirst=True)
elif table_name == 'draft_topic':
DraftTopic.__table__.create(engine, checkfirst=True)
elif table_name == 'shout_author':
ShoutAuthor.__table__.create(engine, checkfirst=True)
elif table_name == 'shout_topic':
ShoutTopic.__table__.create(engine, checkfirst=True)
elif table_name == 'shout_reactions_followers':
ShoutReactionsFollower.__table__.create(engine, checkfirst=True)
elif table_name == 'collection':
Collection.__table__.create(engine, checkfirst=True)
elif table_name == 'topic_followers':
TopicFollower.__table__.create(engine, checkfirst=True)
elif table_name == 'author':
Author.__table__.create(engine, checkfirst=True)
elif table_name == 'shout':
Shout.__table__.create(engine, checkfirst=True)
elif table_name == 'draft':
Draft.__table__.create(engine, checkfirst=True)
elif table_name == 'topic':
Topic.__table__.create(engine, checkfirst=True)
elif table_name == 'reaction':
Reaction.__table__.create(engine, checkfirst=True)
elif table_name == 'invite':
Invite.__table__.create(engine, checkfirst=True)
elif table_name == 'notification':
Notification.__table__.create(engine, checkfirst=True)
elif table_name == 'community':
Community.__table__.create(engine, checkfirst=True)
print(f"✅ Created table {table_name}")
except Exception as e:
print(f"❌ Failed to create table {table_name}: {e}")
# Финальная проверка
inspector = inspect(engine)
final_tables = inspector.get_table_names()
final_missing = [table for table in required_tables if table not in final_tables]
if final_missing:
print(f"❌ Still missing tables after individual creation: {final_missing}")
print("🔄 Last resort: forcing table creation with module reload...")
try:
final_tables = force_create_all_tables(engine)
final_missing = [table for table in required_tables if table not in final_tables]
if final_missing:
raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}")
else:
print("✅ All missing tables created successfully with force creation")
except Exception as e:
print(f"❌ Force creation failed: {e}")
raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}")
else:
print("✅ All missing tables created successfully in test_engine")
else:
print("✅ All missing tables created successfully in test_engine")
except Exception as e:
print(f"❌ Failed to create missing tables in test_engine: {e}")
raise
else:
print("✅ All required tables created in test_engine")
yield engine
# Cleanup после всех тестов
Base.metadata.drop_all(engine)
@pytest.fixture(scope="session")
def test_session_factory(test_engine):
"""
Создает фабрику сессий для тестирования.
"""
return sessionmaker(bind=test_engine, expire_on_commit=False)
@pytest.fixture
def db_session(test_session_factory, test_engine):
"""
Создает новую сессию БД для каждого теста.
Простая реализация без вложенных транзакций.
"""
# Принудительно пересоздаем таблицы для каждого теста
from orm.base import BaseModel as Base
from sqlalchemy import inspect
# Убеждаемся что все модели импортированы перед созданием таблиц
# Явно импортируем все модели чтобы они были зарегистрированы в Base.metadata
from orm.community import Community, CommunityAuthor, CommunityFollower
from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark
from orm.draft import Draft, DraftAuthor, DraftTopic
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower
from orm.reaction import Reaction
from orm.invite import Invite
from orm.notification import Notification
from orm.collection import Collection
# Проверяем что все модели зарегистрированы
print(f"🔍 Registered tables in Base.metadata: {list(Base.metadata.tables.keys())}")
# Убеждаемся что все критические таблицы зарегистрированы в metadata
required_tables = [
'author', 'community', 'community_author', 'community_follower',
'draft', 'draft_author', 'draft_topic',
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
'collection', 'author_follower', 'author_rating', 'author_bookmark'
]
missing_metadata_tables = [table for table in required_tables if table not in Base.metadata.tables]
if missing_metadata_tables:
print(f"❌ Missing tables in Base.metadata: {missing_metadata_tables}")
print("Available tables:", list(Base.metadata.tables.keys()))
raise RuntimeError(f"Critical tables not registered in Base.metadata: {missing_metadata_tables}")
else:
print("✅ All required tables registered in Base.metadata")
# Удаляем все таблицы
Base.metadata.drop_all(test_engine)
# Создаем таблицы заново
Base.metadata.create_all(test_engine)
# Если таблицы не создались, пробуем принудительно
inspector = inspect(test_engine)
created_tables = inspector.get_table_names()
if not created_tables:
print("🔄 No tables created with metadata.create_all, trying force creation...")
try:
force_create_all_tables(test_engine)
inspector = inspect(test_engine)
created_tables = inspector.get_table_names()
print(f"🔧 Force creation result: {created_tables}")
except Exception as e:
print(f"❌ Force creation failed: {e}")
# Последняя попытка: создаем таблицы по одной
print("🔄 Last resort: creating tables one by one...")
for model, table_name in [
(Community, 'community'),
(CommunityAuthor, 'community_author'),
(CommunityFollower, 'community_follower'),
(Author, 'author'),
(AuthorFollower, 'author_follower'),
(AuthorRating, 'author_rating'),
(AuthorBookmark, 'author_bookmark'),
(Draft, 'draft'),
(DraftAuthor, 'draft_author'),
(DraftTopic, 'draft_topic'),
(Shout, 'shout'),
(ShoutAuthor, 'shout_author'),
(ShoutTopic, 'shout_topic'),
(ShoutReactionsFollower, 'shout_reactions_followers'),
(Topic, 'topic'),
(TopicFollower, 'topic_followers'),
(Reaction, 'reaction'),
(Invite, 'invite'),
(Notification, 'notification'),
(Collection, 'collection')
]:
try:
model.__table__.create(test_engine, checkfirst=True)
print(f"✅ Created table {table_name}")
except Exception as e:
print(f"⚠️ Failed to create table {table_name}: {e}")
# Финальная проверка
inspector = inspect(test_engine)
created_tables = inspector.get_table_names()
print(f"🔧 Individual creation result: {created_tables}")
# Debug: проверяем какие таблицы созданы
inspector = inspect(test_engine)
created_tables = inspector.get_table_names()
print(f"🔍 Created tables in db_session fixture: {created_tables}")
# Проверяем что все критические таблицы созданы
required_tables = [
'author', 'community', 'community_author', 'community_follower',
'draft', 'draft_author', 'draft_topic',
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
'collection', 'author_follower', 'author_rating', 'author_bookmark'
]
missing_tables = [table for table in required_tables if table not in created_tables]
if missing_tables:
print(f"❌ Missing tables in db_session: {missing_tables}")
print(f"Available tables: {created_tables}")
# Fallback: попробуем создать отсутствующие таблицы явно
print("🔄 Attempting to create missing tables explicitly in db_session...")
try:
# Создаем все таблицы снова с принудительным импортом моделей
Base.metadata.create_all(test_engine)
# Проверяем снова
inspector = inspect(test_engine)
updated_tables = inspector.get_table_names()
still_missing = [table for table in required_tables if table not in updated_tables]
if still_missing:
print(f"❌ Still missing tables after explicit creation: {still_missing}")
# Последняя попытка: создаем таблицы по одной
print("🔄 Last attempt: creating tables one by one...")
for table_name in still_missing:
try:
if table_name == 'community_author':
CommunityAuthor.__table__.create(test_engine, checkfirst=True)
elif table_name == 'community_follower':
CommunityFollower.__table__.create(test_engine, checkfirst=True)
elif table_name == 'author_follower':
AuthorFollower.__table__.create(test_engine, checkfirst=True)
elif table_name == 'author_rating':
AuthorRating.__table__.create(test_engine, checkfirst=True)
elif table_name == 'author_bookmark':
AuthorBookmark.__table__.create(test_engine, checkfirst=True)
elif table_name == 'draft_author':
DraftAuthor.__table__.create(test_engine, checkfirst=True)
elif table_name == 'draft_topic':
DraftTopic.__table__.create(test_engine, checkfirst=True)
elif table_name == 'shout_author':
ShoutAuthor.__table__.create(test_engine, checkfirst=True)
elif table_name == 'shout_topic':
ShoutTopic.__table__.create(test_engine, checkfirst=True)
elif table_name == 'shout_reactions_followers':
ShoutReactionsFollower.__table__.create(test_engine, checkfirst=True)
elif table_name == 'collection':
Collection.__table__.create(test_engine, checkfirst=True)
elif table_name == 'topic_followers':
TopicFollower.__table__.create(test_engine, checkfirst=True)
elif table_name == 'author':
Author.__table__.create(test_engine, checkfirst=True)
elif table_name == 'shout':
Shout.__table__.create(test_engine, checkfirst=True)
elif table_name == 'draft':
Draft.__table__.create(test_engine, checkfirst=True)
elif table_name == 'topic':
Topic.__table__.create(test_engine, checkfirst=True)
elif table_name == 'reaction':
Reaction.__table__.create(test_engine, checkfirst=True)
elif table_name == 'invite':
Invite.__table__.create(test_engine, checkfirst=True)
elif table_name == 'notification':
Notification.__table__.create(test_engine, checkfirst=True)
elif table_name == 'community':
Community.__table__.create(test_engine, checkfirst=True)
print(f"✅ Created table {table_name}")
except Exception as e:
print(f"❌ Failed to create table {table_name}: {e}")
# Финальная проверка
inspector = inspect(test_engine)
final_tables = inspector.get_table_names()
final_missing = [table for table in required_tables if table not in final_tables]
if final_missing:
print(f"❌ Still missing tables after individual creation: {final_missing}")
print("🔄 Last resort: forcing table creation with module reload...")
try:
final_tables = force_create_all_tables(test_engine)
final_missing = [table for table in required_tables if table not in final_tables]
if final_missing:
raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}")
else:
print("✅ All missing tables created successfully with force creation")
except Exception as e:
print(f"❌ Force creation failed: {e}")
raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}")
else:
print("✅ All missing tables created successfully in db_session")
else:
print("✅ All missing tables created successfully in db_session")
except Exception as e:
print(f"❌ Failed to create missing tables in db_session: {e}")
raise
else:
print("✅ All required tables created in db_session")
# Создаем сессию
session = test_session_factory()
# Создаем базовые данные для тестов
try:
# Создаем главное сообщество
main_community = Community(
id=1,
name="Main Community",
slug="main",
description="Main test community"
)
session.add(main_community)
session.commit()
print("✅ Base test data created successfully")
except Exception as e:
print(f"⚠️ Warning: Failed to create base test data: {e}")
# Не падаем, если не удалось создать базовые данные
yield session
# Cleanup
session.close()
@pytest.fixture
def db_session_commit(test_session_factory):
"""
Создает сессию БД с реальными commit'ами для интеграционных тестов.
Используется когда нужно тестировать реальные транзакции.
"""
session = test_session_factory()
# Создаем дефолтное сообщество для тестов
from orm.community import Community
from orm.author import Author
# Создаем системного автора если его нет
system_author = session.query(Author).where(Author.slug == "system").first()
if not system_author:
system_author = Author(
name="System",
slug="system",
email="system@test.local",
created_at=int(time.time()),
updated_at=int(time.time()),
last_seen=int(time.time())
)
session.add(system_author)
session.commit()
# Создаем дефолтное сообщество если его нет
default_community = session.query(Community).where(Community.id == 1).first()
if not default_community:
default_community = Community(
id=1,
name="Главное сообщество",
slug="main",
desc="Основное сообщество для тестов",
pic="",
created_at=int(time.time()),
created_by=system_author.id,
settings={"default_roles": ["reader", "author"], "available_roles": ["reader", "author", "artist", "expert", "editor", "admin"]},
private=False
)
session.add(default_community)
session.commit()
yield session
# Очищаем все данные после теста
try:
for table in reversed(Base.metadata.sorted_tables):
session.execute(table.delete())
session.commit()
except Exception:
session.rollback()
finally:
session.close()
@pytest.fixture
def frontend_url():
"""
Возвращает URL фронтенда для тестов.
"""
return FRONTEND_URL or "http://localhost:3000"
@pytest.fixture
def backend_url():
"""
Возвращает URL бэкенда для тестов.
"""
return "http://localhost:8000"
@pytest.fixture(scope="session")
def backend_server():
"""
🚀 Фикстура для автоматического запуска/остановки бэкенд сервера.
Запускает сервер только если он не запущен.
"""
backend_process: Optional[subprocess.Popen] = None
backend_running = False
# Проверяем, не запущен ли уже сервер
try:
response = requests.get("http://localhost:8000/", timeout=2)
if response.status_code == 200:
print("✅ Бэкенд сервер уже запущен")
backend_running = True
else:
backend_running = False
except:
backend_running = False
if not backend_running:
print("🔄 Запускаем бэкенд сервер для тестов...")
try:
# Запускаем бэкенд сервер с тестовой базой данных
env = os.environ.copy()
env["DATABASE_URL"] = "sqlite:///test_e2e.db" # Используем тестовую БД для e2e
env["TESTING"] = "true"
backend_process = subprocess.Popen(
["uv", "run", "python", "dev.py"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=env,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
# Ждем запуска бэкенда
print("⏳ Ждем запуска бэкенда...")
for i in range(30): # Ждем максимум 30 секунд
try:
response = requests.get("http://localhost:8000/", timeout=2)
if response.status_code == 200:
print("✅ Бэкенд сервер запущен")
backend_running = True
break
except:
pass
time.sleep(1)
else:
print("❌ Бэкенд сервер не запустился за 30 секунд")
if backend_process:
backend_process.terminate()
backend_process.wait()
raise Exception("Бэкенд сервер не запустился за 30 секунд")
except Exception as e:
print(f"❌ Ошибка запуска сервера: {e}")
if backend_process:
backend_process.terminate()
backend_process.wait()
raise Exception(f"Не удалось запустить бэкенд сервер: {e}")
yield backend_running
# Cleanup: останавливаем сервер только если мы его запускали
if backend_process and not backend_running:
print("🛑 Останавливаем бэкенд сервер...")
try:
backend_process.terminate()
backend_process.wait(timeout=10)
except subprocess.TimeoutExpired:
backend_process.kill()
backend_process.wait()
@pytest.fixture(scope="session")
def frontend_server():
"""
🚀 Фикстура для автоматического запуска/остановки фронтенд сервера.
Запускает фронтенд только если он не запущен.
"""
frontend_process: Optional[subprocess.Popen] = None
frontend_running = False
# Проверяем, не запущен ли уже фронтенд
try:
response = requests.get("http://localhost:3000/", timeout=2)
if response.status_code == 200:
print("✅ Фронтенд сервер уже запущен")
frontend_running = True
else:
frontend_running = False
except:
frontend_running = False
if not frontend_running:
print("🔄 Запускаем фронтенд сервер для тестов...")
try:
# Проверяем наличие node_modules
node_modules_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "node_modules")
if not os.path.exists(node_modules_path):
print("📦 Устанавливаем зависимости фронтенда...")
subprocess.run(["npm", "install"], check=True,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Запускаем фронтенд сервер
env = os.environ.copy()
env["NODE_ENV"] = "development"
frontend_process = subprocess.Popen(
["npm", "run", "dev"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=env,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
# Ждем запуска фронтенда
print("⏳ Ждем запуска фронтенда...")
for i in range(60): # Ждем максимум 60 секунд
try:
response = requests.get("http://localhost:3000/", timeout=2)
if response.status_code == 200:
print("✅ Фронтенд сервер запущен")
frontend_running = True
break
except:
pass
time.sleep(1)
else:
print("❌ Фронтенд сервер не запустился за 60 секунд")
if frontend_process:
frontend_process.terminate()
frontend_process.wait()
# Не падаем жестко, а возвращаем False
frontend_running = False
except Exception as e:
print(f"❌ Ошибка запуска фронтенда: {e}")
if frontend_process:
frontend_process.terminate()
frontend_process.wait()
# Не падаем жестко, а возвращаем False
frontend_running = False
yield frontend_running
# Cleanup: останавливаем фронтенд только если мы его запускали
if frontend_process:
print("🛑 Останавливаем фронтенд сервер...")
try:
frontend_process.terminate()
frontend_process.wait(timeout=10)
except subprocess.TimeoutExpired:
frontend_process.kill()
frontend_process.wait()
@pytest.fixture
def test_client(backend_server):
"""
🧪 Создает тестовый клиент для API тестов.
Требует запущенный бэкенд сервер.
"""
return get_test_client()
@pytest.fixture
async def browser_context():
"""
🌐 Создает контекст браузера для e2e тестов.
Автоматически управляет жизненным циклом браузера.
"""
try:
from playwright.async_api import async_playwright
except ImportError:
pytest.skip("Playwright не установлен")
async with async_playwright() as p:
# Определяем headless режим
headless = os.getenv("PLAYWRIGHT_HEADLESS", "true").lower() == "true"
browser = await p.chromium.launch(
headless=headless,
args=[
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--disable-web-security",
"--disable-features=VizDisplayCompositor"
]
)
context = await browser.new_context(
viewport={"width": 1280, "height": 720},
ignore_https_errors=True,
java_script_enabled=True
)
yield context
await context.close()
await browser.close()
@pytest.fixture
async def page(browser_context):
"""
📄 Создает новую страницу для каждого теста.
"""
page = await browser_context.new_page()
# Устанавливаем таймауты
page.set_default_timeout(30000)
page.set_default_navigation_timeout(30000)
yield page
await page.close()
@pytest.fixture
def api_base_url(backend_server):
"""
🔗 Возвращает базовый URL для API тестов.
"""
return "http://localhost:8000/graphql"
@pytest.fixture
def test_user_credentials():
"""
👤 Возвращает тестовые учетные данные для авторизации из переменных окружения.
"""
import os
# 🔍 Используем переменные окружения TEST_LOGIN и TEST_PASSWORD
test_email = os.getenv("TEST_LOGIN", "test_admin@discours.io")
test_password = os.getenv("TEST_PASSWORD", "password123")
return {
"email": test_email,
"password": test_password
}
@pytest.fixture
def create_test_users_in_backend_db():
"""
👥 Создает нового тестового пользователя в базе данных бэкенда для E2E тестов.
🔍 Всегда создает нового пользователя с уникальным email на основе TEST_LOGIN/TEST_PASSWORD
🏘️ Сначала создает базовое сообщество, если его нет
"""
import requests
import time
import os
import uuid
# 🔍 Используем переменные окружения TEST_LOGIN и TEST_PASSWORD
base_email = os.getenv("TEST_LOGIN", "test_admin@discours.io")
test_password = os.getenv("TEST_PASSWORD", "password123")
# 🔍 Создаем уникальный email для каждого теста
unique_suffix = str(uuid.uuid4())[:8]
if "@" in base_email:
email_parts = base_email.split("@")
test_email = f"{email_parts[0]}+{unique_suffix}@{email_parts[1]}"
else:
test_email = f"{base_email}+{unique_suffix}@discours.io"
# 🏘️ Создаем базовое сообщество в тестовой БД E2E через ORM
print("🏘️ Создаем базовое сообщество в тестовой БД E2E...")
try:
from orm.community import Community
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import time
# Используем ту же БД, что и E2E бэкенд сервер
test_engine = create_engine("sqlite:///test_e2e.db", echo=False)
TestSession = sessionmaker(bind=test_engine)
with TestSession() as session:
# Проверяем, есть ли сообщество с ID=1
existing_community = session.query(Community).where(Community.id == 1).first()
if not existing_community:
# Создаем базовое сообщество
base_community = Community(
name="Test Community",
slug="test-community",
desc="Base community for E2E tests",
pic="",
created_at=int(time.time()),
created_by=None, # Создатель не обязателен
settings={},
private=False
)
session.add(base_community)
session.commit()
print("✅ Базовое сообщество создано в тестовой БД E2E")
else:
print("✅ Базовое сообщество уже существует в тестовой БД E2E")
except Exception as e:
print(f"⚠️ Ошибка создания базового сообщества: {e}")
# Продолжаем выполнение - возможно, сообщество уже есть
# Создаем пользователя через API
register_user_mutation = """
mutation RegisterUser($email: String!, $password: String!, $name: String) {
registerUser(email: $email, password: $password, name: $name) {
success
author {
id
email
name
}
error
}
}
"""
# Создаем нового админа с уникальным email
admin_data = {
"email": test_email,
"password": test_password,
"name": f"Test Admin {unique_suffix}"
}
print(f"🔍 Создаем нового тестового пользователя: {test_email}")
try:
response = requests.post(
"http://localhost:8000/graphql",
json={"query": register_user_mutation, "variables": admin_data},
headers={"Content-Type": "application/json"},
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get("data", {}).get("registerUser", {}).get("success"):
print(f"✅ Новый тестовый пользователь создан: {test_email}")
return {"email": test_email, "password": test_password, "created": True}
else:
error = data.get("data", {}).get("registerUser", {}).get("error")
print(f"⚠️ Ошибка создания тестового пользователя: {error}")
return {"email": test_email, "password": test_password, "created": False, "error": error}
else:
print(f"⚠️ HTTP ошибка при создании тестового пользователя: {response.status_code}")
return {"email": test_email, "password": test_password, "created": False}
except Exception as e:
print(f"⚠️ Ошибка при создании тестового пользователя: {e}")
return {"email": test_email, "password": test_password, "created": False, "error": str(e)}
# Ждем немного для завершения операции
time.sleep(1)
@pytest.fixture
def auth_headers(api_base_url, test_user_credentials):
"""
🔐 Создает заголовки авторизации для API тестов.
"""
def _get_auth_headers(token: Optional[str] = None):
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
return _get_auth_headers
@pytest.fixture
def wait_for_server():
"""
⏳ Утилита для ожидания готовности сервера.
"""
def _wait_for_server(url: str, max_attempts: int = 30, delay: float = 1.0):
"""Ждет готовности сервера по указанному URL."""
for attempt in range(max_attempts):
try:
response = requests.get(url, timeout=2)
if response.status_code == 200:
return True
except:
pass
time.sleep(delay)
return False
return _wait_for_server
@pytest.fixture
def test_users(db_session):
"""Создает тестовых пользователей для тестов"""
import os
from orm.author import Author
# 🔍 Используем переменные окружения TEST_LOGIN и TEST_PASSWORD
test_email = os.getenv("TEST_LOGIN", "test_admin@discours.io")
test_password = os.getenv("TEST_PASSWORD", "password123")
# Создаем первого пользователя (администратор)
# Этот email должен быть в ADMIN_EMAILS для автоматического получения роли admin
admin_user = Author(
slug="test-admin",
email=test_email,
name="Test Admin",
bio="Test admin user for testing",
pic="https://example.com/avatar1.jpg",
oauth={}
)
admin_user.set_password(test_password)
db_session.add(admin_user)
# Создаем второго пользователя (обычный пользователь)
regular_user = Author(
slug="test-user",
email="test_user@discours.io",
name="Test User",
bio="Test regular user for testing",
pic="https://example.com/avatar2.jpg",
oauth={}
)
regular_user.set_password("password456")
db_session.add(regular_user)
# Создаем третьего пользователя (только читатель)
reader_user = Author(
slug="test-reader",
email="test_reader@discours.io",
name="Test Reader",
bio="Test reader user for testing",
pic="https://example.com/avatar3.jpg",
oauth={}
)
reader_user.set_password("password789")
db_session.add(reader_user)
# Сохраняем изменения с паролями
db_session.commit()
# Создаем сообщество с ID 1 и назначаем роли
from orm.community import Community, CommunityAuthor
# Проверяем, существует ли сообщество с ID 1
existing_community = db_session.query(Community).where(Community.id == 1).first()
if existing_community:
community = existing_community
else:
# Создаем сообщество с ID 1
community = Community(
id=1,
name="Test Community",
slug="test-community",
desc="A test community for testing purposes",
created_by=admin_user.id,
settings={"default_roles": ["reader", "author"]}
)
db_session.add(community)
db_session.commit()
# Назначаем роли пользователям (если их еще нет)
# Для admin_user не назначаем роль admin вручную - она определяется автоматически по email
existing_admin_ca = db_session.query(CommunityAuthor).where(
CommunityAuthor.community_id == community.id,
CommunityAuthor.author_id == admin_user.id
).first()
if not existing_admin_ca:
admin_ca = CommunityAuthor(
community_id=community.id,
author_id=admin_user.id,
roles="author,reader" # admin роль добавляется автоматически по email
)
db_session.add(admin_ca)
existing_regular_ca = db_session.query(CommunityAuthor).where(
CommunityAuthor.community_id == community.id,
CommunityAuthor.author_id == regular_user.id
).first()
if not existing_regular_ca:
regular_ca = CommunityAuthor(
community_id=community.id,
author_id=regular_user.id,
roles="author,reader"
)
db_session.add(regular_ca)
existing_reader_ca = db_session.query(CommunityAuthor).where(
CommunityAuthor.community_id == community.id,
CommunityAuthor.author_id == reader_user.id
).first()
if not existing_reader_ca:
reader_ca = CommunityAuthor(
community_id=community.id,
author_id=reader_user.id,
roles="reader"
)
db_session.add(reader_ca)
db_session.commit()
return [admin_user, regular_user, reader_user]
@pytest.fixture
def test_community(db_session, test_users):
"""Создает тестовое сообщество для тестов"""
from orm.community import Community
# Создаем сообщество с ID 2, так как ID 1 уже занят основным сообществом
community = Community(
id=2, # Используем ID 2, чтобы не конфликтовать с основным сообществом
name="Test Community Fixture",
slug="test-community-fixture", # Уникальный slug для этой фикстуры
desc="A test community for testing purposes",
created_by=test_users[0].id, # Администратор создает сообщество
settings={
"default_roles": ["reader", "author"],
"custom_setting": "custom_value"
}
)
db_session.add(community)
db_session.commit()
return community
@pytest.fixture
def community_with_creator(db_session, test_users):
"""Создает сообщество с создателем"""
from orm.community import Community
community = Community(
name="Community With Creator",
slug="community-with-creator",
desc="A test community with a creator",
created_by=test_users[0].id,
settings={"default_roles": ["reader", "author"]}
)
db_session.add(community)
db_session.commit()
return community
@pytest.fixture
def community_without_creator(db_session):
"""Создает сообщество без создателя"""
from orm.community import Community
community = Community(
name="Community Without Creator",
slug="community-without-creator",
desc="A test community without a creator",
created_by=None, # Без создателя
settings={"default_roles": ["reader"]}
)
db_session.add(community)
db_session.commit()
return community
@pytest.fixture
def admin_user_with_roles(db_session, test_users, test_community):
"""Создает администратора с ролями в сообществе"""
from orm.community import CommunityAuthor
ca = CommunityAuthor(
community_id=test_community.id,
author_id=test_users[0].id,
roles="admin,author,reader"
)
db_session.add(ca)
db_session.commit()
return test_users[0]
@pytest.fixture
def regular_user_with_roles(db_session, test_users, test_community):
"""Создает обычного пользователя с ролями в сообществе"""
from orm.community import CommunityAuthor
ca = CommunityAuthor(
community_id=test_community.id,
author_id=test_users[1].id,
roles="author,reader"
)
db_session.add(ca)
db_session.commit()
return test_users[1]
@pytest.fixture
def mock_verify(monkeypatch):
"""Мокает функцию верификации для тестов"""
from unittest.mock import AsyncMock
mock = AsyncMock()
# Здесь можно настроить возвращаемые значения по умолчанию
return mock
@pytest.fixture
def redis_client():
"""Создает Redis клиент для тестов токенов"""
from storage.redis import RedisService
redis_service = RedisService()
return redis_service._client
@pytest.fixture
def fake_redis():
"""Создает fakeredis экземпляр для тестов"""
try:
import fakeredis.aioredis
return fakeredis.aioredis.FakeRedis()
except ImportError:
pytest.skip("fakeredis не установлен - установите: pip install fakeredis[aioredis]")
@pytest.fixture(autouse=True)
def ensure_rbac_initialized():
"""Обеспечивает инициализацию RBAC системы для каждого теста"""
import rbac
rbac.initialize_rbac()
yield
@pytest.fixture(autouse=True)
def mock_redis_globally():
"""Глобально мокает Redis для всех тестов, включая e2e"""
try:
import fakeredis.aioredis
# Создаем fakeredis сервер
fake_redis = fakeredis.aioredis.FakeRedis()
# Патчим глобальный redis экземпляр
with patch('storage.redis.redis') as mock_redis:
# Эмулируем RedisService.execute метод
async def mock_execute(command: str, *args):
cmd_method = getattr(fake_redis, command.lower(), None)
if cmd_method is not None:
if hasattr(cmd_method, '__call__'):
return await cmd_method(*args)
else:
return cmd_method
return None
# Патчим все основные методы Redis
mock_redis.execute = mock_execute
mock_redis.get = fake_redis.get
mock_redis.set = fake_redis.set
mock_redis.delete = fake_redis.delete
mock_redis.exists = fake_redis.exists
mock_redis.ping = fake_redis.ping
mock_redis.hset = fake_redis.hset
mock_redis.hget = fake_redis.hget
mock_redis.hgetall = fake_redis.hgetall
mock_redis.hdel = fake_redis.hdel
mock_redis.expire = fake_redis.expire
mock_redis.ttl = fake_redis.ttl
mock_redis.keys = fake_redis.keys
mock_redis.scan = fake_redis.scan
mock_redis.is_connected = True
# Async методы для connect/disconnect
async def mock_connect():
return True
async def mock_disconnect():
pass
mock_redis.connect = mock_connect
mock_redis.disconnect = mock_disconnect
yield
except ImportError:
# Если fakeredis не доступен, используем базовый mock
with patch('storage.redis.redis') as mock_redis:
mock_redis.execute.return_value = None
mock_redis.get.return_value = None
mock_redis.set.return_value = True
mock_redis.delete.return_value = True
mock_redis.exists.return_value = False
mock_redis.ping.return_value = True
mock_redis.hset.return_value = True
mock_redis.hget.return_value = None
mock_redis.hgetall.return_value = {}
mock_redis.hdel.return_value = True
mock_redis.expire.return_value = True
mock_redis.ttl.return_value = -1
mock_redis.keys.return_value = []
mock_redis.scan.return_value = ([], 0)
mock_redis.is_connected = True
async def mock_connect():
return True
async def mock_disconnect():
pass
mock_redis.connect = mock_connect
mock_redis.disconnect = mock_disconnect
yield
@pytest.fixture(autouse=True)
def mock_redis_service_globally():
"""Глобально мокает RedisService для всех тестов, включая e2e"""
try:
import fakeredis.aioredis
# Создаем fakeredis сервер
fake_redis = fakeredis.aioredis.FakeRedis()
# Патчим RedisService класс
with patch('storage.redis.RedisService') as mock_service_class:
# Создаем mock экземпляр
mock_service = mock_service_class.return_value
# Эмулируем RedisService.execute метод
async def mock_execute(command: str, *args):
cmd_method = getattr(fake_redis, command.lower(), None)
if cmd_method is not None:
if hasattr(cmd_method, '__call__'):
return await cmd_method(*args)
else:
return cmd_method
return None
# Патчим все основные методы
mock_service.execute = mock_execute
mock_service.get = fake_redis.get
mock_service.set = fake_redis.set
mock_service.delete = fake_redis.delete
mock_service.exists = fake_redis.exists
mock_service.ping = fake_redis.ping
mock_service.hset = fake_redis.hset
mock_service.hget = fake_redis.hget
mock_service.hgetall = fake_redis.hgetall
mock_service.hdel = fake_redis.hdel
mock_service.expire = fake_redis.expire
mock_service.ttl = fake_redis.ttl
mock_service.keys = fake_redis.keys
mock_service.scan = fake_redis.scan
mock_service._client = fake_redis
mock_service.is_connected = True
# Async методы для connect/disconnect
async def mock_connect():
return True
async def mock_disconnect():
pass
mock_service.connect = mock_connect
mock_service.disconnect = mock_disconnect
yield
except ImportError:
# Если fakeredis не доступен, используем базовый mock
with patch('storage.redis.RedisService') as mock_service_class:
mock_service = mock_service_class.return_value
mock_service.execute.return_value = None
mock_service.get.return_value = None
mock_service.set.return_value = True
mock_service.delete.return_value = True
mock_service.exists.return_value = False
mock_service.ping.return_value = True
mock_service.hset.return_value = True
mock_service.hget.return_value = None
mock_service.hgetall.return_value = {}
mock_service.hdel.return_value = True
mock_service.expire.return_value = True
mock_service.ttl.return_value = -1
mock_service.keys.return_value = []
mock_service.scan.return_value = ([], 0)
mock_service.is_connected = True
async def mock_connect():
return True
async def mock_disconnect():
pass
mock_service.connect = mock_connect
mock_service.disconnect = mock_disconnect
yield