""" Базовые функции аутентификации и верификации Этот модуль содержит основные функции без циклических зависимостей """ import time from sqlalchemy.orm.exc import NoResultFound from auth.state import AuthState from auth.tokens.storage import TokenStorage as TokenManager from orm.author import Author from orm.community import CommunityAuthor from settings import ADMIN_EMAILS as ADMIN_EMAILS_LIST from storage.db import local_session from utils.logger import root_logger as logger ADMIN_EMAILS = ADMIN_EMAILS_LIST.split(",") async def verify_internal_auth(token: str) -> tuple[int, list, bool]: """ Проверяет локальную авторизацию. Возвращает user_id, список ролей и флаг администратора. Args: token: Токен авторизации (может быть как с Bearer, так и без) Returns: tuple: (user_id, roles, is_admin) """ logger.debug(f"[verify_internal_auth] Проверка токена: {token[:10]}...") # Обработка формата "Bearer " (если токен не был обработан ранее) if token and token.startswith("Bearer "): token = token.replace("Bearer ", "", 1).strip() # Проверяем сессию payload = await TokenManager.verify_session(token) if not payload: logger.warning("[verify_internal_auth] Недействительный токен: payload не получен") return 0, [], False # payload может быть словарем или объектом, обрабатываем оба случая user_id = payload.user_id if hasattr(payload, "user_id") else payload.get("user_id") if not user_id: logger.warning("[verify_internal_auth] user_id не найден в payload") return 0, [], False logger.debug(f"[verify_internal_auth] Токен действителен, user_id={user_id}") with local_session() as session: try: # Author уже импортирован в начале файла author = session.query(Author).where(Author.id == user_id).one() # Получаем роли ca = session.query(CommunityAuthor).filter_by(author_id=author.id, community_id=1).first() roles = ca.role_list if ca else [] logger.debug(f"[verify_internal_auth] Роли пользователя: {roles}") # Определяем, является ли пользователь администратором is_admin = any(role in ["admin", "super"] for role in roles) or author.email in ADMIN_EMAILS logger.debug( f"[verify_internal_auth] Пользователь {author.id} {'является' if is_admin else 'не является'} администратором" ) return int(author.id), roles, is_admin except NoResultFound: logger.warning(f"[verify_internal_auth] Пользователь с ID {user_id} не найден в БД или не активен") return 0, [], False async def create_internal_session(author, device_info: dict | None = None) -> str: """ Создает новую сессию для автора Args: author: Объект автора device_info: Информация об устройстве (опционально) Returns: str: Токен сессии """ # Сбрасываем счетчик неудачных попыток author.reset_failed_login() # Обновляем last_seen author.last_seen = int(time.time()) # Создаем сессию, используя token для идентификации return await TokenManager.create_session( user_id=str(author.id), username=str(author.slug or author.email or author.phone or ""), device_info=device_info, ) async def get_auth_token_from_request(request) -> str | None: """ Извлекает токен авторизации из запроса. Порядок проверки: 1. Проверяет auth из middleware 2. Проверяет auth из scope 3. Проверяет заголовок Authorization 4. Проверяет cookie с именем auth_token Args: request: Объект запроса Returns: Optional[str]: Токен авторизации или None """ # Отложенный импорт для избежания циклических зависимостей from auth.decorators import get_auth_token return await get_auth_token(request) async def authenticate(request) -> AuthState: """ Получает токен из запроса и проверяет авторизацию. Args: request: Объект запроса Returns: AuthState: Состояние аутентификации """ logger.debug("[authenticate] Начало аутентификации") # Получаем токен из запроса используя безопасный метод token = await get_auth_token_from_request(request) if not token: logger.info("[authenticate] Токен не найден в запросе") return AuthState() # Проверяем токен используя internal auth user_id, roles, is_admin = await verify_internal_auth(token) if not user_id: logger.warning("[authenticate] Недействительный токен") return AuthState() logger.debug(f"[authenticate] Аутентификация успешна: user_id={user_id}, roles={roles}, is_admin={is_admin}") auth_state = AuthState() auth_state.logged_in = True auth_state.author_id = str(user_id) auth_state.is_admin = is_admin return auth_state