diff --git a/orm/shout.py b/orm/shout.py index db352441..37734aca 100644 --- a/orm/shout.py +++ b/orm/shout.py @@ -1,6 +1,6 @@ import time -from sqlalchemy import JSON, Boolean, Column, ForeignKey, Integer, String +from sqlalchemy import JSON, Boolean, Column, ForeignKey, Index, Integer, String from sqlalchemy.orm import relationship from orm.author import Author @@ -10,6 +10,15 @@ from services.db import Base class ShoutTopic(Base): + """ + Связь между публикацией и темой. + + Attributes: + shout (int): ID публикации + topic (int): ID темы + main (bool): Признак основной темы + """ + __tablename__ = "shout_topic" id = None # type: ignore @@ -17,6 +26,12 @@ class ShoutTopic(Base): topic = Column(ForeignKey("topic.id"), primary_key=True, index=True) main = Column(Boolean, nullable=True) + # Определяем дополнительные индексы + __table_args__ = ( + # Оптимизированный составной индекс для запросов, которые ищут публикации по теме + Index("idx_shout_topic_topic_shout", "topic", "shout"), + ) + class ShoutReactionsFollower(Base): __tablename__ = "shout_reactions_followers" @@ -30,6 +45,15 @@ class ShoutReactionsFollower(Base): class ShoutAuthor(Base): + """ + Связь между публикацией и автором. + + Attributes: + shout (int): ID публикации + author (int): ID автора + caption (str): Подпись автора + """ + __tablename__ = "shout_author" id = None # type: ignore @@ -37,8 +61,18 @@ class ShoutAuthor(Base): author = Column(ForeignKey("author.id"), primary_key=True, index=True) caption = Column(String, nullable=True, default="") + # Определяем дополнительные индексы + __table_args__ = ( + # Оптимизированный индекс для запросов, которые ищут публикации по автору + Index("idx_shout_author_author_shout", "author", "shout"), + ) + class Shout(Base): + """ + Публикация в системе. + """ + __tablename__ = "shout" created_at: int = Column(Integer, nullable=False, default=lambda: int(time.time())) @@ -74,3 +108,20 @@ class Shout(Base): seo: str | None = Column(String, nullable=True) # JSON draft: int | None = Column(ForeignKey("draft.id"), nullable=True) + + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска неудаленных публикаций + Index("idx_shout_deleted_at", "deleted_at", postgresql_where=deleted_at.is_(None)), + # Индекс для быстрой фильтрации по community + Index("idx_shout_community", "community"), + # Индекс для быстрого поиска по slug + Index("idx_shout_slug", "slug"), + # Составной индекс для фильтрации опубликованных неудаленных публикаций + Index( + "idx_shout_published_deleted", + "published_at", + "deleted_at", + postgresql_where=published_at.is_not(None) & deleted_at.is_(None), + ), + ) diff --git a/orm/topic.py b/orm/topic.py index 61231fb3..4be1897d 100644 --- a/orm/topic.py +++ b/orm/topic.py @@ -1,11 +1,21 @@ import time -from sqlalchemy import JSON, Boolean, Column, ForeignKey, Integer, String +from sqlalchemy import JSON, Boolean, Column, ForeignKey, Index, Integer, String from services.db import Base class TopicFollower(Base): + """ + Связь между топиком и его подписчиком. + + Attributes: + follower (int): ID подписчика + topic (int): ID топика + created_at (int): Время создания связи + auto (bool): Автоматическая подписка + """ + __tablename__ = "topic_followers" id = None # type: ignore @@ -14,8 +24,29 @@ class TopicFollower(Base): created_at = Column(Integer, nullable=False, default=int(time.time())) auto = Column(Boolean, nullable=False, default=False) + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска всех подписчиков топика + Index("idx_topic_followers_topic", "topic"), + # Индекс для быстрого поиска всех топиков, на которые подписан автор + Index("idx_topic_followers_follower", "follower"), + ) + class Topic(Base): + """ + Модель топика (темы) публикаций. + + Attributes: + slug (str): Уникальный строковый идентификатор темы + title (str): Название темы + body (str): Описание темы + pic (str): URL изображения темы + community (int): ID сообщества + oid (str): Старый ID + parent_ids (list): IDs родительских тем + """ + __tablename__ = "topic" slug = Column(String, unique=True) @@ -24,5 +55,12 @@ class Topic(Base): pic = Column(String, nullable=True, comment="Picture") community = Column(ForeignKey("community.id"), default=1) oid = Column(String, nullable=True, comment="Old ID") - parent_ids = Column(JSON, nullable=True, comment="Parent Topic IDs") + + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска по slug + Index("idx_topic_slug", "slug"), + # Индекс для быстрого поиска по сообществу + Index("idx_topic_community", "community"), + ) diff --git a/resolvers/topic.py b/resolvers/topic.py index d7460c36..9dfc245b 100644 --- a/resolvers/topic.py +++ b/resolvers/topic.py @@ -1,44 +1,222 @@ -from sqlalchemy import select +import time + +from sqlalchemy import func, select, text from cache.cache import ( + cache_topic, get_cached_topic_authors, get_cached_topic_by_slug, get_cached_topic_followers, + redis_operation, ) from cache.memorycache import cache_region from orm.author import Author -from orm.topic import Topic +from orm.shout import Shout, ShoutTopic +from orm.topic import Topic, TopicFollower from resolvers.stat import get_with_stat from services.auth import login_required from services.db import local_session +from services.redis import redis from services.schema import mutation, query from utils.logger import root_logger as logger +# Вспомогательная функция для получения всех тем без статистики +async def get_all_topics(): + """ + Получает все темы без статистики. + Используется для случаев, когда нужен полный список тем без дополнительной информации. + + Returns: + list: Список всех тем без статистики + """ + # Пытаемся получить данные из кеша + cached_topics = await redis_operation("GET", "topics:all:basic") + + if cached_topics: + logger.debug("Используем кешированные базовые данные о темах из Redis") + try: + import json + + return json.loads(cached_topics) + except Exception as e: + logger.error(f"Ошибка при десериализации тем из Redis: {e}") + + # Если в кеше нет данных, выполняем запрос в БД + logger.debug("Получаем список всех тем из БД и кешируем результат") + + with local_session() as session: + # Запрос на получение базовой информации о темах + topics_query = select(Topic) + topics = session.execute(topics_query).scalars().all() + + # Преобразуем темы в словари + result = [topic.dict() for topic in topics] + + # Кешируем результат в Redis без TTL (будет обновляться только при изменениях) + try: + import json + + await redis_operation("SET", "topics:all:basic", json.dumps(result)) + except Exception as e: + logger.error(f"Ошибка при кешировании тем в Redis: {e}") + + return result + + +# Вспомогательная функция для получения тем со статистикой с пагинацией +async def get_topics_with_stats(limit=100, offset=0, community_id=None): + """ + Получает темы со статистикой с пагинацией. + + Args: + limit: Максимальное количество возвращаемых тем + offset: Смещение для пагинации + community_id: Опциональный ID сообщества для фильтрации + + Returns: + list: Список тем с их статистикой + """ + # Формируем ключ кеша с учетом параметров + cache_key = f"topics:stats:limit={limit}:offset={offset}" + if community_id: + cache_key += f":community={community_id}" + + # Пытаемся получить данные из кеша + cached_topics = await redis_operation("GET", cache_key) + + if cached_topics: + logger.debug(f"Используем кешированные данные о темах из Redis: {cache_key}") + try: + import json + + return json.loads(cached_topics) + except Exception as e: + logger.error(f"Ошибка при десериализации тем из Redis: {e}") + + # Если в кеше нет данных, выполняем оптимизированный запрос + logger.debug(f"Выполняем запрос на получение тем со статистикой: limit={limit}, offset={offset}") + + with local_session() as session: + # Базовый запрос для получения тем + base_query = select(Topic) + + # Добавляем фильтр по сообществу, если указан + if community_id: + base_query = base_query.where(Topic.community == community_id) + + # Применяем лимит и смещение + base_query = base_query.limit(limit).offset(offset) + + # Получаем темы + topics = session.execute(base_query).scalars().all() + topic_ids = [topic.id for topic in topics] + + if not topic_ids: + return [] + + # Запрос на получение статистики по публикациям для выбранных тем + shouts_stats_query = f""" + SELECT st.topic, COUNT(DISTINCT s.id) as shouts_count + FROM shout_topic st + JOIN shout s ON st.shout = s.id AND s.deleted_at IS NULL + WHERE st.topic IN ({",".join(map(str, topic_ids))}) + GROUP BY st.topic + """ + shouts_stats = {row[0]: row[1] for row in session.execute(text(shouts_stats_query))} + + # Запрос на получение статистики по подписчикам для выбранных тем + followers_stats_query = f""" + SELECT topic, COUNT(DISTINCT follower) as followers_count + FROM topic_followers + WHERE topic IN ({",".join(map(str, topic_ids))}) + GROUP BY topic + """ + followers_stats = {row[0]: row[1] for row in session.execute(text(followers_stats_query))} + + # Формируем результат с добавлением статистики + result = [] + for topic in topics: + topic_dict = topic.dict() + topic_dict["stat"] = { + "shouts": shouts_stats.get(topic.id, 0), + "followers": followers_stats.get(topic.id, 0), + } + result.append(topic_dict) + + # Кешируем каждую тему отдельно для использования в других функциях + await cache_topic(topic_dict) + + # Кешируем полный результат в Redis без TTL (будет обновляться только при изменениях) + try: + import json + + await redis_operation("SET", cache_key, json.dumps(result)) + except Exception as e: + logger.error(f"Ошибка при кешировании тем в Redis: {e}") + + return result + + +# Функция для инвалидации кеша тем +async def invalidate_topics_cache(): + """ + Инвалидирует все кеши тем при изменении данных. + """ + logger.debug("Инвалидация кеша тем") + + # Получаем все ключи, начинающиеся с "topics:" + topic_keys = await redis.execute("KEYS", "topics:*") + + if topic_keys: + # Удаляем все найденные ключи + await redis.execute("DEL", *topic_keys) + logger.debug(f"Удалено {len(topic_keys)} ключей кеша тем") + + # Запрос на получение всех тем @query.field("get_topics_all") -def get_topics_all(_, _info): - cache_key = "get_topics_all" # Ключ для кеша +async def get_topics_all(_, _info): + """ + Получает список всех тем без статистики. - @cache_region.cache_on_arguments(cache_key) - def _get_topics_all(): - topics_query = select(Topic) - return get_with_stat(topics_query) # Получение тем с учетом статистики + Returns: + list: Список всех тем + """ + return await get_all_topics() - return _get_topics_all() + +# Запрос на получение тем с пагинацией и статистикой +@query.field("get_topics_paginated") +async def get_topics_paginated(_, _info, limit=100, offset=0): + """ + Получает список тем с пагинацией и статистикой. + + Args: + limit: Максимальное количество возвращаемых тем + offset: Смещение для пагинации + + Returns: + list: Список тем с их статистикой + """ + return await get_topics_with_stats(limit, offset) # Запрос на получение тем по сообществу @query.field("get_topics_by_community") -def get_topics_by_community(_, _info, community_id: int): - cache_key = f"get_topics_by_community_{community_id}" # Ключ для кеша +async def get_topics_by_community(_, _info, community_id: int, limit=100, offset=0): + """ + Получает список тем, принадлежащих указанному сообществу с пагинацией и статистикой. - @cache_region.cache_on_arguments(cache_key) - def _get_topics_by_community(): - topics_by_community_query = select(Topic).where(Topic.community == community_id) - return get_with_stat(topics_by_community_query) + Args: + community_id: ID сообщества + limit: Максимальное количество возвращаемых тем + offset: Смещение для пагинации - return _get_topics_by_community() + Returns: + list: Список тем с их статистикой + """ + return await get_topics_with_stats(limit, offset, community_id) # Запрос на получение тем по автору @@ -74,6 +252,9 @@ async def create_topic(_, _info, topic_input): session.add(new_topic) session.commit() + # Инвалидируем кеш всех тем + await invalidate_topics_cache() + return {"topic": new_topic} @@ -91,6 +272,11 @@ async def update_topic(_, _info, topic_input): session.add(topic) session.commit() + # Инвалидируем кеш всех тем и конкретной темы + await invalidate_topics_cache() + await redis.execute("DEL", f"topic:slug:{slug}") + await redis.execute("DEL", f"topic:id:{topic.id}") + return {"topic": topic} @@ -111,6 +297,11 @@ async def delete_topic(_, info, slug: str): session.delete(t) session.commit() + # Инвалидируем кеш всех тем и конкретной темы + await invalidate_topics_cache() + await redis.execute("DEL", f"topic:slug:{slug}") + await redis.execute("DEL", f"topic:id:{t.id}") + return {} return {"error": "access denied"} diff --git a/services/db.py b/services/db.py index d598e7f2..b81873ba 100644 --- a/services/db.py +++ b/services/db.py @@ -10,12 +10,14 @@ from sqlalchemy import ( JSON, Column, Engine, + Index, Integer, create_engine, event, exc, func, inspect, + text ) from sqlalchemy.orm import Session, configure_mappers, declarative_base from sqlalchemy.sql.schema import Table @@ -56,6 +58,82 @@ def create_table_if_not_exists(engine, table): logger.info(f"Table '{table.__tablename__}' ok.") +def sync_indexes(): + """ + Синхронизирует индексы в БД с индексами, определенными в моделях SQLAlchemy. + Создает недостающие индексы, если они определены в моделях, но отсутствуют в БД. + + Использует pg_catalog для PostgreSQL для получения списка существующих индексов. + """ + if not DB_URL.startswith("postgres"): + logger.warning("Функция sync_indexes поддерживается только для PostgreSQL.") + return + + logger.info("Начинаем синхронизацию индексов в базе данных...") + + # Получаем все существующие индексы в БД + with local_session() as session: + existing_indexes_query = text(""" + SELECT + t.relname AS table_name, + i.relname AS index_name + FROM + pg_catalog.pg_class i + JOIN + pg_catalog.pg_index ix ON ix.indexrelid = i.oid + JOIN + pg_catalog.pg_class t ON t.oid = ix.indrelid + JOIN + pg_catalog.pg_namespace n ON n.oid = i.relnamespace + WHERE + i.relkind = 'i' + AND n.nspname = 'public' + AND t.relkind = 'r' + ORDER BY + t.relname, i.relname; + """) + + existing_indexes = {row[1].lower() for row in session.execute(existing_indexes_query)} + logger.debug(f"Найдено {len(existing_indexes)} существующих индексов в БД") + + # Проверяем каждую модель и её индексы + for _model_name, model_class in REGISTRY.items(): + if hasattr(model_class, "__table__") and hasattr(model_class, "__table_args__"): + table_args = model_class.__table_args__ + + # Если table_args - это кортеж, ищем в нём объекты Index + if isinstance(table_args, tuple): + for arg in table_args: + if isinstance(arg, Index): + index_name = arg.name.lower() + + # Проверяем, существует ли индекс в БД + if index_name not in existing_indexes: + logger.info( + f"Создаем отсутствующий индекс {index_name} для таблицы {model_class.__tablename__}" + ) + + # Создаем индекс если он отсутствует + try: + arg.create(engine) + logger.info(f"Индекс {index_name} успешно создан") + except Exception as e: + logger.error(f"Ошибка при создании индекса {index_name}: {e}") + else: + logger.debug(f"Индекс {index_name} уже существует") + + # Анализируем таблицы для оптимизации запросов + for model_name, model_class in REGISTRY.items(): + if hasattr(model_class, "__tablename__"): + try: + session.execute(text(f"ANALYZE {model_class.__tablename__}")) + logger.debug(f"Таблица {model_class.__tablename__} проанализирована") + except Exception as e: + logger.error(f"Ошибка при анализе таблицы {model_class.__tablename__}: {e}") + + logger.info("Синхронизация индексов завершена.") + + # noinspection PyUnusedLocal def local_session(src=""): return Session(bind=engine, expire_on_commit=False)