Files
core/rbac/api.py
Untone 634cec657c
All checks were successful
Deploy on push / deploy (push) Successful in 8m16s
notifications-stats-todo
2025-09-16 12:52:14 +03:00

484 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
RBAC: динамическая система прав для ролей и сообществ.
- Каталог всех сущностей и действий хранится в permissions_catalog.json
- Дефолтные права ролей — в default_role_permissions.json
- Кастомные права ролей для каждого сообщества — в Redis (ключ community:roles:{community_id})
- При создании сообщества автоматически копируются дефолтные права
- Декораторы получают роли пользователя из CommunityAuthor для конкретного сообщества
"""
import asyncio
from functools import wraps
from typing import Any, Callable
from orm.author import Author
from rbac.interface import get_rbac_operations
from settings import ADMIN_EMAILS
from storage.db import local_session
from utils.logger import root_logger as logger
async def initialize_community_permissions(community_id: int) -> None:
"""
Инициализирует права для нового сообщества на основе дефолтных настроек с учетом иерархии.
Args:
community_id: ID сообщества
"""
rbac_ops = get_rbac_operations()
await rbac_ops.initialize_community_permissions(community_id)
async def get_permissions_for_role(role: str, community_id: int) -> list[str]:
"""
Получает список разрешений для конкретной роли в сообществе.
Иерархия уже применена при инициализации сообщества.
Args:
role: Название роли
community_id: ID сообщества
Returns:
Список разрешений для роли
"""
rbac_ops = get_rbac_operations()
return await rbac_ops.get_permissions_for_role(role, community_id)
async def get_role_permissions_for_community(community_id: int) -> dict:
"""
Получает все разрешения для всех ролей в сообществе.
Args:
community_id: ID сообщества
Returns:
Словарь {роль: [разрешения]} для всех ролей
"""
rbac_ops = get_rbac_operations()
return await rbac_ops.get_all_permissions_for_community(community_id)
async def update_all_communities_permissions() -> None:
"""
Обновляет права для всех существующих сообществ на основе актуальных дефолтных настроек.
Используется в админ-панели для применения изменений в правах на все сообщества.
"""
rbac_ops = get_rbac_operations()
# Поздний импорт для избежания циклических зависимостей
from orm.community import Community
try:
with local_session() as session:
# Получаем все сообщества
communities = session.query(Community).all()
for community in communities:
# Сбрасываем кеш прав для каждого сообщества
from storage.redis import redis
key = f"community:roles:{community.id}"
await redis.execute("DEL", key)
# Переинициализируем права с актуальными дефолтными настройками
await rbac_ops.initialize_community_permissions(community.id)
logger.info(f"Обновлены права для {len(communities)} сообществ")
except Exception as e:
logger.error(f"Ошибка при обновлении прав всех сообществ: {e}", exc_info=True)
raise
# --- Получение ролей пользователя ---
def get_user_roles_in_community(author_id: int, community_id: int = 1, session: Any = None) -> list[str]:
"""
Получает роли пользователя в сообществе через новую систему CommunityAuthor
"""
rbac_ops = get_rbac_operations()
return rbac_ops.get_user_roles_in_community(author_id, community_id, session)
def assign_role_to_user(author_id: int, role: str, community_id: int = 1, session: Any = None) -> bool:
"""
Назначает роль пользователю в сообществе
Args:
author_id: ID автора
role: Название роли
community_id: ID сообщества
session: Сессия БД (опционально)
Returns:
True если роль была добавлена, False если уже была
"""
rbac_ops = get_rbac_operations()
return rbac_ops.assign_role_to_user(author_id, role, community_id, session)
def remove_role_from_user(author_id: int, role: str, community_id: int = 1, session: Any = None) -> bool:
"""
Удаляет роль у пользователя в сообществе
Args:
author_id: ID автора
role: Название роли
community_id: ID сообщества
session: Сессия БД (опционально)
Returns:
True если роль была удалена, False если её не было
"""
rbac_ops = get_rbac_operations()
return rbac_ops.remove_role_from_user(author_id, role, community_id, session)
async def check_user_permission_in_community(
author_id: int, permission: str, community_id: int = 1, session: Any = None
) -> bool:
"""
Проверяет разрешение пользователя в сообществе
Args:
author_id: ID автора
permission: Разрешение для проверки
community_id: ID сообщества
session: Сессия БД (опционально)
Returns:
True если разрешение есть, False если нет
"""
rbac_ops = get_rbac_operations()
return await rbac_ops.user_has_permission(author_id, permission, community_id, session)
async def user_has_permission(author_id: int, permission: str, community_id: int, session: Any = None) -> bool:
"""
Проверяет, есть ли у пользователя конкретное разрешение в сообществе.
Args:
author_id: ID автора
permission: Разрешение для проверки
community_id: ID сообщества
session: Опциональная сессия БД (для тестов)
Returns:
True если разрешение есть, False если нет
"""
rbac_ops = get_rbac_operations()
return await rbac_ops.user_has_permission(author_id, permission, community_id, session)
# --- Проверка прав ---
async def roles_have_permission(role_slugs: list[str], permission: str, community_id: int) -> bool:
"""
Проверяет, есть ли у набора ролей конкретное разрешение в сообществе.
Args:
role_slugs: Список ролей для проверки
permission: Разрешение для проверки
community_id: ID сообщества
Returns:
True если хотя бы одна роль имеет разрешение
"""
rbac_ops = get_rbac_operations()
return await rbac_ops.roles_have_permission(role_slugs, permission, community_id)
# --- Декораторы ---
class RBACError(Exception):
"""Исключение для ошибок RBAC."""
def get_user_roles_from_context(info) -> tuple[list[str], int]:
"""
Получение ролей пользователя из GraphQL контекста с учетом сообщества.
Returns:
Кортеж (роли_пользователя, community_id)
"""
# Получаем ID автора из контекста
if isinstance(info.context, dict):
author_data = info.context.get("author", {})
else:
author_data = getattr(info.context, "author", {})
author_id = author_data.get("id") if isinstance(author_data, dict) else None
logger.debug(f"[get_user_roles_from_context] author_data: {author_data}, author_id: {author_id}")
# Если author_id не найден в context.author, пробуем получить из scope.auth
if not author_id and hasattr(info.context, "request"):
request = info.context.request
logger.debug(f"[get_user_roles_from_context] Проверяем request.scope: {hasattr(request, 'scope')}")
if hasattr(request, "scope") and "auth" in request.scope:
auth_credentials = request.scope["auth"]
logger.debug(f"[get_user_roles_from_context] Найден auth в scope: {type(auth_credentials)}")
if hasattr(auth_credentials, "author_id") and auth_credentials.author_id:
author_id = auth_credentials.author_id
logger.debug(f"[get_user_roles_from_context] Получен author_id из scope.auth: {author_id}")
elif isinstance(auth_credentials, dict) and "author_id" in auth_credentials:
author_id = auth_credentials["author_id"]
logger.debug(f"[get_user_roles_from_context] Получен author_id из scope.auth (dict): {author_id}")
else:
logger.debug("[get_user_roles_from_context] scope.auth не найден или пуст")
if hasattr(request, "scope"):
logger.debug(f"[get_user_roles_from_context] Ключи в scope: {list(request.scope.keys())}")
if not author_id:
logger.debug("[get_user_roles_from_context] author_id не найден ни в context.author, ни в scope.auth")
return [], 0
# Получаем community_id из аргументов мутации
community_id = get_community_id_from_context(info)
logger.debug(f"[get_user_roles_from_context] Получен community_id: {community_id}")
# Получаем роли пользователя в сообществе
try:
user_roles = get_user_roles_in_community(author_id, community_id)
logger.debug(
f"[get_user_roles_from_context] Роли пользователя {author_id} в сообществе {community_id}: {user_roles}"
)
# Проверяем, является ли пользователь системным администратором
try:
admin_emails = ADMIN_EMAILS.split(",") if ADMIN_EMAILS else []
with local_session() as session:
author = session.query(Author).where(Author.id == author_id).first()
if author and author.email and author.email in admin_emails and "admin" not in user_roles:
# Системный администратор автоматически получает роль admin в любом сообществе
user_roles = [*user_roles, "admin"]
logger.debug(
f"[get_user_roles_from_context] Добавлена роль admin для системного администратора {author.email}"
)
except Exception as e:
logger.error(f"[get_user_roles_from_context] Ошибка при проверке системного администратора: {e}")
return user_roles, community_id
except Exception as e:
logger.error(f"[get_user_roles_from_context] Ошибка при получении ролей: {e}")
return [], community_id
def get_community_id_from_context(info) -> int:
"""
Получение community_id из GraphQL контекста или аргументов.
"""
# Пробуем из контекста
if isinstance(info.context, dict):
community_id = info.context.get("community_id")
else:
community_id = getattr(info.context, "community_id", None)
if community_id:
return int(community_id)
# Пробуем из аргументов resolver'а
logger.debug(
f"[get_community_id_from_context] Проверяем info.variable_values: {getattr(info, 'variable_values', None)}"
)
# Пробуем получить переменные из разных источников
variables = {}
# Способ 1: info.variable_values
if hasattr(info, "variable_values") and info.variable_values:
variables.update(info.variable_values)
logger.debug(f"[get_community_id_from_context] Добавлены переменные из variable_values: {info.variable_values}")
# Способ 2: info.variable_values (альтернативный способ)
if hasattr(info, "variable_values"):
logger.debug(f"[get_community_id_from_context] variable_values тип: {type(info.variable_values)}")
logger.debug(f"[get_community_id_from_context] variable_values содержимое: {info.variable_values}")
# Способ 3: из kwargs (аргументы функции)
if hasattr(info, "context") and hasattr(info.context, "kwargs"):
variables.update(info.context.kwargs)
logger.debug(f"[get_community_id_from_context] Добавлены переменные из context.kwargs: {info.context.kwargs}")
logger.debug(f"[get_community_id_from_context] Итоговые переменные: {variables}")
if "community_id" in variables:
return int(variables["community_id"])
if "communityId" in variables:
return int(variables["communityId"])
# Для мутации delete_community получаем slug и находим community_id
if "slug" in variables:
slug = variables["slug"]
try:
from orm.community import Community # Поздний импорт
with local_session() as session:
community = session.query(Community).filter_by(slug=slug).first()
if community:
logger.debug(f"[get_community_id_from_context] Найден community_id {community.id} для slug {slug}")
return community.id
logger.warning(f"[get_community_id_from_context] Сообщество с slug {slug} не найдено")
except Exception as e:
logger.exception(f"[get_community_id_from_context] Ошибка при поиске community_id: {e}")
# Пробуем из прямых аргументов
if hasattr(info, "field_asts") and info.field_asts:
for field_ast in info.field_asts:
if hasattr(field_ast, "arguments"):
for arg in field_ast.arguments:
if arg.name.value in ["community_id", "communityId"]:
return int(arg.value.value)
# Fallback: основное сообщество
logger.debug("[get_community_id_from_context] Используем дефолтный community_id: 1")
return 1
def require_permission(permission: str) -> Callable:
"""
Декоратор для проверки конкретного разрешения у пользователя в сообществе.
Args:
permission: Требуемое разрешение (например, "shout:create")
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
info = args[1] if len(args) > 1 else None
if not info or not hasattr(info, "context"):
raise RBACError("GraphQL info context не найден")
logger.debug(f"[require_permission] Проверяем права: {permission}")
logger.debug(f"[require_permission] args: {args}")
logger.debug(f"[require_permission] kwargs: {kwargs}")
user_roles, community_id = get_user_roles_from_context(info)
logger.debug(f"[require_permission] user_roles: {user_roles}, community_id: {community_id}")
has_permission = await roles_have_permission(user_roles, permission, community_id)
logger.debug(f"[require_permission] has_permission: {has_permission}")
if not has_permission:
raise RBACError("Недостаточно прав. Требуется: ", permission)
return await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
return wrapper
return decorator
def require_role(role: str) -> Callable:
"""
Декоратор для проверки конкретной роли у пользователя в сообществе.
Args:
role: Требуемая роль (например, "admin", "editor")
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
info = args[1] if len(args) > 1 else None
if not info or not hasattr(info, "context"):
raise RBACError("GraphQL info context не найден")
user_roles, _community_id = get_user_roles_from_context(info)
if role not in user_roles:
raise RBACError("Требуется роль в сообществе", role)
return await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
return wrapper
return decorator
def require_any_permission(permissions: list[str]) -> Callable:
"""
Декоратор для проверки любого из списка разрешений.
Args:
permissions: Список разрешений, любое из которых подходит
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
info = args[1] if len(args) > 1 else None
if not info or not hasattr(info, "context"):
raise RBACError("GraphQL info context не найден")
user_roles, community_id = get_user_roles_from_context(info)
# Проверяем каждое разрешение отдельно
has_any = False
for perm in permissions:
if await roles_have_permission(user_roles, perm, community_id):
has_any = True
break
if not has_any:
raise RBACError("Недостаточно прав. Требуется любое из: ", permissions)
return await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
return wrapper
return decorator
def require_all_permissions(permissions: list[str]) -> Callable:
"""
Декоратор для проверки всех разрешений из списка.
Args:
permissions: Список разрешений, все из которых требуются
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
info = args[1] if len(args) > 1 else None
if not info or not hasattr(info, "context"):
raise RBACError("GraphQL info context не найден")
user_roles, community_id = get_user_roles_from_context(info)
# Проверяем каждое разрешение отдельно
missing_perms = []
for perm in permissions:
if not await roles_have_permission(user_roles, perm, community_id):
missing_perms.append(perm)
if missing_perms:
raise RBACError("Недостаточно прав. Отсутствуют: ", missing_perms)
return await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
return wrapper
return decorator
def admin_only(func: Callable) -> Callable:
"""
Декоратор для ограничения доступа только администраторам сообщества.
"""
@wraps(func)
async def wrapper(*args, **kwargs):
info = args[1] if len(args) > 1 else None
if not info or not hasattr(info, "context"):
raise RBACError("GraphQL info context не найден")
user_roles, community_id = get_user_roles_from_context(info)
if "admin" not in user_roles:
raise RBACError("Доступ только для администраторов сообщества", community_id)
return await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
return wrapper