core/tests/test_rbac_system.py
Untone 82111ed0f6
All checks were successful
Deploy on push / deploy (push) Successful in 7s
Squashed new RBAC
2025-07-02 22:30:21 +03:00

414 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Тесты для новой системы RBAC (Role-Based Access Control).
Проверяет работу системы ролей и разрешений на основе CSV хранения
в таблице CommunityAuthor.
"""
import pytest
from auth.orm import Author
from orm.community import Community, CommunityAuthor
from services.rbac import get_role_permissions_for_community, get_permissions_for_role
from orm.reaction import REACTION_KINDS
@pytest.fixture
def test_users(db_session):
"""Создает тестовых пользователей"""
users = []
# Создаем пользователей с ID 1-5
for i in range(1, 6):
user = db_session.query(Author).filter(Author.id == i).first()
if not user:
user = Author(id=i, email=f"user{i}@example.com", name=f"Test User {i}", slug=f"test-user-{i}")
user.set_password("password123")
db_session.add(user)
users.append(user)
db_session.commit()
return users
@pytest.fixture
def test_community(db_session, test_users):
"""Создает тестовое сообщество"""
community = db_session.query(Community).filter(Community.id == 1).first()
if not community:
community = Community(
id=1,
name="Test Community",
slug="test-community",
desc="Test community for RBAC tests",
created_by=test_users[0].id,
)
db_session.add(community)
db_session.commit()
return community
class TestCommunityAuthorRoles:
"""Тесты для управления ролями в CommunityAuthor"""
def test_role_list_property(self, db_session, test_users, test_community):
"""Тест свойства role_list для CSV ролей"""
# Очищаем существующие записи для этого пользователя
db_session.query(CommunityAuthor).filter(
CommunityAuthor.community_id == test_community.id, CommunityAuthor.author_id == test_users[0].id
).delete()
db_session.commit()
# Создаем запись с ролями
ca = CommunityAuthor(community_id=test_community.id, author_id=test_users[0].id, roles="reader,author,expert")
db_session.add(ca)
db_session.commit()
# Проверяем получение списка ролей
assert ca.role_list == ["reader", "author", "expert"]
# Проверяем установку списка ролей
ca.role_list = ["admin", "editor"]
assert ca.roles == "admin,editor"
# Проверяем пустые роли
ca.role_list = []
assert ca.roles is None
assert ca.role_list == []
def test_has_role(self, db_session, test_users, test_community):
"""Тест проверки наличия роли"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.community_id == test_community.id, CommunityAuthor.author_id == test_users[1].id
).delete()
db_session.commit()
ca = CommunityAuthor(community_id=test_community.id, author_id=test_users[1].id, roles="reader,author")
db_session.add(ca)
db_session.commit()
# Проверяем существующие роли
assert ca.has_role("reader") is True
assert ca.has_role("author") is True
# Проверяем несуществующие роли
assert ca.has_role("admin") is False
assert ca.has_role("editor") is False
def test_add_role(self, db_session, test_users, test_community):
"""Тест добавления роли"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.community_id == test_community.id, CommunityAuthor.author_id == test_users[2].id
).delete()
db_session.commit()
ca = CommunityAuthor(community_id=test_community.id, author_id=test_users[2].id, roles="reader")
db_session.add(ca)
db_session.commit()
# Добавляем новую роль
ca.add_role("author")
assert ca.role_list == ["reader", "author"]
# Попытка добавить существующую роль (не должна дублироваться)
ca.add_role("reader")
assert ca.role_list == ["reader", "author"]
# Добавляем ещё одну роль
ca.add_role("expert")
assert ca.role_list == ["reader", "author", "expert"]
def test_remove_role(self, db_session, test_users, test_community):
"""Тест удаления роли"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.community_id == test_community.id, CommunityAuthor.author_id == test_users[3].id
).delete()
db_session.commit()
ca = CommunityAuthor(community_id=test_community.id, author_id=test_users[3].id, roles="reader,author,expert")
db_session.add(ca)
db_session.commit()
# Удаляем роль
ca.remove_role("author")
assert ca.role_list == ["reader", "expert"]
# Попытка удалить несуществующую роль (не должна ломаться)
ca.remove_role("admin")
assert ca.role_list == ["reader", "expert"]
# Удаляем все роли
ca.remove_role("reader")
ca.remove_role("expert")
assert ca.role_list == []
def test_set_roles(self, db_session, test_users, test_community):
"""Тест установки полного списка ролей"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.community_id == test_community.id, CommunityAuthor.author_id == test_users[4].id
).delete()
db_session.commit()
ca = CommunityAuthor(community_id=test_community.id, author_id=test_users[4].id, roles="reader")
db_session.add(ca)
db_session.commit()
# Устанавливаем новый список ролей
ca.set_roles(["admin", "editor", "expert"])
assert ca.role_list == ["admin", "editor", "expert"]
# Очищаем роли
ca.set_roles([])
assert ca.role_list == []
class TestPermissionsSystem:
"""Тесты для системы разрешений"""
async def test_get_permissions_for_role(self):
"""Тест получения разрешений для роли"""
community_id = 1 # Используем основное сообщество
# Проверяем базовые роли
reader_perms = await get_permissions_for_role("reader", community_id)
assert "shout:read" in reader_perms
assert "shout:create" not in reader_perms
author_perms = await get_permissions_for_role("author", community_id)
assert "shout:create" in author_perms
assert "draft:create" in author_perms
assert "shout:delete_any" not in author_perms
admin_perms = await get_permissions_for_role("admin", community_id)
assert "author:delete_any" in admin_perms
assert "author:update_any" in admin_perms
# Проверяем несуществующую роль
unknown_perms = await get_permissions_for_role("unknown_role", community_id)
assert unknown_perms == []
async def test_reaction_permissions_generation(self):
"""Тест генерации разрешений для реакций"""
community_id = 1 # Используем основное сообщество
# Проверяем что система генерирует разрешения для реакций
admin_perms = await get_permissions_for_role("admin", community_id)
# Админ должен иметь все разрешения на реакции
assert len(admin_perms) > 0, "Admin should have some permissions"
# Проверяем что есть хотя бы базовые разрешения на реакции у читателей
reader_perms = await get_permissions_for_role("reader", community_id)
assert len(reader_perms) > 0, "Reader should have some permissions"
# Проверяем что у reader есть разрешения на чтение реакций
assert any("reaction:read:" in perm for perm in reader_perms), "Reader should have reaction read permissions"
async def test_community_author_get_permissions(self, db_session, test_users, test_community):
"""Тест получения разрешений через CommunityAuthor"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.community_id == test_community.id, CommunityAuthor.author_id == test_users[0].id
).delete()
db_session.commit()
ca = CommunityAuthor(community_id=test_community.id, author_id=test_users[0].id, roles="reader,author")
db_session.add(ca)
db_session.commit()
permissions = await ca.get_permissions()
# Должны быть разрешения от обеих ролей
assert "shout:read" in permissions # От reader
assert "shout:create" in permissions # От author
assert len(permissions) > 0 # Должны быть какие-то разрешения
async def test_community_author_has_permission(self, db_session, test_users, test_community):
"""Тест проверки разрешения через CommunityAuthor"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.community_id == test_community.id, CommunityAuthor.author_id == test_users[1].id
).delete()
db_session.commit()
ca = CommunityAuthor(community_id=test_community.id, author_id=test_users[1].id, roles="expert,editor")
db_session.add(ca)
db_session.commit()
# Проверяем разрешения
permissions = await ca.get_permissions()
# Expert имеет разрешения на реакции PROOF/DISPROOF
assert any("reaction:create:PROOF" in perm for perm in permissions)
# Editor имеет разрешения на удаление и обновление шаутов
assert "shout:delete_any" in permissions
assert "shout:update_any" in permissions
class TestClassMethods:
"""Тесты для классовых методов CommunityAuthor"""
async def test_find_by_user_and_community(self, db_session, test_users, test_community):
"""Тест поиска записи CommunityAuthor"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.community_id == test_community.id, CommunityAuthor.author_id == test_users[0].id
).delete()
db_session.commit()
# Создаем запись
ca = CommunityAuthor(community_id=test_community.id, author_id=test_users[0].id, roles="reader,author")
db_session.add(ca)
db_session.commit()
# Ищем существующую запись
found = CommunityAuthor.find_by_user_and_community(test_users[0].id, test_community.id, db_session)
assert found is not None
assert found.author_id == test_users[0].id
assert found.community_id == test_community.id
# Ищем несуществующую запись
not_found = CommunityAuthor.find_by_user_and_community(test_users[1].id, test_community.id, db_session)
assert not_found is None
async def test_get_users_with_role(self, db_session, test_users, test_community):
"""Тест получения пользователей с определенной ролью"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(CommunityAuthor.community_id == test_community.id).delete()
db_session.commit()
# Создаем пользователей с разными ролями
cas = [
CommunityAuthor(community_id=test_community.id, author_id=test_users[0].id, roles="reader,author"),
CommunityAuthor(community_id=test_community.id, author_id=test_users[1].id, roles="reader,expert"),
CommunityAuthor(community_id=test_community.id, author_id=test_users[2].id, roles="admin"),
]
for ca in cas:
db_session.add(ca)
db_session.commit()
# Ищем пользователей с ролью reader
readers = CommunityAuthor.get_users_with_role(test_community.id, "reader", db_session)
assert test_users[0].id in readers
assert test_users[1].id in readers
assert test_users[2].id not in readers
# Ищем пользователей с ролью admin
admins = CommunityAuthor.get_users_with_role(test_community.id, "admin", db_session)
assert test_users[2].id in admins
assert test_users[0].id not in admins
class TestEdgeCases:
"""Тесты для граничных случаев"""
async def test_empty_roles_handling(self, db_session, test_users, test_community):
"""Тест обработки пустых ролей"""
# Создаем запись с пустыми ролями
ca = CommunityAuthor(community_id=test_community.id, author_id=test_users[0].id, roles="")
db_session.add(ca)
db_session.commit()
assert ca.role_list == []
permissions = await ca.get_permissions()
assert permissions == []
async def test_none_roles_handling(self, db_session, test_users, test_community):
"""Тест обработки NULL ролей"""
ca = CommunityAuthor(community_id=test_community.id, author_id=test_users[0].id, roles=None)
db_session.add(ca)
db_session.commit()
assert ca.role_list == []
assert await ca.get_permissions() == []
async def test_whitespace_roles_handling(self, db_session, test_users, test_community):
"""Тест обработки ролей с пробелами"""
ca = CommunityAuthor(
community_id=test_community.id, author_id=test_users[0].id, roles=" reader , author , expert "
)
db_session.add(ca)
db_session.commit()
# Пробелы должны убираться
assert ca.role_list == ["reader", "author", "expert"]
async def test_duplicate_roles_handling(self, db_session, test_users, test_community):
"""Тест обработки дублирующихся ролей"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.community_id == test_community.id, CommunityAuthor.author_id == test_users[0].id
).delete()
db_session.commit()
ca = CommunityAuthor(
community_id=test_community.id, author_id=test_users[0].id, roles="reader,author,reader,expert,author"
)
db_session.add(ca)
db_session.commit()
# При установке через set_roles дубликаты должны убираться
unique_roles = set(["reader", "author", "reader", "expert"])
ca.set_roles(unique_roles)
roles = ca.role_list
# Проверяем что нет дубликатов
assert len(roles) == len(set(roles))
assert "reader" in roles
assert "author" in roles
assert "expert" in roles
async def test_invalid_role(self):
"""Тест получения разрешений для несуществующих ролей"""
community_id = 1 # Используем основное сообщество
# Проверяем что несуществующая роль не ломает систему
perms = await get_permissions_for_role("nonexistent_role", community_id)
assert perms == []
class TestPerformance:
"""Тесты производительности (базовые)"""
async def test_large_role_list_performance(self, db_session, test_users, test_community):
"""Тест производительности с большим количеством ролей"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.community_id == test_community.id, CommunityAuthor.author_id == test_users[0].id
).delete()
db_session.commit()
# Создаем запись с множеством ролей
many_roles = ",".join([f"role_{i}" for i in range(50)]) # Уменьшим количество
ca = CommunityAuthor(community_id=test_community.id, author_id=test_users[0].id, roles=many_roles)
db_session.add(ca)
db_session.commit()
# Операции должны работать быстро даже с множеством ролей
role_list = ca.role_list
assert len(role_list) == 50
assert all(role.startswith("role_") for role in role_list)
async def test_permissions_caching_behavior(self, db_session, test_users, test_community):
"""Тест поведения кеширования разрешений"""
# Очищаем существующие записи
db_session.query(CommunityAuthor).filter(
CommunityAuthor.community_id == test_community.id, CommunityAuthor.author_id == test_users[1].id
).delete()
db_session.commit()
ca = CommunityAuthor(community_id=test_community.id, author_id=test_users[1].id, roles="reader,author,expert")
db_session.add(ca)
db_session.commit()
# Многократный вызов get_permissions должен работать стабильно
perms1 = await ca.get_permissions()
perms2 = await ca.get_permissions()
perms3 = await ca.get_permissions()
assert perms1.sort() == perms2.sort() == perms3.sort()
assert len(perms1) > 0