All checks were successful
Deploy on push / deploy (push) Successful in 5m47s
- **🔍 Comprehensive authentication documentation refactoring**: Полная переработка документации аутентификации
- Обновлена таблица содержания в README.md
- Исправлены архитектурные диаграммы - токены хранятся только в Redis
- Добавлены практические примеры кода для микросервисов
- Консолидирована OAuth документация
18 KiB
18 KiB
🔍 Аутентификация для микросервисов
🎯 Обзор
Руководство по интеграции системы аутентификации Discours Core с другими микросервисами через общий Redis connection pool.
🚀 Быстрый старт
Подключение к Redis
# Используйте тот же Redis connection pool
from storage.redis import redis
# Или создайте свой с теми же настройками
import aioredis
redis_client = aioredis.from_url(
"redis://localhost:6379/0",
max_connections=20,
retry_on_timeout=True,
socket_keepalive=True,
socket_keepalive_options={},
health_check_interval=30
)
Проверка токена сессии
from auth.tokens.sessions import SessionTokenManager
from auth.utils import extract_token_from_request
async def check_user_session(request) -> dict | None:
"""Проверка сессии пользователя в микросервисе"""
# 1. Извлекаем токен из запроса
token = await extract_token_from_request(request)
if not token:
return None
# 2. Проверяем сессию через SessionTokenManager
sessions = SessionTokenManager()
payload = await sessions.verify_session(token)
if payload:
return {
"authenticated": True,
"user_id": payload.get("user_id"),
"username": payload.get("username"),
"expires_at": payload.get("exp")
}
return {"authenticated": False, "error": "Invalid token"}
🔑 Redis ключи для поиска
Структура данных
# Сессии пользователей
session:{user_id}:{token} # Hash: {user_id, username, device_info, last_activity}
user_sessions:{user_id} # Set: {token1, token2, ...}
# OAuth токены
oauth_access:{user_id}:{provider} # JSON: {token, expires_in, scope}
oauth_refresh:{user_id}:{provider} # JSON: {token, provider_data}
# Токены подтверждения
verification_token:{token} # JSON: {user_id, type, data, created_at}
# OAuth состояние
oauth_state:{state} # JSON: {provider, redirect_uri, code_verifier}
Примеры поиска
from storage.redis import redis
# 1. Поиск всех сессий пользователя
async def get_user_sessions(user_id: int) -> list[str]:
"""Получить все активные токены пользователя"""
session_key = f"user_sessions:{user_id}"
tokens = await redis.smembers(session_key)
return [token.decode() for token in tokens] if tokens else []
# 2. Получение данных конкретной сессии
async def get_session_data(user_id: int, token: str) -> dict | None:
"""Получить данные сессии"""
session_key = f"session:{user_id}:{token}"
data = await redis.hgetall(session_key)
if data:
return {k.decode(): v.decode() for k, v in data.items()}
return None
# 3. Проверка существования токена
async def token_exists(user_id: int, token: str) -> bool:
"""Проверить существование токена"""
session_key = f"session:{user_id}:{token}"
return await redis.exists(session_key)
# 4. Получение TTL токена
async def get_token_ttl(user_id: int, token: str) -> int:
"""Получить время жизни токена в секундах"""
session_key = f"session:{user_id}:{token}"
return await redis.ttl(session_key)
🛠️ Методы интеграции
1. Прямая проверка токена
from auth.tokens.sessions import SessionTokenManager
async def authenticate_request(request) -> dict:
"""Аутентификация запроса в микросервисе"""
sessions = SessionTokenManager()
# Извлекаем токен
token = await extract_token_from_request(request)
if not token:
return {"authenticated": False, "error": "No token provided"}
try:
# Проверяем JWT и Redis сессию
payload = await sessions.verify_session(token)
if payload:
user_id = payload.get("user_id")
# Дополнительно получаем данные сессии из Redis
session_data = await sessions.get_session_data(token, user_id)
return {
"authenticated": True,
"user_id": user_id,
"username": payload.get("username"),
"session_data": session_data,
"expires_at": payload.get("exp")
}
else:
return {"authenticated": False, "error": "Invalid or expired token"}
except Exception as e:
return {"authenticated": False, "error": f"Authentication error: {str(e)}"}
2. Массовая проверка токенов
from auth.tokens.batch import BatchTokenOperations
async def validate_multiple_tokens(tokens: list[str]) -> dict[str, bool]:
"""Массовая проверка токенов для API gateway"""
batch = BatchTokenOperations()
return await batch.batch_validate_tokens(tokens)
# Использование
async def api_gateway_auth(request_tokens: list[str]):
"""Пример использования в API Gateway"""
results = await validate_multiple_tokens(request_tokens)
authenticated_requests = []
for token, is_valid in results.items():
if is_valid:
# Получаем данные пользователя для валидных токенов
sessions = SessionTokenManager()
payload = await sessions.verify_session(token)
if payload:
authenticated_requests.append({
"token": token,
"user_id": payload.get("user_id"),
"username": payload.get("username")
})
return authenticated_requests
3. Получение данных пользователя
from auth.utils import get_user_data_by_token
async def get_user_info(token: str) -> dict | None:
"""Получить информацию о пользователе по токену"""
try:
user_data = await get_user_data_by_token(token)
return user_data
except Exception as e:
print(f"Ошибка получения данных пользователя: {e}")
return None
# Использование
async def protected_endpoint(request):
"""Пример защищенного endpoint в микросервисе"""
token = await extract_token_from_request(request)
user_info = await get_user_info(token)
if not user_info:
return {"error": "Unauthorized", "status": 401}
return {
"message": f"Hello, {user_info.get('username')}!",
"user_id": user_info.get("user_id"),
"status": 200
}
🔧 HTTP заголовки и извлечение токенов
Поддерживаемые форматы
from auth.utils import extract_token_from_request, get_safe_headers
async def extract_auth_token(request) -> str | None:
"""Извлечение токена из различных источников"""
# 1. Автоматическое извлечение (рекомендуется)
token = await extract_token_from_request(request)
if token:
return token
# 2. Ручное извлечение из заголовков
headers = get_safe_headers(request)
# Bearer токен в Authorization
auth_header = headers.get("authorization", "")
if auth_header.startswith("Bearer "):
return auth_header[7:].strip()
# Кастомный заголовок X-Session-Token
session_token = headers.get("x-session-token")
if session_token:
return session_token.strip()
# Cookie (для веб-приложений)
if hasattr(request, "cookies"):
cookie_token = request.cookies.get("session_token")
if cookie_token:
return cookie_token
return None
Примеры HTTP запросов
# 1. Bearer токен в Authorization header
curl -H "Authorization: Bearer your_jwt_token_here" \
http://localhost:8000/api/protected
# 2. Кастомный заголовок
curl -H "X-Session-Token: your_jwt_token_here" \
http://localhost:8000/api/protected
# 3. Cookie (автоматически для веб-приложений)
curl -b "session_token=your_jwt_token_here" \
http://localhost:8000/api/protected
📊 Мониторинг и статистика
Health Check
from auth.tokens.monitoring import TokenMonitoring
async def auth_health_check() -> dict:
"""Health check системы аутентификации"""
monitoring = TokenMonitoring()
try:
# Проверяем состояние системы токенов
health = await monitoring.health_check()
# Получаем статистику
stats = await monitoring.get_token_statistics()
return {
"status": health.get("status", "unknown"),
"redis_connected": health.get("redis_connected", False),
"active_sessions": stats.get("session_tokens", 0),
"oauth_tokens": stats.get("oauth_access_tokens", 0) + stats.get("oauth_refresh_tokens", 0),
"memory_usage_mb": stats.get("memory_usage", 0) / 1024 / 1024,
"timestamp": int(time.time())
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"timestamp": int(time.time())
}
# Использование в endpoint
async def health_endpoint():
"""Endpoint для мониторинга"""
health_data = await auth_health_check()
if health_data["status"] == "healthy":
return {"health": health_data, "status": 200}
else:
return {"health": health_data, "status": 503}
Статистика использования
async def get_auth_statistics() -> dict:
"""Получить статистику использования аутентификации"""
monitoring = TokenMonitoring()
stats = await monitoring.get_token_statistics()
return {
"sessions": {
"active": stats.get("session_tokens", 0),
"total_memory": stats.get("memory_usage", 0)
},
"oauth": {
"access_tokens": stats.get("oauth_access_tokens", 0),
"refresh_tokens": stats.get("oauth_refresh_tokens", 0)
},
"verification": {
"pending": stats.get("verification_tokens", 0)
},
"redis": {
"connected": stats.get("redis_connected", False),
"memory_usage_mb": stats.get("memory_usage", 0) / 1024 / 1024
}
}
🔒 Безопасность для микросервисов
Валидация токенов
async def secure_token_validation(token: str) -> dict:
"""Безопасная валидация токена с дополнительными проверками"""
if not token or len(token) < 10:
return {"valid": False, "error": "Invalid token format"}
try:
sessions = SessionTokenManager()
# 1. Проверяем JWT структуру и подпись
payload = await sessions.verify_session(token)
if not payload:
return {"valid": False, "error": "Invalid JWT token"}
user_id = payload.get("user_id")
if not user_id:
return {"valid": False, "error": "Missing user_id in token"}
# 2. Проверяем существование сессии в Redis
session_exists = await redis.exists(f"session:{user_id}:{token}")
if not session_exists:
return {"valid": False, "error": "Session not found in Redis"}
# 3. Проверяем TTL
ttl = await redis.ttl(f"session:{user_id}:{token}")
if ttl <= 0:
return {"valid": False, "error": "Session expired"}
# 4. Обновляем last_activity
await redis.hset(f"session:{user_id}:{token}", "last_activity", int(time.time()))
return {
"valid": True,
"user_id": user_id,
"username": payload.get("username"),
"expires_in": ttl,
"last_activity": int(time.time())
}
except Exception as e:
return {"valid": False, "error": f"Validation error: {str(e)}"}
Rate Limiting
from collections import defaultdict
import time
# Простой in-memory rate limiter (для production используйте Redis)
request_counts = defaultdict(list)
async def rate_limit_check(user_id: str, max_requests: int = 100, window_seconds: int = 60) -> bool:
"""Проверка rate limiting для пользователя"""
current_time = time.time()
user_requests = request_counts[user_id]
# Удаляем старые запросы
user_requests[:] = [req_time for req_time in user_requests if current_time - req_time < window_seconds]
# Проверяем лимит
if len(user_requests) >= max_requests:
return False
# Добавляем текущий запрос
user_requests.append(current_time)
return True
# Использование в middleware
async def auth_with_rate_limiting(request):
"""Аутентификация с rate limiting"""
auth_result = await authenticate_request(request)
if auth_result["authenticated"]:
user_id = str(auth_result["user_id"])
if not await rate_limit_check(user_id):
return {"error": "Rate limit exceeded", "status": 429}
return auth_result
🧪 Тестирование интеграции
Unit тесты
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_microservice_auth():
"""Тест аутентификации в микросервисе"""
# Mock request с токеном
mock_request = AsyncMock()
mock_request.headers = {"authorization": "Bearer valid_token"}
# Mock SessionTokenManager
with patch('auth.tokens.sessions.SessionTokenManager') as mock_sessions:
mock_sessions.return_value.verify_session.return_value = {
"user_id": "123",
"username": "testuser",
"exp": int(time.time()) + 3600
}
result = await authenticate_request(mock_request)
assert result["authenticated"] is True
assert result["user_id"] == "123"
assert result["username"] == "testuser"
@pytest.mark.asyncio
async def test_batch_token_validation():
"""Тест массовой валидации токенов"""
tokens = ["token1", "token2", "token3"]
with patch('auth.tokens.batch.BatchTokenOperations') as mock_batch:
mock_batch.return_value.batch_validate_tokens.return_value = {
"token1": True,
"token2": False,
"token3": True
}
results = await validate_multiple_tokens(tokens)
assert results["token1"] is True
assert results["token2"] is False
assert results["token3"] is True
Integration тесты
@pytest.mark.asyncio
async def test_redis_integration():
"""Тест интеграции с Redis"""
from storage.redis import redis
# Тестируем подключение
ping_result = await redis.ping()
assert ping_result is True
# Тестируем операции с сессиями
test_key = "session:test:token123"
test_data = {"user_id": "123", "username": "testuser"}
# Сохраняем данные
await redis.hset(test_key, mapping=test_data)
await redis.expire(test_key, 3600)
# Проверяем данные
stored_data = await redis.hgetall(test_key)
assert stored_data[b"user_id"].decode() == "123"
assert stored_data[b"username"].decode() == "testuser"
# Проверяем TTL
ttl = await redis.ttl(test_key)
assert ttl > 0
# Очищаем
await redis.delete(test_key)
📋 Checklist для интеграции
Подготовка
- Настроен Redis connection pool с теми же параметрами
- Установлены зависимости:
auth.tokens.*,auth.utils - Настроены environment variables (JWT_SECRET, REDIS_URL)
Реализация
- Реализована функция извлечения токенов из запросов
- Добавлена проверка сессий через SessionTokenManager
- Настроена обработка ошибок аутентификации
- Добавлен health check endpoint
Безопасность
- Валидация токенов включает проверку Redis сессий
- Настроен rate limiting (опционально)
- Логирование событий аутентификации
- Обработка истекших токенов
Мониторинг
- Health check интегрирован в систему мониторинга
- Метрики аутентификации собираются
- Алерты настроены для проблем с Redis/JWT
Тестирование
- Unit тесты для функций аутентификации
- Integration тесты с Redis
- E2E тесты с реальными токенами
- Load тесты для проверки производительности