core/services/auth.py
2024-12-24 14:22:49 +03:00

188 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from functools import wraps
from cache.cache import get_cached_author_by_user_id
from resolvers.stat import get_with_stat
from services.schema import request_graphql_data
from settings import ADMIN_SECRET, AUTH_URL
from utils.logger import root_logger as logger
# Список разрешенных заголовков
ALLOWED_HEADERS = [
'Authorization',
'Content-Type'
]
async def check_auth(req):
"""
Проверка авторизации пользователя.
Эта функция проверяет токен авторизации, переданный в заголовках запроса,
и возвращает идентификатор пользователя и его роли.
Параметры:
- req: Входящий GraphQL запрос, содержащий заголовок авторизации.
Возвращает:
- user_id: str - Идентификатор пользователя.
- user_roles: list[str] - Список ролей пользователя.
"""
token = req.headers.get("Authorization")
host = req.headers.get('host', '')
logger.debug(f"check_auth: host={host}")
auth_url = AUTH_URL
if '.dscrs.site' in host or 'localhost' in host:
auth_url = "https://auth.dscrs.site/graphql"
user_id = ""
user_roles = []
if token:
# Проверяем и очищаем токен от префикса Bearer если он есть
if token.startswith('Bearer '):
token = token.split('Bearer ')[-1].strip()
# Logging the authentication token
logger.debug(f"TOKEN: {token}")
query_name = "validate_jwt_token"
operation = "ValidateToken"
variables = {"params": {"token_type": "access_token", "token": token}}
# Только необходимые заголовки для GraphQL запроса
headers = {
'Content-Type': 'application/json'
}
gql = {
"query": f"query {operation}($params: ValidateJWTTokenInput!)"
+ "{"
+ f"{query_name}(params: $params) {{ is_valid claims }} "
+ "}",
"variables": variables,
"operationName": operation,
}
data = await request_graphql_data(gql, url=auth_url, headers=headers)
if data:
logger.debug(f"Auth response: {data}")
validation_result = data.get("data", {}).get(query_name, {})
logger.debug(f"Validation result: {validation_result}")
is_valid = validation_result.get("is_valid", False)
if not is_valid:
logger.error(f"Token validation failed: {validation_result}")
return "", []
user_data = validation_result.get("claims", {})
logger.debug(f"User claims: {user_data}")
user_id = user_data.get("sub", "")
user_roles = user_data.get("allowed_roles", [])
return user_id, user_roles
async def add_user_role(user_id):
"""
Добавление роли пользователя.
Эта функция добавляет роли "author" и "reader" для указанного пользователя
в системе авторизации.
Параметры:
- user_id: str - Идентификатор пользователя, которому нужно добавить роли.
Возвращает:
- user_id: str - Идентификатор пользователя, если операция прошла успешно.
"""
logger.info(f"add author role for user_id: {user_id}")
query_name = "_update_user"
operation = "UpdateUserRoles"
headers = {
"Content-Type": "application/json",
"x-authorizer-admin-secret": ADMIN_SECRET,
}
variables = {"params": {"roles": "author, reader", "id": user_id}}
gql = {
"query": f"mutation {operation}($params: UpdateUserInput!) {{ {query_name}(params: $params) {{ id roles }} }}",
"variables": variables,
"operationName": operation,
}
data = await request_graphql_data(gql, headers=headers)
if data:
user_id = data.get("data", {}).get(query_name, {}).get("id")
return user_id
def login_required(f):
"""
Декоратор для проверки авторизации пользователя.
Этот декоратор проверяет, авторизован ли пользователь, <20><> добавляет
информацию о пользователе в контекст функции.
Параметры:
- f: Функция, которую нужно декорировать.
Возвращает:
- Обернутую функцию с добавленной проверкой авторизации.
"""
@wraps(f)
async def decorated_function(*args, **kwargs):
info = args[1]
req = info.context.get("request")
user_id, user_roles = await check_auth(req)
if user_id and user_roles:
logger.info(f" got {user_id} roles: {user_roles}")
info.context["user_id"] = user_id.strip()
info.context["roles"] = user_roles
author = await get_cached_author_by_user_id(user_id, get_with_stat)
if not author:
logger.error(f"author profile not found for user {user_id}")
info.context["author"] = author
return await f(*args, **kwargs)
return decorated_function
def login_accepted(f):
"""
Декоратор для добавления данных авторизации в контекст.
Этот декоратор добавляет данные авторизации в контекст, если они доступны,
но не блокирует доступ для неавторизованных пользователей.
Параметры:
- f: Функция, которую нужно декорировать.
Возвращает:
- Обернутую функцию с добавленной проверкой авторизации.
"""
@wraps(f)
async def decorated_function(*args, **kwargs):
info = args[1]
req = info.context.get("request")
logger.debug("login_accepted: Проверка авторизации пользователя.")
user_id, user_roles = await check_auth(req)
logger.debug(f"login_accepted: user_id={user_id}, user_roles={user_roles}")
if user_id and user_roles:
logger.info(f"login_accepted: Пользователь авторизован: {user_id} с ролями {user_roles}")
info.context["user_id"] = user_id.strip()
info.context["roles"] = user_roles
# Пробуем получить профиль автора
author = await get_cached_author_by_user_id(user_id, get_with_stat)
if author:
logger.debug(f"login_accepted: Найден профиль автора: {author}")
# Предполагается, что `author` является объектом с атрибутом `id`
info.context["author"] = author.dict()
else:
logger.error(
f"login_accepted: Профиль автора не найден для пользователя {user_id}. Используем базовые данные."
) # Используем базовую информацию об автор
else:
logger.debug("login_accepted: Пользователь не авторизован. Очищаем контекст.")
info.context["user_id"] = None
info.context["roles"] = None
info.context["author"] = None
return await f(*args, **kwargs)
return decorated_function