- **🔍 Comprehensive authentication documentation refactoring**: Полная переработка документации аутентификации
- Обновлена таблица содержания в README.md
- Исправлены архитектурные диаграммы - токены хранятся только в Redis
- Добавлены практические примеры кода для микросервисов
- Консолидирована OAuth документация
16 KiB
🔧 Auth API Reference
🎯 Обзор
Полный справочник по API системы аутентификации с примерами кода и использования.
📚 Token Managers
SessionTokenManager
from auth.tokens.sessions import SessionTokenManager
sessions = SessionTokenManager()
Методы
create_session(user_id, auth_data=None, username=None, device_info=None)
Создает новую сессию для пользователя.
Параметры:
user_id(str): ID пользователяauth_data(dict, optional): Данные аутентификацииusername(str, optional): Имя пользователяdevice_info(dict, optional): Информация об устройстве
Возвращает: str - JWT токен
Пример:
token = await sessions.create_session(
user_id="123",
username="john_doe",
device_info={"ip": "192.168.1.1", "user_agent": "Mozilla/5.0"}
)
verify_session(token)
Проверяет валидность JWT токена и Redis сессии.
Параметры:
token(str): JWT токен
Возвращает: dict | None - Payload токена или None
Пример:
payload = await sessions.verify_session(token)
if payload:
user_id = payload.get("user_id")
username = payload.get("username")
validate_session_token(token)
Валидирует токен сессии с дополнительными проверками.
Параметры:
token(str): JWT токен
Возвращает: tuple[bool, dict] - (валидность, данные)
Пример:
valid, data = await sessions.validate_session_token(token)
if valid:
print(f"Session valid for user: {data.get('user_id')}")
get_session_data(token, user_id)
Получает данные сессии из Redis.
Параметры:
token(str): JWT токенuser_id(str): ID пользователя
Возвращает: dict | None - Данные сессии
Пример:
session_data = await sessions.get_session_data(token, user_id)
if session_data:
last_activity = session_data.get("last_activity")
refresh_session(user_id, old_token, device_info=None)
Обновляет сессию пользователя.
Параметры:
user_id(str): ID пользователяold_token(str): Старый JWT токенdevice_info(dict, optional): Информация об устройстве
Возвращает: str - Новый JWT токен
Пример:
new_token = await sessions.refresh_session(
user_id="123",
old_token=old_token,
device_info={"ip": "192.168.1.1"}
)
revoke_session_token(token)
Отзывает конкретный токен сессии.
Параметры:
token(str): JWT токен
Возвращает: bool - Успешность операции
Пример:
revoked = await sessions.revoke_session_token(token)
if revoked:
print("Session revoked successfully")
get_user_sessions(user_id)
Получает все активные сессии пользователя.
Параметры:
user_id(str): ID пользователя
Возвращает: list[dict] - Список сессий
Пример:
user_sessions = await sessions.get_user_sessions("123")
for session in user_sessions:
print(f"Token: {session['token'][:20]}...")
print(f"Last activity: {session['last_activity']}")
revoke_user_sessions(user_id)
Отзывает все сессии пользователя.
Параметры:
user_id(str): ID пользователя
Возвращает: int - Количество отозванных сессий
Пример:
revoked_count = await sessions.revoke_user_sessions("123")
print(f"Revoked {revoked_count} sessions")
OAuthTokenManager
from auth.tokens.oauth import OAuthTokenManager
oauth = OAuthTokenManager()
Методы
store_oauth_tokens(user_id, provider, access_token, refresh_token=None, expires_in=3600, additional_data=None)
Сохраняет OAuth токены в Redis.
Параметры:
user_id(str): ID пользователяprovider(str): OAuth провайдер (google, github, etc.)access_token(str): Access токенrefresh_token(str, optional): Refresh токенexpires_in(int): Время жизни в секундахadditional_data(dict, optional): Дополнительные данные
Пример:
await oauth.store_oauth_tokens(
user_id="123",
provider="google",
access_token="ya29.a0AfH6SM...",
refresh_token="1//04...",
expires_in=3600,
additional_data={"scope": "read write"}
)
get_token(user_id, provider, token_type)
Получает OAuth токен.
Параметры:
user_id(str): ID пользователяprovider(str): OAuth провайдерtoken_type(str): Тип токена ("oauth_access" или "oauth_refresh")
Возвращает: dict | None - Данные токена
Пример:
access_data = await oauth.get_token("123", "google", "oauth_access")
if access_data:
token = access_data["token"]
expires_in = access_data.get("expires_in")
revoke_oauth_tokens(user_id, provider)
Отзывает OAuth токены провайдера.
Параметры:
user_id(str): ID пользователяprovider(str): OAuth провайдер
Возвращает: bool - Успешность операции
Пример:
revoked = await oauth.revoke_oauth_tokens("123", "google")
if revoked:
print("OAuth tokens revoked")
BatchTokenOperations
from auth.tokens.batch import BatchTokenOperations
batch = BatchTokenOperations()
Методы
batch_validate_tokens(tokens)
Массовая валидация токенов.
Параметры:
tokens(list[str]): Список JWT токенов
Возвращает: dict[str, bool] - Результаты валидации
Пример:
tokens = ["token1", "token2", "token3"]
results = await batch.batch_validate_tokens(tokens)
# {"token1": True, "token2": False, "token3": True}
for token, is_valid in results.items():
print(f"Token {token[:10]}... is {'valid' if is_valid else 'invalid'}")
batch_revoke_tokens(tokens)
Массовый отзыв токенов.
Параметры:
tokens(list[str]): Список JWT токенов
Возвращает: int - Количество отозванных токенов
Пример:
revoked_count = await batch.batch_revoke_tokens(tokens)
print(f"Revoked {revoked_count} tokens")
cleanup_expired_tokens()
Очистка истекших токенов.
Возвращает: int - Количество очищенных токенов
Пример:
cleaned_count = await batch.cleanup_expired_tokens()
print(f"Cleaned {cleaned_count} expired tokens")
TokenMonitoring
from auth.tokens.monitoring import TokenMonitoring
monitoring = TokenMonitoring()
Методы
get_token_statistics()
Получает статистику токенов.
Возвращает: dict - Статистика системы
Пример:
stats = await monitoring.get_token_statistics()
print(f"Active sessions: {stats['session_tokens']}")
print(f"OAuth tokens: {stats['oauth_access_tokens']}")
print(f"Memory usage: {stats['memory_usage'] / 1024 / 1024:.2f} MB")
health_check()
Проверка здоровья системы токенов.
Возвращает: dict - Статус системы
Пример:
health = await monitoring.health_check()
if health["status"] == "healthy":
print("Token system is healthy")
print(f"Redis connected: {health['redis_connected']}")
else:
print(f"System unhealthy: {health.get('error')}")
optimize_memory_usage()
Оптимизация использования памяти.
Возвращает: dict - Результаты оптимизации
Пример:
results = await monitoring.optimize_memory_usage()
print(f"Cleaned expired: {results['cleaned_expired']}")
print(f"Memory freed: {results['memory_freed']} bytes")
🛠️ Utility Functions
Auth Utils
from auth.utils import (
extract_token_from_request,
get_auth_token,
get_auth_token_from_context,
get_safe_headers,
get_user_data_by_token
)
extract_token_from_request(request)
Извлекает токен из HTTP запроса.
Параметры:
request: HTTP запрос (FastAPI, Starlette, etc.)
Возвращает: str | None - JWT токен или None
Пример:
token = await extract_token_from_request(request)
if token:
print(f"Found token: {token[:20]}...")
get_auth_token(request)
Расширенное извлечение токена с логированием.
Параметры:
request: HTTP запрос
Возвращает: str | None - JWT токен или None
Пример:
token = await get_auth_token(request)
if token:
# Токен найден и залогирован
pass
get_auth_token_from_context(info)
Извлечение токена из GraphQL контекста.
Параметры:
info: GraphQL Info объект
Возвращает: str | None - JWT токен или None
Пример:
@auth_required
async def protected_resolver(info, **kwargs):
token = await get_auth_token_from_context(info)
# Используем токен для дополнительных проверок
get_safe_headers(request)
Безопасное получение заголовков запроса.
Параметры:
request: HTTP запрос
Возвращает: dict[str, str] - Словарь заголовков
Пример:
headers = get_safe_headers(request)
auth_header = headers.get("authorization", "")
user_agent = headers.get("user-agent", "")
get_user_data_by_token(token)
Получение данных пользователя по токену.
Параметры:
token(str): JWT токен
Возвращает: dict | None - Данные пользователя
Пример:
user_data = await get_user_data_by_token(token)
if user_data:
print(f"User: {user_data['username']}")
print(f"ID: {user_data['user_id']}")
🎭 Decorators
GraphQL Decorators
from auth.decorators import auth_required, permission_required
@auth_required
Требует авторизации для выполнения resolver'а.
Пример:
@auth_required
async def get_user_profile(info, **kwargs):
"""Получение профиля пользователя"""
user = info.context.get('user')
return {
"id": user.id,
"username": user.username,
"email": user.email
}
@permission_required(permission)
Требует конкретного разрешения.
Параметры:
permission(str): Название разрешения
Пример:
@auth_required
@permission_required("shout:create")
async def create_shout(info, input_data):
"""Создание публикации"""
user = info.context.get('user')
shout = Shout(
title=input_data['title'],
content=input_data['content'],
author_id=user.id
)
return shout
🔧 Middleware
AuthMiddleware
from auth.middleware import AuthMiddleware
middleware = AuthMiddleware()
Методы
authenticate_user(request)
Аутентификация пользователя из запроса.
Параметры:
request: HTTP запрос
Возвращает: dict | None - Данные пользователя
Пример:
user_data = await middleware.authenticate_user(request)
if user_data:
request.user = user_data
set_cookie(response, token)
Установка httpOnly cookie с токеном.
Параметры:
response: HTTP ответtoken(str): JWT токен
Пример:
await middleware.set_cookie(response, token)
delete_cookie(response)
Удаление cookie с токеном.
Параметры:
response: HTTP ответ
Пример:
await middleware.delete_cookie(response)
🔒 Error Handling
Исключения
from auth.exceptions import (
AuthenticationError,
InvalidTokenError,
TokenExpiredError,
OAuthError
)
AuthenticationError
Базовое исключение аутентификации.
Пример:
try:
payload = await sessions.verify_session(token)
if not payload:
raise AuthenticationError("Invalid session token")
except AuthenticationError as e:
return {"error": str(e), "status": 401}
InvalidTokenError
Невалидный токен.
Пример:
try:
valid, data = await sessions.validate_session_token(token)
if not valid:
raise InvalidTokenError("Token validation failed")
except InvalidTokenError as e:
return {"error": str(e), "status": 401}
TokenExpiredError
Истекший токен.
Пример:
try:
# Проверка токена
pass
except TokenExpiredError as e:
return {"error": "Token expired", "status": 401}
📊 Response Formats
Успешные ответы
# Успешная аутентификация
{
"authenticated": True,
"user_id": "123",
"username": "john_doe",
"expires_at": 1640995200
}
# Статистика токенов
{
"session_tokens": 150,
"oauth_access_tokens": 25,
"oauth_refresh_tokens": 25,
"verification_tokens": 5,
"memory_usage": 1048576
}
# Health check
{
"status": "healthy",
"redis_connected": True,
"token_count": 205,
"timestamp": 1640995200
}
Ошибки
# Ошибка аутентификации
{
"authenticated": False,
"error": "Invalid or expired token",
"status": 401
}
# Ошибка системы
{
"status": "error",
"error": "Redis connection failed",
"timestamp": 1640995200
}
🧪 Testing Helpers
Mock Utilities
from unittest.mock import AsyncMock, patch
# Mock SessionTokenManager
@patch('auth.tokens.sessions.SessionTokenManager')
async def test_auth(mock_sessions):
mock_sessions.return_value.verify_session.return_value = {
"user_id": "123",
"username": "testuser"
}
# Ваш тест здесь
pass
# Mock Redis
@patch('storage.redis.redis')
async def test_redis_operations(mock_redis):
mock_redis.get.return_value = b'{"user_id": "123"}'
mock_redis.exists.return_value = True
# Ваш тест здесь
pass
Test Fixtures
import pytest
@pytest.fixture
async def auth_token():
"""Фикстура для создания тестового токена"""
sessions = SessionTokenManager()
return await sessions.create_session(
user_id="test_user",
username="testuser"
)
@pytest.fixture
async def authenticated_request(auth_token):
"""Фикстура для аутентифицированного запроса"""
mock_request = AsyncMock()
mock_request.headers = {"authorization": f"Bearer {auth_token}"}
return mock_request