Files
core/tests/test_rbac_integration.py

556 lines
23 KiB
Python
Raw Normal View History

2025-07-02 22:30:21 +03:00
"""
2025-07-03 00:20:10 +03:00
Упрощенные тесты интеграции RBAC системы с новой архитектурой сервисов.
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
Проверяет работу AdminService и AuthService с RBAC системой.
2025-07-02 22:30:21 +03:00
"""
2025-07-25 01:04:15 +03:00
import logging
2025-07-02 22:30:21 +03:00
import pytest
from auth.orm import Author
2025-07-03 00:20:10 +03:00
from orm.community import Community, CommunityAuthor
from services.admin import admin_service
from services.auth import auth_service
2025-07-02 22:30:21 +03:00
2025-07-25 01:04:15 +03:00
logger = logging.getLogger(__name__)
2025-07-02 22:30:21 +03:00
@pytest.fixture
2025-07-03 00:20:10 +03:00
def simple_user(db_session):
"""Создает простого тестового пользователя"""
# Очищаем любые существующие записи с этим ID/email
db_session.query(Author).filter(
(Author.id == 200) | (Author.email == "simple_user@example.com")
).delete()
2025-07-02 22:30:21 +03:00
db_session.commit()
2025-07-03 00:20:10 +03:00
user = Author(
id=200,
email="simple_user@example.com",
name="Simple User",
slug="simple-user",
)
user.set_password("password123")
db_session.add(user)
db_session.commit()
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
yield user
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
# Очистка после теста
2025-07-02 22:30:21 +03:00
try:
2025-07-03 00:20:10 +03:00
# Удаляем связанные записи CommunityAuthor
2025-07-25 01:04:15 +03:00
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user.id).delete(synchronize_session=False)
2025-07-03 00:20:10 +03:00
# Удаляем самого пользователя
db_session.query(Author).filter(Author.id == user.id).delete()
2025-07-02 22:30:21 +03:00
db_session.commit()
except Exception:
db_session.rollback()
2025-07-03 00:20:10 +03:00
@pytest.fixture
def simple_community(db_session, simple_user):
"""Создает простое тестовое сообщество"""
# Очищаем любые существующие записи с этим ID/slug
2025-07-25 01:04:15 +03:00
db_session.query(Community).filter(Community.slug == "simple-test-community").delete()
2025-07-03 00:20:10 +03:00
db_session.commit()
community = Community(
name="Simple Test Community",
slug="simple-test-community",
desc="Simple community for tests",
created_by=simple_user.id,
2025-07-25 01:04:15 +03:00
settings={
"default_roles": ["reader", "author"],
"available_roles": ["reader", "author", "editor"]
}
2025-07-03 00:20:10 +03:00
)
db_session.add(community)
db_session.commit()
yield community
2025-07-02 22:30:21 +03:00
# Очистка после теста
try:
2025-07-03 00:20:10 +03:00
# Удаляем связанные записи CommunityAuthor
db_session.query(CommunityAuthor).filter(CommunityAuthor.community_id == community.id).delete()
# Удаляем само сообщество
db_session.query(Community).filter(Community.id == community.id).delete()
2025-07-02 22:30:21 +03:00
db_session.commit()
except Exception:
db_session.rollback()
2025-07-25 01:04:15 +03:00
@pytest.fixture
def test_community(db_session, simple_user):
"""
Создает тестовое сообщество с ожидаемыми ролями по умолчанию
Args:
db_session: Сессия базы данных для теста
simple_user: Пользователь для создания сообщества
Returns:
Community: Созданное тестовое сообщество
"""
# Очищаем существующие записи
db_session.query(Community).filter(Community.slug == "test-rbac-community").delete()
db_session.commit()
community = Community(
name="Test RBAC Community",
slug="test-rbac-community",
desc="Community for RBAC tests",
created_by=simple_user.id,
settings={
"default_roles": ["reader", "author"],
"available_roles": ["reader", "author", "editor"]
}
)
db_session.add(community)
db_session.flush() # Получаем ID без коммита
logger.info(f"DEBUG: Создание Community с айди {community.id}")
db_session.commit()
yield community
# Очистка после теста
try:
# Удаляем связанные записи CommunityAuthor
db_session.query(CommunityAuthor).filter(CommunityAuthor.community_id == community.id).delete()
# Удаляем сообщество
db_session.query(Community).filter(Community.id == community.id).delete()
db_session.commit()
except Exception:
db_session.rollback()
2025-07-03 00:20:10 +03:00
@pytest.fixture(autouse=True)
def cleanup_test_users(db_session):
"""Автоматически очищает тестовые записи пользователей перед каждым тестом"""
# Очищаем тестовые email'ы перед тестом
test_emails = [
"test_create@example.com",
"test_community@example.com",
"simple_user@example.com",
"test_create_unique@example.com",
"test_community_unique@example.com"
]
# Очищаем также тестовые ID
test_ids = [200, 201, 202, 203, 204, 205]
for email in test_emails:
try:
existing_user = db_session.query(Author).filter(Author.email == email).first()
if existing_user:
# Удаляем связанные записи CommunityAuthor
2025-07-25 01:04:15 +03:00
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == existing_user.id).delete(synchronize_session=False)
2025-07-03 00:20:10 +03:00
# Удаляем пользователя
db_session.delete(existing_user)
db_session.commit()
except Exception:
db_session.rollback()
# Дополнительная очистка по ID
for user_id in test_ids:
try:
# Удаляем записи CommunityAuthor
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user_id).delete()
# Удаляем пользователя
db_session.query(Author).filter(Author.id == user_id).delete()
db_session.commit()
except Exception:
db_session.rollback()
yield # Тест выполняется
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
# Дополнительная очистка после теста
for email in test_emails:
try:
existing_user = db_session.query(Author).filter(Author.email == email).first()
if existing_user:
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == existing_user.id).delete()
db_session.delete(existing_user)
db_session.commit()
except Exception:
db_session.rollback()
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
for user_id in test_ids:
try:
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user_id).delete()
db_session.query(Author).filter(Author.id == user_id).delete()
db_session.commit()
except Exception:
db_session.rollback()
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
class TestSimpleAdminService:
"""Простые тесты для AdminService"""
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
def test_get_user_roles_empty(self, db_session, simple_user, simple_community):
"""Тест получения пустых ролей пользователя"""
# Очищаем любые существующие роли
db_session.query(CommunityAuthor).filter(
CommunityAuthor.author_id == simple_user.id,
CommunityAuthor.community_id == simple_community.id
).delete()
db_session.commit()
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
# Проверяем что ролей нет
roles = admin_service.get_user_roles(simple_user, simple_community.id)
assert isinstance(roles, list)
# Может быть пустой список или содержать системную роль админа
assert len(roles) >= 0
2025-07-02 22:30:21 +03:00
2025-07-25 01:04:15 +03:00
def test_get_user_roles_with_roles(self, db_session, simple_user, test_community):
2025-07-03 00:20:10 +03:00
"""Тест получения ролей пользователя"""
2025-07-25 01:04:15 +03:00
# Используем тестовое сообщество
community_id = test_community.id
2025-07-02 22:30:21 +03:00
2025-07-25 01:04:15 +03:00
# Отладочная информация о тестовом сообществе
logger.info(f"DEBUG: Тестовое сообщество ID: {community_id}")
logger.info(f"DEBUG: Тестовое сообщество slug: {test_community.slug}")
logger.info(f"DEBUG: Тестовое сообщество settings: {test_community.settings}")
# Полностью очищаем все существующие CommunityAuthor для пользователя
existing_community_authors = db_session.query(CommunityAuthor).filter(
CommunityAuthor.author_id == simple_user.id
).all()
# Отладочная информация
logger.info(f"DEBUG: Найдено существующих CommunityAuthor: {len(existing_community_authors)}")
for ca in existing_community_authors:
logger.info(f"DEBUG: Существующий CA - community_id: {ca.community_id}, roles: {ca.roles}")
db_session.delete(ca)
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
db_session.commit()
2025-07-02 22:30:21 +03:00
2025-07-25 01:04:15 +03:00
# Создаем CommunityAuthor с ролями в тестовом сообществе
2025-07-03 00:20:10 +03:00
ca = CommunityAuthor(
2025-07-25 01:04:15 +03:00
community_id=community_id,
2025-07-03 00:20:10 +03:00
author_id=simple_user.id,
)
2025-07-25 01:04:15 +03:00
# Расширенная отладка перед set_roles
logger.info(f"DEBUG: Перед set_roles")
logger.info(f"DEBUG: ca.roles до set_roles: {ca.roles}")
logger.info(f"DEBUG: ca.role_list до set_roles: {ca.role_list}")
2025-07-03 00:20:10 +03:00
ca.set_roles(["reader", "author"])
2025-07-25 01:04:15 +03:00
# Расширенная отладка после set_roles
logger.info(f"DEBUG: После set_roles")
logger.info(f"DEBUG: ca.roles после set_roles: {ca.roles}")
logger.info(f"DEBUG: ca.role_list после set_roles: {ca.role_list}")
2025-07-03 00:20:10 +03:00
db_session.add(ca)
db_session.commit()
2025-07-25 01:04:15 +03:00
# Явная проверка сохранения CommunityAuthor
check_ca = db_session.query(CommunityAuthor).filter(
2025-07-03 00:20:10 +03:00
CommunityAuthor.author_id == simple_user.id,
2025-07-25 01:04:15 +03:00
CommunityAuthor.community_id == community_id
2025-07-03 00:20:10 +03:00
).first()
2025-07-25 01:04:15 +03:00
logger.info(f"DEBUG: Проверка сохраненной записи CommunityAuthor")
logger.info(f"DEBUG: Найденная запись: {check_ca}")
logger.info(f"DEBUG: Роли в найденной записи: {check_ca.roles}")
logger.info(f"DEBUG: role_list найденной записи: {check_ca.role_list}")
assert check_ca is not None, "CommunityAuthor должен быть сохранен в базе данных"
assert check_ca.roles is not None, "Роли CommunityAuthor не должны быть None"
assert "reader" in check_ca.role_list, "Роль 'reader' должна быть в role_list"
assert "author" in check_ca.role_list, "Роль 'author' должна быть в role_list"
# Проверяем роли через AdminService
from services.admin import admin_service
from services.db import local_session
# Используем ту же сессию для проверки
2025-07-03 00:20:10 +03:00
fresh_user = db_session.query(Author).filter(Author.id == simple_user.id).first()
2025-07-25 01:04:15 +03:00
roles = admin_service.get_user_roles(fresh_user, community_id)
# Проверяем роли
assert isinstance(roles, list), "Роли должны быть списком"
assert "reader" in roles, "Роль 'reader' должна присутствовать"
assert "author" in roles, "Роль 'author' должна присутствовать"
assert len(roles) == 2, f"Должно быть 2 роли, а не {len(roles)}"
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
def test_update_user_success(self, db_session, simple_user):
"""Тест успешного обновления пользователя"""
2025-07-25 01:04:15 +03:00
from services.admin import admin_service
2025-07-02 22:30:21 +03:00
2025-07-25 01:04:15 +03:00
# Обновляем пользователя
result = admin_service.update_user({
2025-07-03 00:20:10 +03:00
"id": simple_user.id,
"name": "Updated Name",
2025-07-25 01:04:15 +03:00
"email": simple_user.email
})
2025-07-02 22:30:21 +03:00
2025-07-25 01:04:15 +03:00
# Проверяем обновленного пользователя
assert result is not None, "Пользователь должен быть обновлен"
assert result.get("name") == "Updated Name", "Имя пользователя должно быть обновлено"
2025-07-02 22:30:21 +03:00
2025-07-25 01:04:15 +03:00
# Восстанавливаем исходное имя
admin_service.update_user({
"id": simple_user.id,
"name": "Simple User",
"email": simple_user.email
})
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
class TestSimpleAuthService:
"""Простые тесты для AuthService"""
def test_create_user_basic(self, db_session):
"""Тест базового создания пользователя"""
test_email = "test_create_unique@example.com"
2025-07-25 01:04:15 +03:00
# Найдем существующих пользователей с таким email
existing_users = db_session.query(Author).filter(Author.email == test_email).all()
# Удаляем связанные записи CommunityAuthor для существующих пользователей
for user in existing_users:
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user.id).delete(synchronize_session=False)
db_session.delete(user)
db_session.commit()
2025-07-03 00:20:10 +03:00
user_dict = {
"email": test_email,
"name": "Test Create User",
"slug": "test-create-user-unique",
}
user = auth_service.create_user(user_dict)
assert user is not None
assert user.email == test_email
assert user.name == "Test Create User"
# Очистка
2025-07-25 01:04:15 +03:00
try:
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user.id).delete(synchronize_session=False)
db_session.delete(user)
db_session.commit()
except Exception as e:
# Если возникла ошибка при удалении, просто логируем ее
print(f"Ошибка при очистке: {e}")
db_session.rollback()
2025-07-02 22:30:21 +03:00
2025-07-25 01:04:15 +03:00
def test_create_user_with_community(self, db_session):
"""Проверяем создание пользователя в конкретном сообществе"""
from services.auth import auth_service
from services.rbac import initialize_community_permissions
from auth.orm import Author
import asyncio
import uuid
# Создаем тестового пользователя
system_author = db_session.query(Author).filter(Author.slug == "system").first()
if not system_author:
system_author = Author(
name="System",
slug="system",
email="system@test.local"
)
db_session.add(system_author)
db_session.flush()
# Создаем тестовое сообщество
unique_slug = f"simple-test-community-{uuid.uuid4()}"
community = Community(
name="Simple Test Community",
slug=unique_slug,
desc="Simple community for tests",
created_by=system_author.id,
settings={
"default_roles": ["reader", "author"],
"available_roles": ["reader", "author", "editor"]
}
)
db_session.add(community)
db_session.flush()
2025-07-02 22:30:21 +03:00
2025-07-25 01:04:15 +03:00
# Инициализируем права сообщества
async def init_community_permissions():
await initialize_community_permissions(community.id)
# Запускаем инициализацию в текущем event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(init_community_permissions())
# Генерируем уникальные данные для каждого теста
unique_email = f"test_community_unique_{uuid.uuid4()}@example.com"
unique_name = f"Test Community User {uuid.uuid4()}"
unique_slug = f"test-community-user-{uuid.uuid4()}"
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
user_dict = {
2025-07-25 01:04:15 +03:00
"name": unique_name,
"email": unique_email,
"slug": unique_slug
2025-07-03 00:20:10 +03:00
}
2025-07-25 01:04:15 +03:00
# Создаем пользователя в конкретном сообществе
user = auth_service.create_user(user_dict, community_id=community.id)
2025-07-02 22:30:21 +03:00
2025-07-25 01:04:15 +03:00
# Проверяем созданного пользователя
assert user is not None, "Пользователь должен быть создан"
assert user.email == unique_email.lower(), "Email должен быть в нижнем регистре"
assert user.name == unique_name, "Имя пользователя должно совпадать"
assert user.slug == unique_slug, "Slug пользователя должен совпадать"
2025-07-02 22:30:21 +03:00
2025-07-25 01:04:15 +03:00
# Проверяем роли
from orm.community import get_user_roles_in_community
# Получаем роли
roles = get_user_roles_in_community(user.id, community_id=community.id)
# Проверяем роли
assert "reader" in roles, f"У нового пользователя должна быть роль 'reader' в сообществе {community.id}. Текущие роли: {roles}"
assert "author" in roles, f"У нового пользователя должна быть роль 'author' в сообществе {community.id}. Текущие роли: {roles}"
# Коммитим изменения
2025-07-03 00:20:10 +03:00
db_session.commit()
2025-07-02 22:30:21 +03:00
2025-07-25 01:04:15 +03:00
# Очищаем созданные объекты
try:
# Удаляем связанные записи CommunityAuthor
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user.id).delete()
# Удаляем пользователя
db_session.query(Author).filter(Author.id == user.id).delete()
# Удаляем сообщество
db_session.query(Community).filter(Community.id == community.id).delete()
db_session.commit()
except Exception:
db_session.rollback()
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
class TestCommunityAuthorMethods:
"""Тесты методов CommunityAuthor"""
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
def test_set_get_roles(self, db_session, simple_user, simple_community):
"""Тест установки и получения ролей"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.author_id == simple_user.id,
CommunityAuthor.community_id == simple_community.id
).delete()
2025-07-02 22:30:21 +03:00
db_session.commit()
2025-07-03 00:20:10 +03:00
ca = CommunityAuthor(
community_id=simple_community.id,
author_id=simple_user.id,
)
# Тестируем установку ролей
ca.set_roles(["reader", "author"])
assert ca.role_list == ["reader", "author"]
# Тестируем пустые роли
ca.set_roles([])
2025-07-02 22:30:21 +03:00
assert ca.role_list == []
2025-07-03 00:20:10 +03:00
def test_has_role(self, db_session, simple_user, simple_community):
"""Тест проверки наличия роли"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.author_id == simple_user.id,
CommunityAuthor.community_id == simple_community.id
).delete()
2025-07-02 22:30:21 +03:00
db_session.commit()
2025-07-03 00:20:10 +03:00
ca = CommunityAuthor(
community_id=simple_community.id,
author_id=simple_user.id,
)
ca.set_roles(["reader", "author"])
db_session.add(ca)
db_session.commit()
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
assert ca.has_role("reader") is True
assert ca.has_role("author") is True
assert ca.has_role("admin") is False
def test_add_remove_role(self, db_session, simple_user, simple_community):
"""Тест добавления и удаления ролей"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.author_id == simple_user.id,
CommunityAuthor.community_id == simple_community.id
).delete()
db_session.commit()
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
ca = CommunityAuthor(
community_id=simple_community.id,
author_id=simple_user.id,
)
ca.set_roles(["reader"])
db_session.add(ca)
db_session.commit()
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
# Добавляем роль
ca.add_role("author")
assert ca.has_role("author") is True
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
# Удаляем роль
ca.remove_role("reader")
assert ca.has_role("reader") is False
assert ca.has_role("author") is True
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
class TestDataIntegrity:
"""Простые тесты целостности данных"""
def test_unique_community_author(self, db_session, simple_user, simple_community):
"""Тест уникальности записей CommunityAuthor"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.author_id == simple_user.id,
CommunityAuthor.community_id == simple_community.id
).delete()
db_session.commit()
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
# Создаем первую запись
ca1 = CommunityAuthor(
community_id=simple_community.id,
author_id=simple_user.id,
)
ca1.set_roles(["reader"])
db_session.add(ca1)
db_session.commit()
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
# Проверяем что запись создалась
found = db_session.query(CommunityAuthor).filter(
CommunityAuthor.community_id == simple_community.id,
CommunityAuthor.author_id == simple_user.id
).first()
assert found is not None
assert found.id == ca1.id
def test_roles_validation(self, db_session, simple_user, simple_community):
"""Тест валидации ролей"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.author_id == simple_user.id,
CommunityAuthor.community_id == simple_community.id
).delete()
db_session.commit()
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
ca = CommunityAuthor(
community_id=simple_community.id,
author_id=simple_user.id,
)
# Тестируем различные форматы
ca.set_roles(["reader", "author", "expert"])
assert set(ca.role_list) == {"reader", "author", "expert"}
ca.set_roles([])
assert ca.role_list == []
2025-07-02 22:30:21 +03:00
2025-07-03 00:20:10 +03:00
ca.set_roles(["admin"])
assert ca.role_list == ["admin"]