Files
core/resolvers/auth.py
Untone fb98a1c6c8
All checks were successful
Deploy on push / deploy (push) Successful in 4m32s
[0.9.28] - OAuth/Auth with httpOnly cookie
2025-09-28 12:22:37 +03:00

324 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Auth резолверы - тонкие GraphQL обёртки над AuthService
"""
from typing import Any
from graphql import GraphQLResolveInfo
from starlette.responses import JSONResponse
from auth.utils import extract_token_from_request, get_auth_token_from_context, get_user_data_by_token
from services.auth import auth_service
from settings import (
SESSION_COOKIE_DOMAIN,
SESSION_COOKIE_HTTPONLY,
SESSION_COOKIE_MAX_AGE,
SESSION_COOKIE_NAME,
SESSION_COOKIE_SAMESITE,
SESSION_COOKIE_SECURE,
)
from storage.schema import mutation, query, type_author
from utils.logger import root_logger as logger
# === РЕЗОЛВЕР ДЛЯ ТИПА AUTHOR ===
@type_author.field("roles")
def resolve_roles(obj: dict | Any, info: GraphQLResolveInfo) -> list[str]:
"""Резолвер для поля roles автора"""
try:
if hasattr(obj, "get_roles"):
return obj.get_roles()
if isinstance(obj, dict):
roles_data = obj.get("roles_data", {})
if isinstance(roles_data, list):
return roles_data
if isinstance(roles_data, dict):
return roles_data.get("1", [])
return []
except Exception as e:
logger.error(f"Ошибка получения ролей: {e}")
return []
# === МУТАЦИИ АУТЕНТИФИКАЦИИ ===
@mutation.field("registerUser")
async def register_user(
_: None, _info: GraphQLResolveInfo, email: str, password: str = "", name: str = ""
) -> dict[str, Any]:
"""Регистрирует нового пользователя"""
try:
return await auth_service.register_user(email, password, name)
except Exception as e:
logger.error(f"Ошибка регистрации: {e}")
return {"success": False, "token": None, "author": None, "error": str(e)}
@mutation.field("sendLink")
async def send_link(
_: None, _info: GraphQLResolveInfo, email: str, lang: str = "ru", template: str = "confirm"
) -> bool:
"""Отправляет ссылку подтверждения"""
try:
return bool(await auth_service.send_verification_link(email, lang, template))
except Exception as e:
logger.error(f"Ошибка отправки ссылки подтверждения: {e}")
return False
@mutation.field("confirmEmail")
@auth_service.login_required
async def confirm_email(_: None, _info: GraphQLResolveInfo, token: str) -> dict[str, Any]:
"""Подтверждает email по токену"""
try:
return await auth_service.confirm_email(token)
except Exception as e:
logger.error(f"Ошибка подтверждения email: {e}")
return {"success": False, "token": None, "author": None, "error": str(e)}
@mutation.field("login")
async def login(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Авторизация пользователя"""
try:
email = kwargs.get("email", "")
password = kwargs.get("password", "")
request = info.context.get("request")
result = await auth_service.login(email, password, request)
# Устанавливаем httpOnly cookie если есть токен
if result.get("success") and result.get("token"):
try:
response = info.context.get("response")
if not response:
response = JSONResponse({})
info.context["response"] = response
response.set_cookie(
key=SESSION_COOKIE_NAME,
value=result["token"],
httponly=SESSION_COOKIE_HTTPONLY,
secure=SESSION_COOKIE_SECURE,
samesite=SESSION_COOKIE_SAMESITE
if SESSION_COOKIE_SAMESITE in ["strict", "lax", "none"]
else "none",
max_age=SESSION_COOKIE_MAX_AGE,
path="/",
domain=SESSION_COOKIE_DOMAIN, # ✅ КРИТИЧНО для поддоменов
)
logger.info(
f"✅ Email/Password: httpOnly cookie установлен для пользователя {result.get('author', {}).get('id')}"
)
# 💋 НЕ возвращаем токен клиенту - он в httpOnly cookie
result_without_token = result.copy()
result_without_token["token"] = None # Скрываем токен от JavaScript
return result_without_token
except Exception as cookie_error:
logger.warning(f"Не удалось установить cookie: {cookie_error}")
return result
except Exception as e:
logger.warning(f"Ошибка входа: {e}")
return {"success": False, "token": None, "author": None, "error": str(e)}
@mutation.field("logout")
@auth_service.login_required
async def logout(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Выход из системы"""
try:
author = info.context.get("author")
if not author:
return {"success": False, "message": "Пользователь не найден в контексте"}
user_id = str(author.get("id"))
request = info.context.get("request")
# Получаем токен
token = None
if request:
token = await extract_token_from_request(request)
result = await auth_service.logout(user_id, token)
# Удаляем cookie
if request and hasattr(info.context, "response"):
try:
info.context["response"].delete_cookie(
key=SESSION_COOKIE_NAME,
path="/",
domain=SESSION_COOKIE_DOMAIN, # ✅ КРИТИЧНО: тот же domain что при установке
)
except Exception as e:
logger.warning(f"Не удалось удалить cookie: {e}")
return result
except Exception as e:
logger.warning(f"Ошибка выхода: {e}")
return {"success": False}
@mutation.field("refreshToken")
@auth_service.login_required
async def refresh_token(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Обновление токена"""
try:
author = info.context.get("author")
if not author:
return {"success": False, "token": None, "author": None, "error": "Пользователь не найден"}
user_id = str(author.get("id"))
request = info.context.get("request")
if not request:
return {"success": False, "token": None, "author": None, "error": "Запрос не найден"}
# Получаем токен
token = await extract_token_from_request(request)
if not token:
return {"success": False, "token": None, "author": None, "error": "Токен не найден"}
device_info = {
"ip": request.client.host if request.client else "unknown",
"user_agent": request.headers.get("user-agent"),
}
result = await auth_service.refresh_token(user_id, token, device_info)
# Устанавливаем новый cookie
if result.get("success") and result.get("token"):
try:
if hasattr(info.context, "response"):
info.context["response"].set_cookie(
key=SESSION_COOKIE_NAME,
value=result["token"],
httponly=SESSION_COOKIE_HTTPONLY,
secure=SESSION_COOKIE_SECURE,
samesite=SESSION_COOKIE_SAMESITE
if SESSION_COOKIE_SAMESITE in ["strict", "lax", "none"]
else "none",
max_age=SESSION_COOKIE_MAX_AGE,
path="/",
domain=SESSION_COOKIE_DOMAIN, # ✅ КРИТИЧНО для поддоменов
)
except Exception as e:
logger.warning(f"Не удалось обновить cookie: {e}")
return result
except Exception as e:
logger.warning(f"Ошибка обновления токена: {e}")
return {"success": False, "token": None, "author": None, "error": str(e)}
@mutation.field("requestPasswordReset")
async def request_password_reset(_: None, _info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Запрос сброса пароля"""
try:
email = kwargs.get("email", "")
lang = kwargs.get("lang", "ru")
return await auth_service.request_password_reset(email, lang)
except Exception as e:
logger.error(f"Ошибка запроса сброса пароля: {e}")
return {"success": False}
@mutation.field("updateSecurity")
@auth_service.login_required
async def update_security(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Обновление пароля и email"""
try:
author = info.context.get("author")
if not author:
return {"success": False, "error": "NOT_AUTHENTICATED", "author": None}
user_id = author.get("id")
old_password = kwargs.get("oldPassword", "")
new_password = kwargs.get("newPassword")
email = kwargs.get("email")
return await auth_service.update_security(user_id, old_password, new_password, email)
except Exception as e:
logger.error(f"Ошибка обновления безопасности: {e}")
return {"success": False, "error": str(e), "author": None}
@mutation.field("confirmEmailChange")
@auth_service.login_required
async def confirm_email_change(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Подтверждение смены email по токену"""
try:
author = info.context.get("author")
if not author:
return {"success": False, "error": "NOT_AUTHENTICATED", "author": None}
user_id = author.get("id")
token = kwargs.get("token", "")
return await auth_service.confirm_email_change(user_id, token)
except Exception as e:
logger.error(f"Ошибка подтверждения смены email: {e}")
return {"success": False, "error": str(e), "author": None}
@mutation.field("cancelEmailChange")
@auth_service.login_required
async def cancel_email_change(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Отмена смены email"""
try:
author = info.context.get("author")
if not author:
return {"success": False, "error": "NOT_AUTHENTICATED", "author": None}
user_id = author.get("id")
return await auth_service.cancel_email_change(user_id)
except Exception as e:
logger.error(f"Ошибка отмены смены email: {e}")
return {"success": False, "error": str(e), "author": None}
@mutation.field("getSession")
async def get_session(_: None, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]:
"""Получает информацию о текущей сессии"""
try:
token = await get_auth_token_from_context(info)
if not token:
logger.debug("[getSession] Токен не найден")
return {"success": False, "token": None, "author": None, "error": "Сессия не найдена"}
# Используем DRY функцию для получения данных пользователя
success, user_data, error_message = await get_user_data_by_token(token)
if success and user_data:
user_id = user_data.get("id", "NO_ID")
logger.debug(f"[getSession] Сессия валидна для пользователя {user_id}")
return {"success": True, "token": token, "author": user_data, "error": None}
logger.warning(f"[getSession] Ошибка валидации токена: {error_message}")
return {"success": False, "token": None, "author": None, "error": error_message}
except Exception as e:
logger.warning(f"Ошибка получения сессии: {e}")
return {"success": False, "token": None, "author": None, "error": str(e)}
# === ЗАПРОСЫ ===
@query.field("isEmailUsed")
async def is_email_used(_: None, _info: GraphQLResolveInfo, email: str) -> bool:
"""Проверяет, используется ли email"""
try:
return auth_service.is_email_used(email)
except Exception as e:
logger.error(f"Ошибка проверки email: {e}")
return False