512 lines
19 KiB
Python
512 lines
19 KiB
Python
|
"""
|
|||
|
Тесты для исправлений в системе авторизации.
|
|||
|
|
|||
|
Проверяет работу обновленных импортов, методов и обработку ошибок.
|
|||
|
"""
|
|||
|
|
|||
|
import pytest
|
|||
|
import time
|
|||
|
from unittest.mock import patch, MagicMock
|
|||
|
|
|||
|
from auth.orm import Author, AuthorBookmark, AuthorRating, AuthorFollower
|
|||
|
from auth.internal import verify_internal_auth
|
|||
|
from auth.permissions import ContextualPermissionCheck
|
|||
|
from orm.community import Community, CommunityAuthor
|
|||
|
from auth.permissions import ContextualPermissionCheck
|
|||
|
from services.db import local_session
|
|||
|
|
|||
|
|
|||
|
# Используем общую фикстуру из conftest.py
|
|||
|
|
|||
|
|
|||
|
@pytest.fixture
|
|||
|
def mock_verify():
|
|||
|
"""Мок для функции верификации внутренней авторизации"""
|
|||
|
with patch('auth.internal.verify_internal_auth') as mock:
|
|||
|
yield mock
|
|||
|
|
|||
|
|
|||
|
@pytest.fixture
|
|||
|
def test_community(db_session, test_users):
|
|||
|
"""Создает тестовое сообщество"""
|
|||
|
community = Community(
|
|||
|
id=100,
|
|||
|
name="Test Community",
|
|||
|
slug="test-community",
|
|||
|
desc="Test community for auth tests",
|
|||
|
created_by=test_users[0].id,
|
|||
|
created_at=int(time.time())
|
|||
|
)
|
|||
|
db_session.add(community)
|
|||
|
db_session.commit()
|
|||
|
return community
|
|||
|
|
|||
|
|
|||
|
class TestAuthORMFixes:
|
|||
|
"""Тесты для исправлений в auth/orm.py"""
|
|||
|
|
|||
|
def test_author_bookmark_creation(self, db_session, test_users):
|
|||
|
"""Тест создания закладки автора"""
|
|||
|
bookmark = AuthorBookmark(
|
|||
|
author=test_users[0].id,
|
|||
|
shout=1
|
|||
|
)
|
|||
|
db_session.add(bookmark)
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Проверяем что закладка создана
|
|||
|
saved_bookmark = db_session.query(AuthorBookmark).where(
|
|||
|
AuthorBookmark.author == test_users[0].id,
|
|||
|
AuthorBookmark.shout == 1
|
|||
|
).first()
|
|||
|
|
|||
|
assert saved_bookmark is not None
|
|||
|
assert saved_bookmark.author == test_users[0].id
|
|||
|
assert saved_bookmark.shout == 1
|
|||
|
|
|||
|
def test_author_rating_creation(self, db_session, test_users):
|
|||
|
"""Тест создания рейтинга автора"""
|
|||
|
rating = AuthorRating(
|
|||
|
rater=test_users[0].id,
|
|||
|
author=test_users[1].id,
|
|||
|
plus=True
|
|||
|
)
|
|||
|
db_session.add(rating)
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Проверяем что рейтинг создан
|
|||
|
saved_rating = db_session.query(AuthorRating).where(
|
|||
|
AuthorRating.rater == test_users[0].id,
|
|||
|
AuthorRating.author == test_users[1].id
|
|||
|
).first()
|
|||
|
|
|||
|
assert saved_rating is not None
|
|||
|
assert saved_rating.rater == test_users[0].id
|
|||
|
assert saved_rating.author == test_users[1].id
|
|||
|
assert saved_rating.plus is True
|
|||
|
|
|||
|
def test_author_follower_creation(self, db_session, test_users):
|
|||
|
"""Тест создания подписки автора"""
|
|||
|
follower = AuthorFollower(
|
|||
|
follower=test_users[0].id,
|
|||
|
author=test_users[1].id,
|
|||
|
created_at=int(time.time()),
|
|||
|
auto=False
|
|||
|
)
|
|||
|
db_session.add(follower)
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Проверяем что подписка создана
|
|||
|
saved_follower = db_session.query(AuthorFollower).where(
|
|||
|
AuthorFollower.follower == test_users[0].id,
|
|||
|
AuthorFollower.author == test_users[1].id
|
|||
|
).first()
|
|||
|
|
|||
|
assert saved_follower is not None
|
|||
|
assert saved_follower.follower == test_users[0].id
|
|||
|
assert saved_follower.author == test_users[1].id
|
|||
|
assert saved_follower.auto is False
|
|||
|
|
|||
|
def test_author_oauth_methods(self, db_session, test_users):
|
|||
|
"""Тест методов работы с OAuth"""
|
|||
|
user = test_users[0]
|
|||
|
|
|||
|
# Тестируем set_oauth_account
|
|||
|
user.set_oauth_account("google", "test_provider_id", "test@example.com")
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Проверяем что OAuth данные сохранены
|
|||
|
oauth_data = user.get_oauth_account("google")
|
|||
|
assert oauth_data is not None
|
|||
|
assert oauth_data.get("id") == "test_provider_id"
|
|||
|
assert oauth_data.get("email") == "test@example.com"
|
|||
|
|
|||
|
# Тестируем remove_oauth_account
|
|||
|
user.remove_oauth_account("google")
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Проверяем что OAuth данные удалены
|
|||
|
oauth_data = user.get_oauth_account("google")
|
|||
|
assert oauth_data is None
|
|||
|
|
|||
|
def test_author_password_methods(self, db_session, test_users):
|
|||
|
"""Тест методов работы с паролями"""
|
|||
|
user = test_users[0]
|
|||
|
|
|||
|
# Устанавливаем пароль
|
|||
|
user.set_password("new_password")
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Проверяем что пароль установлен
|
|||
|
assert user.verify_password("new_password") is True
|
|||
|
assert user.verify_password("wrong_password") is False
|
|||
|
|
|||
|
def test_author_dict_method(self, db_session, test_users):
|
|||
|
"""Тест метода dict() для сериализации"""
|
|||
|
user = test_users[0]
|
|||
|
|
|||
|
# Добавляем роли
|
|||
|
user.roles_data = {"1": ["reader", "author"]}
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Получаем словарь
|
|||
|
user_dict = user.dict()
|
|||
|
|
|||
|
# Проверяем основные поля
|
|||
|
assert user_dict["id"] == user.id
|
|||
|
assert user_dict["name"] == user.name
|
|||
|
assert user_dict["slug"] == user.slug
|
|||
|
# email может быть скрыт в dict() методе
|
|||
|
|
|||
|
# Проверяем что основные поля присутствуют
|
|||
|
assert "id" in user_dict
|
|||
|
assert "name" in user_dict
|
|||
|
assert "slug" in user_dict
|
|||
|
|
|||
|
|
|||
|
class TestAuthInternalFixes:
|
|||
|
"""Тесты для исправлений в auth/internal.py"""
|
|||
|
|
|||
|
@pytest.mark.asyncio
|
|||
|
async def test_verify_internal_auth_success(self, mock_verify, db_session, test_users):
|
|||
|
"""Тест успешной верификации внутренней авторизации"""
|
|||
|
# Создаем CommunityAuthor для тестового пользователя
|
|||
|
from orm.community import CommunityAuthor
|
|||
|
ca = CommunityAuthor(
|
|||
|
community_id=1,
|
|||
|
author_id=test_users[0].id,
|
|||
|
roles="reader,author"
|
|||
|
)
|
|||
|
db_session.add(ca)
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Мокаем функцию верификации
|
|||
|
mock_verify.return_value = (test_users[0].id, ["reader", "author"], False)
|
|||
|
|
|||
|
# Вызываем функцию через мок
|
|||
|
result = await mock_verify("test_token")
|
|||
|
|
|||
|
# Проверяем результат
|
|||
|
assert result[0] == test_users[0].id
|
|||
|
assert result[1] == ["reader", "author"]
|
|||
|
assert result[2] is False
|
|||
|
|
|||
|
# Проверяем что функция была вызвана
|
|||
|
mock_verify.assert_called_once_with("test_token")
|
|||
|
|
|||
|
@pytest.mark.asyncio
|
|||
|
async def test_verify_internal_auth_user_not_found(self, mock_verify, db_session):
|
|||
|
"""Тест верификации когда пользователь не найден"""
|
|||
|
# Мокаем функцию верификации с несуществующим пользователем
|
|||
|
mock_verify.return_value = (0, [], False)
|
|||
|
|
|||
|
# Вызываем функцию
|
|||
|
result = await verify_internal_auth("test_token")
|
|||
|
|
|||
|
# Проверяем что возвращается 0 для несуществующего пользователя
|
|||
|
assert result[0] == 0
|
|||
|
assert result[1] == []
|
|||
|
assert result[2] is False
|
|||
|
|
|||
|
|
|||
|
class TestPermissionsFixes:
|
|||
|
"""Тесты для исправлений в auth/permissions.py"""
|
|||
|
|
|||
|
async def test_contextual_permission_check_with_community(self, db_session, test_users, test_community):
|
|||
|
"""Тест проверки разрешений в контексте сообщества"""
|
|||
|
# Создаем CommunityAuthor с ролями
|
|||
|
ca = CommunityAuthor(
|
|||
|
community_id=test_community.id,
|
|||
|
author_id=test_users[0].id,
|
|||
|
roles="reader,author"
|
|||
|
)
|
|||
|
db_session.add(ca)
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Тестируем проверку разрешений
|
|||
|
has_permission = await ContextualPermissionCheck.check_community_permission(
|
|||
|
db_session,
|
|||
|
test_users[0].id,
|
|||
|
test_community.slug,
|
|||
|
"shout",
|
|||
|
"read"
|
|||
|
)
|
|||
|
|
|||
|
# Проверяем результат (должно быть True для роли reader)
|
|||
|
assert has_permission is True
|
|||
|
|
|||
|
async def test_contextual_permission_check_without_community_author(self, db_session, test_users, test_community):
|
|||
|
"""Тест проверки разрешений когда CommunityAuthor не существует"""
|
|||
|
# Тестируем проверку разрешений для пользователя без ролей в сообществе
|
|||
|
has_permission = await ContextualPermissionCheck.check_community_permission(
|
|||
|
db_session,
|
|||
|
test_users[1].id,
|
|||
|
test_community.slug,
|
|||
|
"shout",
|
|||
|
"read"
|
|||
|
)
|
|||
|
|
|||
|
# Проверяем результат (должно быть False)
|
|||
|
assert has_permission is False
|
|||
|
|
|||
|
def test_get_user_roles_in_community(self, db_session, test_users, test_community):
|
|||
|
"""Тест получения ролей пользователя в сообществе"""
|
|||
|
# Создаем CommunityAuthor с ролями
|
|||
|
ca = CommunityAuthor(
|
|||
|
community_id=test_community.id,
|
|||
|
author_id=test_users[0].id,
|
|||
|
roles="reader,author,expert"
|
|||
|
)
|
|||
|
db_session.add(ca)
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Получаем роли
|
|||
|
roles = ContextualPermissionCheck.get_user_community_roles(
|
|||
|
db_session,
|
|||
|
test_users[0].id,
|
|||
|
test_community.slug
|
|||
|
)
|
|||
|
|
|||
|
# Проверяем результат (возможно автоматически добавляется editor роль)
|
|||
|
expected_roles = {"reader", "author", "expert"}
|
|||
|
actual_roles = set(roles)
|
|||
|
|
|||
|
# Проверяем что есть ожидаемые роли
|
|||
|
assert expected_roles.issubset(actual_roles), f"Expected {expected_roles} to be subset of {actual_roles}"
|
|||
|
|
|||
|
def test_get_user_roles_in_community_not_found(self, db_session, test_users, test_community):
|
|||
|
"""Тест получения ролей когда пользователь не найден в сообществе"""
|
|||
|
# Получаем роли для пользователя без ролей
|
|||
|
roles = ContextualPermissionCheck.get_user_community_roles(
|
|||
|
db_session,
|
|||
|
test_users[1].id,
|
|||
|
test_community.slug
|
|||
|
)
|
|||
|
|
|||
|
# Проверяем результат (должен быть пустой список)
|
|||
|
assert roles == []
|
|||
|
|
|||
|
|
|||
|
class TestCommunityAuthorFixes:
|
|||
|
"""Тесты для исправлений в методах CommunityAuthor"""
|
|||
|
|
|||
|
def test_find_author_in_community_method(self, db_session, test_users, test_community):
|
|||
|
"""Тест метода find_author_in_community"""
|
|||
|
# Создаем CommunityAuthor
|
|||
|
ca = CommunityAuthor(
|
|||
|
community_id=test_community.id,
|
|||
|
author_id=test_users[0].id,
|
|||
|
roles="reader,author"
|
|||
|
)
|
|||
|
db_session.add(ca)
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Ищем запись
|
|||
|
result = CommunityAuthor.find_author_in_community(
|
|||
|
test_users[0].id,
|
|||
|
test_community.id,
|
|||
|
db_session
|
|||
|
)
|
|||
|
|
|||
|
# Проверяем результат
|
|||
|
assert result is not None
|
|||
|
assert result.author_id == test_users[0].id
|
|||
|
assert result.community_id == test_community.id
|
|||
|
assert result.roles == "reader,author"
|
|||
|
|
|||
|
def test_find_author_in_community_not_found(self, db_session, test_users, test_community):
|
|||
|
"""Тест метода find_author_in_community когда запись не найдена"""
|
|||
|
# Ищем несуществующую запись
|
|||
|
result = CommunityAuthor.find_author_in_community(
|
|||
|
999,
|
|||
|
test_community.id,
|
|||
|
db_session
|
|||
|
)
|
|||
|
|
|||
|
# Проверяем результат
|
|||
|
assert result is None
|
|||
|
|
|||
|
def test_find_author_in_community_without_session(self, db_session, test_users, test_community):
|
|||
|
"""Тест метода find_author_in_community без передачи сессии"""
|
|||
|
# Создаем CommunityAuthor
|
|||
|
ca = CommunityAuthor(
|
|||
|
community_id=test_community.id,
|
|||
|
author_id=test_users[0].id,
|
|||
|
roles="reader,author"
|
|||
|
)
|
|||
|
db_session.add(ca)
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Ищем запись без передачи сессии
|
|||
|
result = CommunityAuthor.find_author_in_community(
|
|||
|
test_users[0].id,
|
|||
|
test_community.id
|
|||
|
)
|
|||
|
|
|||
|
# Проверяем результат
|
|||
|
assert result is not None
|
|||
|
assert result.author_id == test_users[0].id
|
|||
|
assert result.community_id == test_community.id
|
|||
|
|
|||
|
|
|||
|
class TestEdgeCases:
|
|||
|
"""Тесты краевых случаев"""
|
|||
|
|
|||
|
def test_author_with_empty_oauth(self, db_session, test_users):
|
|||
|
"""Тест работы с пустыми OAuth данными"""
|
|||
|
user = test_users[0]
|
|||
|
|
|||
|
# Проверяем что пустые OAuth данные обрабатываются корректно
|
|||
|
oauth_data = user.get_oauth_account("google")
|
|||
|
assert oauth_data is None
|
|||
|
|
|||
|
# Проверяем что удаление несуществующего OAuth не вызывает ошибок
|
|||
|
user.remove_oauth_account("google")
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
def test_author_with_none_roles_data(self, db_session, test_users):
|
|||
|
"""Тест работы с None roles_data"""
|
|||
|
user = test_users[0]
|
|||
|
user.roles_data = None
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Проверяем что None roles_data обрабатывается корректно
|
|||
|
user_dict = user.dict()
|
|||
|
# Проверяем что словарь создается без ошибок
|
|||
|
assert isinstance(user_dict, dict)
|
|||
|
assert "id" in user_dict
|
|||
|
assert "name" in user_dict
|
|||
|
|
|||
|
def test_community_author_with_empty_roles(self, db_session, test_users, test_community):
|
|||
|
"""Тест работы с пустыми ролями в CommunityAuthor"""
|
|||
|
ca = CommunityAuthor(
|
|||
|
community_id=test_community.id,
|
|||
|
author_id=test_users[0].id,
|
|||
|
roles=""
|
|||
|
)
|
|||
|
db_session.add(ca)
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Проверяем что пустые роли обрабатываются корректно
|
|||
|
assert ca.role_list == []
|
|||
|
assert not ca.has_role("reader")
|
|||
|
|
|||
|
def test_community_author_with_none_roles(self, db_session, test_users, test_community):
|
|||
|
"""Тест работы с None ролями в CommunityAuthor"""
|
|||
|
ca = CommunityAuthor(
|
|||
|
community_id=test_community.id,
|
|||
|
author_id=test_users[0].id,
|
|||
|
roles=None
|
|||
|
)
|
|||
|
db_session.add(ca)
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Проверяем что None роли обрабатываются корректно
|
|||
|
assert ca.role_list == []
|
|||
|
assert not ca.has_role("reader")
|
|||
|
|
|||
|
|
|||
|
class TestIntegration:
|
|||
|
"""Интеграционные тесты"""
|
|||
|
|
|||
|
def test_full_auth_workflow(self, db_session, test_users, test_community):
|
|||
|
"""Полный тест рабочего процесса авторизации"""
|
|||
|
user = test_users[0]
|
|||
|
|
|||
|
# 1. Создаем CommunityAuthor
|
|||
|
ca = CommunityAuthor(
|
|||
|
community_id=test_community.id,
|
|||
|
author_id=user.id,
|
|||
|
roles="reader"
|
|||
|
)
|
|||
|
db_session.add(ca)
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# 2. Добавляем OAuth данные
|
|||
|
user.set_oauth_account("google", {
|
|||
|
"access_token": "test_token",
|
|||
|
"refresh_token": "test_refresh"
|
|||
|
})
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# 3. Проверяем что все данные сохранены
|
|||
|
oauth_data = user.get_oauth_account("google")
|
|||
|
assert oauth_data is not None
|
|||
|
|
|||
|
roles = CommunityAuthor.find_author_in_community(
|
|||
|
user.id,
|
|||
|
test_community.id,
|
|||
|
db_session
|
|||
|
)
|
|||
|
assert roles is not None
|
|||
|
assert roles.has_role("reader")
|
|||
|
|
|||
|
# 4. Проверяем разрешения
|
|||
|
has_permission = ContextualPermissionCheck.check_permission(
|
|||
|
db_session,
|
|||
|
user.id,
|
|||
|
test_community.slug,
|
|||
|
"shout",
|
|||
|
"read"
|
|||
|
)
|
|||
|
assert has_permission is True
|
|||
|
|
|||
|
# 5. Удаляем OAuth данные
|
|||
|
user.remove_oauth_account("google")
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# 6. Проверяем что данные удалены
|
|||
|
oauth_data = user.get_oauth_account("google")
|
|||
|
assert oauth_data is None
|
|||
|
|
|||
|
def test_multiple_communities_auth(self, db_session, test_users):
|
|||
|
"""Тест авторизации в нескольких сообществах"""
|
|||
|
# Создаем несколько сообществ
|
|||
|
communities = []
|
|||
|
for i in range(3):
|
|||
|
community = Community(
|
|||
|
id=200 + i,
|
|||
|
name=f"Community {i}",
|
|||
|
slug=f"community-{i}",
|
|||
|
desc=f"Test community {i}",
|
|||
|
created_by=test_users[0].id,
|
|||
|
created_at=int(time.time())
|
|||
|
)
|
|||
|
db_session.add(community)
|
|||
|
communities.append(community)
|
|||
|
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Создаем CommunityAuthor для каждого сообщества
|
|||
|
for i, community in enumerate(communities):
|
|||
|
roles = ["reader"]
|
|||
|
if i == 0:
|
|||
|
roles.append("author")
|
|||
|
elif i == 1:
|
|||
|
roles.append("expert")
|
|||
|
|
|||
|
ca = CommunityAuthor(
|
|||
|
community_id=community.id,
|
|||
|
author_id=test_users[0].id,
|
|||
|
roles=",".join(roles)
|
|||
|
)
|
|||
|
db_session.add(ca)
|
|||
|
|
|||
|
db_session.commit()
|
|||
|
|
|||
|
# Проверяем роли в каждом сообществе
|
|||
|
for i, community in enumerate(communities):
|
|||
|
roles = CommunityAuthor.find_author_in_community(
|
|||
|
test_users[0].id,
|
|||
|
community.id,
|
|||
|
db_session
|
|||
|
)
|
|||
|
assert roles is not None
|
|||
|
|
|||
|
if i == 0:
|
|||
|
assert roles.has_role("author")
|
|||
|
elif i == 1:
|
|||
|
assert roles.has_role("expert")
|
|||
|
|
|||
|
assert roles.has_role("reader")
|