topics caching upgrade

This commit is contained in:
Untone 2025-03-22 09:31:53 +03:00
parent 31c32143d0
commit 86ddb50cb8
4 changed files with 377 additions and 19 deletions

View File

@ -1,6 +1,6 @@
import time
from sqlalchemy import JSON, Boolean, Column, ForeignKey, Integer, String
from sqlalchemy import JSON, Boolean, Column, ForeignKey, Index, Integer, String
from sqlalchemy.orm import relationship
from orm.author import Author
@ -10,6 +10,15 @@ from services.db import Base
class ShoutTopic(Base):
"""
Связь между публикацией и темой.
Attributes:
shout (int): ID публикации
topic (int): ID темы
main (bool): Признак основной темы
"""
__tablename__ = "shout_topic"
id = None # type: ignore
@ -17,6 +26,12 @@ class ShoutTopic(Base):
topic = Column(ForeignKey("topic.id"), primary_key=True, index=True)
main = Column(Boolean, nullable=True)
# Определяем дополнительные индексы
__table_args__ = (
# Оптимизированный составной индекс для запросов, которые ищут публикации по теме
Index("idx_shout_topic_topic_shout", "topic", "shout"),
)
class ShoutReactionsFollower(Base):
__tablename__ = "shout_reactions_followers"
@ -30,6 +45,15 @@ class ShoutReactionsFollower(Base):
class ShoutAuthor(Base):
"""
Связь между публикацией и автором.
Attributes:
shout (int): ID публикации
author (int): ID автора
caption (str): Подпись автора
"""
__tablename__ = "shout_author"
id = None # type: ignore
@ -37,8 +61,18 @@ class ShoutAuthor(Base):
author = Column(ForeignKey("author.id"), primary_key=True, index=True)
caption = Column(String, nullable=True, default="")
# Определяем дополнительные индексы
__table_args__ = (
# Оптимизированный индекс для запросов, которые ищут публикации по автору
Index("idx_shout_author_author_shout", "author", "shout"),
)
class Shout(Base):
"""
Публикация в системе.
"""
__tablename__ = "shout"
created_at: int = Column(Integer, nullable=False, default=lambda: int(time.time()))
@ -74,3 +108,20 @@ class Shout(Base):
seo: str | None = Column(String, nullable=True) # JSON
draft: int | None = Column(ForeignKey("draft.id"), nullable=True)
# Определяем индексы
__table_args__ = (
# Индекс для быстрого поиска неудаленных публикаций
Index("idx_shout_deleted_at", "deleted_at", postgresql_where=deleted_at.is_(None)),
# Индекс для быстрой фильтрации по community
Index("idx_shout_community", "community"),
# Индекс для быстрого поиска по slug
Index("idx_shout_slug", "slug"),
# Составной индекс для фильтрации опубликованных неудаленных публикаций
Index(
"idx_shout_published_deleted",
"published_at",
"deleted_at",
postgresql_where=published_at.is_not(None) & deleted_at.is_(None),
),
)

View File

@ -1,11 +1,21 @@
import time
from sqlalchemy import JSON, Boolean, Column, ForeignKey, Integer, String
from sqlalchemy import JSON, Boolean, Column, ForeignKey, Index, Integer, String
from services.db import Base
class TopicFollower(Base):
"""
Связь между топиком и его подписчиком.
Attributes:
follower (int): ID подписчика
topic (int): ID топика
created_at (int): Время создания связи
auto (bool): Автоматическая подписка
"""
__tablename__ = "topic_followers"
id = None # type: ignore
@ -14,8 +24,29 @@ class TopicFollower(Base):
created_at = Column(Integer, nullable=False, default=int(time.time()))
auto = Column(Boolean, nullable=False, default=False)
# Определяем индексы
__table_args__ = (
# Индекс для быстрого поиска всех подписчиков топика
Index("idx_topic_followers_topic", "topic"),
# Индекс для быстрого поиска всех топиков, на которые подписан автор
Index("idx_topic_followers_follower", "follower"),
)
class Topic(Base):
"""
Модель топика (темы) публикаций.
Attributes:
slug (str): Уникальный строковый идентификатор темы
title (str): Название темы
body (str): Описание темы
pic (str): URL изображения темы
community (int): ID сообщества
oid (str): Старый ID
parent_ids (list): IDs родительских тем
"""
__tablename__ = "topic"
slug = Column(String, unique=True)
@ -24,5 +55,12 @@ class Topic(Base):
pic = Column(String, nullable=True, comment="Picture")
community = Column(ForeignKey("community.id"), default=1)
oid = Column(String, nullable=True, comment="Old ID")
parent_ids = Column(JSON, nullable=True, comment="Parent Topic IDs")
# Определяем индексы
__table_args__ = (
# Индекс для быстрого поиска по slug
Index("idx_topic_slug", "slug"),
# Индекс для быстрого поиска по сообществу
Index("idx_topic_community", "community"),
)

