""" Реализация RBAC операций для использования через интерфейс. Этот модуль предоставляет конкретную реализацию RBAC операций, не импортирует ORM модели напрямую, используя dependency injection. """ import json from pathlib import Path from typing import Any from rbac.interface import CommunityAuthorQueries, RBACOperations, get_community_queries from storage.db import local_session from storage.redis import redis from utils.logger import root_logger as logger # --- Загрузка каталога сущностей и дефолтных прав --- with Path("rbac/permissions_catalog.json").open() as f: PERMISSIONS_CATALOG = json.load(f) with Path("rbac/default_role_permissions.json").open() as f: DEFAULT_ROLE_PERMISSIONS = json.load(f) role_names = list(DEFAULT_ROLE_PERMISSIONS.keys()) class RBACOperationsImpl(RBACOperations): """Конкретная реализация RBAC операций""" async def get_permissions_for_role(self, role: str, community_id: int) -> list[str]: """ Получает список разрешений для конкретной роли в сообществе. Иерархия уже применена при инициализации сообщества. Args: role: Название роли community_id: ID сообщества Returns: Список разрешений для роли """ role_perms = await self.get_role_permissions_for_community(community_id, role) return role_perms.get(role, []) async def initialize_community_permissions(self, community_id: int) -> None: """ Инициализирует права для нового сообщества на основе дефолтных настроек с учетом иерархии. Args: community_id: ID сообщества """ key = f"community:roles:{community_id}" # Проверяем, не инициализировано ли уже existing = await redis.execute("GET", key) if existing: logger.debug(f"Права для сообщества {community_id} уже инициализированы") return # Создаем полные списки разрешений с учетом иерархии expanded_permissions = {} def get_role_permissions(role: str, processed_roles: set[str] | None = None) -> set[str]: """ Рекурсивно получает все разрешения для роли, включая наследованные Args: role: Название роли processed_roles: Список уже обработанных ролей для предотвращения зацикливания Returns: Множество разрешений """ if processed_roles is None: processed_roles = set() if role in processed_roles: return set() processed_roles.add(role) # Получаем прямые разрешения роли direct_permissions = set(DEFAULT_ROLE_PERMISSIONS.get(role, [])) # Проверяем, есть ли наследование роли for perm in list(direct_permissions): if perm in role_names: # Если пермишен - это название роли, добавляем все её разрешения direct_permissions.remove(perm) direct_permissions.update(get_role_permissions(perm, processed_roles)) return direct_permissions # Формируем расширенные разрешения для каждой роли for role in role_names: expanded_permissions[role] = list(get_role_permissions(role)) # Сохраняем в Redis уже развернутые списки с учетом иерархии await redis.execute("SET", key, json.dumps(expanded_permissions)) logger.info(f"Инициализированы права с иерархией для сообщества {community_id}") async def user_has_permission( self, author_id: int, permission: str, community_id: int, session: Any = None ) -> bool: """ Проверяет, есть ли у пользователя конкретное разрешение в сообществе. Args: author_id: ID автора permission: Разрешение для проверки community_id: ID сообщества session: Опциональная сессия БД (для тестов) Returns: True если разрешение есть, False если нет """ community_queries = get_community_queries() user_roles = community_queries.get_user_roles_in_community(author_id, community_id, session) return await self.roles_have_permission(user_roles, permission, community_id) async def get_role_permissions_for_community(self, community_id: int, role: str) -> dict: """ Получает права для конкретной роли в сообществе, включая все наследованные разрешения. Если права не настроены, автоматически инициализирует их дефолтными. Args: community_id: ID сообщества role: Название роли для получения разрешений Returns: Словарь {роль: [разрешения]} для указанной роли с учетом наследования """ key = f"community:roles:{community_id}" data = await redis.execute("GET", key) if data: role_permissions = json.loads(data) if role in role_permissions: return {role: role_permissions[role]} # Если роль не найдена в кеше, используем рекурсивный расчет # Автоматически инициализируем, если не найдено await self.initialize_community_permissions(community_id) # Получаем инициализированные разрешения data = await redis.execute("GET", key) if data: role_permissions = json.loads(data) if role in role_permissions: return {role: role_permissions[role]} # Fallback: рекурсивно вычисляем разрешения для роли return {role: list(self._get_role_permissions_recursive(role))} async def get_all_permissions_for_community(self, community_id: int) -> dict: """ Получает все права ролей для конкретного сообщества. Если права не настроены, автоматически инициализирует их дефолтными. Args: community_id: ID сообщества Returns: Словарь {роль: [разрешения]} для всех ролей в сообществе """ key = f"community:roles:{community_id}" data = await redis.execute("GET", key) if data: return json.loads(data) # Автоматически инициализируем, если не найдено await self.initialize_community_permissions(community_id) # Получаем инициализированные разрешения data = await redis.execute("GET", key) if data: return json.loads(data) # Fallback на дефолтные разрешения если что-то пошло не так return DEFAULT_ROLE_PERMISSIONS def _get_role_permissions_recursive(self, role: str, processed_roles: set[str] | None = None) -> set[str]: """ Рекурсивно получает все разрешения для роли, включая наследованные. Вспомогательный метод для вычисления разрешений без обращения к Redis. Args: role: Название роли processed_roles: Множество уже обработанных ролей для предотвращения зацикливания Returns: Множество всех разрешений роли (прямых и наследованных) """ if processed_roles is None: processed_roles = set() if role in processed_roles: return set() processed_roles.add(role) # Получаем прямые разрешения роли direct_permissions = set(DEFAULT_ROLE_PERMISSIONS.get(role, [])) # Проверяем, есть ли наследование роли inherited_permissions = set() for perm in list(direct_permissions): if perm in role_names: # Если пермишен - это название роли, добавляем все её разрешения direct_permissions.remove(perm) inherited_permissions.update(self._get_role_permissions_recursive(perm, processed_roles)) # Объединяем прямые и наследованные разрешения return direct_permissions | inherited_permissions async def roles_have_permission(self, role_slugs: list[str], permission: str, community_id: int) -> bool: """ Проверяет, есть ли у набора ролей конкретное разрешение в сообществе. Args: role_slugs: Список ролей для проверки permission: Разрешение для проверки community_id: ID сообщества Returns: True если хотя бы одна роль имеет разрешение """ # Получаем разрешения для каждой роли с учетом наследования for role in role_slugs: role_perms = await self.get_role_permissions_for_community(community_id, role) if permission in role_perms.get(role, []): return True return False def assign_role_to_user(self, author_id: int, role: str, community_id: int, session: Any = None) -> bool: """ Назначает роль пользователю в сообществе Args: author_id: ID автора role: Название роли community_id: ID сообщества session: Сессия БД (опционально) Returns: True если роль была добавлена, False если уже была """ try: # Поздний импорт для избежания циклических зависимостей from orm.community import CommunityAuthor if session: ca = CommunityAuthor.find_author_in_community(author_id, community_id, session) if ca: if ca.has_role(role): return False # Роль уже есть ca.add_role(role) else: # Создаем новую запись ca = CommunityAuthor(community_id=community_id, author_id=author_id, roles=role) session.add(ca) session.commit() return True # Используем local_session для продакшена with local_session() as db_session: ca = CommunityAuthor.find_author_in_community(author_id, community_id, db_session) if ca: if ca.has_role(role): return False # Роль уже есть ca.add_role(role) else: # Создаем новую запись ca = CommunityAuthor(community_id=community_id, author_id=author_id, roles=role) db_session.add(ca) db_session.commit() return True except Exception as e: logger.error(f"[assign_role_to_user] Ошибка при назначении роли {role} пользователю {author_id}: {e}") return False def get_user_roles_in_community(self, author_id: int, community_id: int, session: Any = None) -> list[str]: """ Получает роли пользователя в сообществе Args: author_id: ID автора community_id: ID сообщества session: Сессия БД (опционально) Returns: Список ролей пользователя """ try: # Поздний импорт для избежания циклических зависимостей from orm.community import CommunityAuthor if session: ca = CommunityAuthor.find_author_in_community(author_id, community_id, session) return ca.role_list if ca else [] # Используем local_session для продакшена with local_session() as db_session: ca = CommunityAuthor.find_author_in_community(author_id, community_id, db_session) return ca.role_list if ca else [] except Exception as e: logger.error(f"[get_user_roles_in_community] Ошибка при получении ролей: {e}") return [] def remove_role_from_user(self, author_id: int, role: str, community_id: int, session: Any = None) -> bool: """ Удаляет роль у пользователя в сообществе Args: author_id: ID автора role: Название роли community_id: ID сообщества session: Сессия БД (опционально) Returns: True если роль была удалена, False если её не было """ try: # Поздний импорт для избежания циклических зависимостей from orm.community import CommunityAuthor if session: ca = CommunityAuthor.find_author_in_community(author_id, community_id, session) if ca and ca.has_role(role): ca.remove_role(role) # Если ролей не осталось, удаляем запись if not ca.role_list: session.delete(ca) session.commit() return True return False # Используем local_session для продакшена with local_session() as db_session: ca = CommunityAuthor.find_author_in_community(author_id, community_id, db_session) if ca and ca.has_role(role): ca.remove_role(role) # Если ролей не осталось, удаляем запись if not ca.role_list: db_session.delete(ca) db_session.commit() return True return False except Exception as e: logger.error(f"[remove_role_from_user] Ошибка при удалении роли {role} у пользователя {author_id}: {e}") return False class CommunityAuthorQueriesImpl(CommunityAuthorQueries): """Конкретная реализация запросов CommunityAuthor через поздний импорт""" def get_user_roles_in_community(self, author_id: int, community_id: int = 1, session: Any = None) -> list[str]: """ Получает роли пользователя в сообществе через новую систему CommunityAuthor """ # Поздний импорт для избежания циклических зависимостей from orm.community import CommunityAuthor try: if session: ca = ( session.query(CommunityAuthor) .where(CommunityAuthor.author_id == author_id, CommunityAuthor.community_id == community_id) .first() ) return ca.role_list if ca else [] # Используем local_session для продакшена with local_session() as db_session: ca = ( db_session.query(CommunityAuthor) .where(CommunityAuthor.author_id == author_id, CommunityAuthor.community_id == community_id) .first() ) return ca.role_list if ca else [] except Exception as e: logger.error(f"[get_user_roles_in_community] Ошибка при получении ролей: {e}") return [] # Создаем экземпляры реализаций rbac_operations = RBACOperationsImpl() community_queries = CommunityAuthorQueriesImpl()