core/auth/internal.py

142 lines
5.5 KiB
Python
Raw Normal View History

2025-05-30 11:05:50 +00:00
"""
Утилитные функции для внутренней аутентификации
Используются в GraphQL резолверах и декораторах
"""
2025-05-16 06:23:48 +00:00
import time
2025-06-30 20:10:48 +00:00
from typing import Optional
2025-05-16 06:23:48 +00:00
from sqlalchemy.orm import exc
from auth.orm import Author
2025-05-19 21:00:24 +00:00
from auth.state import AuthState
2025-06-02 18:50:58 +00:00
from auth.tokens.storage import TokenStorage as TokenManager
2025-05-29 09:37:39 +00:00
from services.db import local_session
from settings import ADMIN_EMAILS as ADMIN_EMAILS_LIST
from utils.logger import root_logger as logger
2025-05-19 21:00:24 +00:00
ADMIN_EMAILS = ADMIN_EMAILS_LIST.split(",")
2025-05-16 06:23:48 +00:00
async def verify_internal_auth(token: str) -> tuple[int, list, bool]:
2025-05-16 06:23:48 +00:00
"""
Проверяет локальную авторизацию.
2025-05-20 22:34:02 +00:00
Возвращает user_id, список ролей и флаг администратора.
2025-05-16 06:23:48 +00:00
Args:
token: Токен авторизации (может быть как с Bearer, так и без)
Returns:
2025-05-20 22:34:02 +00:00
tuple: (user_id, roles, is_admin)
2025-05-16 06:23:48 +00:00
"""
2025-05-21 15:29:32 +00:00
logger.debug(f"[verify_internal_auth] Проверка токена: {token[:10]}...")
2025-05-29 09:37:39 +00:00
2025-05-16 06:23:48 +00:00
# Обработка формата "Bearer <token>" (если токен не был обработан ранее)
2025-05-21 15:29:32 +00:00
if token and token.startswith("Bearer "):
2025-05-16 06:23:48 +00:00
token = token.replace("Bearer ", "", 1).strip()
# Проверяем сессию
2025-06-02 18:50:58 +00:00
payload = await TokenManager.verify_session(token)
2025-05-16 06:23:48 +00:00
if not payload:
2025-05-21 15:29:32 +00:00
logger.warning("[verify_internal_auth] Недействительный токен: payload не получен")
return 0, [], False
2025-05-16 06:23:48 +00:00
2025-05-21 15:29:32 +00:00
logger.debug(f"[verify_internal_auth] Токен действителен, user_id={payload.user_id}")
2025-05-16 06:23:48 +00:00
with local_session() as session:
try:
author = session.query(Author).filter(Author.id == payload.user_id).one()
2025-05-16 06:23:48 +00:00
# Получаем роли
roles = [role.id for role in author.roles]
2025-05-21 15:29:32 +00:00
logger.debug(f"[verify_internal_auth] Роли пользователя: {roles}")
2025-05-29 09:37:39 +00:00
2025-05-20 22:34:02 +00:00
# Определяем, является ли пользователь администратором
2025-05-29 09:37:39 +00:00
is_admin = any(role in ["admin", "super"] for role in roles) or author.email in ADMIN_EMAILS
logger.debug(
f"[verify_internal_auth] Пользователь {author.id} {'является' if is_admin else 'не является'} администратором"
)
return int(author.id), roles, is_admin
2025-05-16 06:23:48 +00:00
except exc.NoResultFound:
2025-05-21 15:29:32 +00:00
logger.warning(f"[verify_internal_auth] Пользователь с ID {payload.user_id} не найден в БД или не активен")
return 0, [], False
2025-05-16 06:23:48 +00:00
async def create_internal_session(author: Author, device_info: Optional[dict] = None) -> str:
"""
Создает новую сессию для автора
Args:
author: Объект автора
device_info: Информация об устройстве (опционально)
Returns:
str: Токен сессии
"""
# Сбрасываем счетчик неудачных попыток
author.reset_failed_login()
2025-05-20 22:34:02 +00:00
# Обновляем last_seen
author.last_seen = int(time.time()) # type: ignore[assignment]
2025-05-16 06:23:48 +00:00
# Создаем сессию, используя token для идентификации
2025-06-02 18:50:58 +00:00
return await TokenManager.create_session(
2025-05-16 06:23:48 +00:00
user_id=str(author.id),
username=str(author.slug or author.email or author.phone or ""),
2025-05-16 06:23:48 +00:00
device_info=device_info,
)
2025-05-19 21:00:24 +00:00
2025-06-30 20:10:48 +00:00
async def authenticate(request) -> AuthState:
2025-05-19 21:00:24 +00:00
"""
2025-06-30 20:10:48 +00:00
Аутентифицирует пользователя по токену из запроса.
2025-05-19 21:00:24 +00:00
Args:
2025-06-30 20:10:48 +00:00
request: Объект запроса
2025-05-19 21:00:24 +00:00
Returns:
2025-06-30 20:10:48 +00:00
AuthState: Состояние аутентификации
2025-05-19 21:00:24 +00:00
"""
2025-06-30 20:10:48 +00:00
from auth.decorators import get_auth_token
from utils.logger import root_logger as logger
2025-05-19 21:00:24 +00:00
2025-06-30 20:10:48 +00:00
logger.debug("[authenticate] Начало аутентификации")
2025-06-30 20:27:22 +00:00
# Создаем объект AuthState
auth_state = AuthState()
auth_state.logged_in = False
auth_state.author_id = None
auth_state.error = None
auth_state.token = None
2025-06-30 20:10:48 +00:00
# Получаем токен из запроса
token = get_auth_token(request)
if not token:
logger.warning("[authenticate] Токен не найден в запросе")
auth_state.error = "No authentication token provided"
return auth_state
logger.debug(f"[authenticate] Токен найден, длина: {len(token)}")
# Проверяем токен
try:
2025-06-30 20:27:22 +00:00
# Используем TokenManager вместо прямого создания SessionTokenManager
auth_result = await TokenManager.verify_session(token)
2025-06-30 20:10:48 +00:00
2025-06-30 20:27:22 +00:00
if auth_result and hasattr(auth_result, "user_id") and auth_result.user_id:
2025-06-30 20:10:48 +00:00
logger.debug(f"[authenticate] Успешная аутентификация, user_id: {auth_result.user_id}")
auth_state.logged_in = True
auth_state.author_id = auth_result.user_id
auth_state.token = token
return auth_state
2025-06-30 20:27:22 +00:00
2025-06-30 20:10:48 +00:00
error_msg = "Invalid or expired token"
logger.warning(f"[authenticate] Недействительный токен: {error_msg}")
auth_state.error = error_msg
return auth_state
except Exception as e:
logger.error(f"[authenticate] Ошибка при проверке токена: {e}")
auth_state.error = f"Authentication error: {e!s}"
return auth_state