2025-05-16 06:23:48 +00:00
|
|
|
|
"""
|
|
|
|
|
Модуль для проверки разрешений пользователей в контексте сообществ.
|
|
|
|
|
|
|
|
|
|
Позволяет проверять доступ пользователя к определенным операциям в сообществе
|
|
|
|
|
на основе его роли в этом сообществе.
|
|
|
|
|
"""
|
|
|
|
|
|
2025-06-01 23:56:11 +00:00
|
|
|
|
from typing import Union
|
2025-05-16 06:23:48 +00:00
|
|
|
|
|
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
|
2025-05-29 09:37:39 +00:00
|
|
|
|
from auth.orm import Author, Permission, Role, RolePermission
|
2025-05-16 06:23:48 +00:00
|
|
|
|
from orm.community import Community, CommunityFollower, CommunityRole
|
2025-05-29 09:37:39 +00:00
|
|
|
|
from settings import ADMIN_EMAILS as ADMIN_EMAILS_LIST
|
2025-05-16 06:23:48 +00:00
|
|
|
|
|
|
|
|
|
ADMIN_EMAILS = ADMIN_EMAILS_LIST.split(",")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ContextualPermissionCheck:
|
|
|
|
|
"""
|
|
|
|
|
Класс для проверки контекстно-зависимых разрешений.
|
|
|
|
|
|
|
|
|
|
Позволяет проверять разрешения пользователя в контексте сообщества,
|
|
|
|
|
учитывая как глобальные роли пользователя, так и его роли внутри сообщества.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# Маппинг из ролей сообщества в системные роли RBAC
|
|
|
|
|
COMMUNITY_ROLE_MAP = {
|
|
|
|
|
CommunityRole.READER: "community_reader",
|
|
|
|
|
CommunityRole.AUTHOR: "community_author",
|
|
|
|
|
CommunityRole.EXPERT: "community_expert",
|
|
|
|
|
CommunityRole.EDITOR: "community_editor",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Обратное отображение для отображения системных ролей в роли сообщества
|
|
|
|
|
RBAC_TO_COMMUNITY_ROLE = {v: k for k, v in COMMUNITY_ROLE_MAP.items()}
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def check_community_permission(
|
|
|
|
|
session: Session, author_id: int, community_slug: str, resource: str, operation: str
|
|
|
|
|
) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Проверяет наличие разрешения у пользователя в контексте сообщества.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
session: Сессия SQLAlchemy
|
|
|
|
|
author_id: ID автора/пользователя
|
|
|
|
|
community_slug: Slug сообщества
|
|
|
|
|
resource: Ресурс для доступа
|
|
|
|
|
operation: Операция над ресурсом
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: True, если пользователь имеет разрешение, иначе False
|
|
|
|
|
"""
|
|
|
|
|
# 1. Проверка глобальных разрешений (например, администратор)
|
|
|
|
|
author = session.query(Author).filter(Author.id == author_id).one_or_none()
|
|
|
|
|
if not author:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Если это администратор (по списку email) или у него есть глобальное разрешение
|
|
|
|
|
if author.has_permission(resource, operation) or author.email in ADMIN_EMAILS:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
# 2. Проверка разрешений в контексте сообщества
|
|
|
|
|
# Получаем информацию о сообществе
|
|
|
|
|
community = session.query(Community).filter(Community.slug == community_slug).one_or_none()
|
|
|
|
|
if not community:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Если автор является создателем сообщества, то у него есть полные права
|
|
|
|
|
if community.created_by == author_id:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
# Получаем роли пользователя в этом сообществе
|
|
|
|
|
community_follower = (
|
|
|
|
|
session.query(CommunityFollower)
|
|
|
|
|
.filter(CommunityFollower.author == author_id, CommunityFollower.community == community.id)
|
|
|
|
|
.one_or_none()
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not community_follower or not community_follower.roles:
|
|
|
|
|
# Пользователь не является членом сообщества или у него нет ролей
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Преобразуем роли сообщества в RBAC роли
|
|
|
|
|
rbac_roles = []
|
|
|
|
|
community_roles = community_follower.get_roles()
|
|
|
|
|
|
|
|
|
|
for role in community_roles:
|
|
|
|
|
if role in ContextualPermissionCheck.COMMUNITY_ROLE_MAP:
|
|
|
|
|
rbac_role_id = ContextualPermissionCheck.COMMUNITY_ROLE_MAP[role]
|
|
|
|
|
rbac_roles.append(rbac_role_id)
|
|
|
|
|
|
|
|
|
|
if not rbac_roles:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Проверяем наличие разрешения для этих ролей
|
|
|
|
|
permission_id = f"{resource}:{operation}"
|
|
|
|
|
|
|
|
|
|
# Запрос на проверку разрешений для указанных ролей
|
2025-06-01 23:56:11 +00:00
|
|
|
|
return (
|
2025-05-16 06:23:48 +00:00
|
|
|
|
session.query(RolePermission)
|
|
|
|
|
.join(Role, Role.id == RolePermission.role)
|
|
|
|
|
.join(Permission, Permission.id == RolePermission.permission)
|
|
|
|
|
.filter(Role.id.in_(rbac_roles), Permission.id == permission_id)
|
|
|
|
|
.first()
|
|
|
|
|
is not None
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
2025-06-01 23:56:11 +00:00
|
|
|
|
def get_user_community_roles(session: Session, author_id: int, community_slug: str) -> list[CommunityRole]:
|
2025-05-16 06:23:48 +00:00
|
|
|
|
"""
|
|
|
|
|
Получает список ролей пользователя в сообществе.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
session: Сессия SQLAlchemy
|
|
|
|
|
author_id: ID автора/пользователя
|
|
|
|
|
community_slug: Slug сообщества
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List[CommunityRole]: Список ролей пользователя в сообществе
|
|
|
|
|
"""
|
|
|
|
|
# Получаем информацию о сообществе
|
|
|
|
|
community = session.query(Community).filter(Community.slug == community_slug).one_or_none()
|
|
|
|
|
if not community:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
# Если автор является создателем сообщества, то у него есть роль владельца
|
|
|
|
|
if community.created_by == author_id:
|
|
|
|
|
return [CommunityRole.EDITOR] # Владелец имеет роль редактора по умолчанию
|
|
|
|
|
|
|
|
|
|
# Получаем роли пользователя в этом сообществе
|
|
|
|
|
community_follower = (
|
|
|
|
|
session.query(CommunityFollower)
|
|
|
|
|
.filter(CommunityFollower.author == author_id, CommunityFollower.community == community.id)
|
|
|
|
|
.one_or_none()
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not community_follower or not community_follower.roles:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
return community_follower.get_roles()
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def assign_role_to_user(
|
|
|
|
|
session: Session, author_id: int, community_slug: str, role: Union[CommunityRole, str]
|
|
|
|
|
) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Назначает роль пользователю в сообществе.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
session: Сессия SQLAlchemy
|
|
|
|
|
author_id: ID автора/пользователя
|
|
|
|
|
community_slug: Slug сообщества
|
|
|
|
|
role: Роль для назначения (CommunityRole или строковое представление)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: True если роль успешно назначена, иначе False
|
|
|
|
|
"""
|
|
|
|
|
# Преобразуем строковую роль в CommunityRole если нужно
|
|
|
|
|
if isinstance(role, str):
|
|
|
|
|
try:
|
|
|
|
|
role = CommunityRole(role)
|
|
|
|
|
except ValueError:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Получаем информацию о сообществе
|
|
|
|
|
community = session.query(Community).filter(Community.slug == community_slug).one_or_none()
|
|
|
|
|
if not community:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Проверяем существование связи автор-сообщество
|
|
|
|
|
community_follower = (
|
|
|
|
|
session.query(CommunityFollower)
|
|
|
|
|
.filter(CommunityFollower.author == author_id, CommunityFollower.community == community.id)
|
|
|
|
|
.one_or_none()
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not community_follower:
|
|
|
|
|
# Создаем новую запись CommunityFollower
|
2025-06-01 23:56:11 +00:00
|
|
|
|
community_follower = CommunityFollower(follower=author_id, community=community.id)
|
2025-05-16 06:23:48 +00:00
|
|
|
|
session.add(community_follower)
|
|
|
|
|
|
|
|
|
|
# Назначаем роль
|
|
|
|
|
current_roles = community_follower.get_roles() if community_follower.roles else []
|
|
|
|
|
if role not in current_roles:
|
|
|
|
|
current_roles.append(role)
|
|
|
|
|
community_follower.set_roles(current_roles)
|
|
|
|
|
session.commit()
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def revoke_role_from_user(
|
|
|
|
|
session: Session, author_id: int, community_slug: str, role: Union[CommunityRole, str]
|
|
|
|
|
) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Отзывает роль у пользователя в сообществе.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
session: Сессия SQLAlchemy
|
|
|
|
|
author_id: ID автора/пользователя
|
|
|
|
|
community_slug: Slug сообщества
|
|
|
|
|
role: Роль для отзыва (CommunityRole или строковое представление)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: True если роль успешно отозвана, иначе False
|
|
|
|
|
"""
|
|
|
|
|
# Преобразуем строковую роль в CommunityRole если нужно
|
|
|
|
|
if isinstance(role, str):
|
|
|
|
|
try:
|
|
|
|
|
role = CommunityRole(role)
|
|
|
|
|
except ValueError:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Получаем информацию о сообществе
|
|
|
|
|
community = session.query(Community).filter(Community.slug == community_slug).one_or_none()
|
|
|
|
|
if not community:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Проверяем существование связи автор-сообщество
|
|
|
|
|
community_follower = (
|
|
|
|
|
session.query(CommunityFollower)
|
|
|
|
|
.filter(CommunityFollower.author == author_id, CommunityFollower.community == community.id)
|
|
|
|
|
.one_or_none()
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not community_follower or not community_follower.roles:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Отзываем роль
|
|
|
|
|
current_roles = community_follower.get_roles()
|
|
|
|
|
if role in current_roles:
|
|
|
|
|
current_roles.remove(role)
|
|
|
|
|
community_follower.set_roles(current_roles)
|
|
|
|
|
session.commit()
|
|
|
|
|
|
|
|
|
|
return True
|