1564 lines
65 KiB
Python
1564 lines
65 KiB
Python
import pytest
|
||
import os
|
||
from settings import FRONTEND_URL
|
||
from sqlalchemy import create_engine
|
||
from sqlalchemy.orm import sessionmaker
|
||
from sqlalchemy.pool import StaticPool
|
||
import time
|
||
import requests
|
||
import subprocess
|
||
from typing import Optional
|
||
from unittest.mock import patch, MagicMock
|
||
import importlib
|
||
|
||
# 🚨 CRITICAL: Patch Redis BEFORE any other imports to prevent connection attempts
|
||
try:
|
||
import fakeredis.aioredis
|
||
|
||
# Create a fake Redis instance
|
||
fake_redis = fakeredis.aioredis.FakeRedis()
|
||
|
||
# Add the execute method that our Redis service needs
|
||
async def execute_method(command: str, *args):
|
||
"""Добавляем метод execute для совместимости с RedisService"""
|
||
cmd_method = getattr(fake_redis, command.lower(), None)
|
||
if cmd_method is not None:
|
||
if hasattr(cmd_method, '__call__'):
|
||
return await cmd_method(*args)
|
||
else:
|
||
return cmd_method
|
||
return None
|
||
|
||
fake_redis.execute = execute_method
|
||
|
||
# Patch Redis at module level
|
||
import storage.redis
|
||
|
||
# Mock the global redis instance
|
||
storage.redis.redis = fake_redis
|
||
|
||
print("✅ Redis patched with fakeredis at module level")
|
||
|
||
except ImportError:
|
||
print("❌ fakeredis not available, tests may fail")
|
||
|
||
from orm.base import BaseModel as Base
|
||
|
||
|
||
# 🚨 CRITICAL: Create all database tables at module level BEFORE any tests run
|
||
def ensure_all_tables_exist():
|
||
"""Создает все таблицы в in-memory базе для тестов"""
|
||
try:
|
||
# Create a temporary engine for table creation
|
||
temp_engine = create_engine(
|
||
"sqlite:///:memory:",
|
||
echo=False,
|
||
poolclass=StaticPool,
|
||
connect_args={"check_same_thread": False}
|
||
)
|
||
|
||
# Import all ORM modules to ensure they're registered
|
||
import orm.base
|
||
import orm.community
|
||
import orm.author
|
||
import orm.draft
|
||
import orm.shout
|
||
import orm.topic
|
||
import orm.reaction
|
||
import orm.invite
|
||
import orm.notification
|
||
import orm.collection
|
||
import orm.rating
|
||
|
||
# Force create all tables
|
||
Base.metadata.create_all(temp_engine)
|
||
|
||
# Verify tables were created
|
||
from sqlalchemy import inspect
|
||
inspector = inspect(temp_engine)
|
||
created_tables = inspector.get_table_names()
|
||
|
||
print(f"✅ Module-level table creation: {len(created_tables)} tables created")
|
||
print(f"📋 Tables: {created_tables}")
|
||
|
||
# Clean up
|
||
temp_engine.dispose()
|
||
|
||
except Exception as e:
|
||
print(f"❌ Module-level table creation failed: {e}")
|
||
# На CI не падаем, просто логируем ошибку
|
||
print(f"⚠️ Continuing despite table creation failure (CI environment)")
|
||
|
||
# Execute table creation immediately
|
||
ensure_all_tables_exist()
|
||
|
||
# Дополнительная проверка: убеждаемся что все модели импортированы
|
||
def ensure_all_models_imported():
|
||
"""Убеждается что все модели ORM импортированы и зарегистрированы"""
|
||
try:
|
||
# Принудительно импортируем все модели
|
||
import orm.base
|
||
import orm.community
|
||
import orm.author
|
||
import orm.draft
|
||
import orm.shout
|
||
import orm.topic
|
||
import orm.reaction
|
||
import orm.invite
|
||
import orm.notification
|
||
import orm.collection
|
||
import orm.rating
|
||
|
||
# Проверяем что все модели зарегистрированы
|
||
from orm.base import BaseModel as Base
|
||
registered_tables = list(Base.metadata.tables.keys())
|
||
print(f"🔍 ensure_all_models_imported: {len(registered_tables)} tables registered")
|
||
print(f"📋 Registered tables: {registered_tables}")
|
||
|
||
# Проверяем что все критические таблицы зарегистрированы
|
||
required_tables = [
|
||
'author', 'community', 'community_author', 'community_follower',
|
||
'draft', 'draft_author', 'draft_topic',
|
||
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
|
||
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
|
||
'collection', 'author_follower', 'author_rating', 'author_bookmark'
|
||
]
|
||
|
||
missing_tables = [table for table in required_tables if table not in registered_tables]
|
||
|
||
if missing_tables:
|
||
print(f"⚠️ ensure_all_models_imported: missing tables: {missing_tables}")
|
||
print(f"Available tables: {registered_tables}")
|
||
|
||
# Пробуем принудительно импортировать модели
|
||
try:
|
||
print("🔄 ensure_all_models_imported: attempting explicit model imports...")
|
||
# Явно импортируем все модели
|
||
from orm.community import Community, CommunityAuthor, CommunityFollower
|
||
from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark
|
||
from orm.draft import Draft, DraftAuthor, DraftTopic
|
||
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
|
||
from orm.topic import Topic, TopicFollower
|
||
from orm.reaction import Reaction
|
||
from orm.invite import Invite
|
||
from orm.notification import Notification
|
||
from orm.collection import Collection
|
||
|
||
# Проверяем снова
|
||
updated_tables = list(Base.metadata.tables.keys())
|
||
still_missing = [table for table in required_tables if table not in updated_tables]
|
||
if still_missing:
|
||
print(f"⚠️ ensure_all_models_imported: still missing tables: {still_missing}")
|
||
else:
|
||
print("✅ ensure_all_models_imported: all tables registered after explicit import")
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ ensure_all_models_imported: failed to import models explicitly: {e}")
|
||
else:
|
||
print("✅ ensure_all_models_imported: all required tables registered in metadata")
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ ensure_all_models_imported: model import check failed: {e}")
|
||
|
||
# Проверяем импорт моделей
|
||
ensure_all_models_imported()
|
||
|
||
|
||
def pytest_configure(config):
|
||
"""Pytest configuration hook - runs before any tests"""
|
||
# Redis is already patched at module level, no need to do it again
|
||
print("✅ Redis already patched at module level")
|
||
|
||
|
||
def force_create_all_tables(engine):
|
||
"""
|
||
Принудительно создает все таблицы, перезагружая модели если нужно.
|
||
Это помогает в CI среде где могут быть проблемы с метаданными.
|
||
"""
|
||
from sqlalchemy import inspect
|
||
import orm
|
||
|
||
# Перезагружаем все модули ORM для гарантии актуальности метаданных
|
||
importlib.reload(orm.base)
|
||
importlib.reload(orm.community)
|
||
importlib.reload(orm.author)
|
||
importlib.reload(orm.draft)
|
||
importlib.reload(orm.shout)
|
||
importlib.reload(orm.topic)
|
||
importlib.reload(orm.reaction)
|
||
importlib.reload(orm.invite)
|
||
importlib.reload(orm.notification)
|
||
importlib.reload(orm.collection)
|
||
importlib.reload(orm.rating)
|
||
|
||
# Получаем обновленную Base
|
||
from orm.base import BaseModel as Base
|
||
|
||
# Создаем все таблицы
|
||
Base.metadata.create_all(engine)
|
||
|
||
# Проверяем результат
|
||
inspector = inspect(engine)
|
||
created_tables = inspector.get_table_names()
|
||
print(f"🔧 Force created tables: {created_tables}")
|
||
|
||
# Если таблицы все еще не созданы, пробуем создать их по одной
|
||
if not created_tables:
|
||
print("🔄 No tables created, trying individual table creation...")
|
||
try:
|
||
# Импортируем все модели
|
||
from orm.community import Community, CommunityAuthor, CommunityFollower
|
||
from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark
|
||
from orm.draft import Draft, DraftAuthor, DraftTopic
|
||
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
|
||
from orm.topic import Topic, TopicFollower
|
||
from orm.reaction import Reaction
|
||
from orm.invite import Invite
|
||
from orm.notification import Notification
|
||
from orm.collection import Collection
|
||
|
||
# Создаем таблицы по одной
|
||
tables_to_create = [
|
||
(Community, 'community'),
|
||
(CommunityAuthor, 'community_author'),
|
||
(CommunityFollower, 'community_follower'),
|
||
(Author, 'author'),
|
||
(AuthorFollower, 'author_follower'),
|
||
(AuthorRating, 'author_rating'),
|
||
(AuthorBookmark, 'author_bookmark'),
|
||
(Draft, 'draft'),
|
||
(DraftAuthor, 'draft_author'),
|
||
(DraftTopic, 'draft_topic'),
|
||
(Shout, 'shout'),
|
||
(ShoutAuthor, 'shout_author'),
|
||
(ShoutTopic, 'shout_topic'),
|
||
(ShoutReactionsFollower, 'shout_reactions_followers'),
|
||
(Topic, 'topic'),
|
||
(TopicFollower, 'topic_followers'),
|
||
(Reaction, 'reaction'),
|
||
(Invite, 'invite'),
|
||
(Notification, 'notification'),
|
||
(Collection, 'collection')
|
||
]
|
||
|
||
for model, table_name in tables_to_create:
|
||
try:
|
||
model.__table__.create(engine, checkfirst=True)
|
||
print(f"✅ Created table {table_name}")
|
||
except Exception as e:
|
||
print(f"⚠️ Failed to create table {table_name}: {e}")
|
||
|
||
# Проверяем результат
|
||
inspector = inspect(engine)
|
||
created_tables = inspector.get_table_names()
|
||
print(f"🔧 Individual table creation result: {created_tables}")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Individual table creation failed: {e}")
|
||
|
||
return created_tables
|
||
|
||
|
||
def get_test_client():
|
||
"""
|
||
Создает и возвращает тестовый клиент для интеграционных тестов.
|
||
|
||
Returns:
|
||
TestClient: Клиент для выполнения тестовых запросов
|
||
"""
|
||
from starlette.testclient import TestClient
|
||
|
||
# Отложенный импорт для предотвращения циклических зависимостей
|
||
def _import_app():
|
||
from main import app
|
||
return app
|
||
|
||
return TestClient(_import_app())
|
||
|
||
|
||
@pytest.fixture(autouse=True, scope="session")
|
||
def _set_requests_default_timeout():
|
||
"""Глобально задаем таймаут по умолчанию для requests в тестах, чтобы исключить зависания.
|
||
|
||
🪓 Упрощение: мокаем методы requests, добавляя timeout=10, если он не указан.
|
||
"""
|
||
original_request = requests.sessions.Session.request
|
||
|
||
def request_with_default_timeout(self, method, url, **kwargs): # type: ignore[override]
|
||
if "timeout" not in kwargs:
|
||
kwargs["timeout"] = 10
|
||
return original_request(self, method, url, **kwargs)
|
||
|
||
requests.sessions.Session.request = request_with_default_timeout # type: ignore[assignment]
|
||
yield
|
||
requests.sessions.Session.request = original_request # type: ignore[assignment]
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def test_engine():
|
||
"""
|
||
Создает тестовый engine для всей сессии тестирования.
|
||
Использует in-memory SQLite для быстрых тестов.
|
||
"""
|
||
# Принудительно импортируем ВСЕ модели чтобы они были зарегистрированы в Base.metadata
|
||
import orm.base
|
||
import orm.community
|
||
import orm.author
|
||
import orm.draft
|
||
import orm.shout
|
||
import orm.topic
|
||
import orm.reaction
|
||
import orm.invite
|
||
import orm.notification
|
||
import orm.collection
|
||
|
||
# Явно импортируем классы для гарантии регистрации
|
||
from orm.base import BaseModel as Base
|
||
from orm.community import Community, CommunityAuthor, CommunityFollower
|
||
from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark
|
||
from orm.draft import Draft, DraftAuthor, DraftTopic
|
||
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
|
||
from orm.topic import Topic, TopicFollower
|
||
from orm.reaction import Reaction
|
||
from orm.invite import Invite
|
||
from orm.notification import Notification
|
||
from orm.collection import Collection
|
||
|
||
# Инициализируем RBAC систему
|
||
import rbac
|
||
rbac.initialize_rbac()
|
||
|
||
engine = create_engine(
|
||
"sqlite:///:memory:", echo=False, poolclass=StaticPool, connect_args={"check_same_thread": False}
|
||
)
|
||
|
||
# Принудительно удаляем все таблицы и создаем заново
|
||
Base.metadata.drop_all(engine)
|
||
Base.metadata.create_all(engine)
|
||
|
||
# Debug: проверяем какие таблицы созданы на уровне сессии
|
||
from sqlalchemy import inspect
|
||
inspector = inspect(engine)
|
||
session_tables = inspector.get_table_names()
|
||
print(f"🔍 Created tables in test_engine fixture: {session_tables}")
|
||
|
||
# Проверяем что все критические таблицы созданы
|
||
required_tables = [
|
||
'author', 'community', 'community_author', 'community_follower',
|
||
'draft', 'draft_author', 'draft_topic',
|
||
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
|
||
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
|
||
'collection', 'author_follower', 'author_rating', 'author_bookmark'
|
||
]
|
||
|
||
missing_tables = [table for table in required_tables if table not in session_tables]
|
||
|
||
if missing_tables:
|
||
print(f"❌ Missing tables in test_engine: {missing_tables}")
|
||
print(f"Available tables: {session_tables}")
|
||
|
||
# Fallback: попробуем создать отсутствующие таблицы явно
|
||
print("🔄 Attempting to create missing tables explicitly in test_engine...")
|
||
try:
|
||
# Создаем все таблицы снова с принудительным импортом моделей
|
||
Base.metadata.create_all(engine)
|
||
|
||
# Проверяем снова
|
||
inspector = inspect(engine)
|
||
updated_tables = inspector.get_table_names()
|
||
still_missing = [table for table in required_tables if table not in updated_tables]
|
||
|
||
if still_missing:
|
||
print(f"❌ Still missing tables after explicit creation: {still_missing}")
|
||
# Последняя попытка: создаем таблицы по одной
|
||
print("🔄 Last attempt: creating tables one by one...")
|
||
for table_name in still_missing:
|
||
try:
|
||
if table_name == 'community_author':
|
||
CommunityAuthor.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'community_follower':
|
||
CommunityFollower.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'author_follower':
|
||
AuthorFollower.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'author_rating':
|
||
AuthorRating.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'author_bookmark':
|
||
AuthorBookmark.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'draft_author':
|
||
DraftAuthor.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'draft_topic':
|
||
DraftTopic.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'shout_author':
|
||
ShoutAuthor.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'shout_topic':
|
||
ShoutTopic.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'shout_reactions_followers':
|
||
ShoutReactionsFollower.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'collection':
|
||
Collection.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'topic_followers':
|
||
TopicFollower.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'author':
|
||
Author.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'shout':
|
||
Shout.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'draft':
|
||
Draft.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'topic':
|
||
Topic.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'reaction':
|
||
Reaction.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'invite':
|
||
Invite.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'notification':
|
||
Notification.__table__.create(engine, checkfirst=True)
|
||
elif table_name == 'community':
|
||
Community.__table__.create(engine, checkfirst=True)
|
||
print(f"✅ Created table {table_name}")
|
||
except Exception as e:
|
||
print(f"❌ Failed to create table {table_name}: {e}")
|
||
|
||
# Финальная проверка
|
||
inspector = inspect(engine)
|
||
final_tables = inspector.get_table_names()
|
||
final_missing = [table for table in required_tables if table not in final_tables]
|
||
if final_missing:
|
||
print(f"❌ Still missing tables after individual creation: {final_missing}")
|
||
print("🔄 Last resort: forcing table creation with module reload...")
|
||
try:
|
||
final_tables = force_create_all_tables(engine)
|
||
final_missing = [table for table in required_tables if table not in final_tables]
|
||
if final_missing:
|
||
raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}")
|
||
else:
|
||
print("✅ All missing tables created successfully with force creation")
|
||
except Exception as e:
|
||
print(f"❌ Force creation failed: {e}")
|
||
raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}")
|
||
else:
|
||
print("✅ All missing tables created successfully in test_engine")
|
||
else:
|
||
print("✅ All missing tables created successfully in test_engine")
|
||
except Exception as e:
|
||
print(f"❌ Failed to create missing tables in test_engine: {e}")
|
||
raise
|
||
else:
|
||
print("✅ All required tables created in test_engine")
|
||
|
||
yield engine
|
||
|
||
# Cleanup после всех тестов
|
||
Base.metadata.drop_all(engine)
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def test_session_factory(test_engine):
|
||
"""
|
||
Создает фабрику сессий для тестирования.
|
||
"""
|
||
return sessionmaker(bind=test_engine, expire_on_commit=False)
|
||
|
||
|
||
@pytest.fixture
|
||
def db_session(test_session_factory, test_engine):
|
||
"""
|
||
Создает новую сессию БД для каждого теста.
|
||
Простая реализация без вложенных транзакций.
|
||
"""
|
||
# Принудительно пересоздаем таблицы для каждого теста
|
||
from orm.base import BaseModel as Base
|
||
from sqlalchemy import inspect
|
||
|
||
# Убеждаемся что все модели импортированы перед созданием таблиц
|
||
# Явно импортируем все модели чтобы они были зарегистрированы в Base.metadata
|
||
from orm.community import Community, CommunityAuthor, CommunityFollower
|
||
from orm.author import Author, AuthorFollower, AuthorRating, AuthorBookmark
|
||
from orm.draft import Draft, DraftAuthor, DraftTopic
|
||
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
|
||
from orm.topic import Topic, TopicFollower
|
||
from orm.reaction import Reaction
|
||
from orm.invite import Invite
|
||
from orm.notification import Notification
|
||
from orm.collection import Collection
|
||
|
||
# Проверяем что все модели зарегистрированы
|
||
print(f"🔍 Registered tables in Base.metadata: {list(Base.metadata.tables.keys())}")
|
||
|
||
# Убеждаемся что все критические таблицы зарегистрированы в metadata
|
||
required_tables = [
|
||
'author', 'community', 'community_author', 'community_follower',
|
||
'draft', 'draft_author', 'draft_topic',
|
||
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
|
||
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
|
||
'collection', 'author_follower', 'author_rating', 'author_bookmark'
|
||
]
|
||
|
||
missing_metadata_tables = [table for table in required_tables if table not in Base.metadata.tables]
|
||
|
||
if missing_metadata_tables:
|
||
print(f"❌ Missing tables in Base.metadata: {missing_metadata_tables}")
|
||
print("Available tables:", list(Base.metadata.tables.keys()))
|
||
raise RuntimeError(f"Critical tables not registered in Base.metadata: {missing_metadata_tables}")
|
||
else:
|
||
print("✅ All required tables registered in Base.metadata")
|
||
|
||
# Удаляем все таблицы
|
||
Base.metadata.drop_all(test_engine)
|
||
|
||
# Создаем таблицы заново
|
||
Base.metadata.create_all(test_engine)
|
||
|
||
# Если таблицы не создались, пробуем принудительно
|
||
inspector = inspect(test_engine)
|
||
created_tables = inspector.get_table_names()
|
||
if not created_tables:
|
||
print("🔄 No tables created with metadata.create_all, trying force creation...")
|
||
try:
|
||
force_create_all_tables(test_engine)
|
||
inspector = inspect(test_engine)
|
||
created_tables = inspector.get_table_names()
|
||
print(f"🔧 Force creation result: {created_tables}")
|
||
except Exception as e:
|
||
print(f"❌ Force creation failed: {e}")
|
||
# Последняя попытка: создаем таблицы по одной
|
||
print("🔄 Last resort: creating tables one by one...")
|
||
for model, table_name in [
|
||
(Community, 'community'),
|
||
(CommunityAuthor, 'community_author'),
|
||
(CommunityFollower, 'community_follower'),
|
||
(Author, 'author'),
|
||
(AuthorFollower, 'author_follower'),
|
||
(AuthorRating, 'author_rating'),
|
||
(AuthorBookmark, 'author_bookmark'),
|
||
(Draft, 'draft'),
|
||
(DraftAuthor, 'draft_author'),
|
||
(DraftTopic, 'draft_topic'),
|
||
(Shout, 'shout'),
|
||
(ShoutAuthor, 'shout_author'),
|
||
(ShoutTopic, 'shout_topic'),
|
||
(ShoutReactionsFollower, 'shout_reactions_followers'),
|
||
(Topic, 'topic'),
|
||
(TopicFollower, 'topic_followers'),
|
||
(Reaction, 'reaction'),
|
||
(Invite, 'invite'),
|
||
(Notification, 'notification'),
|
||
(Collection, 'collection')
|
||
]:
|
||
try:
|
||
model.__table__.create(test_engine, checkfirst=True)
|
||
print(f"✅ Created table {table_name}")
|
||
except Exception as e:
|
||
print(f"⚠️ Failed to create table {table_name}: {e}")
|
||
|
||
# Финальная проверка
|
||
inspector = inspect(test_engine)
|
||
created_tables = inspector.get_table_names()
|
||
print(f"🔧 Individual creation result: {created_tables}")
|
||
|
||
# Debug: проверяем какие таблицы созданы
|
||
inspector = inspect(test_engine)
|
||
created_tables = inspector.get_table_names()
|
||
print(f"🔍 Created tables in db_session fixture: {created_tables}")
|
||
|
||
# Проверяем что все критические таблицы созданы
|
||
required_tables = [
|
||
'author', 'community', 'community_author', 'community_follower',
|
||
'draft', 'draft_author', 'draft_topic',
|
||
'shout', 'shout_author', 'shout_topic', 'shout_reactions_followers',
|
||
'topic', 'topic_followers', 'reaction', 'invite', 'notification',
|
||
'collection', 'author_follower', 'author_rating', 'author_bookmark'
|
||
]
|
||
|
||
missing_tables = [table for table in required_tables if table not in created_tables]
|
||
|
||
if missing_tables:
|
||
print(f"❌ Missing tables in db_session: {missing_tables}")
|
||
print(f"Available tables: {created_tables}")
|
||
|
||
# Fallback: попробуем создать отсутствующие таблицы явно
|
||
print("🔄 Attempting to create missing tables explicitly in db_session...")
|
||
try:
|
||
# Создаем все таблицы снова с принудительным импортом моделей
|
||
Base.metadata.create_all(test_engine)
|
||
|
||
# Проверяем снова
|
||
inspector = inspect(test_engine)
|
||
updated_tables = inspector.get_table_names()
|
||
still_missing = [table for table in required_tables if table not in updated_tables]
|
||
|
||
if still_missing:
|
||
print(f"❌ Still missing tables after explicit creation: {still_missing}")
|
||
# Последняя попытка: создаем таблицы по одной
|
||
print("🔄 Last attempt: creating tables one by one...")
|
||
for table_name in still_missing:
|
||
try:
|
||
if table_name == 'community_author':
|
||
CommunityAuthor.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'community_follower':
|
||
CommunityFollower.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'author_follower':
|
||
AuthorFollower.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'author_rating':
|
||
AuthorRating.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'author_bookmark':
|
||
AuthorBookmark.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'draft_author':
|
||
DraftAuthor.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'draft_topic':
|
||
DraftTopic.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'shout_author':
|
||
ShoutAuthor.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'shout_topic':
|
||
ShoutTopic.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'shout_reactions_followers':
|
||
ShoutReactionsFollower.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'collection':
|
||
Collection.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'topic_followers':
|
||
TopicFollower.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'author':
|
||
Author.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'shout':
|
||
Shout.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'draft':
|
||
Draft.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'topic':
|
||
Topic.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'reaction':
|
||
Reaction.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'invite':
|
||
Invite.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'notification':
|
||
Notification.__table__.create(test_engine, checkfirst=True)
|
||
elif table_name == 'community':
|
||
Community.__table__.create(test_engine, checkfirst=True)
|
||
print(f"✅ Created table {table_name}")
|
||
except Exception as e:
|
||
print(f"❌ Failed to create table {table_name}: {e}")
|
||
|
||
# Финальная проверка
|
||
inspector = inspect(test_engine)
|
||
final_tables = inspector.get_table_names()
|
||
final_missing = [table for table in required_tables if table not in final_tables]
|
||
if final_missing:
|
||
print(f"❌ Still missing tables after individual creation: {final_missing}")
|
||
print("🔄 Last resort: forcing table creation with module reload...")
|
||
try:
|
||
final_tables = force_create_all_tables(test_engine)
|
||
final_missing = [table for table in required_tables if table not in final_tables]
|
||
if final_missing:
|
||
raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}")
|
||
else:
|
||
print("✅ All missing tables created successfully with force creation")
|
||
except Exception as e:
|
||
print(f"❌ Force creation failed: {e}")
|
||
raise RuntimeError(f"Failed to create required tables after all attempts: {final_missing}")
|
||
else:
|
||
print("✅ All missing tables created successfully in db_session")
|
||
else:
|
||
print("✅ All missing tables created successfully in db_session")
|
||
except Exception as e:
|
||
print(f"❌ Failed to create missing tables in db_session: {e}")
|
||
raise
|
||
else:
|
||
print("✅ All required tables created in db_session")
|
||
|
||
# Создаем сессию
|
||
session = test_session_factory()
|
||
|
||
# Создаем базовые данные для тестов
|
||
try:
|
||
# Создаем главное сообщество
|
||
main_community = Community(
|
||
id=1,
|
||
name="Main Community",
|
||
slug="main",
|
||
description="Main test community"
|
||
)
|
||
session.add(main_community)
|
||
session.commit()
|
||
|
||
print("✅ Base test data created successfully")
|
||
except Exception as e:
|
||
print(f"⚠️ Warning: Failed to create base test data: {e}")
|
||
# Не падаем, если не удалось создать базовые данные
|
||
|
||
yield session
|
||
|
||
# Cleanup
|
||
session.close()
|
||
|
||
|
||
@pytest.fixture
|
||
def db_session_commit(test_session_factory):
|
||
"""
|
||
Создает сессию БД с реальными commit'ами для интеграционных тестов.
|
||
Используется когда нужно тестировать реальные транзакции.
|
||
"""
|
||
session = test_session_factory()
|
||
|
||
# Создаем дефолтное сообщество для тестов
|
||
from orm.community import Community
|
||
from orm.author import Author
|
||
|
||
# Создаем системного автора если его нет
|
||
system_author = session.query(Author).where(Author.slug == "system").first()
|
||
if not system_author:
|
||
system_author = Author(
|
||
name="System",
|
||
slug="system",
|
||
email="system@test.local",
|
||
created_at=int(time.time()),
|
||
updated_at=int(time.time()),
|
||
last_seen=int(time.time())
|
||
)
|
||
session.add(system_author)
|
||
session.commit()
|
||
|
||
# Создаем дефолтное сообщество если его нет
|
||
default_community = session.query(Community).where(Community.id == 1).first()
|
||
if not default_community:
|
||
default_community = Community(
|
||
id=1,
|
||
name="Главное сообщество",
|
||
slug="main",
|
||
desc="Основное сообщество для тестов",
|
||
pic="",
|
||
created_at=int(time.time()),
|
||
created_by=system_author.id,
|
||
settings={"default_roles": ["reader", "author"], "available_roles": ["reader", "author", "artist", "expert", "editor", "admin"]},
|
||
private=False
|
||
)
|
||
session.add(default_community)
|
||
session.commit()
|
||
|
||
yield session
|
||
|
||
# Очищаем все данные после теста
|
||
try:
|
||
for table in reversed(Base.metadata.sorted_tables):
|
||
session.execute(table.delete())
|
||
session.commit()
|
||
except Exception:
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
@pytest.fixture
|
||
def frontend_url():
|
||
"""
|
||
Возвращает URL фронтенда для тестов.
|
||
"""
|
||
return FRONTEND_URL or "http://localhost:3000"
|
||
|
||
|
||
@pytest.fixture
|
||
def backend_url():
|
||
"""
|
||
Возвращает URL бэкенда для тестов.
|
||
"""
|
||
return "http://localhost:8000"
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def backend_server():
|
||
"""
|
||
🚀 Фикстура для автоматического запуска/остановки бэкенд сервера.
|
||
Запускает сервер только если он не запущен.
|
||
"""
|
||
backend_process: Optional[subprocess.Popen] = None
|
||
backend_running = False
|
||
|
||
# Проверяем, не запущен ли уже сервер
|
||
try:
|
||
response = requests.get("http://localhost:8000/", timeout=2)
|
||
if response.status_code == 200:
|
||
print("✅ Бэкенд сервер уже запущен")
|
||
backend_running = True
|
||
else:
|
||
backend_running = False
|
||
except:
|
||
backend_running = False
|
||
|
||
if not backend_running:
|
||
print("🔄 Запускаем бэкенд сервер для тестов...")
|
||
try:
|
||
# Запускаем бэкенд сервер с тестовой базой данных
|
||
env = os.environ.copy()
|
||
env["DATABASE_URL"] = "sqlite:///test_e2e.db" # Используем тестовую БД для e2e
|
||
env["TESTING"] = "true"
|
||
|
||
backend_process = subprocess.Popen(
|
||
["uv", "run", "python", "dev.py"],
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
env=env,
|
||
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
)
|
||
|
||
# Ждем запуска бэкенда
|
||
print("⏳ Ждем запуска бэкенда...")
|
||
for i in range(30): # Ждем максимум 30 секунд
|
||
try:
|
||
response = requests.get("http://localhost:8000/", timeout=2)
|
||
if response.status_code == 200:
|
||
print("✅ Бэкенд сервер запущен")
|
||
backend_running = True
|
||
break
|
||
except:
|
||
pass
|
||
time.sleep(1)
|
||
else:
|
||
print("❌ Бэкенд сервер не запустился за 30 секунд")
|
||
if backend_process:
|
||
backend_process.terminate()
|
||
backend_process.wait()
|
||
raise Exception("Бэкенд сервер не запустился за 30 секунд")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Ошибка запуска сервера: {e}")
|
||
if backend_process:
|
||
backend_process.terminate()
|
||
backend_process.wait()
|
||
raise Exception(f"Не удалось запустить бэкенд сервер: {e}")
|
||
|
||
yield backend_running
|
||
|
||
# Cleanup: останавливаем сервер только если мы его запускали
|
||
if backend_process and not backend_running:
|
||
print("🛑 Останавливаем бэкенд сервер...")
|
||
try:
|
||
backend_process.terminate()
|
||
backend_process.wait(timeout=10)
|
||
except subprocess.TimeoutExpired:
|
||
backend_process.kill()
|
||
backend_process.wait()
|
||
|
||
|
||
@pytest.fixture(scope="session")
|
||
def frontend_server():
|
||
"""
|
||
🚀 Фикстура для автоматического запуска/остановки фронтенд сервера.
|
||
Запускает фронтенд только если он не запущен.
|
||
"""
|
||
frontend_process: Optional[subprocess.Popen] = None
|
||
frontend_running = False
|
||
|
||
# Проверяем, не запущен ли уже фронтенд
|
||
try:
|
||
response = requests.get("http://localhost:3000/", timeout=2)
|
||
if response.status_code == 200:
|
||
print("✅ Фронтенд сервер уже запущен")
|
||
frontend_running = True
|
||
else:
|
||
frontend_running = False
|
||
except:
|
||
frontend_running = False
|
||
|
||
if not frontend_running:
|
||
print("🔄 Запускаем фронтенд сервер для тестов...")
|
||
try:
|
||
# Проверяем наличие node_modules
|
||
node_modules_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "node_modules")
|
||
if not os.path.exists(node_modules_path):
|
||
print("📦 Устанавливаем зависимости фронтенда...")
|
||
subprocess.run(["npm", "install"], check=True,
|
||
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
# Запускаем фронтенд сервер
|
||
env = os.environ.copy()
|
||
env["NODE_ENV"] = "development"
|
||
|
||
frontend_process = subprocess.Popen(
|
||
["npm", "run", "dev"],
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
env=env,
|
||
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
)
|
||
|
||
# Ждем запуска фронтенда
|
||
print("⏳ Ждем запуска фронтенда...")
|
||
for i in range(60): # Ждем максимум 60 секунд
|
||
try:
|
||
response = requests.get("http://localhost:3000/", timeout=2)
|
||
if response.status_code == 200:
|
||
print("✅ Фронтенд сервер запущен")
|
||
frontend_running = True
|
||
break
|
||
except:
|
||
pass
|
||
time.sleep(1)
|
||
else:
|
||
print("❌ Фронтенд сервер не запустился за 60 секунд")
|
||
if frontend_process:
|
||
frontend_process.terminate()
|
||
frontend_process.wait()
|
||
# Не падаем жестко, а возвращаем False
|
||
frontend_running = False
|
||
|
||
except Exception as e:
|
||
print(f"❌ Ошибка запуска фронтенда: {e}")
|
||
if frontend_process:
|
||
frontend_process.terminate()
|
||
frontend_process.wait()
|
||
# Не падаем жестко, а возвращаем False
|
||
frontend_running = False
|
||
|
||
yield frontend_running
|
||
|
||
# Cleanup: останавливаем фронтенд только если мы его запускали
|
||
if frontend_process:
|
||
print("🛑 Останавливаем фронтенд сервер...")
|
||
try:
|
||
frontend_process.terminate()
|
||
frontend_process.wait(timeout=10)
|
||
except subprocess.TimeoutExpired:
|
||
frontend_process.kill()
|
||
frontend_process.wait()
|
||
|
||
|
||
@pytest.fixture
|
||
def test_client(backend_server):
|
||
"""
|
||
🧪 Создает тестовый клиент для API тестов.
|
||
Требует запущенный бэкенд сервер.
|
||
"""
|
||
return get_test_client()
|
||
|
||
|
||
@pytest.fixture
|
||
async def browser_context():
|
||
"""
|
||
🌐 Создает контекст браузера для e2e тестов.
|
||
Автоматически управляет жизненным циклом браузера.
|
||
"""
|
||
try:
|
||
from playwright.async_api import async_playwright
|
||
except ImportError:
|
||
pytest.skip("Playwright не установлен")
|
||
|
||
async with async_playwright() as p:
|
||
# Определяем headless режим
|
||
headless = os.getenv("PLAYWRIGHT_HEADLESS", "true").lower() == "true"
|
||
|
||
browser = await p.chromium.launch(
|
||
headless=headless,
|
||
args=[
|
||
"--no-sandbox",
|
||
"--disable-dev-shm-usage",
|
||
"--disable-gpu",
|
||
"--disable-web-security",
|
||
"--disable-features=VizDisplayCompositor"
|
||
]
|
||
)
|
||
|
||
context = await browser.new_context(
|
||
viewport={"width": 1280, "height": 720},
|
||
ignore_https_errors=True,
|
||
java_script_enabled=True
|
||
)
|
||
|
||
yield context
|
||
|
||
await context.close()
|
||
await browser.close()
|
||
|
||
|
||
@pytest.fixture
|
||
async def page(browser_context):
|
||
"""
|
||
📄 Создает новую страницу для каждого теста.
|
||
"""
|
||
page = await browser_context.new_page()
|
||
|
||
# Устанавливаем таймауты
|
||
page.set_default_timeout(30000)
|
||
page.set_default_navigation_timeout(30000)
|
||
|
||
yield page
|
||
|
||
await page.close()
|
||
|
||
|
||
@pytest.fixture
|
||
def api_base_url(backend_server):
|
||
"""
|
||
🔗 Возвращает базовый URL для API тестов.
|
||
"""
|
||
return "http://localhost:8000/graphql"
|
||
|
||
|
||
@pytest.fixture
|
||
def test_user_credentials():
|
||
"""
|
||
👤 Возвращает тестовые учетные данные для авторизации из переменных окружения.
|
||
"""
|
||
import os
|
||
|
||
# 🔍 Используем переменные окружения TEST_LOGIN и TEST_PASSWORD
|
||
test_email = os.getenv("TEST_LOGIN", "test_admin@discours.io")
|
||
test_password = os.getenv("TEST_PASSWORD", "password123")
|
||
|
||
return {
|
||
"email": test_email,
|
||
"password": test_password
|
||
}
|
||
|
||
|
||
@pytest.fixture
|
||
def create_test_users_in_backend_db():
|
||
"""
|
||
👥 Создает нового тестового пользователя в базе данных бэкенда для E2E тестов.
|
||
🔍 Всегда создает нового пользователя с уникальным email на основе TEST_LOGIN/TEST_PASSWORD
|
||
🏘️ Сначала создает базовое сообщество, если его нет
|
||
"""
|
||
import requests
|
||
import time
|
||
import os
|
||
import uuid
|
||
|
||
# 🔍 Используем переменные окружения TEST_LOGIN и TEST_PASSWORD
|
||
base_email = os.getenv("TEST_LOGIN", "test_admin@discours.io")
|
||
test_password = os.getenv("TEST_PASSWORD", "password123")
|
||
|
||
# 🔍 Создаем уникальный email для каждого теста
|
||
unique_suffix = str(uuid.uuid4())[:8]
|
||
if "@" in base_email:
|
||
email_parts = base_email.split("@")
|
||
test_email = f"{email_parts[0]}+{unique_suffix}@{email_parts[1]}"
|
||
else:
|
||
test_email = f"{base_email}+{unique_suffix}@discours.io"
|
||
|
||
# 🏘️ Создаем базовое сообщество в тестовой БД E2E через ORM
|
||
print("🏘️ Создаем базовое сообщество в тестовой БД E2E...")
|
||
try:
|
||
from orm.community import Community
|
||
from sqlalchemy import create_engine
|
||
from sqlalchemy.orm import sessionmaker
|
||
import time
|
||
|
||
# Используем ту же БД, что и E2E бэкенд сервер
|
||
test_engine = create_engine("sqlite:///test_e2e.db", echo=False)
|
||
TestSession = sessionmaker(bind=test_engine)
|
||
|
||
with TestSession() as session:
|
||
# Проверяем, есть ли сообщество с ID=1
|
||
existing_community = session.query(Community).where(Community.id == 1).first()
|
||
if not existing_community:
|
||
# Создаем базовое сообщество
|
||
base_community = Community(
|
||
name="Test Community",
|
||
slug="test-community",
|
||
desc="Base community for E2E tests",
|
||
pic="",
|
||
created_at=int(time.time()),
|
||
created_by=None, # Создатель не обязателен
|
||
settings={},
|
||
private=False
|
||
)
|
||
session.add(base_community)
|
||
session.commit()
|
||
print("✅ Базовое сообщество создано в тестовой БД E2E")
|
||
else:
|
||
print("✅ Базовое сообщество уже существует в тестовой БД E2E")
|
||
except Exception as e:
|
||
print(f"⚠️ Ошибка создания базового сообщества: {e}")
|
||
# Продолжаем выполнение - возможно, сообщество уже есть
|
||
|
||
# Создаем пользователя через API
|
||
register_user_mutation = """
|
||
mutation RegisterUser($email: String!, $password: String!, $name: String) {
|
||
registerUser(email: $email, password: $password, name: $name) {
|
||
success
|
||
author {
|
||
id
|
||
email
|
||
name
|
||
}
|
||
error
|
||
}
|
||
}
|
||
"""
|
||
|
||
# Создаем нового админа с уникальным email
|
||
admin_data = {
|
||
"email": test_email,
|
||
"password": test_password,
|
||
"name": f"Test Admin {unique_suffix}"
|
||
}
|
||
|
||
print(f"🔍 Создаем нового тестового пользователя: {test_email}")
|
||
|
||
try:
|
||
response = requests.post(
|
||
"http://localhost:8000/graphql",
|
||
json={"query": register_user_mutation, "variables": admin_data},
|
||
headers={"Content-Type": "application/json"},
|
||
timeout=10
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get("data", {}).get("registerUser", {}).get("success"):
|
||
print(f"✅ Новый тестовый пользователь создан: {test_email}")
|
||
return {"email": test_email, "password": test_password, "created": True}
|
||
else:
|
||
error = data.get("data", {}).get("registerUser", {}).get("error")
|
||
print(f"⚠️ Ошибка создания тестового пользователя: {error}")
|
||
return {"email": test_email, "password": test_password, "created": False, "error": error}
|
||
else:
|
||
print(f"⚠️ HTTP ошибка при создании тестового пользователя: {response.status_code}")
|
||
return {"email": test_email, "password": test_password, "created": False}
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ Ошибка при создании тестового пользователя: {e}")
|
||
return {"email": test_email, "password": test_password, "created": False, "error": str(e)}
|
||
|
||
# Ждем немного для завершения операции
|
||
time.sleep(1)
|
||
|
||
|
||
@pytest.fixture
|
||
def auth_headers(api_base_url, test_user_credentials):
|
||
"""
|
||
🔐 Создает заголовки авторизации для API тестов.
|
||
"""
|
||
def _get_auth_headers(token: Optional[str] = None):
|
||
headers = {"Content-Type": "application/json"}
|
||
if token:
|
||
headers["Authorization"] = f"Bearer {token}"
|
||
return headers
|
||
|
||
return _get_auth_headers
|
||
|
||
|
||
@pytest.fixture
|
||
def wait_for_server():
|
||
"""
|
||
⏳ Утилита для ожидания готовности сервера.
|
||
"""
|
||
def _wait_for_server(url: str, max_attempts: int = 30, delay: float = 1.0):
|
||
"""Ждет готовности сервера по указанному URL."""
|
||
for attempt in range(max_attempts):
|
||
try:
|
||
response = requests.get(url, timeout=2)
|
||
if response.status_code == 200:
|
||
return True
|
||
except:
|
||
pass
|
||
time.sleep(delay)
|
||
return False
|
||
|
||
return _wait_for_server
|
||
|
||
|
||
@pytest.fixture
|
||
def test_users(db_session):
|
||
"""Создает тестовых пользователей для тестов"""
|
||
import os
|
||
from orm.author import Author
|
||
|
||
# 🔍 Используем переменные окружения TEST_LOGIN и TEST_PASSWORD
|
||
test_email = os.getenv("TEST_LOGIN", "test_admin@discours.io")
|
||
test_password = os.getenv("TEST_PASSWORD", "password123")
|
||
|
||
# Создаем первого пользователя (администратор)
|
||
# Этот email должен быть в ADMIN_EMAILS для автоматического получения роли admin
|
||
admin_user = Author(
|
||
slug="test-admin",
|
||
email=test_email,
|
||
name="Test Admin",
|
||
bio="Test admin user for testing",
|
||
pic="https://example.com/avatar1.jpg",
|
||
oauth={}
|
||
)
|
||
admin_user.set_password(test_password)
|
||
db_session.add(admin_user)
|
||
|
||
# Создаем второго пользователя (обычный пользователь)
|
||
regular_user = Author(
|
||
slug="test-user",
|
||
email="test_user@discours.io",
|
||
name="Test User",
|
||
bio="Test regular user for testing",
|
||
pic="https://example.com/avatar2.jpg",
|
||
oauth={}
|
||
)
|
||
regular_user.set_password("password456")
|
||
db_session.add(regular_user)
|
||
|
||
# Создаем третьего пользователя (только читатель)
|
||
reader_user = Author(
|
||
slug="test-reader",
|
||
email="test_reader@discours.io",
|
||
name="Test Reader",
|
||
bio="Test reader user for testing",
|
||
pic="https://example.com/avatar3.jpg",
|
||
oauth={}
|
||
)
|
||
reader_user.set_password("password789")
|
||
db_session.add(reader_user)
|
||
|
||
# Сохраняем изменения с паролями
|
||
db_session.commit()
|
||
|
||
# Создаем сообщество с ID 1 и назначаем роли
|
||
from orm.community import Community, CommunityAuthor
|
||
|
||
# Проверяем, существует ли сообщество с ID 1
|
||
existing_community = db_session.query(Community).where(Community.id == 1).first()
|
||
if existing_community:
|
||
community = existing_community
|
||
else:
|
||
# Создаем сообщество с ID 1
|
||
community = Community(
|
||
id=1,
|
||
name="Test Community",
|
||
slug="test-community",
|
||
desc="A test community for testing purposes",
|
||
created_by=admin_user.id,
|
||
settings={"default_roles": ["reader", "author"]}
|
||
)
|
||
db_session.add(community)
|
||
db_session.commit()
|
||
|
||
# Назначаем роли пользователям (если их еще нет)
|
||
# Для admin_user не назначаем роль admin вручную - она определяется автоматически по email
|
||
existing_admin_ca = db_session.query(CommunityAuthor).where(
|
||
CommunityAuthor.community_id == community.id,
|
||
CommunityAuthor.author_id == admin_user.id
|
||
).first()
|
||
if not existing_admin_ca:
|
||
admin_ca = CommunityAuthor(
|
||
community_id=community.id,
|
||
author_id=admin_user.id,
|
||
roles="author,reader" # admin роль добавляется автоматически по email
|
||
)
|
||
db_session.add(admin_ca)
|
||
|
||
existing_regular_ca = db_session.query(CommunityAuthor).where(
|
||
CommunityAuthor.community_id == community.id,
|
||
CommunityAuthor.author_id == regular_user.id
|
||
).first()
|
||
if not existing_regular_ca:
|
||
regular_ca = CommunityAuthor(
|
||
community_id=community.id,
|
||
author_id=regular_user.id,
|
||
roles="author,reader"
|
||
)
|
||
db_session.add(regular_ca)
|
||
|
||
existing_reader_ca = db_session.query(CommunityAuthor).where(
|
||
CommunityAuthor.community_id == community.id,
|
||
CommunityAuthor.author_id == reader_user.id
|
||
).first()
|
||
if not existing_reader_ca:
|
||
reader_ca = CommunityAuthor(
|
||
community_id=community.id,
|
||
author_id=reader_user.id,
|
||
roles="reader"
|
||
)
|
||
db_session.add(reader_ca)
|
||
|
||
db_session.commit()
|
||
|
||
return [admin_user, regular_user, reader_user]
|
||
|
||
|
||
@pytest.fixture
|
||
def test_community(db_session, test_users):
|
||
"""Создает тестовое сообщество для тестов"""
|
||
from orm.community import Community
|
||
|
||
# Создаем сообщество с ID 2, так как ID 1 уже занят основным сообществом
|
||
community = Community(
|
||
id=2, # Используем ID 2, чтобы не конфликтовать с основным сообществом
|
||
name="Test Community Fixture",
|
||
slug="test-community-fixture", # Уникальный slug для этой фикстуры
|
||
desc="A test community for testing purposes",
|
||
created_by=test_users[0].id, # Администратор создает сообщество
|
||
settings={
|
||
"default_roles": ["reader", "author"],
|
||
"custom_setting": "custom_value"
|
||
}
|
||
)
|
||
db_session.add(community)
|
||
db_session.commit()
|
||
|
||
return community
|
||
|
||
|
||
@pytest.fixture
|
||
def community_with_creator(db_session, test_users):
|
||
"""Создает сообщество с создателем"""
|
||
from orm.community import Community
|
||
|
||
community = Community(
|
||
name="Community With Creator",
|
||
slug="community-with-creator",
|
||
desc="A test community with a creator",
|
||
created_by=test_users[0].id,
|
||
settings={"default_roles": ["reader", "author"]}
|
||
)
|
||
db_session.add(community)
|
||
db_session.commit()
|
||
|
||
return community
|
||
|
||
|
||
@pytest.fixture
|
||
def community_without_creator(db_session):
|
||
"""Создает сообщество без создателя"""
|
||
from orm.community import Community
|
||
|
||
community = Community(
|
||
name="Community Without Creator",
|
||
slug="community-without-creator",
|
||
desc="A test community without a creator",
|
||
created_by=None, # Без создателя
|
||
settings={"default_roles": ["reader"]}
|
||
)
|
||
db_session.add(community)
|
||
db_session.commit()
|
||
|
||
return community
|
||
|
||
|
||
@pytest.fixture
|
||
def admin_user_with_roles(db_session, test_users, test_community):
|
||
"""Создает администратора с ролями в сообществе"""
|
||
from orm.community import CommunityAuthor
|
||
|
||
ca = CommunityAuthor(
|
||
community_id=test_community.id,
|
||
author_id=test_users[0].id,
|
||
roles="admin,author,reader"
|
||
)
|
||
db_session.add(ca)
|
||
db_session.commit()
|
||
|
||
return test_users[0]
|
||
|
||
|
||
@pytest.fixture
|
||
def regular_user_with_roles(db_session, test_users, test_community):
|
||
"""Создает обычного пользователя с ролями в сообществе"""
|
||
from orm.community import CommunityAuthor
|
||
|
||
ca = CommunityAuthor(
|
||
community_id=test_community.id,
|
||
author_id=test_users[1].id,
|
||
roles="author,reader"
|
||
)
|
||
db_session.add(ca)
|
||
db_session.commit()
|
||
|
||
return test_users[1]
|
||
|
||
|
||
@pytest.fixture
|
||
def mock_verify(monkeypatch):
|
||
"""Мокает функцию верификации для тестов"""
|
||
from unittest.mock import AsyncMock
|
||
|
||
mock = AsyncMock()
|
||
# Здесь можно настроить возвращаемые значения по умолчанию
|
||
return mock
|
||
|
||
|
||
@pytest.fixture
|
||
def redis_client():
|
||
"""Создает Redis клиент для тестов токенов"""
|
||
from storage.redis import RedisService
|
||
|
||
redis_service = RedisService()
|
||
return redis_service._client
|
||
|
||
|
||
@pytest.fixture
|
||
def fake_redis():
|
||
"""Создает fakeredis экземпляр для тестов"""
|
||
try:
|
||
import fakeredis.aioredis
|
||
return fakeredis.aioredis.FakeRedis()
|
||
except ImportError:
|
||
pytest.skip("fakeredis не установлен - установите: pip install fakeredis[aioredis]")
|
||
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def ensure_rbac_initialized():
|
||
"""Обеспечивает инициализацию RBAC системы для каждого теста"""
|
||
import rbac
|
||
rbac.initialize_rbac()
|
||
yield
|
||
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def mock_redis_globally():
|
||
"""Глобально мокает Redis для всех тестов, включая e2e"""
|
||
try:
|
||
import fakeredis.aioredis
|
||
|
||
# Создаем fakeredis сервер
|
||
fake_redis = fakeredis.aioredis.FakeRedis()
|
||
|
||
# Патчим глобальный redis экземпляр
|
||
with patch('storage.redis.redis') as mock_redis:
|
||
# Эмулируем RedisService.execute метод
|
||
async def mock_execute(command: str, *args):
|
||
cmd_method = getattr(fake_redis, command.lower(), None)
|
||
if cmd_method is not None:
|
||
if hasattr(cmd_method, '__call__'):
|
||
return await cmd_method(*args)
|
||
else:
|
||
return cmd_method
|
||
return None
|
||
|
||
# Патчим все основные методы Redis
|
||
mock_redis.execute = mock_execute
|
||
mock_redis.get = fake_redis.get
|
||
mock_redis.set = fake_redis.set
|
||
mock_redis.delete = fake_redis.delete
|
||
mock_redis.exists = fake_redis.exists
|
||
mock_redis.ping = fake_redis.ping
|
||
mock_redis.hset = fake_redis.hset
|
||
mock_redis.hget = fake_redis.hget
|
||
mock_redis.hgetall = fake_redis.hgetall
|
||
mock_redis.hdel = fake_redis.hdel
|
||
mock_redis.expire = fake_redis.expire
|
||
mock_redis.ttl = fake_redis.ttl
|
||
mock_redis.keys = fake_redis.keys
|
||
mock_redis.scan = fake_redis.scan
|
||
mock_redis.is_connected = True
|
||
|
||
# Async методы для connect/disconnect
|
||
async def mock_connect():
|
||
return True
|
||
|
||
async def mock_disconnect():
|
||
pass
|
||
|
||
mock_redis.connect = mock_connect
|
||
mock_redis.disconnect = mock_disconnect
|
||
|
||
yield
|
||
|
||
except ImportError:
|
||
# Если fakeredis не доступен, используем базовый mock
|
||
with patch('storage.redis.redis') as mock_redis:
|
||
mock_redis.execute.return_value = None
|
||
mock_redis.get.return_value = None
|
||
mock_redis.set.return_value = True
|
||
mock_redis.delete.return_value = True
|
||
mock_redis.exists.return_value = False
|
||
mock_redis.ping.return_value = True
|
||
mock_redis.hset.return_value = True
|
||
mock_redis.hget.return_value = None
|
||
mock_redis.hgetall.return_value = {}
|
||
mock_redis.hdel.return_value = True
|
||
mock_redis.expire.return_value = True
|
||
mock_redis.ttl.return_value = -1
|
||
mock_redis.keys.return_value = []
|
||
mock_redis.scan.return_value = ([], 0)
|
||
mock_redis.is_connected = True
|
||
|
||
async def mock_connect():
|
||
return True
|
||
|
||
async def mock_disconnect():
|
||
pass
|
||
|
||
mock_redis.connect = mock_connect
|
||
mock_redis.disconnect = mock_disconnect
|
||
|
||
yield
|
||
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def mock_redis_service_globally():
|
||
"""Глобально мокает RedisService для всех тестов, включая e2e"""
|
||
try:
|
||
import fakeredis.aioredis
|
||
|
||
# Создаем fakeredis сервер
|
||
fake_redis = fakeredis.aioredis.FakeRedis()
|
||
|
||
# Патчим RedisService класс
|
||
with patch('storage.redis.RedisService') as mock_service_class:
|
||
# Создаем mock экземпляр
|
||
mock_service = mock_service_class.return_value
|
||
|
||
# Эмулируем RedisService.execute метод
|
||
async def mock_execute(command: str, *args):
|
||
cmd_method = getattr(fake_redis, command.lower(), None)
|
||
if cmd_method is not None:
|
||
if hasattr(cmd_method, '__call__'):
|
||
return await cmd_method(*args)
|
||
else:
|
||
return cmd_method
|
||
return None
|
||
|
||
# Патчим все основные методы
|
||
mock_service.execute = mock_execute
|
||
mock_service.get = fake_redis.get
|
||
mock_service.set = fake_redis.set
|
||
mock_service.delete = fake_redis.delete
|
||
mock_service.exists = fake_redis.exists
|
||
mock_service.ping = fake_redis.ping
|
||
mock_service.hset = fake_redis.hset
|
||
mock_service.hget = fake_redis.hget
|
||
mock_service.hgetall = fake_redis.hgetall
|
||
mock_service.hdel = fake_redis.hdel
|
||
mock_service.expire = fake_redis.expire
|
||
mock_service.ttl = fake_redis.ttl
|
||
mock_service.keys = fake_redis.keys
|
||
mock_service.scan = fake_redis.scan
|
||
mock_service._client = fake_redis
|
||
mock_service.is_connected = True
|
||
|
||
# Async методы для connect/disconnect
|
||
async def mock_connect():
|
||
return True
|
||
|
||
async def mock_disconnect():
|
||
pass
|
||
|
||
mock_service.connect = mock_connect
|
||
mock_service.disconnect = mock_disconnect
|
||
|
||
yield
|
||
|
||
except ImportError:
|
||
# Если fakeredis не доступен, используем базовый mock
|
||
with patch('storage.redis.RedisService') as mock_service_class:
|
||
mock_service = mock_service_class.return_value
|
||
|
||
mock_service.execute.return_value = None
|
||
mock_service.get.return_value = None
|
||
mock_service.set.return_value = True
|
||
mock_service.delete.return_value = True
|
||
mock_service.exists.return_value = False
|
||
mock_service.ping.return_value = True
|
||
mock_service.hset.return_value = True
|
||
mock_service.hget.return_value = None
|
||
mock_service.hgetall.return_value = {}
|
||
mock_service.hdel.return_value = True
|
||
mock_service.expire.return_value = True
|
||
mock_service.ttl.return_value = -1
|
||
mock_service.keys.return_value = []
|
||
mock_service.scan.return_value = ([], 0)
|
||
mock_service.is_connected = True
|
||
|
||
async def mock_connect():
|
||
return True
|
||
|
||
async def mock_disconnect():
|
||
pass
|
||
|
||
mock_service.connect = mock_connect
|
||
mock_service.disconnect = mock_disconnect
|
||
|
||
yield
|