Some checks failed
Deploy on push / deploy (push) Failing after 39s
### 🚨 CRITICAL Security Fixes - **🔒 Open Redirect Protection**: Добавлена строгая валидация redirect_uri против whitelist доменов - **🔒 Rate Limiting**: Защита OAuth endpoints от брутфорса (10 попыток за 5 минут на IP) - **🔒 Logout Endpoint**: Критически важный endpoint для безопасного отзыва httpOnly cookies - **🔒 Provider Validation**: Усиленная валидация OAuth провайдеров с логированием атак - **🚨 GlitchTip Alerts**: Автоматические алерты безопасности в GlitchTip при критических событиях ### 🛡️ Security Modules - **auth/oauth_security.py**: Модуль безопасности OAuth с валидацией и rate limiting + GlitchTip алерты - **auth/logout.py**: Безопасный logout с поддержкой JSON API и browser redirect - **tests/test_oauth_security.py**: Комплексные тесты безопасности (11 тестов) - **tests/test_oauth_glitchtip_alerts.py**: Тесты интеграции с GlitchTip (8 тестов) ### 🔧 OAuth Improvements - **Minimal Flow**: Упрощен до минимума - только httpOnly cookie, нет JWT в URL - **Simple Logic**: Нет error параметра = успех, максимальная простота - **DRY Refactoring**: Устранено дублирование кода в logout и валидации ### 🎯 OAuth Endpoints - **Старт**: `v3.dscrs.site/oauth/{provider}` - с rate limiting и валидацией - **Callback**: `v3.dscrs.site/oauth/{provider}/callback` - безопасный redirect_uri - **Logout**: `v3.dscrs.site/auth/logout` - отзыв httpOnly cookies - **Финализация**: `testing.discours.io/oauth?redirect_url=...` - минимальная схема ### 📊 Security Test Coverage - ✅ Open redirect attack prevention - ✅ Rate limiting protection - ✅ Provider validation - ✅ Safe fallback mechanisms - ✅ Cookie security (httpOnly + Secure + SameSite) - ✅ GlitchTip integration (8 тестов алертов) ### 📝 Documentation - Создан `docs/oauth-minimal-flow.md` - полное описание минимального flow - Обновлена документация OAuth в `docs/auth/oauth.md` - Добавлены security best practices
177 lines
7.1 KiB
Python
177 lines
7.1 KiB
Python
"""
|
||
Простой тест кеша подписок без GraphQL и авторизации
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import time
|
||
import pytest
|
||
|
||
from cache.cache import get_cached_follower_authors
|
||
from orm.author import Author, AuthorFollower
|
||
from storage.db import local_session
|
||
from storage.redis import redis
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.timeout(60) # 🚨 Таймаут для предотвращения зависания
|
||
async def test_cache_invalidation_logic():
|
||
"""
|
||
Тест логики инвалидации кеша при прямых операциях с БД
|
||
"""
|
||
# Создаем тестовых пользователей
|
||
with local_session() as session:
|
||
follower = Author(
|
||
name="Cache Test Follower",
|
||
slug=f"cache-test-follower-{int(time.time())}",
|
||
email=f"cache-follower-{int(time.time())}@test.com"
|
||
)
|
||
session.add(follower)
|
||
|
||
target_author = Author(
|
||
name="Cache Test Target",
|
||
slug=f"cache-test-target-{int(time.time())}",
|
||
email=f"cache-target-{int(time.time())}@test.com"
|
||
)
|
||
session.add(target_author)
|
||
session.commit()
|
||
|
||
follower_id = follower.id
|
||
target_author_id = target_author.id
|
||
|
||
try:
|
||
# 1. Очищаем кеш
|
||
cache_key = f"author:follows-authors:{follower_id}"
|
||
await redis.execute("DEL", cache_key)
|
||
|
||
# 2. Проверяем начальное состояние (пустой список)
|
||
initial_follows = await get_cached_follower_authors(follower_id)
|
||
assert len(initial_follows) == 0, "Изначально подписок быть не должно"
|
||
|
||
# 3. Создаем подписку в БД напрямую
|
||
with local_session() as session:
|
||
subscription = AuthorFollower(
|
||
follower=follower_id,
|
||
following=target_author_id
|
||
)
|
||
session.add(subscription)
|
||
session.commit()
|
||
|
||
# 4. Инвалидируем кеш (имитируя логику follow)
|
||
await redis.execute("DEL", cache_key)
|
||
|
||
# 5. Проверяем, что кеш обновился
|
||
after_follow_follows = await get_cached_follower_authors(follower_id)
|
||
assert len(after_follow_follows) == 1, "После подписки должна быть 1 запись"
|
||
assert after_follow_follows[0]["id"] == target_author_id, "ID должен совпадать"
|
||
|
||
# 6. Проверяем, что второй запрос берется из кеша
|
||
cached_follows = await get_cached_follower_authors(follower_id)
|
||
assert len(cached_follows) == 1, "Кеш должен содержать 1 запись"
|
||
assert cached_follows[0]["id"] == target_author_id, "ID в кеше должен совпадать"
|
||
|
||
# 7. Удаляем подписку из БД
|
||
with local_session() as session:
|
||
session.query(AuthorFollower).filter(
|
||
AuthorFollower.follower == follower_id,
|
||
AuthorFollower.following == target_author_id
|
||
).delete()
|
||
session.commit()
|
||
|
||
# 8. Инвалидируем кеш (имитируя логику unfollow)
|
||
await redis.execute("DEL", cache_key)
|
||
|
||
# 9. Проверяем, что кеш снова пустой
|
||
after_unfollow_follows = await get_cached_follower_authors(follower_id)
|
||
assert len(after_unfollow_follows) == 0, "После отписки кеш должен быть пустым"
|
||
|
||
print("✅ Тест логики кеширования прошел успешно!")
|
||
|
||
finally:
|
||
# Очистка
|
||
with local_session() as session:
|
||
session.query(AuthorFollower).filter(
|
||
AuthorFollower.follower == follower_id
|
||
).delete()
|
||
session.query(Author).filter(Author.id.in_([follower_id, target_author_id])).delete()
|
||
session.commit()
|
||
|
||
await redis.execute("DEL", cache_key)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.timeout(60) # 🚨 Таймаут для предотвращения зависания
|
||
async def test_cache_miss_behavior():
|
||
"""
|
||
Тест поведения при промахе кеша - данные должны браться из БД
|
||
"""
|
||
with local_session() as session:
|
||
follower = Author(
|
||
name="Cache Miss Test",
|
||
slug=f"cache-miss-{int(time.time())}",
|
||
email=f"cache-miss-{int(time.time())}@test.com"
|
||
)
|
||
session.add(follower)
|
||
|
||
target1 = Author(
|
||
name="Target 1",
|
||
slug=f"target-1-{int(time.time())}",
|
||
email=f"target-1-{int(time.time())}@test.com"
|
||
)
|
||
session.add(target1)
|
||
|
||
target2 = Author(
|
||
name="Target 2",
|
||
slug=f"target-2-{int(time.time())}",
|
||
email=f"target-2-{int(time.time())}@test.com"
|
||
)
|
||
session.add(target2)
|
||
session.commit()
|
||
|
||
follower_id = follower.id
|
||
target1_id = target1.id
|
||
target2_id = target2.id
|
||
|
||
# Создаем подписки в БД
|
||
sub1 = AuthorFollower(follower=follower_id, following=target1_id)
|
||
sub2 = AuthorFollower(follower=follower_id, following=target2_id)
|
||
session.add_all([sub1, sub2])
|
||
session.commit()
|
||
|
||
try:
|
||
cache_key = f"author:follows-authors:{follower_id}"
|
||
|
||
# Убеждаемся, что кеша нет
|
||
await redis.execute("DEL", cache_key)
|
||
|
||
# Запрашиваем данные (должно произойти cache miss и загрузка из БД)
|
||
follows = await get_cached_follower_authors(follower_id)
|
||
|
||
assert len(follows) == 2, "Должно быть 2 подписки"
|
||
follow_ids = {f["id"] for f in follows}
|
||
assert target1_id in follow_ids, "Должна быть подписка на target1"
|
||
assert target2_id in follow_ids, "Должна быть подписка на target2"
|
||
|
||
# Второй запрос должен брать из кеша
|
||
cached_follows = await get_cached_follower_authors(follower_id)
|
||
assert len(cached_follows) == 2, "Кеш должен содержать 2 записи"
|
||
|
||
print("✅ Тест cache miss поведения прошел успешно!")
|
||
|
||
finally:
|
||
# Очистка
|
||
with local_session() as session:
|
||
session.query(AuthorFollower).filter(
|
||
AuthorFollower.follower == follower_id
|
||
).delete()
|
||
session.query(Author).filter(Author.id.in_([follower_id, target1_id, target2_id])).delete()
|
||
session.commit()
|
||
|
||
await redis.execute("DEL", cache_key)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(test_cache_invalidation_logic())
|
||
asyncio.run(test_cache_miss_behavior())
|
||
print("🎯 Все тесты кеша прошли успешно!")
|