core/resolvers/editor.py

715 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
from typing import Any
import orjson
from graphql import GraphQLResolveInfo
from sqlalchemy import and_, desc, select
from sqlalchemy.orm import joinedload
from sqlalchemy.sql.functions import coalesce
from auth.orm import Author
from cache.cache import (
cache_author,
cache_topic,
invalidate_shout_related_cache,
invalidate_shouts_cache,
)
from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic
from resolvers.follower import follow
from resolvers.stat import get_with_stat
from services.auth import login_required
from services.common_result import CommonResult
from services.db import local_session
from services.notify import notify_shout
from services.schema import mutation, query
from services.search import search_service
from utils.extract_text import extract_text
from utils.logger import root_logger as logger
async def cache_by_id(entity, entity_id: int, cache_method):
"""Cache an entity by its ID using the provided cache method.
Args:
entity: The SQLAlchemy model class to query
entity_id (int): The ID of the entity to cache
cache_method: The caching function to use
Returns:
dict: The cached entity data if successful, None if entity not found
Example:
>>> async def test_cache():
... author = await cache_by_id(Author, 1, cache_author)
... assert author['id'] == 1
... assert 'name' in author
... return author
"""
caching_query = select(entity).filter(entity.id == entity_id)
result = get_with_stat(caching_query)
if not result or not result[0]:
logger.warning(f"{entity.__name__} with id {entity_id} not found")
return None
x = result[0]
d = x.dict() # convert object to dictionary
cache_method(d)
return d
@query.field("get_my_shout")
@login_required
async def get_my_shout(_: None, info, shout_id: int):
"""Get a shout by ID if the requesting user has permission to view it.
DEPRECATED: use `load_drafts` instead
Args:
info: GraphQL resolver info containing context
shout_id (int): ID of the shout to retrieve
Returns:
dict: Contains either:
- error (str): Error message if retrieval failed
- shout (Shout): The requested shout if found and accessible
Permissions:
User must be:
- The shout creator
- Listed as an author
- Have editor role
Example:
>>> async def test_get_my_shout():
... context = {'user_id': '123', 'author': {'id': 1}, 'roles': []}
... info = type('Info', (), {'context': context})()
... result = await get_my_shout(None, info, 1)
... assert result['error'] is None
... assert result['shout'].id == 1
... return result
"""
author_dict = info.context.get("author", {})
author_id = author_dict.get("id")
roles = info.context.get("roles", [])
shout = None
if not author_id:
return {"error": "unauthorized", "shout": None}
with local_session() as session:
shout = (
session.query(Shout)
.filter(Shout.id == shout_id)
.options(joinedload(Shout.authors), joinedload(Shout.topics))
.filter(Shout.deleted_at.is_(None))
.first()
)
if not shout:
return {"error": "no shout found", "shout": None}
# Преобразуем media JSON в список объектов MediaItem
if hasattr(shout, "media") and shout.media:
if isinstance(shout.media, str):
try:
shout.media = orjson.loads(shout.media)
except Exception as e:
logger.error(f"Error parsing shout media: {e}")
shout.media = []
elif isinstance(shout.media, list):
shout.media = shout.media or []
else:
shout.media = [] # type: ignore[assignment]
logger.debug(f"got {len(shout.authors)} shout authors, created by {shout.created_by}")
is_editor = "editor" in roles
logger.debug(f"viewer is{'' if is_editor else ' not'} editor")
is_creator = author_id == shout.created_by
logger.debug(f"viewer is{'' if is_creator else ' not'} creator")
is_author = bool(list(filter(lambda x: x.id == int(author_id), list(shout.authors))))
logger.debug(f"viewer is{'' if is_creator else ' not'} author")
can_edit = is_editor or is_author or is_creator
if not can_edit:
return {"error": "forbidden", "shout": None}
logger.debug("got shout editor with data")
return {"error": None, "shout": shout}
@query.field("get_shouts_drafts")
@login_required
async def get_shouts_drafts(_: None, info: GraphQLResolveInfo) -> list[dict]:
author_dict = info.context.get("author") or {}
if not author_dict:
return [] # Return empty list instead of error dict
author_id = author_dict.get("id")
shouts = []
with local_session() as session:
if author_id:
q = (
select(Shout)
.options(joinedload(Shout.authors), joinedload(Shout.topics))
.filter(and_(Shout.deleted_at.is_(None), Shout.created_by == int(author_id)))
.filter(Shout.published_at.is_(None))
.order_by(desc(coalesce(Shout.updated_at, Shout.created_at)))
.group_by(Shout.id)
)
shouts = [shout.dict() for [shout] in session.execute(q).unique()]
return shouts
# @mutation.field("create_shout")
# @login_required
async def create_shout(_: None, info: GraphQLResolveInfo, inp: dict) -> dict:
logger.info(f"Starting create_shout with input: {inp}")
author_dict = info.context.get("author") or {}
logger.debug(f"Context author: {author_dict}")
if not author_dict:
logger.error("Author profile not found in context")
return {"error": "author profile was not found"}
author_id = author_dict.get("id")
if author_id:
try:
with local_session() as session:
author_id = int(author_id)
current_time = int(time.time())
slug = inp.get("slug") or f"draft-{current_time}"
logger.info(f"Creating shout with input: {inp}")
# Создаем публикацию без topics
body = inp.get("body", "")
lead = inp.get("lead", "")
body_text = extract_text(body)
lead_text = extract_text(lead)
seo_parts = lead_text.strip() or body_text.strip()[:300].split(". ")[:-1]
seo = inp.get("seo", ". ".join(seo_parts))
new_shout = Shout(
slug=slug,
body=body,
seo=seo,
lead=lead,
layout=inp.get("layout", "article"),
title=inp.get("title", ""),
created_by=author_id,
created_at=current_time,
community=1,
)
# Проверяем уникальность slug
logger.debug(f"Checking for existing slug: {slug}")
same_slug_shout = session.query(Shout).filter(Shout.slug == new_shout.slug).first()
c = 1
while same_slug_shout is not None:
logger.debug(f"Found duplicate slug, trying iteration {c}")
new_shout.slug = f"{slug}-{c}" # type: ignore[assignment]
same_slug_shout = session.query(Shout).filter(Shout.slug == new_shout.slug).first()
c += 1
try:
logger.info("Creating new shout object")
session.add(new_shout)
session.commit()
logger.info(f"Created shout with ID: {new_shout.id}")
except Exception as e:
logger.error(f"Error creating shout object: {e}", exc_info=True)
return {"error": f"Database error: {e!s}"}
# Связываем с автором
try:
logger.debug(f"Linking author {author_id} to shout {new_shout.id}")
sa = ShoutAuthor(shout=new_shout.id, author=author_id)
session.add(sa)
except Exception as e:
logger.error(f"Error linking author: {e}", exc_info=True)
return {"error": f"Error linking author: {e!s}"}
# Связываем с темами
input_topics = inp.get("topics", [])
if input_topics:
try:
logger.debug(f"Linking topics: {[t.slug for t in input_topics]}")
main_topic = inp.get("main_topic")
for topic in input_topics:
st = ShoutTopic(
topic=topic.id,
shout=new_shout.id,
main=(topic.slug == main_topic) if main_topic else False,
)
session.add(st)
logger.debug(f"Added topic {topic.slug} {'(main)' if st.main else ''}")
except Exception as e:
logger.error(f"Error linking topics: {e}", exc_info=True)
return {"error": f"Error linking topics: {e!s}"}
try:
session.commit()
logger.info("Final commit successful")
except Exception as e:
logger.error(f"Error in final commit: {e}", exc_info=True)
return {"error": f"Error in final commit: {e!s}"}
# Получаем созданную публикацию
shout = session.query(Shout).filter(Shout.id == new_shout.id).first()
if shout:
# Подписываем автора
try:
logger.debug("Following created shout")
await follow(None, info, "shout", shout.slug)
except Exception as e:
logger.warning(f"Error following shout: {e}", exc_info=True)
logger.info(f"Successfully created shout {shout.id}")
return {"shout": shout}
except Exception as e:
logger.error(f"Unexpected error in create_shout: {e}", exc_info=True)
return {"error": f"Unexpected error: {e!s}"}
error_msg = "cant create shout" if author_id else "unauthorized"
logger.error(f"Create shout failed: {error_msg}")
return {"error": error_msg}
def patch_main_topic(session: Any, main_topic_slug: str, shout: Any) -> None:
"""Update the main topic for a shout."""
logger.info(f"Starting patch_main_topic for shout#{shout.id} with slug '{main_topic_slug}'")
logger.debug(f"Current shout topics: {[(t.topic.slug, t.main) for t in shout.topics]}")
with session.begin():
# Получаем текущий главный топик
old_main = (
session.query(ShoutTopic).filter(and_(ShoutTopic.shout == shout.id, ShoutTopic.main.is_(True))).first()
)
if old_main:
logger.info(f"Found current main topic: {old_main.topic.slug}")
else:
logger.info("No current main topic found")
# Находим новый главный топик
main_topic = session.query(Topic).filter(Topic.slug == main_topic_slug).first()
if not main_topic:
logger.error(f"Main topic with slug '{main_topic_slug}' not found")
return
logger.info(f"Found new main topic: {main_topic.slug} (id={main_topic.id})")
# Находим связь с новым главным топиком
new_main = (
session.query(ShoutTopic)
.filter(and_(ShoutTopic.shout == shout.id, ShoutTopic.topic == main_topic.id))
.first()
)
logger.debug(f"Found new main topic relation: {new_main is not None}")
if old_main and new_main and old_main is not new_main:
logger.info(f"Updating main topic flags: {old_main.topic.slug} -> {new_main.topic.slug}")
old_main.main = False # type: ignore[assignment]
session.add(old_main)
new_main.main = True # type: ignore[assignment]
session.add(new_main)
session.flush()
logger.info(f"Main topic updated for shout#{shout.id}")
else:
logger.warning(f"No changes needed for main topic (old={old_main is not None}, new={new_main is not None})")
def patch_topics(session: Any, shout: Any, topics_input: list[Any]) -> None:
"""Update the topics associated with a shout.
Args:
session: SQLAlchemy session
shout (Shout): The shout to update
topics_input (list): List of topic dicts with fields:
- id (int): Topic ID (<0 for new topics)
- slug (str): Topic slug
- title (str): Topic title (for new topics)
Side Effects:
- Creates new topics if needed
- Updates shout-topic associations
- Refreshes shout object with new topics
Example:
>>> def test_patch_topics():
... topics = [
... {'id': -1, 'slug': 'new-topic', 'title': 'New Topic'},
... {'id': 1, 'slug': 'existing-topic'}
... ]
... with local_session() as session:
... shout = session.query(Shout).first()
... patch_topics(session, shout, topics)
... assert len(shout.topics) == 2
... assert any(t.slug == 'new-topic' for t in shout.topics)
... return shout.topics
"""
logger.info(f"Starting patch_topics for shout#{shout.id}")
logger.info(f"Received topics_input: {topics_input}")
# Создаем новые топики если есть
new_topics_to_link = [Topic(**new_topic) for new_topic in topics_input if new_topic["id"] < 0]
if new_topics_to_link:
logger.info(f"Creating new topics: {[t.dict() for t in new_topics_to_link]}")
session.add_all(new_topics_to_link)
session.flush()
# Получаем текущие связи
current_links = session.query(ShoutTopic).filter(ShoutTopic.shout == shout.id).all()
logger.info(f"Current topic links: {[{t.topic: t.main} for t in current_links]}")
# Удаляем старые связи
if current_links:
logger.info(f"Removing old topic links for shout#{shout.id}")
for link in current_links:
session.delete(link)
session.flush()
# Создаем новые связи
for topic_input in topics_input:
topic_id = topic_input["id"]
if topic_id < 0:
topic = next(t for t in new_topics_to_link if t.slug == topic_input["slug"])
topic_id = topic.id
logger.info(f"Creating new topic link: shout#{shout.id} -> topic#{topic_id}")
new_link = ShoutTopic(shout=shout.id, topic=topic_id, main=False)
session.add(new_link)
session.flush()
# Обновляем связи в объекте шаута
session.refresh(shout)
logger.info(f"Successfully updated topics for shout#{shout.id}")
logger.info(f"Final shout topics: {[t.dict() for t in shout.topics]}")
# @mutation.field("update_shout")
# @login_required
async def update_shout(
_: None, info: GraphQLResolveInfo, shout_id: int, shout_input: dict | None = None, *, publish: bool = False
) -> CommonResult:
"""Update an existing shout with optional publishing"""
logger.info(f"update_shout called with shout_id={shout_id}, publish={publish}")
author_dict = info.context.get("author", {})
author_id = author_dict.get("id")
if not author_id:
logger.error("Unauthorized update attempt")
return CommonResult(error="unauthorized", shout=None)
logger.info(f"Starting update_shout with id={shout_id}, publish={publish}")
logger.debug(f"Full shout_input: {shout_input}") # DraftInput
roles = info.context.get("roles", [])
current_time = int(time.time())
shout_input = shout_input or {}
shout_id = shout_id or shout_input.get("id", shout_id)
slug = shout_input.get("slug")
try:
with local_session() as session:
if author_id:
logger.info(f"Processing update for shout#{shout_id} by author #{author_id}")
shout_by_id = (
session.query(Shout)
.options(joinedload(Shout.topics).joinedload(ShoutTopic.topic), joinedload(Shout.authors))
.filter(Shout.id == shout_id)
.first()
)
if not shout_by_id:
logger.error(f"shout#{shout_id} not found")
return CommonResult(error="shout not found", shout=None)
logger.info(f"Found shout#{shout_id}")
# Логируем текущие топики
current_topics = (
[{"id": t.id, "slug": t.slug, "title": t.title} for t in shout_by_id.topics]
if shout_by_id.topics
else []
)
logger.info(f"Current topics for shout#{shout_id}: {current_topics}")
if slug != shout_by_id.slug:
same_slug_shout = session.query(Shout).filter(Shout.slug == slug).first()
c = 1
while same_slug_shout is not None:
c += 1
same_slug_shout.slug = f"{slug}-{c}" # type: ignore[assignment]
same_slug_shout = session.query(Shout).filter(Shout.slug == slug).first()
shout_input["slug"] = slug
logger.info(f"shout#{shout_id} slug patched")
if filter(lambda x: x.id == author_id, list(shout_by_id.authors)) or "editor" in roles:
logger.info(f"Author #{author_id} has permission to edit shout#{shout_id}")
# topics patch
topics_input = shout_input.get("topics")
if topics_input:
logger.info(f"Received topics_input for shout#{shout_id}: {topics_input}")
try:
patch_topics(session, shout_by_id, topics_input)
logger.info(f"Successfully patched topics for shout#{shout_id}")
# Обновляем связи в сессии после patch_topics
session.refresh(shout_by_id)
except Exception as e:
logger.error(f"Error patching topics: {e}", exc_info=True)
return CommonResult(error=f"Failed to update topics: {e!s}", shout=None)
del shout_input["topics"]
for tpc in topics_input:
await cache_by_id(Topic, tpc["id"], cache_topic)
else:
logger.warning(f"No topics_input received for shout#{shout_id}")
# main topic
main_topic = shout_input.get("main_topic")
if main_topic:
logger.info(f"Updating main topic for shout#{shout_id} to {main_topic}")
patch_main_topic(session, main_topic, shout_by_id)
shout_by_id.updated_at = current_time # type: ignore[assignment]
if publish:
logger.info(f"Publishing shout#{shout_id}")
shout_by_id.published_at = current_time # type: ignore[assignment]
# Проверяем наличие связи с автором
logger.info(f"Checking author link for shout#{shout_id} and author#{author_id}")
author_link = (
session.query(ShoutAuthor)
.filter(and_(ShoutAuthor.shout == shout_id, ShoutAuthor.author == author_id))
.first()
)
if not author_link:
logger.info(f"Adding missing author link for shout#{shout_id}")
sa = ShoutAuthor(shout=shout_id, author=author_id)
session.add(sa)
session.flush()
logger.info("Author link added successfully")
else:
logger.info("Author link already exists")
# Логируем финальное состояние перед сохранением
logger.info(f"Final shout_input for update: {shout_input}")
Shout.update(shout_by_id, shout_input)
session.add(shout_by_id)
try:
session.commit()
# Обновляем объект после коммита чтобы получить все связи
session.refresh(shout_by_id)
logger.info(f"Successfully committed updates for shout#{shout_id}")
except Exception as e:
logger.error(f"Commit failed: {e}", exc_info=True)
return CommonResult(error=f"Failed to save changes: {e!s}", shout=None)
# После обновления проверяем топики
updated_topics = (
[{"id": t.id, "slug": t.slug, "title": t.title} for t in shout_by_id.topics]
if shout_by_id.topics
else []
)
logger.info(f"Updated topics for shout#{shout_id}: {updated_topics}")
# Инвалидация кэша после обновления
try:
logger.info("Invalidating cache after shout update")
cache_keys = [
"feed", # лента
f"author_{author_id}", # публикации автора
"random_top", # случайные топовые
"unrated", # неоцененные
]
# Добавляем ключи для тем публикации
for topic in shout_by_id.topics:
cache_keys.append(f"topic_{topic.id}")
cache_keys.append(f"topic_shouts_{topic.id}")
await invalidate_shouts_cache(cache_keys)
await invalidate_shout_related_cache(shout_by_id, author_id)
# Обновляем кэш тем и авторов
for topic in shout_by_id.topics:
await cache_by_id(Topic, topic.id, cache_topic)
for author in shout_by_id.authors:
await cache_author(author.dict())
logger.info("Cache invalidated successfully")
except Exception as cache_error:
logger.warning(f"Cache invalidation error: {cache_error}", exc_info=True)
if not publish:
await notify_shout(shout_by_id.dict(), "update")
else:
await notify_shout(shout_by_id.dict(), "published")
# search service indexing
search_service.index(shout_by_id)
for a in shout_by_id.authors:
await cache_by_id(Author, a.id, cache_author)
logger.info(f"shout#{shout_id} updated")
# Return success with the updated shout
return CommonResult(error=None, shout=shout_by_id)
logger.warning(f"Access denied: author #{author_id} cannot edit shout#{shout_id}")
return CommonResult(error="access denied", shout=None)
return CommonResult(error="cant update shout", shout=None)
except Exception as e:
logger.error(f"Exception in update_shout: {e}", exc_info=True)
return CommonResult(error="cant update shout", shout=None)
# @mutation.field("delete_shout")
# @login_required
async def delete_shout(_: None, info: GraphQLResolveInfo, shout_id: int) -> CommonResult:
"""Delete a shout (mark as deleted)"""
author_dict = info.context.get("author", {})
if not author_dict:
return CommonResult(error="author profile was not found", shout=None)
author_id = author_dict.get("id")
roles = info.context.get("roles", [])
with local_session() as session:
if author_id:
if shout_id:
shout = session.query(Shout).filter(Shout.id == shout_id).first()
if shout:
# Check if user has permission to delete
if any(x.id == author_id for x in shout.authors) or "editor" in roles:
# Use setattr to avoid MyPy complaints about Column assignment
shout.deleted_at = int(time.time()) # type: ignore[assignment]
session.add(shout)
session.commit()
# Get shout data for notification
shout_dict = shout.dict()
# Invalidate cache
await invalidate_shout_related_cache(shout, author_id)
# Notify about deletion
await notify_shout(shout_dict, "delete")
return CommonResult(error=None, shout=shout)
return CommonResult(error="access denied", shout=None)
return CommonResult(error="shout not found", shout=None)
def get_main_topic(topics: list[Any]) -> dict[str, Any]:
"""Get the main topic from a list of ShoutTopic objects."""
logger.info(f"Starting get_main_topic with {len(topics) if topics else 0} topics")
logger.debug(f"Topics data: {[(t.slug, getattr(t, 'main', False)) for t in topics] if topics else []}")
if not topics:
logger.warning("No topics provided to get_main_topic")
return {"id": 0, "title": "no topic", "slug": "notopic", "is_main": True}
# Проверяем, является ли topics списком объектов ShoutTopic или Topic
if hasattr(topics[0], "topic") and topics[0].topic:
# Для ShoutTopic объектов (старый формат)
# Find first main topic in original order
main_topic_rel = next((st for st in topics if getattr(st, "main", False)), None)
logger.debug(
f"Found main topic relation: {main_topic_rel.topic.slug if main_topic_rel and main_topic_rel.topic else None}"
)
if main_topic_rel and main_topic_rel.topic:
result = {
"slug": main_topic_rel.topic.slug,
"title": main_topic_rel.topic.title,
"id": main_topic_rel.topic.id,
"is_main": True,
}
logger.info(f"Returning main topic: {result}")
return result
# If no main found but topics exist, return first
if topics and topics[0].topic:
logger.info(f"No main topic found, using first topic: {topics[0].topic.slug}")
return {
"slug": topics[0].topic.slug,
"title": topics[0].topic.title,
"id": topics[0].topic.id,
"is_main": True,
}
# Для Topic объектов (новый формат из selectinload)
# После смены на selectinload у нас просто список Topic объектов
elif topics:
logger.info(f"Using first topic as main: {topics[0].slug}")
return {
"slug": topics[0].slug,
"title": topics[0].title,
"id": topics[0].id,
"is_main": True,
}
logger.warning("No valid topics found, returning default")
return {"slug": "notopic", "title": "no topic", "id": 0, "is_main": True}
@mutation.field("unpublish_shout")
@login_required
async def unpublish_shout(_: None, info: GraphQLResolveInfo, shout_id: int) -> CommonResult:
"""
Unpublish a shout by setting published_at to NULL
"""
author_dict = info.context.get("author", {})
author_id = author_dict.get("id")
roles = info.context.get("roles", [])
if not author_id:
return CommonResult(error="Author ID is required", shout=None)
try:
with local_session() as session:
# Получаем шаут с авторами
shout = session.query(Shout).options(joinedload(Shout.authors)).filter(Shout.id == shout_id).first()
if not shout:
return CommonResult(error="Shout not found", shout=None)
# Проверяем права доступа
can_edit = any(author.id == author_id for author in shout.authors) or "editor" in roles
if can_edit:
shout.published_at = None # type: ignore[assignment]
shout.updated_at = int(time.time()) # type: ignore[assignment]
session.add(shout)
session.commit()
# Инвалидация кэша
cache_keys = [
"feed",
f"author_{author_id}",
"random_top",
"unrated",
]
# Добавляем ключи для тем публикации
for topic in shout.topics:
cache_keys.append(f"topic_{topic.id}")
cache_keys.append(f"topic_shouts_{topic.id}")
await invalidate_shouts_cache(cache_keys)
await invalidate_shout_related_cache(shout, author_id)
# Получаем обновленные данные шаута
session.refresh(shout)
shout_dict = shout.dict()
logger.info(f"Shout {shout_id} unpublished successfully")
return CommonResult(error=None, shout=shout)
return CommonResult(error="Access denied", shout=None)
except Exception as e:
logger.error(f"Error unpublishing shout {shout_id}: {e}", exc_info=True)
return CommonResult(error=f"Failed to unpublish shout: {e!s}", shout=None)