Files
core/orm/author.py
Untone 1b48675b92
Some checks failed
Deploy on push / deploy (push) Failing after 2m22s
[0.9.7] - 2025-08-18
### 🔄 Изменения
- **SQLAlchemy KeyError** - исправление ошибки `KeyError: Reaction` при инициализации
- **Исправлена ошибка SQLAlchemy**: Устранена проблема `InvalidRequestError: When initializing mapper Mapper[Shout(shout)], expression Reaction failed to locate a name (Reaction)`

### 🧪 Тестирование
- **Исправление тестов** - адаптация к новой структуре моделей
- **RBAC инициализация** - добавление `rbac.initialize_rbac()` в `conftest.py`
- **Создан тест для getSession**: Добавлен комплексный тест `test_getSession_cookies.py` с проверкой всех сценариев
- **Покрытие edge cases**: Тесты проверяют работу с валидными/невалидными токенами, отсутствующими пользователями
- **Мокирование зависимостей**: Использование unittest.mock для изоляции тестируемого кода

### 🔧 Рефакторинг
- **Упрощена архитектура**: Убраны сложные конструкции с отложенными импортами, заменены на чистую архитектуру
- **Перемещение моделей** - `Author` и связанные модели перенесены в `orm/author.py`: Вынесены базовые модели пользователей (`Author`, `AuthorFollower`, `AuthorBookmark`, `AuthorRating`) из `orm.author` в отдельный модуль
- **Устранены циклические импорты**: Разорван цикл между `auth.core` → `orm.community` → `orm.author` через реструктуризацию архитектуры
- **Создан модуль `utils/password.py`**: Класс `Password` вынесен в utils для избежания циклических зависимостей
- **Оптимизированы импорты моделей**: Убран прямой импорт `Shout` из `orm/community.py`, заменен на строковые ссылки

### 🔧 Авторизация с cookies
- **getSession теперь работает с cookies**: Мутация `getSession` теперь может получать токен из httpOnly cookies даже без заголовка Authorization
- **Убрано требование авторизации**: `getSession` больше не требует декоратор `@login_required`, работает автономно
- **Поддержка dual-авторизации**: Токен может быть получен как из заголовка Authorization, так и из cookie `session_token`
- **Автоматическая установка cookies**: Middleware автоматически устанавливает httpOnly cookies при успешном `getSession`
- **Обновлена GraphQL схема**: `SessionInfo` теперь содержит поля `success`, `error` и опциональные `token`, `author`
- **Единообразная обработка токенов**: Все модули теперь используют централизованные функции для работы с токенами
- **Улучшена обработка ошибок**: Добавлена детальная валидация токенов и пользователей в `getSession`
- **Логирование операций**: Добавлены подробные логи для отслеживания процесса авторизации

### 📝 Документация
- **Обновлена схема GraphQL**: `SessionInfo` тип теперь соответствует новому формату ответа
- Обновлена документация RBAC
- Обновлена документация авторизации с cookies
2025-08-18 14:25:25 +03:00

