Compare commits
38 Commits
03c2d286de
...
e17690f27b
Author | SHA1 | Date | |
---|---|---|---|
e17690f27b | |||
cb990b61a3 | |||
cc837288bb | |||
4a26e4f75b | |||
eee2c1a13d | |||
209d5c1a5e | |||
4f4affaca4 | |||
d59710309d | |||
88525276c2 | |||
1f4b3d3eee | |||
76a4c5fb53 | |||
8f6b96cb0f | |||
76a707c7fd | |||
ae584abb5b | |||
eff8278cc3 | |||
8432a00691 | |||
1ed185a701 | |||
562ce3296e | |||
ddc2d69e54 | |||
f6863b32e8 | |||
9bf9f3d384 | |||
998d01c751 | |||
57d04ddf1c | |||
0ba2d2ecee | |||
839cc84c26 | |||
c80c282118 | |||
5acae03c55 | |||
49be05d4db | |||
ae7580252b | |||
7c85f51436 | |||
83ec475cc8 | |||
c1c095a73c | |||
c4e84364c6 | |||
8287b82554 | |||
56fe8bebbe | |||
4fffd1025f | |||
576e1ea152 | |||
5e1021a18e |
229
cache/cache.py
vendored
229
cache/cache.py
vendored
@@ -19,45 +19,56 @@ DEFAULT_FOLLOWS = {
|
||||
"communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}],
|
||||
}
|
||||
|
||||
CACHE_TTL = 300 # 5 минут
|
||||
|
||||
CACHE_KEYS = {
|
||||
"TOPIC_ID": "topic:id:{}",
|
||||
"TOPIC_SLUG": "topic:slug:{}",
|
||||
"TOPIC_AUTHORS": "topic:authors:{}",
|
||||
"TOPIC_FOLLOWERS": "topic:followers:{}",
|
||||
"TOPIC_SHOUTS": "topic_shouts_{}",
|
||||
"AUTHOR_ID": "author:id:{}",
|
||||
"AUTHOR_USER": "author:user:{}",
|
||||
"SHOUTS": "shouts:{}",
|
||||
}
|
||||
|
||||
|
||||
# Cache topic data
|
||||
async def cache_topic(topic: dict):
|
||||
payload = json.dumps(topic, cls=CustomJSONEncoder)
|
||||
# Cache by id and slug for quick access
|
||||
await asyncio.gather(
|
||||
redis.execute("SET", f"topic:id:{topic['id']}", payload),
|
||||
redis.execute("SET", f"topic:slug:{topic['slug']}", payload),
|
||||
redis_operation("SET", f"topic:id:{topic['id']}", payload),
|
||||
redis_operation("SET", f"topic:slug:{topic['slug']}", payload),
|
||||
)
|
||||
|
||||
|
||||
# Cache author data
|
||||
async def cache_author(author: dict):
|
||||
payload = json.dumps(author, cls=CustomJSONEncoder)
|
||||
# Cache author data by user and id
|
||||
await asyncio.gather(
|
||||
redis.execute("SET", f"author:user:{author['user'].strip()}", str(author["id"])),
|
||||
redis.execute("SET", f"author:id:{author['id']}", payload),
|
||||
redis_operation("SET", f"author:user:{author['user'].strip()}", str(author["id"])),
|
||||
redis_operation("SET", f"author:id:{author['id']}", payload),
|
||||
)
|
||||
|
||||
|
||||
# Cache follows data
|
||||
async def cache_follows(follower_id: int, entity_type: str, entity_id: int, is_insert=True):
|
||||
key = f"author:follows-{entity_type}s:{follower_id}"
|
||||
follows_str = await redis.execute("get", key)
|
||||
follows_str = await redis_operation("GET", key)
|
||||
follows = json.loads(follows_str) if follows_str else DEFAULT_FOLLOWS[entity_type]
|
||||
if is_insert:
|
||||
if entity_id not in follows:
|
||||
follows.append(entity_id)
|
||||
else:
|
||||
follows = [eid for eid in follows if eid != entity_id]
|
||||
await redis.execute("set", key, json.dumps(follows, cls=CustomJSONEncoder))
|
||||
await redis_operation("SET", key, json.dumps(follows, cls=CustomJSONEncoder))
|
||||
await update_follower_stat(follower_id, entity_type, len(follows))
|
||||
|
||||
|
||||
# Update follower statistics
|
||||
async def update_follower_stat(follower_id, entity_type, count):
|
||||
follower_key = f"author:id:{follower_id}"
|
||||
follower_str = await redis.execute("get", follower_key)
|
||||
follower_str = await redis_operation("GET", follower_key)
|
||||
follower = json.loads(follower_str) if follower_str else None
|
||||
if follower:
|
||||
follower["stat"] = {f"{entity_type}s": count}
|
||||
@@ -67,7 +78,7 @@ async def update_follower_stat(follower_id, entity_type, count):
|
||||
# Get author from cache
|
||||
async def get_cached_author(author_id: int, get_with_stat):
|
||||
author_key = f"author:id:{author_id}"
|
||||
result = await redis.execute("get", author_key)
|
||||
result = await redis_operation("GET", author_key)
|
||||
if result:
|
||||
return json.loads(result)
|
||||
# Load from database if not found in cache
|
||||
@@ -92,7 +103,7 @@ async def get_cached_topic(topic_id: int):
|
||||
dict: Topic data or None if not found.
|
||||
"""
|
||||
topic_key = f"topic:id:{topic_id}"
|
||||
cached_topic = await redis.execute("get", topic_key)
|
||||
cached_topic = await redis_operation("GET", topic_key)
|
||||
if cached_topic:
|
||||
return json.loads(cached_topic)
|
||||
|
||||
@@ -101,7 +112,7 @@ async def get_cached_topic(topic_id: int):
|
||||
topic = session.execute(select(Topic).where(Topic.id == topic_id)).scalar_one_or_none()
|
||||
if topic:
|
||||
topic_dict = topic.dict()
|
||||
await redis.execute("set", topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder))
|
||||
await redis_operation("SET", topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder))
|
||||
return topic_dict
|
||||
|
||||
return None
|
||||
@@ -110,7 +121,7 @@ async def get_cached_topic(topic_id: int):
|
||||
# Get topic by slug from cache
|
||||
async def get_cached_topic_by_slug(slug: str, get_with_stat):
|
||||
topic_key = f"topic:slug:{slug}"
|
||||
result = await redis.execute("get", topic_key)
|
||||
result = await redis_operation("GET", topic_key)
|
||||
if result:
|
||||
return json.loads(result)
|
||||
# Load from database if not found in cache
|
||||
@@ -127,7 +138,7 @@ async def get_cached_topic_by_slug(slug: str, get_with_stat):
|
||||
async def get_cached_authors_by_ids(author_ids: List[int]) -> List[dict]:
|
||||
# Fetch all author data concurrently
|
||||
keys = [f"author:id:{author_id}" for author_id in author_ids]
|
||||
results = await asyncio.gather(*(redis.execute("get", key) for key in keys))
|
||||
results = await asyncio.gather(*(redis_operation("GET", key) for key in keys))
|
||||
authors = [json.loads(result) if result else None for result in results]
|
||||
# Load missing authors from database and cache
|
||||
missing_indices = [index for index, author in enumerate(authors) if author is None]
|
||||
@@ -152,7 +163,7 @@ async def get_cached_topic_followers(topic_id: int):
|
||||
"""
|
||||
try:
|
||||
# Попытка получить данные из кеша
|
||||
cached = await redis.get(f"topic:followers:{topic_id}")
|
||||
cached = await redis_operation("GET", f"topic:followers:{topic_id}")
|
||||
if cached:
|
||||
followers_ids = json.loads(cached)
|
||||
logger.debug(f"Cached {len(followers_ids)} followers for topic #{topic_id}")
|
||||
@@ -169,7 +180,7 @@ async def get_cached_topic_followers(topic_id: int):
|
||||
followers_ids = [f[0] for f in result.scalars().all()]
|
||||
|
||||
# Кеширование результатов
|
||||
await redis.set(f"topic:followers:{topic_id}", json.dumps(followers_ids))
|
||||
await redis_operation("SET", f"topic:followers:{topic_id}", json.dumps(followers_ids))
|
||||
|
||||
# Получение подробной информации о подписчиках по их ID
|
||||
followers = await get_cached_authors_by_ids(followers_ids)
|
||||
@@ -183,7 +194,7 @@ async def get_cached_topic_followers(topic_id: int):
|
||||
# Get cached author followers
|
||||
async def get_cached_author_followers(author_id: int):
|
||||
# Check cache for data
|
||||
cached = await redis.execute("get", f"author:followers:{author_id}")
|
||||
cached = await redis_operation("GET", f"author:followers:{author_id}")
|
||||
if cached:
|
||||
followers_ids = json.loads(cached)
|
||||
followers = await get_cached_authors_by_ids(followers_ids)
|
||||
@@ -199,7 +210,7 @@ async def get_cached_author_followers(author_id: int):
|
||||
.filter(AuthorFollower.author == author_id, Author.id != author_id)
|
||||
.all()
|
||||
]
|
||||
await redis.execute("SET", f"author:followers:{author_id}", json.dumps(followers_ids))
|
||||
await redis_operation("SET", f"author:followers:{author_id}", json.dumps(followers_ids))
|
||||
followers = await get_cached_authors_by_ids(followers_ids)
|
||||
return followers
|
||||
|
||||
@@ -207,7 +218,7 @@ async def get_cached_author_followers(author_id: int):
|
||||
# Get cached follower authors
|
||||
async def get_cached_follower_authors(author_id: int):
|
||||
# Attempt to retrieve authors from cache
|
||||
cached = await redis.execute("get", f"author:follows-authors:{author_id}")
|
||||
cached = await redis_operation("GET", f"author:follows-authors:{author_id}")
|
||||
if cached:
|
||||
authors_ids = json.loads(cached)
|
||||
else:
|
||||
@@ -221,7 +232,7 @@ async def get_cached_follower_authors(author_id: int):
|
||||
.where(AuthorFollower.follower == author_id)
|
||||
).all()
|
||||
]
|
||||
await redis.execute("SET", f"author:follows-authors:{author_id}", json.dumps(authors_ids))
|
||||
await redis_operation("SET", f"author:follows-authors:{author_id}", json.dumps(authors_ids))
|
||||
|
||||
authors = await get_cached_authors_by_ids(authors_ids)
|
||||
return authors
|
||||
@@ -230,7 +241,7 @@ async def get_cached_follower_authors(author_id: int):
|
||||
# Get cached follower topics
|
||||
async def get_cached_follower_topics(author_id: int):
|
||||
# Attempt to retrieve topics from cache
|
||||
cached = await redis.execute("get", f"author:follows-topics:{author_id}")
|
||||
cached = await redis_operation("GET", f"author:follows-topics:{author_id}")
|
||||
if cached:
|
||||
topics_ids = json.loads(cached)
|
||||
else:
|
||||
@@ -243,11 +254,11 @@ async def get_cached_follower_topics(author_id: int):
|
||||
.where(TopicFollower.follower == author_id)
|
||||
.all()
|
||||
]
|
||||
await redis.execute("SET", f"author:follows-topics:{author_id}", json.dumps(topics_ids))
|
||||
await redis_operation("SET", f"author:follows-topics:{author_id}", json.dumps(topics_ids))
|
||||
|
||||
topics = []
|
||||
for topic_id in topics_ids:
|
||||
topic_str = await redis.execute("get", f"topic:id:{topic_id}")
|
||||
topic_str = await redis_operation("GET", f"topic:id:{topic_id}")
|
||||
if topic_str:
|
||||
topic = json.loads(topic_str)
|
||||
if topic and topic not in topics:
|
||||
@@ -269,10 +280,10 @@ async def get_cached_author_by_user_id(user_id: str, get_with_stat):
|
||||
dict: Dictionary with author data or None if not found.
|
||||
"""
|
||||
# Attempt to find author ID by user_id in Redis cache
|
||||
author_id = await redis.execute("get", f"author:user:{user_id.strip()}")
|
||||
author_id = await redis_operation("GET", f"author:user:{user_id.strip()}")
|
||||
if author_id:
|
||||
# If ID is found, get full author data by ID
|
||||
author_data = await redis.execute("get", f"author:id:{author_id}")
|
||||
author_data = await redis_operation("GET", f"author:id:{author_id}")
|
||||
if author_data:
|
||||
return json.loads(author_data)
|
||||
|
||||
@@ -284,8 +295,8 @@ async def get_cached_author_by_user_id(user_id: str, get_with_stat):
|
||||
author = authors[0]
|
||||
author_dict = author.dict()
|
||||
await asyncio.gather(
|
||||
redis.execute("SET", f"author:user:{user_id.strip()}", str(author.id)),
|
||||
redis.execute("SET", f"author:id:{author.id}", json.dumps(author_dict)),
|
||||
redis_operation("SET", f"author:user:{user_id.strip()}", str(author.id)),
|
||||
redis_operation("SET", f"author:id:{author.id}", json.dumps(author_dict)),
|
||||
)
|
||||
return author_dict
|
||||
|
||||
@@ -306,7 +317,7 @@ async def get_cached_topic_authors(topic_id: int):
|
||||
"""
|
||||
# Attempt to get a list of author IDs from cache
|
||||
rkey = f"topic:authors:{topic_id}"
|
||||
cached_authors_ids = await redis.execute("get", rkey)
|
||||
cached_authors_ids = await redis_operation("GET", rkey)
|
||||
if cached_authors_ids:
|
||||
authors_ids = json.loads(cached_authors_ids)
|
||||
else:
|
||||
@@ -320,7 +331,7 @@ async def get_cached_topic_authors(topic_id: int):
|
||||
)
|
||||
authors_ids = [author_id for (author_id,) in session.execute(query).all()]
|
||||
# Cache the retrieved author IDs
|
||||
await redis.execute("set", rkey, json.dumps(authors_ids))
|
||||
await redis_operation("SET", rkey, json.dumps(authors_ids))
|
||||
|
||||
# Retrieve full author details from cached IDs
|
||||
if authors_ids:
|
||||
@@ -329,3 +340,163 @@ async def get_cached_topic_authors(topic_id: int):
|
||||
return authors
|
||||
|
||||
return []
|
||||
|
||||
|
||||
async def invalidate_shouts_cache(cache_keys: List[str]):
|
||||
"""
|
||||
Инвалидирует кэш выборок публикаций по переданным ключам.
|
||||
"""
|
||||
for key in cache_keys:
|
||||
try:
|
||||
# Формируем полный ключ кэша
|
||||
cache_key = f"shouts:{key}"
|
||||
|
||||
# Удаляем основной кэш
|
||||
await redis_operation("DEL", cache_key)
|
||||
logger.debug(f"Invalidated cache key: {cache_key}")
|
||||
|
||||
# Добавляем ключ в список инвалидированных с TTL
|
||||
await redis_operation("SETEX", f"{cache_key}:invalidated", value="1", ttl=CACHE_TTL)
|
||||
|
||||
# Если это кэш темы, инвалидируем также связанные ключи
|
||||
if key.startswith("topic_"):
|
||||
topic_id = key.split("_")[1]
|
||||
related_keys = [
|
||||
f"topic:id:{topic_id}",
|
||||
f"topic:authors:{topic_id}",
|
||||
f"topic:followers:{topic_id}",
|
||||
f"topic:stats:{topic_id}",
|
||||
]
|
||||
for related_key in related_keys:
|
||||
await redis_operation("DEL", related_key)
|
||||
logger.debug(f"Invalidated related key: {related_key}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error invalidating cache key {key}: {e}")
|
||||
|
||||
|
||||
async def cache_topic_shouts(topic_id: int, shouts: List[dict]):
|
||||
"""Кэширует список публикаций для темы"""
|
||||
key = f"topic_shouts_{topic_id}"
|
||||
payload = json.dumps(shouts, cls=CustomJSONEncoder)
|
||||
await redis_operation("SETEX", key, value=payload, ttl=CACHE_TTL)
|
||||
|
||||
|
||||
async def get_cached_topic_shouts(topic_id: int) -> List[dict]:
|
||||
"""Получает кэшированный список публикаций для темы"""
|
||||
key = f"topic_shouts_{topic_id}"
|
||||
cached = await redis_operation("GET", key)
|
||||
if cached:
|
||||
return json.loads(cached)
|
||||
return None
|
||||
|
||||
|
||||
async def cache_related_entities(shout: Shout):
|
||||
"""
|
||||
Кэширует все связанные с публикацией сущности (авторов и темы)
|
||||
"""
|
||||
tasks = []
|
||||
for author in shout.authors:
|
||||
tasks.append(cache_by_id(Author, author.id, cache_author))
|
||||
for topic in shout.topics:
|
||||
tasks.append(cache_by_id(Topic, topic.id, cache_topic))
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
async def invalidate_shout_related_cache(shout: Shout, author_id: int):
|
||||
"""
|
||||
Инвалидирует весь кэш, связанный с публикацией
|
||||
"""
|
||||
# Базовые ленты
|
||||
cache_keys = [
|
||||
"feed", # основная лента
|
||||
f"author_{author_id}", # публикации автора
|
||||
"random_top", # случайные топовые
|
||||
"unrated", # неоцененные
|
||||
"recent", # последние публикации
|
||||
"coauthored", # совместные публикации
|
||||
f"authored_{author_id}", # авторские публикации
|
||||
f"followed_{author_id}", # подписки автора
|
||||
]
|
||||
|
||||
# Добавляем ключи для всех авторов публикации
|
||||
for author in shout.authors:
|
||||
cache_keys.extend(
|
||||
[f"author_{author.id}", f"authored_{author.id}", f"followed_{author.id}", f"coauthored_{author.id}"]
|
||||
)
|
||||
|
||||
# Добавляем ключи для тем
|
||||
for topic in shout.topics:
|
||||
cache_keys.extend(
|
||||
[f"topic_{topic.id}", f"topic_shouts_{topic.id}", f"topic_recent_{topic.id}", f"topic_top_{topic.id}"]
|
||||
)
|
||||
|
||||
# Инвалидируем все ключи
|
||||
await invalidate_shouts_cache(cache_keys)
|
||||
|
||||
|
||||
async def redis_operation(operation: str, key: str, value=None, ttl=None):
|
||||
"""
|
||||
Унифицированная функция для работы с Redis
|
||||
|
||||
Args:
|
||||
operation: 'GET', 'SET', 'DEL', 'SETEX'
|
||||
key: ключ
|
||||
value: значение (для SET/SETEX)
|
||||
ttl: время жизни в секундах (для SETEX)
|
||||
"""
|
||||
try:
|
||||
if operation == "GET":
|
||||
return await redis.execute("GET", key)
|
||||
elif operation == "SET":
|
||||
await redis.execute("SET", key, value)
|
||||
elif operation == "SETEX":
|
||||
await redis.execute("SETEX", key, ttl or CACHE_TTL, value)
|
||||
elif operation == "DEL":
|
||||
await redis.execute("DEL", key)
|
||||
except Exception as e:
|
||||
logger.error(f"Redis {operation} error for key {key}: {e}")
|
||||
|
||||
|
||||
async def get_cached_entity(entity_type: str, entity_id: int, get_method, cache_method):
|
||||
"""
|
||||
Универсальная функция получения кэшированной сущности
|
||||
|
||||
Args:
|
||||
entity_type: 'author' или 'topic'
|
||||
entity_id: ID сущности
|
||||
get_method: метод получения из БД
|
||||
cache_method: метод кэширования
|
||||
"""
|
||||
key = f"{entity_type}:id:{entity_id}"
|
||||
cached = await redis_operation("GET", key)
|
||||
if cached:
|
||||
return json.loads(cached)
|
||||
|
||||
entity = await get_method(entity_id)
|
||||
if entity:
|
||||
await cache_method(entity)
|
||||
return entity
|
||||
return None
|
||||
|
||||
|
||||
async def cache_by_id(entity, entity_id: int, cache_method):
|
||||
"""
|
||||
Кэширует сущность по ID, используя указанный метод кэширования
|
||||
|
||||
Args:
|
||||
entity: класс сущности (Author/Topic)
|
||||
entity_id: ID сущности
|
||||
cache_method: функция кэширования
|
||||
"""
|
||||
from resolvers.stat import get_with_stat
|
||||
|
||||
caching_query = select(entity).filter(entity.id == entity_id)
|
||||
result = get_with_stat(caching_query)
|
||||
if not result or not result[0]:
|
||||
logger.warning(f"{entity.__name__} with id {entity_id} not found")
|
||||
return
|
||||
x = result[0]
|
||||
d = x.dict()
|
||||
await cache_method(d)
|
||||
return d
|
||||
|
@@ -6,7 +6,11 @@
|
||||
{{ $cors_headers_get := "if ($request_method = 'GET') { add_header 'Access-Control-Allow-Origin' '$allow_origin' always; add_header 'Access-Control-Allow-Methods' 'POST, GET, OPTIONS' always; add_header 'Access-Control-Allow-Headers' 'Content-Type, Authorization' always; add_header 'Access-Control-Allow-Credentials' 'true' always; }" }}
|
||||
|
||||
map $http_origin $allow_origin {
|
||||
~^https?:\/\/((.*\.)?localhost(:\d+)?|discoursio-webapp(-(.*))?\.vercel\.app|(.*\.)?discours\.io|(.*\.)?dscrs\.site)$ $http_origin;
|
||||
~^https?:\/\/(.*\.)?localhost(:\d+)?$ $http_origin;
|
||||
~^https?:\/\/(.*\.)?discours\.io$ $http_origin;
|
||||
~^https?:\/\/(.*\.)?dscrs\.site$ $http_origin;
|
||||
~^https?:\/\/testing\.(discours\.io|dscrs\.site)$ $http_origin;
|
||||
~^https?:\/\/discoursio-webapp(-.*)?\.vercel\.app$ $http_origin;
|
||||
default "";
|
||||
}
|
||||
|
||||
@@ -84,7 +88,7 @@ server {
|
||||
add_header Cache-Control "public, no-transform";
|
||||
}
|
||||
|
||||
location ~* \.(mp3)$ {
|
||||
location ~* \.(mp3|wav|ogg|flac|aac|aif|webm)$ {
|
||||
if ($request_method = 'GET') {
|
||||
add_header 'Access-Control-Allow-Origin' $allow_origin always;
|
||||
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS' always;
|
||||
|
46
orm/shout.py
46
orm/shout.py
@@ -41,34 +41,34 @@ class ShoutAuthor(Base):
|
||||
class Shout(Base):
|
||||
__tablename__ = "shout"
|
||||
|
||||
created_at = Column(Integer, nullable=False, default=lambda: int(time.time()))
|
||||
updated_at = Column(Integer, nullable=True, index=True)
|
||||
published_at = Column(Integer, nullable=True, index=True)
|
||||
featured_at = Column(Integer, nullable=True, index=True)
|
||||
deleted_at = Column(Integer, nullable=True, index=True)
|
||||
created_at: int = Column(Integer, nullable=False, default=lambda: int(time.time()))
|
||||
updated_at: int | None = Column(Integer, nullable=True, index=True)
|
||||
published_at: int | None = Column(Integer, nullable=True, index=True)
|
||||
featured_at: int | None = Column(Integer, nullable=True, index=True)
|
||||
deleted_at: int | None = Column(Integer, nullable=True, index=True)
|
||||
|
||||
created_by = Column(ForeignKey("author.id"), nullable=False)
|
||||
updated_by = Column(ForeignKey("author.id"), nullable=True)
|
||||
deleted_by = Column(ForeignKey("author.id"), nullable=True)
|
||||
community = Column(ForeignKey("community.id"), nullable=False)
|
||||
created_by: int = Column(ForeignKey("author.id"), nullable=False)
|
||||
updated_by: int | None = Column(ForeignKey("author.id"), nullable=True)
|
||||
deleted_by: int | None = Column(ForeignKey("author.id"), nullable=True)
|
||||
community: int = Column(ForeignKey("community.id"), nullable=False)
|
||||
|
||||
body = Column(String, nullable=False, comment="Body")
|
||||
slug = Column(String, unique=True)
|
||||
cover = Column(String, nullable=True, comment="Cover image url")
|
||||
cover_caption = Column(String, nullable=True, comment="Cover image alt caption")
|
||||
lead = Column(String, nullable=True)
|
||||
description = Column(String, nullable=True)
|
||||
title = Column(String, nullable=False)
|
||||
subtitle = Column(String, nullable=True)
|
||||
layout = Column(String, nullable=False, default="article")
|
||||
media = Column(JSON, nullable=True)
|
||||
body: str = Column(String, nullable=False, comment="Body")
|
||||
slug: str = Column(String, unique=True)
|
||||
cover: str | None = Column(String, nullable=True, comment="Cover image url")
|
||||
cover_caption: str | None = Column(String, nullable=True, comment="Cover image alt caption")
|
||||
lead: str | None = Column(String, nullable=True)
|
||||
description: str | None = Column(String, nullable=True)
|
||||
title: str = Column(String, nullable=False)
|
||||
subtitle: str | None = Column(String, nullable=True)
|
||||
layout: str = Column(String, nullable=False, default="article")
|
||||
media: dict | None = Column(JSON, nullable=True)
|
||||
|
||||
authors = relationship(Author, secondary="shout_author")
|
||||
topics = relationship(Topic, secondary="shout_topic")
|
||||
reactions = relationship(Reaction)
|
||||
|
||||
lang = Column(String, nullable=False, default="ru", comment="Language")
|
||||
version_of = Column(ForeignKey("shout.id"), nullable=True)
|
||||
oid = Column(String, nullable=True)
|
||||
lang: str = Column(String, nullable=False, default="ru", comment="Language")
|
||||
version_of: int | None = Column(ForeignKey("shout.id"), nullable=True)
|
||||
oid: str | None = Column(String, nullable=True)
|
||||
|
||||
seo = Column(String, nullable=True) # JSON
|
||||
seo: str | None = Column(String, nullable=True) # JSON
|
||||
|
@@ -4,7 +4,7 @@ from sqlalchemy import and_, desc, select
|
||||
from sqlalchemy.orm import joinedload
|
||||
from sqlalchemy.sql.functions import coalesce
|
||||
|
||||
from cache.cache import cache_author, cache_topic
|
||||
from cache.cache import cache_author, cache_topic, invalidate_shout_related_cache, invalidate_shouts_cache
|
||||
from orm.author import Author
|
||||
from orm.shout import Shout, ShoutAuthor, ShoutTopic
|
||||
from orm.topic import Topic
|
||||
@@ -94,67 +94,109 @@ async def get_shouts_drafts(_, info):
|
||||
@mutation.field("create_shout")
|
||||
@login_required
|
||||
async def create_shout(_, info, inp):
|
||||
logger.info(f"Starting create_shout with input: {inp}")
|
||||
user_id = info.context.get("user_id")
|
||||
author_dict = info.context.get("author")
|
||||
logger.debug(f"Context user_id: {user_id}, author: {author_dict}")
|
||||
|
||||
if not author_dict:
|
||||
logger.error("Author profile not found in context")
|
||||
return {"error": "author profile was not found"}
|
||||
|
||||
author_id = author_dict.get("id")
|
||||
if user_id and author_id:
|
||||
with local_session() as session:
|
||||
author_id = int(author_id)
|
||||
current_time = int(time.time())
|
||||
slug = inp.get("slug") or f"draft-{current_time}"
|
||||
shout_dict = {
|
||||
"title": inp.get("title", ""),
|
||||
"subtitle": inp.get("subtitle", ""),
|
||||
"lead": inp.get("lead", ""),
|
||||
"description": inp.get("description", ""),
|
||||
"body": inp.get("body", ""),
|
||||
"layout": inp.get("layout", "article"),
|
||||
"created_by": author_id,
|
||||
"authors": [],
|
||||
"slug": slug,
|
||||
"topics": inp.get("topics", []),
|
||||
"published_at": None,
|
||||
"community": 1,
|
||||
"created_at": current_time, # Set created_at as Unix timestamp
|
||||
}
|
||||
same_slug_shout = session.query(Shout).filter(Shout.slug == shout_dict.get("slug")).first()
|
||||
c = 1
|
||||
while same_slug_shout is not None:
|
||||
same_slug_shout = session.query(Shout).filter(Shout.slug == shout_dict.get("slug")).first()
|
||||
c += 1
|
||||
shout_dict["slug"] += f"-{c}"
|
||||
new_shout = Shout(**shout_dict)
|
||||
session.add(new_shout)
|
||||
session.commit()
|
||||
try:
|
||||
with local_session() as session:
|
||||
author_id = int(author_id)
|
||||
current_time = int(time.time())
|
||||
slug = inp.get("slug") or f"draft-{current_time}"
|
||||
|
||||
# NOTE: requesting new shout back
|
||||
shout = session.query(Shout).where(Shout.slug == slug).first()
|
||||
if shout:
|
||||
# Проверка на существование записи
|
||||
existing_sa = session.query(ShoutAuthor).filter_by(shout=shout.id, author=author_id).first()
|
||||
if not existing_sa:
|
||||
sa = ShoutAuthor(shout=shout.id, author=author_id)
|
||||
logger.info(f"Creating shout with input: {inp}")
|
||||
# Создаем публикацию без topics
|
||||
new_shout = Shout(
|
||||
slug=slug,
|
||||
body=inp.get("body", ""),
|
||||
layout=inp.get("layout", "article"),
|
||||
title=inp.get("title", ""),
|
||||
created_by=author_id,
|
||||
created_at=current_time,
|
||||
community=1,
|
||||
)
|
||||
|
||||
# Проверяем уникальность slug
|
||||
logger.debug(f"Checking for existing slug: {slug}")
|
||||
same_slug_shout = session.query(Shout).filter(Shout.slug == new_shout.slug).first()
|
||||
c = 1
|
||||
while same_slug_shout is not None:
|
||||
logger.debug(f"Found duplicate slug, trying iteration {c}")
|
||||
new_shout.slug = f"{slug}-{c}"
|
||||
same_slug_shout = session.query(Shout).filter(Shout.slug == new_shout.slug).first()
|
||||
c += 1
|
||||
|
||||
try:
|
||||
logger.info("Creating new shout object")
|
||||
session.add(new_shout)
|
||||
session.commit()
|
||||
logger.info(f"Created shout with ID: {new_shout.id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating shout object: {e}", exc_info=True)
|
||||
return {"error": f"Database error: {str(e)}"}
|
||||
|
||||
# Связываем с автором
|
||||
try:
|
||||
logger.debug(f"Linking author {author_id} to shout {new_shout.id}")
|
||||
sa = ShoutAuthor(shout=new_shout.id, author=author_id)
|
||||
session.add(sa)
|
||||
except Exception as e:
|
||||
logger.error(f"Error linking author: {e}", exc_info=True)
|
||||
return {"error": f"Error linking author: {str(e)}"}
|
||||
|
||||
topics = session.query(Topic).filter(Topic.slug.in_(inp.get("topics", []))).all()
|
||||
for topic in topics:
|
||||
existing_st = session.query(ShoutTopic).filter_by(shout=shout.id, author=topic.id).first()
|
||||
if not existing_st:
|
||||
t = ShoutTopic(topic=topic.id, shout=shout.id)
|
||||
session.add(t)
|
||||
# Связываем с темами
|
||||
|
||||
session.commit()
|
||||
input_topics = inp.get("topics", [])
|
||||
if input_topics:
|
||||
try:
|
||||
logger.debug(f"Linking topics: {[t.slug for t in input_topics]}")
|
||||
main_topic = inp.get("main_topic")
|
||||
for topic in input_topics:
|
||||
st = ShoutTopic(
|
||||
topic=topic.id,
|
||||
shout=new_shout.id,
|
||||
main=(topic.slug == main_topic) if main_topic else False,
|
||||
)
|
||||
session.add(st)
|
||||
logger.debug(f"Added topic {topic.slug} {'(main)' if st.main else ''}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error linking topics: {e}", exc_info=True)
|
||||
return {"error": f"Error linking topics: {str(e)}"}
|
||||
|
||||
follow(None, info, "shout", shout.slug)
|
||||
try:
|
||||
session.commit()
|
||||
logger.info("Final commit successful")
|
||||
except Exception as e:
|
||||
logger.error(f"Error in final commit: {e}", exc_info=True)
|
||||
return {"error": f"Error in final commit: {str(e)}"}
|
||||
|
||||
# notifier
|
||||
# await notify_shout(shout_dict, 'create')
|
||||
# Получаем созданную публикацию
|
||||
shout = session.query(Shout).filter(Shout.id == new_shout.id).first()
|
||||
|
||||
# Подписываем автора
|
||||
try:
|
||||
logger.debug("Following created shout")
|
||||
await follow(None, info, "shout", shout.slug)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error following shout: {e}", exc_info=True)
|
||||
|
||||
logger.info(f"Successfully created shout {shout.id}")
|
||||
return {"shout": shout}
|
||||
|
||||
return {"error": "cant create shout" if user_id else "unauthorized"}
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in create_shout: {e}", exc_info=True)
|
||||
return {"error": f"Unexpected error: {str(e)}"}
|
||||
|
||||
error_msg = "cant create shout" if user_id else "unauthorized"
|
||||
logger.error(f"Create shout failed: {error_msg}")
|
||||
return {"error": error_msg}
|
||||
|
||||
|
||||
def patch_main_topic(session, main_topic, shout):
|
||||
@@ -268,13 +310,61 @@ async def update_shout(_, info, shout_id: int, shout_input=None, publish=False):
|
||||
patch_main_topic(session, main_topic, shout_by_id)
|
||||
|
||||
shout_input["updated_at"] = current_time
|
||||
shout_input["published_at"] = current_time if publish else None
|
||||
if publish:
|
||||
logger.info(f"publishing shout#{shout_id} with input: {shout_input}")
|
||||
shout_input["published_at"] = current_time
|
||||
# Проверяем наличие связи с автором
|
||||
logger.info(f"Checking author link for shout#{shout_id} and author#{author_id}")
|
||||
author_link = (
|
||||
session.query(ShoutAuthor)
|
||||
.filter(and_(ShoutAuthor.shout == shout_id, ShoutAuthor.author == author_id))
|
||||
.first()
|
||||
)
|
||||
|
||||
if not author_link:
|
||||
logger.info(f"Adding missing author link for shout#{shout_id}")
|
||||
sa = ShoutAuthor(shout=shout_id, author=author_id)
|
||||
session.add(sa)
|
||||
session.flush()
|
||||
logger.info("Author link added successfully")
|
||||
else:
|
||||
logger.info("Author link already exists")
|
||||
|
||||
Shout.update(shout_by_id, shout_input)
|
||||
session.add(shout_by_id)
|
||||
session.commit()
|
||||
|
||||
shout_dict = shout_by_id.dict()
|
||||
|
||||
# Инвалидация кэша после обновления
|
||||
try:
|
||||
logger.info("Invalidating cache after shout update")
|
||||
|
||||
cache_keys = [
|
||||
"feed", # лента
|
||||
f"author_{author_id}", # публикации автора
|
||||
"random_top", # случайные топовые
|
||||
"unrated", # неоцененные
|
||||
]
|
||||
|
||||
# Добавляем ключи для тем публикации
|
||||
for topic in shout_by_id.topics:
|
||||
cache_keys.append(f"topic_{topic.id}")
|
||||
cache_keys.append(f"topic_shouts_{topic.id}")
|
||||
|
||||
await invalidate_shouts_cache(cache_keys)
|
||||
await invalidate_shout_related_cache(shout_by_id, author_id)
|
||||
|
||||
# Обновляем кэш тем и авторов
|
||||
for topic in shout_by_id.topics:
|
||||
await cache_by_id(Topic, topic.id, cache_topic)
|
||||
for author in shout_by_id.authors:
|
||||
await cache_author(author.dict())
|
||||
|
||||
logger.info("Cache invalidated successfully")
|
||||
except Exception as cache_error:
|
||||
logger.warning(f"Cache invalidation error: {cache_error}", exc_info=True)
|
||||
|
||||
if not publish:
|
||||
await notify_shout(shout_dict, "update")
|
||||
else:
|
||||
@@ -286,7 +376,7 @@ async def update_shout(_, info, shout_id: int, shout_input=None, publish=False):
|
||||
logger.info(f"shout#{shout_id} updated")
|
||||
return {"shout": shout_dict, "error": None}
|
||||
else:
|
||||
logger.warning(f"shout#{shout_id} is not author or editor")
|
||||
logger.warning(f"updater for shout#{shout_id} is not author or editor")
|
||||
return {"error": "access denied", "shout": None}
|
||||
|
||||
except Exception as exc:
|
||||
|
@@ -1,10 +1,9 @@
|
||||
import json
|
||||
import time
|
||||
|
||||
from graphql import GraphQLResolveInfo
|
||||
from sqlalchemy import nulls_last, text
|
||||
from sqlalchemy import and_, nulls_last, text
|
||||
from sqlalchemy.orm import aliased
|
||||
from sqlalchemy.sql.expression import and_, asc, case, desc, func, select
|
||||
from sqlalchemy.sql.expression import asc, case, desc, func, select
|
||||
|
||||
from orm.author import Author
|
||||
from orm.reaction import Reaction, ReactionKind
|
||||
@@ -99,22 +98,6 @@ def query_with_stat(info):
|
||||
).label("main_topic")
|
||||
)
|
||||
|
||||
if has_field(info, "topics"):
|
||||
topics_subquery = (
|
||||
select(
|
||||
ShoutTopic.shout,
|
||||
json_array_builder(
|
||||
json_builder("id", Topic.id, "title", Topic.title, "slug", Topic.slug, "is_main", ShoutTopic.main)
|
||||
).label("topics"),
|
||||
)
|
||||
.outerjoin(Topic, ShoutTopic.topic == Topic.id)
|
||||
.where(ShoutTopic.shout == Shout.id)
|
||||
.group_by(ShoutTopic.shout)
|
||||
.subquery()
|
||||
)
|
||||
q = q.outerjoin(topics_subquery, topics_subquery.c.shout == Shout.id)
|
||||
q = q.add_columns(topics_subquery.c.topics)
|
||||
|
||||
if has_field(info, "authors"):
|
||||
authors_subquery = (
|
||||
select(
|
||||
@@ -144,8 +127,23 @@ def query_with_stat(info):
|
||||
q = q.outerjoin(authors_subquery, authors_subquery.c.shout == Shout.id)
|
||||
q = q.add_columns(authors_subquery.c.authors)
|
||||
|
||||
if has_field(info, "topics"):
|
||||
topics_subquery = (
|
||||
select(
|
||||
ShoutTopic.shout,
|
||||
json_array_builder(
|
||||
json_builder("id", Topic.id, "title", Topic.title, "slug", Topic.slug, "is_main", ShoutTopic.main)
|
||||
).label("topics"),
|
||||
)
|
||||
.outerjoin(Topic, ShoutTopic.topic == Topic.id)
|
||||
.where(ShoutTopic.shout == Shout.id)
|
||||
.group_by(ShoutTopic.shout)
|
||||
.subquery()
|
||||
)
|
||||
q = q.outerjoin(topics_subquery, topics_subquery.c.shout == Shout.id)
|
||||
q = q.add_columns(topics_subquery.c.topics)
|
||||
|
||||
if has_field(info, "stat"):
|
||||
# Подзапрос для статистики реакций
|
||||
stats_subquery = (
|
||||
select(
|
||||
Reaction.shout,
|
||||
@@ -190,17 +188,12 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
||||
"""
|
||||
shouts = []
|
||||
try:
|
||||
logger.info(f"Начало выполнения get_shouts_with_links с limit={limit}, offset={offset}")
|
||||
q = q.limit(limit).offset(offset)
|
||||
|
||||
with local_session() as session:
|
||||
logger.info("Выполнение основного запроса")
|
||||
t1 = time.time()
|
||||
shouts_result = session.execute(q).all()
|
||||
logger.info(f"Запрос выполнен, получено {len(shouts_result)} результатов за {time.time() - t1:.3f} секунд")
|
||||
|
||||
if not shouts_result:
|
||||
logger.warning("Нет найденных результатов")
|
||||
return []
|
||||
|
||||
for idx, row in enumerate(shouts_result):
|
||||
@@ -222,14 +215,12 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
||||
"pic": a.pic,
|
||||
}
|
||||
|
||||
if hasattr(row, "stat"):
|
||||
if has_field(info, "stat"):
|
||||
stat = {}
|
||||
if isinstance(row.stat, str):
|
||||
stat = json.loads(row.stat)
|
||||
elif isinstance(row.stat, dict):
|
||||
stat = row.stat
|
||||
else:
|
||||
logger.warning(f"Строка {idx} - неизвестный тип stat: {type(row.stat)}")
|
||||
viewed = ViewedStorage.get_shout(shout_id=shout_id) or 0
|
||||
shout_dict["stat"] = {**stat, "viewed": viewed, "commented": stat.get("comments_count", 0)}
|
||||
|
||||
@@ -244,6 +235,16 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
||||
if has_field(info, "topics") and hasattr(row, "topics"):
|
||||
shout_dict["topics"] = json.loads(row.topics) if isinstance(row.topics, str) else row.topics
|
||||
|
||||
if has_field(info, "media") and shout.media:
|
||||
# Обработка поля media
|
||||
media_data = shout.media
|
||||
if isinstance(media_data, str):
|
||||
try:
|
||||
media_data = json.loads(media_data)
|
||||
except json.JSONDecodeError:
|
||||
media_data = []
|
||||
shout_dict["media"] = [media_data] if isinstance(media_data, dict) else media_data
|
||||
|
||||
shouts.append(shout_dict)
|
||||
|
||||
except Exception as row_error:
|
||||
@@ -346,11 +347,13 @@ async def load_shouts_by(_, info: GraphQLResolveInfo, options):
|
||||
|
||||
:param _: Корневой объект запроса (не используется)
|
||||
:param info: Информация о контексте GraphQL
|
||||
:param options: Опции фильтрации и сортировки.
|
||||
:return: Список публикаций, удовлетворяющих критериям.
|
||||
:param options: Опции фильтрации и сортировки
|
||||
:return: Список публикаций, удовлетворяющих критериям
|
||||
"""
|
||||
# Базовый запрос: используем специальный запрос с статистикой
|
||||
# Базовый запрос со статистикой
|
||||
q = query_with_stat(info)
|
||||
|
||||
# Применяем остальные опции фильтрации
|
||||
q, limit, offset = apply_options(q, options)
|
||||
|
||||
# Передача сформированного запроса в метод получения публикаций с учетом сортировки и пагинации
|
||||
|
@@ -7,10 +7,8 @@ from settings import ADMIN_SECRET, AUTH_URL
|
||||
from utils.logger import root_logger as logger
|
||||
|
||||
# Список разрешенных заголовков
|
||||
ALLOWED_HEADERS = [
|
||||
'Authorization',
|
||||
'Content-Type'
|
||||
]
|
||||
ALLOWED_HEADERS = ["Authorization", "Content-Type"]
|
||||
|
||||
|
||||
async def check_auth(req):
|
||||
"""
|
||||
@@ -27,32 +25,26 @@ async def check_auth(req):
|
||||
- user_roles: list[str] - Список ролей пользователя.
|
||||
"""
|
||||
token = req.headers.get("Authorization")
|
||||
|
||||
host = req.headers.get('host', '')
|
||||
|
||||
host = req.headers.get("host", "")
|
||||
logger.debug(f"check_auth: host={host}")
|
||||
auth_url = AUTH_URL
|
||||
if '.dscrs.site' in host or 'localhost' in host:
|
||||
if ".dscrs.site" in host or "localhost" in host:
|
||||
auth_url = "https://auth.dscrs.site/graphql"
|
||||
user_id = ""
|
||||
user_roles = []
|
||||
if token:
|
||||
# Проверяем и очищаем токен от префикса Bearer если он есть
|
||||
if token.startswith('Bearer '):
|
||||
token = token.split('Bearer ')[-1].strip()
|
||||
if token.startswith("Bearer "):
|
||||
token = token.split("Bearer ")[-1].strip()
|
||||
# Logging the authentication token
|
||||
logger.debug(f"TOKEN: {token}")
|
||||
query_name = "validate_jwt_token"
|
||||
operation = "ValidateToken"
|
||||
variables = {"params": {"token_type": "access_token", "token": token}}
|
||||
|
||||
# Добавляем CORS заголовки
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
|
||||
'Access-Control-Allow-Headers': ', '.join(ALLOWED_HEADERS),
|
||||
'Access-Control-Allow-Credentials': 'true'
|
||||
}
|
||||
# Только необходимые заголовки для GraphQL запроса
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
||||
gql = {
|
||||
"query": f"query {operation}($params: ValidateJWTTokenInput!)"
|
||||
@@ -114,7 +106,7 @@ def login_required(f):
|
||||
"""
|
||||
Декоратор для проверки авторизации пользователя.
|
||||
|
||||
Этот декоратор проверяет, авторизован ли пользователь, и добавляет
|
||||
Этот декоратор проверяет, авторизован ли пользователь, <EFBFBD><EFBFBD> добавляет
|
||||
информацию о пользователе в контекст функции.
|
||||
|
||||
Параметры:
|
||||
|
@@ -11,12 +11,19 @@ resolvers = [query, mutation]
|
||||
|
||||
|
||||
async def request_graphql_data(gql, url=AUTH_URL, headers=None):
|
||||
"""
|
||||
Выполняет GraphQL запрос к указанному URL
|
||||
|
||||
:param gql: GraphQL запрос
|
||||
:param url: URL для запроса, по умолчанию AUTH_URL
|
||||
:param headers: Заголовки запроса
|
||||
:return: Результат запроса или None в случае ошибки
|
||||
"""
|
||||
if not url:
|
||||
return None
|
||||
if headers is None:
|
||||
headers = {"Content-Type": "application/json"}
|
||||
try:
|
||||
# logger.debug(f"{url}:\n{headers}\n{gql}")
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(url, json=gql, headers=headers)
|
||||
if response.status_code == 200:
|
||||
@@ -29,8 +36,6 @@ async def request_graphql_data(gql, url=AUTH_URL, headers=None):
|
||||
else:
|
||||
logger.error(f"{url}: {response.status_code} {response.text}")
|
||||
except Exception as _e:
|
||||
# Handling and logging exceptions during authentication check
|
||||
import traceback
|
||||
|
||||
logger.error(f"request_graphql_data error: {traceback.format_exc()}")
|
||||
return None
|
||||
|
@@ -20,7 +20,7 @@ from settings import ADMIN_SECRET, WEBHOOK_SECRET
|
||||
async def check_webhook_existence():
|
||||
"""
|
||||
Проверяет существование вебхука для user.login события
|
||||
|
||||
|
||||
Returns:
|
||||
tuple: (bool, str, str) - существует ли вебхук, его id и endpoint если существует
|
||||
"""
|
||||
@@ -28,11 +28,8 @@ async def check_webhook_existence():
|
||||
if not ADMIN_SECRET:
|
||||
logger.error("ADMIN_SECRET is not set")
|
||||
return False, None, None
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Authorizer-Admin-Secret": ADMIN_SECRET
|
||||
}
|
||||
|
||||
headers = {"Content-Type": "application/json", "X-Authorizer-Admin-Secret": ADMIN_SECRET}
|
||||
|
||||
operation = "GetWebhooks"
|
||||
query_name = "_webhooks"
|
||||
@@ -63,17 +60,14 @@ async def create_webhook_endpoint():
|
||||
"""
|
||||
logger.info("create_webhook_endpoint called")
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Authorizer-Admin-Secret": ADMIN_SECRET
|
||||
}
|
||||
headers = {"Content-Type": "application/json", "X-Authorizer-Admin-Secret": ADMIN_SECRET}
|
||||
|
||||
exists, webhook_id, current_endpoint = await check_webhook_existence()
|
||||
|
||||
|
||||
# Определяем endpoint в зависимости от окружения
|
||||
host = os.environ.get('HOST', 'core.dscrs.site')
|
||||
host = os.environ.get("HOST", "core.dscrs.site")
|
||||
endpoint = f"https://{host}/new-author"
|
||||
|
||||
|
||||
if exists:
|
||||
# Если вебхук существует, но с другим endpoint или с модифицированным именем
|
||||
if current_endpoint != endpoint or webhook_id:
|
||||
|
Reference in New Issue
Block a user