core/auth/jwtcodec.py
Untone 1d64811880
All checks were successful
Deploy on push / deploy (push) Successful in 6s
userlist-demo-ready
2025-05-20 00:00:24 +03:00

112 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, timezone, timedelta
import jwt
from pydantic import BaseModel
from typing import Optional
from utils.logger import root_logger as logger
from auth.exceptions import ExpiredToken, InvalidToken
from settings import JWT_ALGORITHM, JWT_SECRET_KEY
class TokenPayload(BaseModel):
user_id: str
username: str
exp: Optional[datetime] = None
iat: datetime
iss: str
class JWTCodec:
@staticmethod
def encode(user, exp: Optional[datetime] = None) -> str:
# Поддержка как объектов, так и словарей
if isinstance(user, dict):
# В SessionManager.create_session передается словарь {"id": user_id, "email": username}
user_id = str(user.get("id", ""))
username = user.get("email", "") or user.get("username", "")
else:
# Для объектов с атрибутами
user_id = str(getattr(user, "id", ""))
username = getattr(user, "slug", "") or getattr(user, "email", "") or getattr(user, "phone", "") or ""
logger.debug(f"[JWTCodec.encode] Кодирование токена для user_id={user_id}, username={username}")
# Если время истечения не указано, установим срок годности на 30 дней
if exp is None:
exp = datetime.now(tz=timezone.utc) + timedelta(days=30)
logger.debug(f"[JWTCodec.encode] Время истечения не указано, устанавливаем срок: {exp}")
# Важно: убедимся, что exp всегда является либо datetime, либо целым числом от timestamp
if isinstance(exp, datetime):
# Преобразуем datetime в timestamp чтобы гарантировать правильный формат
exp_timestamp = int(exp.timestamp())
else:
# Если передано что-то другое, установим значение по умолчанию
logger.warning(f"[JWTCodec.encode] Некорректный формат exp: {exp}, используем значение по умолчанию")
exp_timestamp = int((datetime.now(tz=timezone.utc) + timedelta(days=30)).timestamp())
payload = {
"user_id": user_id,
"username": username,
"exp": exp_timestamp, # Используем timestamp вместо datetime
"iat": datetime.now(tz=timezone.utc),
"iss": "discours",
}
logger.debug(f"[JWTCodec.encode] Сформирован payload: {payload}")
try:
token = jwt.encode(payload, JWT_SECRET_KEY, JWT_ALGORITHM)
logger.debug(f"[JWTCodec.encode] Токен успешно создан, длина: {len(token) if token else 0}")
return token
except Exception as e:
logger.error(f"[JWTCodec.encode] Ошибка при кодировании JWT: {e}")
raise
@staticmethod
def decode(token: str, verify_exp: bool = True):
logger.debug(f"[JWTCodec.decode] Начало декодирования токена длиной {len(token) if token else 0}")
r = None
payload = None
try:
payload = jwt.decode(
token,
key=JWT_SECRET_KEY,
options={
"verify_exp": verify_exp,
# "verify_signature": False
},
algorithms=[JWT_ALGORITHM],
issuer="discours",
)
logger.debug(f"[JWTCodec.decode] Декодирован payload: {payload}")
# Убедимся, что exp существует (добавим обработку если exp отсутствует)
if "exp" not in payload:
logger.warning(f"[JWTCodec.decode] В токене отсутствует поле exp")
# Добавим exp по умолчанию, чтобы избежать ошибки при создании TokenPayload
payload["exp"] = int((datetime.now(tz=timezone.utc) + timedelta(days=30)).timestamp())
r = TokenPayload(**payload)
logger.debug(f"[JWTCodec.decode] Создан объект TokenPayload: user_id={r.user_id}, username={r.username}")
return r
except jwt.InvalidIssuedAtError:
logger.error(f"[JWTCodec.decode] Недействительное время выпуска токена: {payload}")
raise ExpiredToken("jwt check token issued time")
except jwt.ExpiredSignatureError:
logger.error(f"[JWTCodec.decode] Истек срок действия токена: {payload}")
raise ExpiredToken("jwt check token lifetime")
except jwt.InvalidSignatureError:
logger.error("[JWTCodec.decode] Недействительная подпись токена")
raise InvalidToken("jwt check signature is not valid")
except jwt.InvalidTokenError:
logger.error("[JWTCodec.decode] Недействительный токен")
raise InvalidToken("jwt check token is not valid")
except jwt.InvalidKeyError:
logger.error("[JWTCodec.decode] Недействительный ключ")
raise InvalidToken("jwt check key is not valid")
except Exception as e:
logger.error(f"[JWTCodec.decode] Неожиданная ошибка при декодировании: {e}")
raise InvalidToken(f"Ошибка декодирования: {str(e)}")