314 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
from typing import Any, Dict, Optional
from sqlalchemy import (
JSON,
Boolean,
ForeignKey,
Index,
Integer,
PrimaryKeyConstraint,
String,
)
from sqlalchemy.orm import Mapped, Session, mapped_column
from orm.base import BaseModel as Base
from utils.password import Password
# Общие table_args для всех моделей
DEFAULT_TABLE_ARGS = {"extend_existing": True}
PROTECTED_FIELDS = ["email", "password", "provider_access_token", "provider_refresh_token"]
class Author(Base):
"""
Расширенная модель автора с функциями аутентификации и авторизации
"""
__tablename__ = "author"
__table_args__ = (
Index("idx_author_slug", "slug"),
Index("idx_author_email", "email"),
Index("idx_author_phone", "phone"),
{"extend_existing": True},
)
# Базовые поля автора
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str | None] = mapped_column(String, nullable=True, comment="Display name")
slug: Mapped[str] = mapped_column(String, unique=True, comment="Author's slug")
bio: Mapped[str | None] = mapped_column(String, nullable=True, comment="Bio") # короткое описание
about: Mapped[str | None] = mapped_column(
String, nullable=True, comment="About"
) # длинное форматированное описание
pic: Mapped[str | None] = mapped_column(String, nullable=True, comment="Picture")
links: Mapped[dict[str, Any] | None] = mapped_column(JSON, nullable=True, comment="Links")
# OAuth аккаунты - JSON с данными всех провайдеров
# Формат: {"google": {"id": "123", "email": "user@gmail.com"}, "github": {"id": "456"}}
oauth: Mapped[dict[str, Any] | None] = mapped_column(
JSON, nullable=True, default=dict, comment="OAuth accounts data"
)
# Поля аутентификации
email: Mapped[str | None] = mapped_column(String, unique=True, nullable=True, comment="Email")
phone: Mapped[str | None] = mapped_column(String, nullable=True, comment="Phone")
password: Mapped[str | None] = mapped_column(String, nullable=True, comment="Password hash")
email_verified: Mapped[bool] = mapped_column(Boolean, default=False)
phone_verified: Mapped[bool] = mapped_column(Boolean, default=False)
failed_login_attempts: Mapped[int] = mapped_column(Integer, default=0)
account_locked_until: Mapped[int | None] = mapped_column(Integer, nullable=True)
# Временные метки
created_at: Mapped[int] = mapped_column(Integer, nullable=False, default=lambda: int(time.time()))
updated_at: Mapped[int] = mapped_column(Integer, nullable=False, default=lambda: int(time.time()))
last_seen: Mapped[int] = mapped_column(Integer, nullable=False, default=lambda: int(time.time()))
deleted_at: Mapped[int | None] = mapped_column(Integer, nullable=True)
oid: Mapped[str | None] = mapped_column(String, nullable=True)
@property
def protected_fields(self) -> list[str]:
return PROTECTED_FIELDS
@property
def is_authenticated(self) -> bool:
"""Проверяет, аутентифицирован ли пользователь"""
return self.id is not None
def verify_password(self, password: str) -> bool:
"""Проверяет пароль пользователя"""
return Password.verify(password, str(self.password)) if self.password else False
def set_password(self, password: str):
"""Устанавливает пароль пользователя"""
self.password = Password.encode(password) # type: ignore[assignment]
def increment_failed_login(self):
"""Увеличивает счетчик неудачных попыток входа"""
self.failed_login_attempts += 1 # type: ignore[assignment]
if self.failed_login_attempts >= 5:
self.account_locked_until = int(time.time()) + 300 # type: ignore[assignment] # 5 минут
def reset_failed_login(self):
"""Сбрасывает счетчик неудачных попыток входа"""
self.failed_login_attempts = 0 # type: ignore[assignment]
self.account_locked_until = None # type: ignore[assignment]
def is_locked(self) -> bool:
"""Проверяет, заблокирован ли аккаунт"""
if not self.account_locked_until:
return False
return int(time.time()) < self.account_locked_until
@property
def username(self) -> str:
"""
Возвращает имя пользователя для использования в токенах.
Необходимо для совместимости с TokenStorage и JWTCodec.
Returns:
str: slug, email или phone пользователя
"""
return str(self.slug or self.email or self.phone or "")
def dict(self, access: bool = False) -> Dict[str, Any]:
"""
Сериализует объект автора в словарь.
Args:
access: Если True, включает защищенные поля
Returns:
Dict: Словарь с данными автора
"""
result: Dict[str, Any] = {
"id": self.id,
"name": self.name,
"slug": self.slug,
"bio": self.bio,
"about": self.about,
"pic": self.pic,
"links": self.links,
"created_at": self.created_at,
"updated_at": self.updated_at,
"last_seen": self.last_seen,
"deleted_at": self.deleted_at,
"email_verified": self.email_verified,
}
# Добавляем защищенные поля только если запрошен полный доступ
if access:
result.update({"email": self.email, "phone": self.phone, "oauth": self.oauth})
return result
@classmethod
def find_by_oauth(cls, provider: str, provider_id: str, session: Session) -> Optional["Author"]:
"""
Находит автора по OAuth провайдеру и ID
Args:
provider (str): Имя OAuth провайдера (google, github и т.д.)
provider_id (str): ID пользователя у провайдера
session: Сессия базы данных
Returns:
Author или None: Найденный автор или None если не найден
"""
# Ищем авторов, у которых есть данный провайдер с данным ID
authors = session.query(cls).where(cls.oauth.isnot(None)).all()
for author in authors:
if author.oauth and provider in author.oauth:
oauth_data = author.oauth[provider] # type: ignore[index]
if isinstance(oauth_data, dict) and oauth_data.get("id") == provider_id:
return author
return None
def set_oauth_account(self, provider: str, provider_id: str, email: str | None = None) -> None:
"""
Устанавливает OAuth аккаунт для автора
Args:
provider (str): Имя OAuth провайдера (google, github и т.д.)
provider_id (str): ID пользователя у провайдера
email (Optional[str]): Email от провайдера
"""
if not self.oauth:
self.oauth = {} # type: ignore[assignment]
oauth_data: Dict[str, str] = {"id": provider_id}
if email:
oauth_data["email"] = email
self.oauth[provider] = oauth_data # type: ignore[index]
def get_oauth_account(self, provider: str) -> Dict[str, Any] | None:
"""
Получает OAuth аккаунт провайдера
Args:
provider (str): Имя OAuth провайдера
Returns:
dict или None: Данные OAuth аккаунта или None если не найден
"""
oauth_data = getattr(self, "oauth", None)
if not oauth_data:
return None
if isinstance(oauth_data, dict):
return oauth_data.get(provider)
return None
def remove_oauth_account(self, provider: str):
"""
Удаляет OAuth аккаунт провайдера
Args:
provider (str): Имя OAuth провайдера
"""
if self.oauth and provider in self.oauth:
del self.oauth[provider]
def to_dict(self, include_protected: bool = False) -> Dict[str, Any]:
"""Конвертирует модель в словарь"""
result = {
"id": self.id,
"name": self.name,
"slug": self.slug,
"bio": self.bio,
"about": self.about,
"pic": self.pic,
"links": self.links,
"oauth": self.oauth,
"email_verified": self.email_verified,
"phone_verified": self.phone_verified,
"created_at": self.created_at,
"updated_at": self.updated_at,
"last_seen": self.last_seen,
"deleted_at": self.deleted_at,
"oid": self.oid,
}
if include_protected:
result.update(
{
"email": self.email,
"phone": self.phone,
"failed_login_attempts": self.failed_login_attempts,
"account_locked_until": self.account_locked_until,
}
)
return result
def __repr__(self) -> str:
return f"<Author(id={self.id}, slug='{self.slug}', email='{self.email}')>"
class AuthorFollower(Base):
"""
Связь подписки между авторами.
"""
__tablename__ = "author_follower"
__table_args__ = (
PrimaryKeyConstraint("follower", "following"),
Index("idx_author_follower_follower", "follower"),
Index("idx_author_follower_following", "following"),
{"extend_existing": True},
)
follower: Mapped[int] = mapped_column(Integer, ForeignKey("author.id"), nullable=False)
following: Mapped[int] = mapped_column(Integer, ForeignKey("author.id"), nullable=False)
created_at: Mapped[int] = mapped_column(Integer, nullable=False, default=lambda: int(time.time()))
def __repr__(self) -> str:
return f"<AuthorFollower(follower={self.follower}, following={self.following})>"
class AuthorBookmark(Base):
"""
Закладки автора.
"""
__tablename__ = "author_bookmark"
__table_args__ = (
PrimaryKeyConstraint("author", "shout"),
Index("idx_author_bookmark_author", "author"),
Index("idx_author_bookmark_shout", "shout"),
{"extend_existing": True},
)
author: Mapped[int] = mapped_column(Integer, ForeignKey("author.id"), nullable=False)
shout: Mapped[int] = mapped_column(Integer, ForeignKey("shout.id"), nullable=False)
created_at: Mapped[int] = mapped_column(Integer, nullable=False, default=lambda: int(time.time()))
def __repr__(self) -> str:
return f"<AuthorBookmark(author={self.author}, shout={self.shout})>"
class AuthorRating(Base):
"""
Рейтинг автора.
"""
__tablename__ = "author_rating"
__table_args__ = (
PrimaryKeyConstraint("author", "rater"),
Index("idx_author_rating_author", "author"),
Index("idx_author_rating_rater", "rater"),
{"extend_existing": True},
)
author: Mapped[int] = mapped_column(Integer, ForeignKey("author.id"), nullable=False)
rater: Mapped[int] = mapped_column(Integer, ForeignKey("author.id"), nullable=False)
plus: Mapped[bool] = mapped_column(Boolean, nullable=True)
rating: Mapped[int] = mapped_column(Integer, nullable=False, comment="Rating value")
created_at: Mapped[int] = mapped_column(Integer, nullable=False, default=lambda: int(time.time()))
updated_at: Mapped[int | None] = mapped_column(Integer, nullable=True)
def __repr__(self) -> str:
return f"<AuthorRating(author={self.author}, rater={self.rater}, rating={self.rating})>"