import time from typing import Dict, Set from sqlalchemy import JSON, Boolean, Column, ForeignKey, Index, Integer, String from sqlalchemy.orm import relationship from auth.identity import Password from services.db import Base from settings import ADMIN_EMAILS # from sqlalchemy_utils import TSVectorType # Общие table_args для всех моделей DEFAULT_TABLE_ARGS = {"extend_existing": True} """ Модель закладок автора """ class AuthorBookmark(Base): """ Закладка автора на публикацию. Attributes: author (int): ID автора shout (int): ID публикации """ __tablename__ = "author_bookmark" __table_args__ = ( Index("idx_author_bookmark_author", "author"), Index("idx_author_bookmark_shout", "shout"), {"extend_existing": True}, ) id = None # type: ignore author = Column(ForeignKey("author.id"), primary_key=True) shout = Column(ForeignKey("shout.id"), primary_key=True) class AuthorRating(Base): """ Рейтинг автора от другого автора. Attributes: rater (int): ID оценивающего автора author (int): ID оцениваемого автора plus (bool): Положительная/отрицательная оценка """ __tablename__ = "author_rating" __table_args__ = ( Index("idx_author_rating_author", "author"), Index("idx_author_rating_rater", "rater"), {"extend_existing": True}, ) id = None # type: ignore rater = Column(ForeignKey("author.id"), primary_key=True) author = Column(ForeignKey("author.id"), primary_key=True) plus = Column(Boolean) class AuthorFollower(Base): """ Подписка одного автора на другого. Attributes: follower (int): ID подписчика author (int): ID автора, на которого подписываются created_at (int): Время создания подписки auto (bool): Признак автоматической подписки """ __tablename__ = "author_follower" __table_args__ = ( Index("idx_author_follower_author", "author"), Index("idx_author_follower_follower", "follower"), {"extend_existing": True}, ) id = None # type: ignore follower = Column(ForeignKey("author.id"), primary_key=True) author = Column(ForeignKey("author.id"), primary_key=True) created_at = Column(Integer, nullable=False, default=lambda: int(time.time())) auto = Column(Boolean, nullable=False, default=False) class RolePermission(Base): """Связь роли с разрешениями""" __tablename__ = "role_permission" __table_args__ = {"extend_existing": True} id = None role = Column(ForeignKey("role.id"), primary_key=True, index=True) permission = Column(ForeignKey("permission.id"), primary_key=True, index=True) class Permission(Base): """Модель разрешения в системе RBAC""" __tablename__ = "permission" __table_args__ = {"extend_existing": True} id = Column(String, primary_key=True, unique=True, nullable=False, default=None) resource = Column(String, nullable=False) operation = Column(String, nullable=False) class Role(Base): """Модель роли в системе RBAC""" __tablename__ = "role" __table_args__ = {"extend_existing": True} id = Column(String, primary_key=True, unique=True, nullable=False, default=None) name = Column(String, nullable=False) permissions = relationship(Permission, secondary="role_permission", lazy="joined") class AuthorRole(Base): """Связь автора с ролями""" __tablename__ = "author_role" __table_args__ = {"extend_existing": True} id = None community = Column(ForeignKey("community.id"), primary_key=True, index=True, default=1) author = Column(ForeignKey("author.id"), primary_key=True, index=True) role = Column(ForeignKey("role.id"), primary_key=True, index=True) class Author(Base): """ Расширенная модель автора с функциями аутентификации и авторизации """ __tablename__ = "author" __table_args__ = ( Index("idx_author_slug", "slug"), Index("idx_author_email", "email"), Index("idx_author_phone", "phone"), {"extend_existing": True}, ) # Базовые поля автора id = Column(Integer, primary_key=True) name = Column(String, nullable=True, comment="Display name") slug = Column(String, unique=True, comment="Author's slug") bio = Column(String, nullable=True, comment="Bio") # короткое описание about = Column(String, nullable=True, comment="About") # длинное форматированное описание pic = Column(String, nullable=True, comment="Picture") links = Column(JSON, nullable=True, comment="Links") # Дополнительные поля из User oauth = Column(String, nullable=True, comment="OAuth provider") oid = Column(String, nullable=True, comment="OAuth ID") muted = Column(Boolean, default=False, comment="Is author muted") # Поля аутентификации email = Column(String, unique=True, nullable=True, comment="Email") phone = Column(String, unique=True, nullable=True, comment="Phone") password = Column(String, nullable=True, comment="Password hash") is_active = Column(Boolean, default=True, nullable=False) email_verified = Column(Boolean, default=False) phone_verified = Column(Boolean, default=False) failed_login_attempts = Column(Integer, default=0) account_locked_until = Column(Integer, nullable=True) # Временные метки created_at = Column(Integer, nullable=False, default=lambda: int(time.time())) updated_at = Column(Integer, nullable=False, default=lambda: int(time.time())) last_seen = Column(Integer, nullable=False, default=lambda: int(time.time())) deleted_at = Column(Integer, nullable=True) # Связи с ролями roles = relationship(Role, secondary="author_role", lazy="joined") # search_vector = Column( # TSVectorType("name", "slug", "bio", "about", regconfig="pg_catalog.russian") # ) # Список защищенных полей, которые видны только владельцу и администраторам _protected_fields = ['email', 'password', 'provider_access_token', 'provider_refresh_token'] @property def is_authenticated(self) -> bool: """Проверяет, аутентифицирован ли пользователь""" return self.id is not None def get_permissions(self) -> Dict[str, Set[str]]: """Получает все разрешения пользователя""" permissions: Dict[str, Set[str]] = {} for role in self.roles: for permission in role.permissions: if permission.resource not in permissions: permissions[permission.resource] = set() permissions[permission.resource].add(permission.operation) return permissions def has_permission(self, resource: str, operation: str) -> bool: """Проверяет наличие разрешения у пользователя""" permissions = self.get_permissions() return resource in permissions and operation in permissions[resource] def verify_password(self, password: str) -> bool: """Проверяет пароль пользователя""" return Password.verify(password, self.password) if self.password else False def set_password(self, password: str): """Устанавливает пароль пользователя""" self.password = Password.encode(password) def increment_failed_login(self): """Увеличивает счетчик неудачных попыток входа""" self.failed_login_attempts += 1 if self.failed_login_attempts >= 5: self.account_locked_until = int(time.time()) + 300 # 5 минут def reset_failed_login(self): """Сбрасывает счетчик неудачных попыток входа""" self.failed_login_attempts = 0 self.account_locked_until = None def is_locked(self) -> bool: """Проверяет, заблокирован ли аккаунт""" if not self.account_locked_until: return False return self.account_locked_until > int(time.time()) @property def username(self) -> str: """ Возвращает имя пользователя для использования в токенах. Необходимо для совместимости с TokenStorage и JWTCodec. Returns: str: slug, email или phone пользователя """ return self.slug or self.email or self.phone or "" def dict(self, access=False) -> Dict: """ Сериализует объект Author в словарь с учетом прав доступа. Args: access (bool, optional): Флаг, указывающий, доступны ли защищенные поля Returns: dict: Словарь с атрибутами Author, отфильтрованный по правам доступа """ # Получаем все атрибуты объекта result = {c.name: getattr(self, c.name) for c in self.__table__.columns} # Добавляем роли как список идентификаторов и названий if hasattr(self, 'roles'): result['roles'] = [] for role in self.roles: if isinstance(role, dict): result['roles'].append(role.get('id')) # скрываем защищенные поля if not access: for field in self._protected_fields: if field in result: result[field] = None return result