View File

@ -1,44 +1,222 @@
from sqlalchemy import select
import time
from sqlalchemy import func, select, text
from cache.cache import (
cache_topic,
get_cached_topic_authors,
get_cached_topic_by_slug,
get_cached_topic_followers,
redis_operation,
)
from cache.memorycache import cache_region
from orm.author import Author
from orm.topic import Topic
from orm.shout import Shout, ShoutTopic
from orm.topic import Topic, TopicFollower
from resolvers.stat import get_with_stat
from services.auth import login_required
from services.db import local_session
from services.redis import redis
from services.schema import mutation, query
from utils.logger import root_logger as logger
# Вспомогательная функция для получения всех тем без статистики
async def get_all_topics():
"""
Получает все темы без статистики.
Используется для случаев, когда нужен полный список тем без дополнительной информации.
Returns:
list: Список всех тем без статистики
"""
# Пытаемся получить данные из кеша
cached_topics = await redis_operation("GET", "topics:all:basic")
if cached_topics:
logger.debug("Используем кешированные базовые данные о темах из Redis")
try:
import json
return json.loads(cached_topics)
except Exception as e:
logger.error(f"Ошибка при десериализации тем из Redis: {e}")
# Если в кеше нет данных, выполняем запрос в БД
logger.debug("Получаем список всех тем из БД и кешируем результат")
with local_session() as session:
# Запрос на получение базовой информации о темах
topics_query = select(Topic)
topics = session.execute(topics_query).scalars().all()
# Преобразуем темы в словари
result = [topic.dict() for topic in topics]
# Кешируем результат в Redis без TTL (будет обновляться только при изменениях)
try:
import json
await redis_operation("SET", "topics:all:basic", json.dumps(result))
except Exception as e:
logger.error(f"Ошибка при кешировании тем в Redis: {e}")
return result
# Вспомогательная функция для получения тем со статистикой с пагинацией
async def get_topics_with_stats(limit=100, offset=0, community_id=None):
"""
Получает темы со статистикой с пагинацией.
Args:
limit: Максимальное количество возвращаемых тем
offset: Смещение для пагинации
community_id: Опциональный ID сообщества для фильтрации
Returns:
list: Список тем с их статистикой
"""
# Формируем ключ кеша с учетом параметров
cache_key = f"topics:stats:limit={limit}:offset={offset}"
if community_id:
cache_key += f":community={community_id}"
# Пытаемся получить данные из кеша
cached_topics = await redis_operation("GET", cache_key)
if cached_topics:
logger.debug(f"Используем кешированные данные о темах из Redis: {cache_key}")
try:
import json
return json.loads(cached_topics)
except Exception as e:
logger.error(f"Ошибка при десериализации тем из Redis: {e}")
# Если в кеше нет данных, выполняем оптимизированный запрос
logger.debug(f"Выполняем запрос на получение тем со статистикой: limit={limit}, offset={offset}")
with local_session() as session:
# Базовый запрос для получения тем
base_query = select(Topic)
# Добавляем фильтр по сообществу, если указан
if community_id:
base_query = base_query.where(Topic.community == community_id)
# Применяем лимит и смещение
base_query = base_query.limit(limit).offset(offset)
# Получаем темы
topics = session.execute(base_query).scalars().all()
topic_ids = [topic.id for topic in topics]
if not topic_ids:
return []
# Запрос на получение статистики по публикациям для выбранных тем
shouts_stats_query = f"""
SELECT st.topic, COUNT(DISTINCT s.id) as shouts_count
FROM shout_topic st
JOIN shout s ON st.shout = s.id AND s.deleted_at IS NULL
WHERE st.topic IN ({",".join(map(str, topic_ids))})
GROUP BY st.topic
"""
shouts_stats = {row[0]: row[1] for row in session.execute(text(shouts_stats_query))}
# Запрос на получение статистики по подписчикам для выбранных тем
followers_stats_query = f"""
SELECT topic, COUNT(DISTINCT follower) as followers_count
FROM topic_followers
WHERE topic IN ({",".join(map(str, topic_ids))})
GROUP BY topic
"""
followers_stats = {row[0]: row[1] for row in session.execute(text(followers_stats_query))}
# Формируем результат с добавлением статистики
result = []
for topic in topics:
topic_dict = topic.dict()
topic_dict["stat"] = {
"shouts": shouts_stats.get(topic.id, 0),
"followers": followers_stats.get(topic.id, 0),
}
result.append(topic_dict)
# Кешируем каждую тему отдельно для использования в других функциях
await cache_topic(topic_dict)
# Кешируем полный результат в Redis без TTL (будет обновляться только при изменениях)
try:
import json
await redis_operation("SET", cache_key, json.dumps(result))
except Exception as e:
logger.error(f"Ошибка при кешировании тем в Redis: {e}")
return result
# Функция для инвалидации кеша тем
async def invalidate_topics_cache():
"""
Инвалидирует все кеши тем при изменении данных.
"""
logger.debug("Инвалидация кеша тем")
# Получаем все ключи, начинающиеся с "topics:"
topic_keys = await redis.execute("KEYS", "topics:*")
if topic_keys:
# Удаляем все найденные ключи
await redis.execute("DEL", *topic_keys)
logger.debug(f"Удалено {len(topic_keys)} ключей кеша тем")
# Запрос на получение всех тем
@query.field("get_topics_all")
def get_topics_all(_, _info):
cache_key = "get_topics_all" # Ключ для кеша
async def get_topics_all(_, _info):
"""
Получает список всех тем без статистики.
@cache_region.cache_on_arguments(cache_key)
def _get_topics_all():
topics_query = select(Topic)
return get_with_stat(topics_query) # Получение тем с учетом статистики
Returns:
list: Список всех тем
"""
return await get_all_topics()
return _get_topics_all()
# Запрос на получение тем с пагинацией и статистикой
@query.field("get_topics_paginated")
async def get_topics_paginated(_, _info, limit=100, offset=0):
"""
Получает список тем с пагинацией и статистикой.
Args:
limit: Максимальное количество возвращаемых тем
offset: Смещение для пагинации
Returns:
list: Список тем с их статистикой
"""
return await get_topics_with_stats(limit, offset)
# Запрос на получение тем по сообществу
@query.field("get_topics_by_community")
def get_topics_by_community(_, _info, community_id: int):
cache_key = f"get_topics_by_community_{community_id}" # Ключ для кеша
async def get_topics_by_community(_, _info, community_id: int, limit=100, offset=0):
"""
Получает список тем, принадлежащих указанному сообществу с пагинацией и статистикой.
@cache_region.cache_on_arguments(cache_key)
def _get_topics_by_community():
topics_by_community_query = select(Topic).where(Topic.community == community_id)
return get_with_stat(topics_by_community_query)
Args:
community_id: ID сообщества
limit: Максимальное количество возвращаемых тем
offset: Смещение для пагинации
return _get_topics_by_community()
Returns:
list: Список тем с их статистикой
"""
return await get_topics_with_stats(limit, offset, community_id)
# Запрос на получение тем по автору
@ -74,6 +252,9 @@ async def create_topic(_, _info, topic_input):
session.add(new_topic)
session.commit()
# Инвалидируем кеш всех тем
await invalidate_topics_cache()
return {"topic": new_topic}
@ -91,6 +272,11 @@ async def update_topic(_, _info, topic_input):
session.add(topic)
session.commit()
# Инвалидируем кеш всех тем и конкретной темы
await invalidate_topics_cache()
await redis.execute("DEL", f"topic:slug:{slug}")
await redis.execute("DEL", f"topic:id:{topic.id}")
return {"topic": topic}
@ -111,6 +297,11 @@ async def delete_topic(_, info, slug: str):
session.delete(t)
session.commit()
# Инвалидируем кеш всех тем и конкретной темы
await invalidate_topics_cache()
await redis.execute("DEL", f"topic:slug:{slug}")
await redis.execute("DEL", f"topic:id:{t.id}")
return {}
return {"error": "access denied"}

