Files
core/auth/__init__.py
Untone fb98a1c6c8
All checks were successful
Deploy on push / deploy (push) Successful in 4m32s
[0.9.28] - OAuth/Auth with httpOnly cookie
2025-09-28 12:22:37 +03:00

130 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from starlette.requests import Request
from starlette.responses import JSONResponse, RedirectResponse, Response
from auth.core import verify_internal_auth
from auth.tokens.storage import TokenStorage
from auth.utils import extract_token_from_request
from orm.author import Author
from settings import (
SESSION_COOKIE_HTTPONLY,
SESSION_COOKIE_MAX_AGE,
SESSION_COOKIE_NAME,
SESSION_COOKIE_SAMESITE,
SESSION_COOKIE_SECURE,
)
from storage.db import local_session
from utils.logger import root_logger as logger
async def logout(request: Request) -> Response:
"""
Выход из системы с удалением сессии и cookie.
Поддерживает получение токена из:
1. HTTP-only cookie
2. Заголовка Authorization
"""
token = await extract_token_from_request(request)
# Если токен найден, отзываем его
if token:
try:
# Декодируем токен для получения user_id
user_id, _, _ = await verify_internal_auth(token)
if user_id:
# Отзываем сессию
await TokenStorage.revoke_session(token)
logger.info(f"[auth] logout: Токен успешно отозван для пользователя {user_id}")
else:
logger.warning("[auth] logout: Не удалось получить user_id из токена")
except Exception as e:
logger.error(f"[auth] logout: Ошибка при отзыве токена: {e}")
else:
logger.warning("[auth] logout: Токен не найден в запросе")
# Создаем ответ с редиректом на страницу входа
response = RedirectResponse(url="/")
# Удаляем cookie с токеном
response.delete_cookie(
key=SESSION_COOKIE_NAME,
secure=SESSION_COOKIE_SECURE,
httponly=SESSION_COOKIE_HTTPONLY,
samesite=SESSION_COOKIE_SAMESITE if SESSION_COOKIE_SAMESITE in ["strict", "lax", "none"] else "none",
)
logger.info("[auth] logout: Cookie успешно удалена")
return response
async def refresh_token(request: Request) -> JSONResponse:
"""
Обновление токена аутентификации.
Поддерживает получение токена из:
1. HTTP-only cookie
2. Заголовка Authorization
Возвращает новый токен как в HTTP-only cookie, так и в теле ответа.
"""
token = await extract_token_from_request(request)
if not token:
logger.warning("[auth] refresh_token: Токен не найден в запросе")
return JSONResponse({"success": False, "error": "Токен не найден"}, status_code=401)
try:
# Получаем информацию о пользователе из токена
user_id, _, _ = await verify_internal_auth(token)
if not user_id:
logger.warning("[auth] refresh_token: Недействительный токен")
return JSONResponse({"success": False, "error": "Недействительный токен"}, status_code=401)
# Получаем пользователя из базы данных
with local_session() as session:
author = session.query(Author).where(Author.id == user_id).first()
if not author:
logger.warning(f"[auth] refresh_token: Пользователь с ID {user_id} не найден")
return JSONResponse({"success": False, "error": "Пользователь не найден"}, status_code=404)
# Обновляем сессию (создаем новую и отзываем старую)
device_info = {
"ip": request.client.host if request.client else "unknown",
"user_agent": request.headers.get("user-agent"),
}
new_token = await TokenStorage.refresh_session(user_id, token, device_info)
if not new_token:
logger.error(f"[auth] refresh_token: Не удалось обновить токен для пользователя {user_id}")
return JSONResponse({"success": False, "error": "Не удалось обновить токен"}, status_code=500)
source = "cookie" if token.startswith("Bearer ") else "header"
# Создаем ответ
response = JSONResponse(
{
"success": True,
# Возвращаем токен в теле ответа только если он был получен из заголовка
"token": new_token if source == "header" else None,
"author": {"id": author.id, "email": author.email, "name": author.name},
}
)
# Всегда устанавливаем cookie с новым токеном
response.set_cookie(
key=SESSION_COOKIE_NAME,
value=new_token,
httponly=SESSION_COOKIE_HTTPONLY,
secure=SESSION_COOKIE_SECURE,
samesite=SESSION_COOKIE_SAMESITE if SESSION_COOKIE_SAMESITE in ["strict", "lax", "none"] else "none",
max_age=SESSION_COOKIE_MAX_AGE,
)
logger.info(f"[auth] refresh_token: Токен успешно обновлен для пользователя {user_id}")
return response
except Exception as e:
logger.error(f"[auth] refresh_token: Ошибка при обновлении токена: {e}")
return JSONResponse({"success": False, "error": str(e)}, status_code=401)