Files
core/auth/identity.py
Untone 1b48675b92
Some checks failed
Deploy on push / deploy (push) Failing after 2m22s
[0.9.7] - 2025-08-18
### 🔄 Изменения
- **SQLAlchemy KeyError** - исправление ошибки `KeyError: Reaction` при инициализации
- **Исправлена ошибка SQLAlchemy**: Устранена проблема `InvalidRequestError: When initializing mapper Mapper[Shout(shout)], expression Reaction failed to locate a name (Reaction)`

### 🧪 Тестирование
- **Исправление тестов** - адаптация к новой структуре моделей
- **RBAC инициализация** - добавление `rbac.initialize_rbac()` в `conftest.py`
- **Создан тест для getSession**: Добавлен комплексный тест `test_getSession_cookies.py` с проверкой всех сценариев
- **Покрытие edge cases**: Тесты проверяют работу с валидными/невалидными токенами, отсутствующими пользователями
- **Мокирование зависимостей**: Использование unittest.mock для изоляции тестируемого кода

### 🔧 Рефакторинг
- **Упрощена архитектура**: Убраны сложные конструкции с отложенными импортами, заменены на чистую архитектуру
- **Перемещение моделей** - `Author` и связанные модели перенесены в `orm/author.py`: Вынесены базовые модели пользователей (`Author`, `AuthorFollower`, `AuthorBookmark`, `AuthorRating`) из `orm.author` в отдельный модуль
- **Устранены циклические импорты**: Разорван цикл между `auth.core` → `orm.community` → `orm.author` через реструктуризацию архитектуры
- **Создан модуль `utils/password.py`**: Класс `Password` вынесен в utils для избежания циклических зависимостей
- **Оптимизированы импорты моделей**: Убран прямой импорт `Shout` из `orm/community.py`, заменен на строковые ссылки

### 🔧 Авторизация с cookies
- **getSession теперь работает с cookies**: Мутация `getSession` теперь может получать токен из httpOnly cookies даже без заголовка Authorization
- **Убрано требование авторизации**: `getSession` больше не требует декоратор `@login_required`, работает автономно
- **Поддержка dual-авторизации**: Токен может быть получен как из заголовка Authorization, так и из cookie `session_token`
- **Автоматическая установка cookies**: Middleware автоматически устанавливает httpOnly cookies при успешном `getSession`
- **Обновлена GraphQL схема**: `SessionInfo` теперь содержит поля `success`, `error` и опциональные `token`, `author`
- **Единообразная обработка токенов**: Все модули теперь используют централизованные функции для работы с токенами
- **Улучшена обработка ошибок**: Добавлена детальная валидация токенов и пользователей в `getSession`
- **Логирование операций**: Добавлены подробные логи для отслеживания процесса авторизации

### 📝 Документация
- **Обновлена схема GraphQL**: `SessionInfo` тип теперь соответствует новому формату ответа
- Обновлена документация RBAC
- Обновлена документация авторизации с cookies
2025-08-18 14:25:25 +03:00

115 lines
4.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Any, TypeVar
from auth.exceptions import ExpiredTokenError, InvalidPasswordError, InvalidTokenError
from auth.jwtcodec import JWTCodec
from orm.author import Author
from storage.db import local_session
from storage.redis import redis
from utils.logger import root_logger as logger
from utils.password import Password
AuthorType = TypeVar("AuthorType", bound=Author)
class Identity:
@staticmethod
def password(orm_author: AuthorType, password: str) -> AuthorType:
"""
Проверяет пароль пользователя
Args:
orm_author (Author): Объект пользователя
password (str): Пароль пользователя
Returns:
Author: Объект автора при успешной проверке
Raises:
InvalidPasswordError: Если пароль не соответствует хешу или отсутствует
"""
# Проверим исходный пароль в orm_author
if not orm_author.password:
logger.warning(f"[auth.identity] Пароль в исходном объекте автора пуст: email={orm_author.email}")
msg = "Пароль не установлен для данного пользователя"
raise InvalidPasswordError(msg)
# Проверяем пароль напрямую, не используя dict()
password_hash = str(orm_author.password) if orm_author.password else ""
if not password_hash or not Password.verify(password, password_hash):
logger.warning(f"[auth.identity] Неверный пароль для {orm_author.email}")
msg = "Неверный пароль пользователя"
raise InvalidPasswordError(msg)
# Возвращаем исходный объект, чтобы сохранить все связи
return orm_author
@staticmethod
def oauth(inp: dict[str, Any]) -> Any:
"""
Создает нового пользователя OAuth, если он не существует
Args:
inp (dict): Данные OAuth пользователя
Returns:
Author: Объект пользователя
"""
# Author уже импортирован в начале файла
with local_session() as session:
author = session.query(Author).where(Author.email == inp["email"]).first()
if not author:
author = Author(**inp)
author.email_verified = True # type: ignore[assignment]
session.add(author)
session.commit()
return author
@staticmethod
async def onetime(token: str) -> Any:
"""
Проверяет одноразовый токен
Args:
token (str): Одноразовый токен
Returns:
Author: Объект пользователя
"""
try:
print("[auth.identity] using one time token")
payload = JWTCodec.decode(token)
if payload is None:
logger.warning("[Identity.token] Токен не валиден (payload is None)")
return {"error": "Invalid token"}
# Проверяем существование токена в хранилище
user_id = payload.get("user_id")
username = payload.get("username")
if not user_id or not username:
logger.warning("[Identity.token] Нет user_id или username в токене")
return {"error": "Invalid token"}
token_key = f"{user_id}-{username}-{token}"
if not await redis.exists(token_key):
logger.warning(f"[Identity.token] Токен не найден в хранилище: {token_key}")
return {"error": "Token not found"}
# Если все проверки пройдены, ищем автора в базе данных
# Author уже импортирован в начале файла
with local_session() as session:
author = session.query(Author).filter_by(id=user_id).first()
if not author:
logger.warning(f"[Identity.token] Автор с ID {user_id} не найден")
return {"error": "User not found"}
logger.info(f"[Identity.token] Токен валиден для автора {author.id}")
return author
except ExpiredTokenError:
# raise InvalidTokenError("Login token has expired, please try again")
return {"error": "Token has expired"}
except InvalidTokenError:
# raise InvalidTokenError("token format error") from e
return {"error": "Token format error"}