core/auth/orm.py

330 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
from typing import Dict, Set
from sqlalchemy import JSON, Boolean, Column, ForeignKey, Index, Integer, String
from sqlalchemy.orm import relationship
from auth.identity import Password
from services.db import BaseModel as Base
# Общие table_args для всех моделей
DEFAULT_TABLE_ARGS = {"extend_existing": True}
"""
Модель закладок автора
"""
class AuthorBookmark(Base):
"""
Закладка автора на публикацию.
Attributes:
author (int): ID автора
shout (int): ID публикации
"""
__tablename__ = "author_bookmark"
__table_args__ = (
Index("idx_author_bookmark_author", "author"),
Index("idx_author_bookmark_shout", "shout"),
{"extend_existing": True},
)
id = None # type: ignore
author = Column(ForeignKey("author.id"), primary_key=True)
shout = Column(ForeignKey("shout.id"), primary_key=True)
class AuthorRating(Base):
"""
Рейтинг автора от другого автора.
Attributes:
rater (int): ID оценивающего автора
author (int): ID оцениваемого автора
plus (bool): Положительная/отрицательная оценка
"""
__tablename__ = "author_rating"
__table_args__ = (
Index("idx_author_rating_author", "author"),
Index("idx_author_rating_rater", "rater"),
{"extend_existing": True},
)
id = None # type: ignore
rater = Column(ForeignKey("author.id"), primary_key=True)
author = Column(ForeignKey("author.id"), primary_key=True)
plus = Column(Boolean)
class AuthorFollower(Base):
"""
Подписка одного автора на другого.
Attributes:
follower (int): ID подписчика
author (int): ID автора, на которого подписываются
created_at (int): Время создания подписки
auto (bool): Признак автоматической подписки
"""
__tablename__ = "author_follower"
__table_args__ = (
Index("idx_author_follower_author", "author"),
Index("idx_author_follower_follower", "follower"),
{"extend_existing": True},
)
id = None # type: ignore
follower = Column(ForeignKey("author.id"), primary_key=True)
author = Column(ForeignKey("author.id"), primary_key=True)
created_at = Column(Integer, nullable=False, default=lambda: int(time.time()))
auto = Column(Boolean, nullable=False, default=False)
class RolePermission(Base):
"""Связь роли с разрешениями"""
__tablename__ = "role_permission"
__table_args__ = {"extend_existing": True}
id = None # type: ignore
role = Column(ForeignKey("role.id"), primary_key=True, index=True)
permission = Column(ForeignKey("permission.id"), primary_key=True, index=True)
class Permission(Base):
"""Модель разрешения в системе RBAC"""
__tablename__ = "permission"
__table_args__ = {"extend_existing": True}
id = Column(String, primary_key=True, unique=True, nullable=False, default=None)
resource = Column(String, nullable=False)
operation = Column(String, nullable=False)
class Role(Base):
"""Модель роли в системе RBAC"""
__tablename__ = "role"
__table_args__ = {"extend_existing": True}
id = Column(String, primary_key=True, unique=True, nullable=False, default=None)
name = Column(String, nullable=False)
permissions = relationship(Permission, secondary="role_permission", lazy="joined")
class AuthorRole(Base):
"""Связь автора с ролями"""
__tablename__ = "author_role"
__table_args__ = {"extend_existing": True}
id = None # type: ignore
community = Column(ForeignKey("community.id"), primary_key=True, index=True, default=1)
author = Column(ForeignKey("author.id"), primary_key=True, index=True)
role = Column(ForeignKey("role.id"), primary_key=True, index=True)
class Author(Base):
"""
Расширенная модель автора с функциями аутентификации и авторизации
"""
__tablename__ = "author"
__table_args__ = (
Index("idx_author_slug", "slug"),
Index("idx_author_email", "email"),
Index("idx_author_phone", "phone"),
{"extend_existing": True},
)
# Базовые поля автора
id = Column(Integer, primary_key=True)
name = Column(String, nullable=True, comment="Display name")
slug = Column(String, unique=True, comment="Author's slug")
bio = Column(String, nullable=True, comment="Bio") # короткое описание
about = Column(String, nullable=True, comment="About") # длинное форматированное описание
pic = Column(String, nullable=True, comment="Picture")
links = Column(JSON, nullable=True, comment="Links")
# OAuth аккаунты - JSON с данными всех провайдеров
# Формат: {"google": {"id": "123", "email": "user@gmail.com"}, "github": {"id": "456"}}
oauth = Column(JSON, nullable=True, default=dict, comment="OAuth accounts data")
# Поля аутентификации
email = Column(String, unique=True, nullable=True, comment="Email")
phone = Column(String, unique=True, nullable=True, comment="Phone")
password = Column(String, nullable=True, comment="Password hash")
email_verified = Column(Boolean, default=False)
phone_verified = Column(Boolean, default=False)
failed_login_attempts = Column(Integer, default=0)
account_locked_until = Column(Integer, nullable=True)
# Временные метки
created_at = Column(Integer, nullable=False, default=lambda: int(time.time()))
updated_at = Column(Integer, nullable=False, default=lambda: int(time.time()))
last_seen = Column(Integer, nullable=False, default=lambda: int(time.time()))
deleted_at = Column(Integer, nullable=True)
# Связи с ролями
roles = relationship(Role, secondary="author_role", lazy="joined")
# search_vector = Column(
# TSVectorType("name", "slug", "bio", "about", regconfig="pg_catalog.russian")
# )
# Список защищенных полей, которые видны только владельцу и администраторам
_protected_fields = ["email", "password", "provider_access_token", "provider_refresh_token"]
@property
def is_authenticated(self) -> bool:
"""Проверяет, аутентифицирован ли пользователь"""
return self.id is not None
def get_permissions(self) -> Dict[str, Set[str]]:
"""Получает все разрешения пользователя"""
permissions: Dict[str, Set[str]] = {}
for role in self.roles:
for permission in role.permissions:
if permission.resource not in permissions:
permissions[permission.resource] = set()
permissions[permission.resource].add(permission.operation)
return permissions
def has_permission(self, resource: str, operation: str) -> bool:
"""Проверяет наличие разрешения у пользователя"""
permissions = self.get_permissions()
return resource in permissions and operation in permissions[resource]
def verify_password(self, password: str) -> bool:
"""Проверяет пароль пользователя"""
return Password.verify(password, str(self.password)) if self.password else False
def set_password(self, password: str):
"""Устанавливает пароль пользователя"""
self.password = Password.encode(password) # type: ignore[assignment]
def increment_failed_login(self):
"""Увеличивает счетчик неудачных попыток входа"""
self.failed_login_attempts += 1 # type: ignore[assignment]
if self.failed_login_attempts >= 5:
self.account_locked_until = int(time.time()) + 300 # type: ignore[assignment] # 5 минут
def reset_failed_login(self):
"""Сбрасывает счетчик неудачных попыток входа"""
self.failed_login_attempts = 0 # type: ignore[assignment]
self.account_locked_until = None # type: ignore[assignment]
def is_locked(self) -> bool:
"""Проверяет, заблокирован ли аккаунт"""
if not self.account_locked_until:
return False
return bool(self.account_locked_until > int(time.time()))
@property
def username(self) -> str:
"""
Возвращает имя пользователя для использования в токенах.
Необходимо для совместимости с TokenStorage и JWTCodec.
Returns:
str: slug, email или phone пользователя
"""
return str(self.slug or self.email or self.phone or "")
def dict(self, access: bool = False) -> Dict:
"""
Сериализует объект Author в словарь с учетом прав доступа.
Args:
access (bool, optional): Флаг, указывающий, доступны ли защищенные поля
Returns:
dict: Словарь с атрибутами Author, отфильтрованный по правам доступа
"""
# Получаем все атрибуты объекта
result = {c.name: getattr(self, c.name) for c in self.__table__.columns}
# Добавляем роли как список идентификаторов и названий
if hasattr(self, "roles"):
result["roles"] = []
for role in self.roles:
if isinstance(role, dict):
result["roles"].append(role.get("id"))
# скрываем защищенные поля
if not access:
for field in self._protected_fields:
if field in result:
result[field] = None
return result
@classmethod
def find_by_oauth(cls, provider: str, provider_id: str, session):
"""
Находит автора по OAuth провайдеру и ID
Args:
provider (str): Имя OAuth провайдера (google, github и т.д.)
provider_id (str): ID пользователя у провайдера
session: Сессия базы данных
Returns:
Author или None: Найденный автор или None если не найден
"""
# Ищем авторов, у которых есть данный провайдер с данным ID
authors = session.query(cls).filter(cls.oauth.isnot(None)).all()
for author in authors:
if author.oauth and provider in author.oauth:
if author.oauth[provider].get("id") == provider_id:
return author
return None
def set_oauth_account(self, provider: str, provider_id: str, email: str = None):
"""
Устанавливает OAuth аккаунт для автора
Args:
provider (str): Имя OAuth провайдера (google, github и т.д.)
provider_id (str): ID пользователя у провайдера
email (str, optional): Email от провайдера
"""
if not self.oauth:
self.oauth = {} # type: ignore[assignment]
oauth_data = {"id": provider_id}
if email:
oauth_data["email"] = email
self.oauth[provider] = oauth_data # type: ignore[index]
def get_oauth_account(self, provider: str):
"""
Получает OAuth аккаунт провайдера
Args:
provider (str): Имя OAuth провайдера
Returns:
dict или None: Данные OAuth аккаунта или None если не найден
"""
if not self.oauth:
return None
return self.oauth.get(provider)
def remove_oauth_account(self, provider: str):
"""
Удаляет OAuth аккаунт провайдера
Args:
provider (str): Имя OAuth провайдера
"""
if self.oauth and provider in self.oauth:
del self.oauth[provider]