View File

@ -10,12 +10,14 @@ from sqlalchemy import (
JSON,
Column,
Engine,
Index,
Integer,
create_engine,
event,
exc,
func,
inspect,
text
)
from sqlalchemy.orm import Session, configure_mappers, declarative_base
from sqlalchemy.sql.schema import Table
@ -56,6 +58,82 @@ def create_table_if_not_exists(engine, table):
logger.info(f"Table '{table.__tablename__}' ok.")
def sync_indexes():
"""
Синхронизирует индексы в БД с индексами, определенными в моделях SQLAlchemy.
Создает недостающие индексы, если они определены в моделях, но отсутствуют в БД.
Использует pg_catalog для PostgreSQL для получения списка существующих индексов.
"""
if not DB_URL.startswith("postgres"):
logger.warning("Функция sync_indexes поддерживается только для PostgreSQL.")
return
logger.info("Начинаем синхронизацию индексов в базе данных...")
# Получаем все существующие индексы в БД
with local_session() as session:
existing_indexes_query = text("""
SELECT
t.relname AS table_name,
i.relname AS index_name
FROM
pg_catalog.pg_class i
JOIN
pg_catalog.pg_index ix ON ix.indexrelid = i.oid
JOIN
pg_catalog.pg_class t ON t.oid = ix.indrelid
JOIN
pg_catalog.pg_namespace n ON n.oid = i.relnamespace
WHERE
i.relkind = 'i'
AND n.nspname = 'public'
AND t.relkind = 'r'
ORDER BY
t.relname, i.relname;
""")
existing_indexes = {row[1].lower() for row in session.execute(existing_indexes_query)}
logger.debug(f"Найдено {len(existing_indexes)} существующих индексов в БД")
# Проверяем каждую модель и её индексы
for _model_name, model_class in REGISTRY.items():
if hasattr(model_class, "__table__") and hasattr(model_class, "__table_args__"):
table_args = model_class.__table_args__
# Если table_args - это кортеж, ищем в нём объекты Index
if isinstance(table_args, tuple):
for arg in table_args:
if isinstance(arg, Index):
index_name = arg.name.lower()
# Проверяем, существует ли индекс в БД
if index_name not in existing_indexes:
logger.info(
f"Создаем отсутствующий индекс {index_name} для таблицы {model_class.__tablename__}"
)
# Создаем индекс если он отсутствует
try:
arg.create(engine)
logger.info(f"Индекс {index_name} успешно создан")
except Exception as e:
logger.error(f"Ошибка при создании индекса {index_name}: {e}")
else:
logger.debug(f"Индекс {index_name} уже существует")
# Анализируем таблицы для оптимизации запросов
for model_name, model_class in REGISTRY.items():
if hasattr(model_class, "__tablename__"):
try:
session.execute(text(f"ANALYZE {model_class.__tablename__}"))
logger.debug(f"Таблица {model_class.__tablename__} проанализирована")
except Exception as e:
logger.error(f"Ошибка при анализе таблицы {model_class.__tablename__}: {e}")
logger.info("Синхронизация индексов завершена.")
# noinspection PyUnusedLocal
def local_session(src=""):
return Session(bind=engine, expire_on_commit=False)