auth-fix
All checks were successful
Deploy on push / deploy (push) Successful in 13s

This commit is contained in:
2025-07-23 13:29:49 +03:00
parent 3826797317
commit 867232e48f
2 changed files with 504 additions and 353 deletions

View File

@@ -9,7 +9,6 @@ import time
from functools import wraps
from typing import Any, Callable, Optional
from sqlalchemy import exc
from starlette.requests import Request
from auth.email import send_auth_email
@@ -80,24 +79,39 @@ class AuthService:
f"[check_auth] Результат verify_internal_auth: user_id={user_id}, roles={user_roles}, is_admin={is_admin}"
)
# Если в ролях нет админа, но есть ID - проверяем в БД
# Если в ролях нет админа, но есть ID - проверяем через новую систему RBAC
if user_id and not is_admin:
try:
with local_session() as session:
# Преобразуем user_id в число
try:
if isinstance(user_id, str):
user_id_int = int(user_id.strip())
else:
user_id_int = int(user_id)
except (ValueError, TypeError):
logger.error(f"Невозможно преобразовать user_id {user_id} в число")
# Преобразуем user_id в число
try:
if isinstance(user_id, str):
user_id_int = int(user_id.strip())
else:
# Проверяем наличие админских прав через новую RBAC систему
from orm.community import get_user_roles_in_community
user_id_int = int(user_id)
except (ValueError, TypeError):
logger.error(f"Невозможно преобразовать user_id {user_id} в число")
return 0, [], False
# Получаем роли через новую систему CommunityAuthor
from orm.community import get_user_roles_in_community
user_roles_in_community = get_user_roles_in_community(user_id_int, community_id=1)
logger.debug(f"[check_auth] Роли из CommunityAuthor: {user_roles_in_community}")
# Обновляем роли из новой системы
user_roles = user_roles_in_community
is_admin = any(role in ["admin", "super"] for role in user_roles_in_community)
# Проверяем админские права через email если нет роли админа
if not is_admin:
with local_session() as session:
author = session.query(Author).filter(Author.id == user_id_int).first()
if author and author.email in ADMIN_EMAILS.split(","):
is_admin = True
logger.debug(
f"[check_auth] Пользователь {author.email} определен как админ через ADMIN_EMAILS"
)
user_roles_in_community = get_user_roles_in_community(user_id_int, community_id=1)
is_admin = any(role in ["admin", "super"] for role in user_roles_in_community)
except Exception as e:
logger.error(f"Ошибка при проверке прав администратора: {e}")
@@ -112,26 +126,30 @@ class AuthService:
logger.info(f"Adding roles {roles} to user {user_id}")
from orm.community import assign_role_to_user
try:
user_id_int = int(user_id)
except (ValueError, TypeError):
logger.error(f"Невозможно преобразовать user_id {user_id} в число")
return None
logger.debug("Using local authentication with new RBAC system")
with local_session() as session:
try:
author = session.query(Author).filter(Author.id == user_id).one()
from orm.community import assign_role_to_user, get_user_roles_in_community
# Добавляем роли через новую систему RBAC в дефолтное сообщество (ID=1)
for role_name in roles:
success = assign_role_to_user(int(user_id), role_name, community_id=1)
if success:
logger.debug(f"Роль {role_name} добавлена пользователю {user_id}")
else:
logger.warning(f"Не удалось добавить роль {role_name} пользователю {user_id}")
# Проверяем существующие роли
existing_roles = get_user_roles_in_community(user_id_int, community_id=1)
logger.debug(f"Существующие роли пользователя {user_id}: {existing_roles}")
return user_id
# Добавляем новые роли через новую систему RBAC
for role_name in roles:
if role_name not in existing_roles:
success = assign_role_to_user(user_id_int, role_name, community_id=1)
if success:
logger.debug(f"Роль {role_name} добавлена пользователю {user_id}")
else:
logger.warning(f"Не удалось добавить роль {role_name} пользователю {user_id}")
else:
logger.debug(f"Роль {role_name} уже есть у пользователя {user_id}")
except exc.NoResultFound:
logger.error(f"Author {user_id} not found")
return None
return user_id
def create_user(self, user_dict: dict[str, Any], community_id: int | None = None) -> Author:
"""Создает нового пользователя с дефолтными ролями"""
@@ -332,17 +350,22 @@ class AuthService:
logger.warning(f"Пользователь {email} не найден")
return {"success": False, "token": None, "author": None, "error": "Пользователь не найден"}
# Проверяем роли (упрощенная версия)
has_reader_role = False
if hasattr(author, "roles") and author.roles:
for role in author.roles:
if role.id == "reader":
has_reader_role = True
break
# Проверяем роли через новую систему CommunityAuthor
from orm.community import get_user_roles_in_community
user_roles = get_user_roles_in_community(author.id, community_id=1)
has_reader_role = "reader" in user_roles
logger.debug(f"Роли пользователя {email}: {user_roles}")
if not has_reader_role and author.email not in ADMIN_EMAILS.split(","):
logger.warning(f"У пользователя {email} нет роли 'reader'")
return {"success": False, "token": None, "author": None, "error": "Нет прав для входа"}
logger.warning(f"У пользователя {email} нет роли 'reader'. Текущие роли: {user_roles}")
return {
"success": False,
"token": None,
"author": None,
"error": "Нет прав для входа. Требуется роль 'reader'.",
}
# Проверяем пароль
try:
@@ -610,6 +633,60 @@ class AuthService:
logger.error(f"Ошибка отмены смены email: {e}")
return {"success": False, "error": str(e), "author": None}
async def ensure_user_has_reader_role(self, user_id: int) -> bool:
"""
Убеждается, что у пользователя есть роль 'reader'.
Если её нет - добавляет автоматически.
Args:
user_id: ID пользователя
Returns:
True если роль была добавлена или уже существует
"""
from orm.community import assign_role_to_user, get_user_roles_in_community
existing_roles = get_user_roles_in_community(user_id, community_id=1)
if "reader" not in existing_roles:
logger.warning(f"У пользователя {user_id} нет роли 'reader'. Добавляем автоматически.")
success = assign_role_to_user(user_id, "reader", community_id=1)
if success:
logger.info(f"Роль 'reader' добавлена пользователю {user_id}")
return True
logger.error(f"Не удалось добавить роль 'reader' пользователю {user_id}")
return False
return True
async def fix_all_users_reader_role(self) -> dict[str, int]:
"""
Проверяет всех пользователей и добавляет роль 'reader' тем, у кого её нет.
Returns:
Статистика операции: {"checked": int, "fixed": int, "errors": int}
"""
stats = {"checked": 0, "fixed": 0, "errors": 0}
with local_session() as session:
# Получаем всех пользователей
all_authors = session.query(Author).all()
for author in all_authors:
stats["checked"] += 1
try:
had_reader = await self.ensure_user_has_reader_role(author.id)
if not had_reader:
stats["fixed"] += 1
except Exception as e:
logger.error(f"Ошибка при исправлении ролей для пользователя {author.id}: {e}")
stats["errors"] += 1
logger.info(f"Исправление ролей завершено: {stats}")
return stats
def login_required(self, f: Callable) -> Callable:
"""Декоратор для проверки авторизации пользователя. Требуется наличие роли 'reader'."""