Files
core/resolvers/auth.py
Untone 3ae675c52c
All checks were successful
Deploy on push / deploy (push) Successful in 5m44s
auth-fix
2025-09-30 19:20:41 +03:00

350 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Auth резолверы - тонкие GraphQL обёртки над AuthService
"""
from typing import Any
from graphql import GraphQLResolveInfo
from starlette.responses import JSONResponse
from auth.utils import extract_token_from_request, get_auth_token_from_context, get_user_data_by_token
from services.auth import auth_service
from settings import (
SESSION_COOKIE_DOMAIN,
SESSION_COOKIE_HTTPONLY,
SESSION_COOKIE_MAX_AGE,
SESSION_COOKIE_NAME,
SESSION_COOKIE_SAMESITE,
SESSION_COOKIE_SECURE,
)
from storage.schema import mutation, query, type_author
from utils.logger import root_logger as logger
# === РЕЗОЛВЕР ДЛЯ ТИПА AUTHOR ===
@type_author.field("roles")
def resolve_roles(obj: dict | Any, info: GraphQLResolveInfo) -> list[str]:
"""Резолвер для поля roles автора"""
try:
# Если это ORM объект с методом get_roles
if hasattr(obj, "get_roles"):
return obj.get_roles()
# Если это словарь
if isinstance(obj, dict):
roles_data = obj.get("roles_data")
if roles_data is None:
return []
if isinstance(roles_data, list):
return roles_data
if isinstance(roles_data, dict):
return roles_data.get("1", [])
return []
except Exception as e:
logger.error(f"Ошибка получения ролей: {e}")
return []
# === МУТАЦИИ АУТЕНТИФИКАЦИИ ===
@mutation.field("registerUser")
async def register_user(
_: None, _info: GraphQLResolveInfo, email: str, password: str = "", name: str = ""
) -> dict[str, Any]:
"""Регистрирует нового пользователя"""
try:
return await auth_service.register_user(email, password, name)
except Exception as e:
logger.error(f"Ошибка регистрации: {e}")
return {"success": False, "token": None, "author": None, "error": str(e)}
@mutation.field("sendLink")
async def send_link(
_: None, _info: GraphQLResolveInfo, email: str, lang: str = "ru", template: str = "confirm"
) -> bool:
"""Отправляет ссылку подтверждения"""
try:
return bool(await auth_service.send_verification_link(email, lang, template))
except Exception as e:
logger.error(f"Ошибка отправки ссылки подтверждения: {e}")
return False
@mutation.field("confirmEmail")
@auth_service.login_required
async def confirm_email(_: None, _info: GraphQLResolveInfo, token: str) -> dict[str, Any]:
"""Подтверждает email по токену"""
try:
return await auth_service.confirm_email(token)
except Exception as e:
logger.error(f"Ошибка подтверждения email: {e}")
return {"success": False, "token": None, "author": None, "error": str(e)}
@mutation.field("login")
async def login(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Авторизация пользователя"""
try:
email = kwargs.get("email", "")
password = kwargs.get("password", "")
request = info.context.get("request")
result = await auth_service.login(email, password, request)
# 🎯 Проверяем откуда пришел запрос - админка или основной сайт
request = info.context.get("request")
is_admin_request = False
if request:
# Проверяем путь запроса или Referer header
referer = request.headers.get("referer", "")
origin = request.headers.get("origin", "")
is_admin_request = "/panel" in referer or "/panel" in origin or "admin" in referer
# Устанавливаем httpOnly cookie только для админки
if result.get("success") and result.get("token") and is_admin_request:
try:
response = info.context.get("response")
if not response:
response = JSONResponse({})
info.context["response"] = response
response.set_cookie(
key=SESSION_COOKIE_NAME,
value=result["token"],
httponly=SESSION_COOKIE_HTTPONLY,
secure=SESSION_COOKIE_SECURE,
samesite=SESSION_COOKIE_SAMESITE
if SESSION_COOKIE_SAMESITE in ["strict", "lax", "none"]
else "none",
max_age=SESSION_COOKIE_MAX_AGE,
path="/",
domain=SESSION_COOKIE_DOMAIN,
)
author_id = (
result.get("author", {}).get("id")
if isinstance(result.get("author"), dict)
else getattr(result.get("author"), "id", "unknown")
)
logger.info(f"✅ Admin login: httpOnly cookie установлен для пользователя {author_id}")
# Для админки НЕ возвращаем токен клиенту - он в httpOnly cookie
result_without_token = result.copy()
result_without_token["token"] = None
return result_without_token
except Exception as cookie_error:
logger.warning(f"Не удалось установить cookie: {cookie_error}")
# Для основного сайта возвращаем токен как обычно (Bearer в localStorage)
if not is_admin_request:
author_id = (
result.get("author", {}).get("id")
if isinstance(result.get("author"), dict)
else getattr(result.get("author"), "id", "unknown")
)
logger.info(f"✅ Main site login: токен возвращен для localStorage пользователя {author_id}")
return result
except Exception as e:
logger.warning(f"Ошибка входа: {e}")
return {"success": False, "token": None, "author": None, "error": str(e)}
@mutation.field("logout")
@auth_service.login_required
async def logout(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Выход из системы"""
try:
author = info.context.get("author")
if not author:
return {"success": False, "message": "Пользователь не найден в контексте"}
user_id = str(author.get("id"))
request = info.context.get("request")
# Получаем токен
token = None
if request:
token = await extract_token_from_request(request)
result = await auth_service.logout(user_id, token)
# Удаляем cookie
if request and hasattr(info.context, "response"):
try:
info.context["response"].delete_cookie(
key=SESSION_COOKIE_NAME,
path="/",
domain=SESSION_COOKIE_DOMAIN, # ✅ КРИТИЧНО: тот же domain что при установке
)
except Exception as e:
logger.warning(f"Не удалось удалить cookie: {e}")
return result
except Exception as e:
logger.warning(f"Ошибка выхода: {e}")
return {"success": False}
@mutation.field("refreshToken")
@auth_service.login_required
async def refresh_token(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Обновление токена"""
try:
author = info.context.get("author")
if not author:
return {"success": False, "token": None, "author": None, "error": "Пользователь не найден"}
user_id = str(author.get("id"))
request = info.context.get("request")
if not request:
return {"success": False, "token": None, "author": None, "error": "Запрос не найден"}
# Получаем токен
token = await extract_token_from_request(request)
if not token:
return {"success": False, "token": None, "author": None, "error": "Токен не найден"}
device_info = {
"ip": request.client.host if request.client else "unknown",
"user_agent": request.headers.get("user-agent"),
}
result = await auth_service.refresh_token(user_id, token, device_info)
# Устанавливаем новый cookie
if result.get("success") and result.get("token"):
try:
if hasattr(info.context, "response"):
info.context["response"].set_cookie(
key=SESSION_COOKIE_NAME,
value=result["token"],
httponly=SESSION_COOKIE_HTTPONLY,
secure=SESSION_COOKIE_SECURE,
samesite=SESSION_COOKIE_SAMESITE
if SESSION_COOKIE_SAMESITE in ["strict", "lax", "none"]
else "none",
max_age=SESSION_COOKIE_MAX_AGE,
path="/",
domain=SESSION_COOKIE_DOMAIN, # ✅ КРИТИЧНО для поддоменов
)
except Exception as e:
logger.warning(f"Не удалось обновить cookie: {e}")
return result
except Exception as e:
logger.warning(f"Ошибка обновления токена: {e}")
return {"success": False, "token": None, "author": None, "error": str(e)}
@mutation.field("requestPasswordReset")
async def request_password_reset(_: None, _info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Запрос сброса пароля"""
try:
email = kwargs.get("email", "")
lang = kwargs.get("lang", "ru")
return await auth_service.request_password_reset(email, lang)
except Exception as e:
logger.error(f"Ошибка запроса сброса пароля: {e}")
return {"success": False}
@mutation.field("updateSecurity")
@auth_service.login_required
async def update_security(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Обновление пароля и email"""
try:
author = info.context.get("author")
if not author:
return {"success": False, "error": "NOT_AUTHENTICATED", "author": None}
user_id = author.get("id")
old_password = kwargs.get("oldPassword", "")
new_password = kwargs.get("newPassword")
email = kwargs.get("email")
return await auth_service.update_security(user_id, old_password, new_password, email)
except Exception as e:
logger.error(f"Ошибка обновления безопасности: {e}")
return {"success": False, "error": str(e), "author": None}
@mutation.field("confirmEmailChange")
@auth_service.login_required
async def confirm_email_change(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Подтверждение смены email по токену"""
try:
author = info.context.get("author")
if not author:
return {"success": False, "error": "NOT_AUTHENTICATED", "author": None}
user_id = author.get("id")
token = kwargs.get("token", "")
return await auth_service.confirm_email_change(user_id, token)
except Exception as e:
logger.error(f"Ошибка подтверждения смены email: {e}")
return {"success": False, "error": str(e), "author": None}
@mutation.field("cancelEmailChange")
@auth_service.login_required
async def cancel_email_change(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Отмена смены email"""
try:
author = info.context.get("author")
if not author:
return {"success": False, "error": "NOT_AUTHENTICATED", "author": None}
user_id = author.get("id")
return await auth_service.cancel_email_change(user_id)
except Exception as e:
logger.error(f"Ошибка отмены смены email: {e}")
return {"success": False, "error": str(e), "author": None}
@mutation.field("getSession")
async def get_session(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Получает информацию о текущей сессии"""
try:
token = await get_auth_token_from_context(info)
if not token:
logger.debug("[getSession] Токен не найден")
return {"success": False, "token": None, "author": None, "error": "Сессия не найдена"}
# Используем DRY функцию для получения данных пользователя
success, user_data, error_message = await get_user_data_by_token(token)
if success and user_data:
user_id = user_data.get("id", "NO_ID")
logger.debug(f"[getSession] Сессия валидна для пользователя {user_id}")
return {"success": True, "token": token, "author": user_data, "error": None}
logger.warning(f"[getSession] Ошибка валидации токена: {error_message}")
return {"success": False, "token": None, "author": None, "error": error_message}
except Exception as e:
logger.warning(f"Ошибка получения сессии: {e}")
return {"success": False, "token": None, "author": None, "error": str(e)}
# === ЗАПРОСЫ ===
@query.field("isEmailUsed")
async def is_email_used(_: None, _info: GraphQLResolveInfo, email: str) -> bool:
"""Проверяет, используется ли email"""
try:
return auth_service.is_email_used(email)
except Exception as e:
logger.error(f"Ошибка проверки email: {e}")
return False