tests-passed

This commit is contained in:
2025-07-31 18:55:59 +03:00
parent b7abb8d8a1
commit e7230ba63c
126 changed files with 8326 additions and 3207 deletions

View File

@@ -1,123 +1,97 @@
from datetime import datetime, timedelta, timezone
from typing import Any, Optional, Union
import datetime
import logging
from typing import Any, Dict, Optional
import jwt
from pydantic import BaseModel
from settings import JWT_ALGORITHM, JWT_SECRET_KEY
from utils.logger import root_logger as logger
class TokenPayload(BaseModel):
user_id: str
username: str
exp: Optional[datetime] = None
iat: datetime
iss: str
from settings import JWT_ALGORITHM, JWT_ISSUER, JWT_REFRESH_TOKEN_EXPIRE_DAYS, JWT_SECRET_KEY
class JWTCodec:
"""
Кодировщик и декодировщик JWT токенов.
"""
@staticmethod
def encode(user: Union[dict[str, Any], Any], exp: Optional[datetime] = None) -> str:
# Поддержка как объектов, так и словарей
if isinstance(user, dict):
# В TokenStorage.create_session передается словарь {"user_id": user_id, "username": username}
user_id = str(user.get("user_id", "") or user.get("id", ""))
username = user.get("username", "") or user.get("email", "")
else:
# Для объектов с атрибутами
user_id = str(getattr(user, "id", ""))
username = getattr(user, "slug", "") or getattr(user, "email", "") or getattr(user, "phone", "") or ""
def encode(
payload: Dict[str, Any],
secret_key: Optional[str] = None,
algorithm: Optional[str] = None,
expiration: Optional[datetime.datetime] = None,
) -> str | bytes:
"""
Кодирует payload в JWT токен.
logger.debug(f"[JWTCodec.encode] Кодирование токена для user_id={user_id}, username={username}")
Args:
payload (Dict[str, Any]): Полезная нагрузка для кодирования
secret_key (Optional[str]): Секретный ключ. По умолчанию используется JWT_SECRET_KEY
algorithm (Optional[str]): Алгоритм шифрования. По умолчанию используется JWT_ALGORITHM
expiration (Optional[datetime.datetime]): Время истечения токена
# Если время истечения не указано, установим срок годности на 30 дней
if exp is None:
exp = datetime.now(tz=timezone.utc) + timedelta(days=30)
logger.debug(f"[JWTCodec.encode] Время истечения не указано, устанавливаем срок: {exp}")
Returns:
str: Закодированный JWT токен
"""
logger = logging.getLogger("root")
logger.debug(f"[JWTCodec.encode] Кодирование токена для payload: {payload}")
# Важно: убедимся, что exp всегда является либо datetime, либо целым числом от timestamp
if isinstance(exp, datetime):
# Преобразуем datetime в timestamp чтобы гарантировать правильный формат
exp_timestamp = int(exp.timestamp())
else:
# Если передано что-то другое, установим значение по умолчанию
logger.warning(f"[JWTCodec.encode] Некорректный формат exp: {exp}, используем значение по умолчанию")
exp_timestamp = int((datetime.now(tz=timezone.utc) + timedelta(days=30)).timestamp())
# Используем переданные или дефолтные значения
secret_key = secret_key or JWT_SECRET_KEY
algorithm = algorithm or JWT_ALGORITHM
payload = {
"user_id": user_id,
"username": username,
"exp": exp_timestamp, # Используем timestamp вместо datetime
"iat": datetime.now(tz=timezone.utc),
"iss": "discours",
}
# Если время истечения не указано, устанавливаем дефолтное
if not expiration:
expiration = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
days=JWT_REFRESH_TOKEN_EXPIRE_DAYS
)
logger.debug(f"[JWTCodec.encode] Время истечения не указано, устанавливаем срок: {expiration}")
# Формируем payload с временными метками
payload.update(
{"exp": int(expiration.timestamp()), "iat": datetime.datetime.now(datetime.timezone.utc), "iss": JWT_ISSUER}
)
logger.debug(f"[JWTCodec.encode] Сформирован payload: {payload}")
try:
token = jwt.encode(payload, JWT_SECRET_KEY, JWT_ALGORITHM)
logger.debug(f"[JWTCodec.encode] Токен успешно создан, длина: {len(token) if token else 0}")
# Ensure we always return str, not bytes
if isinstance(token, bytes):
return token.decode("utf-8")
return str(token)
# Используем PyJWT для кодирования
encoded = jwt.encode(payload, secret_key, algorithm=algorithm)
token_str = encoded.decode("utf-8") if isinstance(encoded, bytes) else encoded
return token_str
except Exception as e:
logger.error(f"[JWTCodec.encode] Ошибка при кодировании JWT: {e}")
logger.warning(f"[JWTCodec.encode] Ошибка при кодировании JWT: {e}")
raise
@staticmethod
def decode(token: str, verify_exp: bool = True) -> Optional[TokenPayload]:
logger.debug(f"[JWTCodec.decode] Начало декодирования токена длиной {len(token) if token else 0}")
def decode(
token: str,
secret_key: Optional[str] = None,
algorithms: Optional[list] = None,
) -> Dict[str, Any]:
"""
Декодирует JWT токен.
if not token:
logger.error("[JWTCodec.decode] Пустой токен")
return None
Args:
token (str): JWT токен
secret_key (Optional[str]): Секретный ключ. По умолчанию используется JWT_SECRET_KEY
algorithms (Optional[list]): Список алгоритмов. По умолчанию используется [JWT_ALGORITHM]
Returns:
Dict[str, Any]: Декодированный payload
"""
logger = logging.getLogger("root")
logger.debug("[JWTCodec.decode] Декодирование токена")
# Используем переданные или дефолтные значения
secret_key = secret_key or JWT_SECRET_KEY
algorithms = algorithms or [JWT_ALGORITHM]
try:
payload = jwt.decode(
token,
key=JWT_SECRET_KEY,
options={
"verify_exp": verify_exp,
# "verify_signature": False
},
algorithms=[JWT_ALGORITHM],
issuer="discours",
)
logger.debug(f"[JWTCodec.decode] Декодирован payload: {payload}")
# Убедимся, что exp существует (добавим обработку если exp отсутствует)
if "exp" not in payload:
logger.warning("[JWTCodec.decode] В токене отсутствует поле exp")
# Добавим exp по умолчанию, чтобы избежать ошибки при создании TokenPayload
payload["exp"] = int((datetime.now(tz=timezone.utc) + timedelta(days=30)).timestamp())
try:
r = TokenPayload(**payload)
logger.debug(
f"[JWTCodec.decode] Создан объект TokenPayload: user_id={r.user_id}, username={r.username}"
)
return r
except Exception as e:
logger.error(f"[JWTCodec.decode] Ошибка при создании TokenPayload: {e}")
return None
except jwt.InvalidIssuedAtError:
logger.error("[JWTCodec.decode] Недействительное время выпуска токена")
return None
# Используем PyJWT для декодирования
decoded = jwt.decode(token, secret_key, algorithms=algorithms)
return decoded
except jwt.ExpiredSignatureError:
logger.error("[JWTCodec.decode] Истек срок действия токена")
return None
except jwt.InvalidSignatureError:
logger.error("[JWTCodec.decode] Недействительная подпись токена")
return None
except jwt.InvalidTokenError:
logger.error("[JWTCodec.decode] Недействительный токен")
return None
except jwt.InvalidKeyError:
logger.error("[JWTCodec.decode] Недействительный ключ")
return None
except Exception as e:
logger.error(f"[JWTCodec.decode] Неожиданная ошибка при декодировании: {e}")
return None
logger.warning("[JWTCodec.decode] Токен просрочен")
raise
except jwt.InvalidTokenError as e:
logger.warning(f"[JWTCodec.decode] Ошибка при декодировании JWT: {e}")
raise