core/services/db.py

290 lines
11 KiB
Python
Raw Permalink Normal View History

2024-10-14 09:19:30 +00:00
import math
2024-02-29 12:15:04 +00:00
import time
2024-04-08 07:38:58 +00:00
import traceback
import warnings
2023-11-22 16:38:39 +00:00
from typing import Any, Callable, Dict, TypeVar
2024-10-14 09:19:30 +00:00
2025-03-20 08:55:21 +00:00
import orjson
2024-11-01 17:24:09 +00:00
import sqlalchemy
2025-02-11 09:00:35 +00:00
from sqlalchemy import (
JSON,
Column,
Engine,
2025-03-22 06:31:53 +00:00
Index,
2025-02-11 09:00:35 +00:00
Integer,
create_engine,
event,
exc,
func,
inspect,
2025-03-22 08:47:19 +00:00
text,
2025-02-11 09:00:35 +00:00
)
from sqlalchemy.orm import Session, configure_mappers, declarative_base, joinedload
from sqlalchemy.sql.schema import Table
2024-04-26 22:41:47 +00:00
2024-10-14 09:19:30 +00:00
from settings import DB_URL
from utils.logger import root_logger as logger
2024-04-26 22:41:47 +00:00
2024-10-13 23:05:20 +00:00
if DB_URL.startswith("postgres"):
engine = create_engine(
DB_URL,
echo=False,
pool_size=10,
max_overflow=20,
pool_timeout=30, # Время ожидания свободного соединения
pool_recycle=1800, # Время жизни соединения
2024-11-01 21:26:57 +00:00
pool_pre_ping=True, # Добавить проверку соединений
connect_args={
"sslmode": "disable",
2024-11-02 09:09:24 +00:00
"connect_timeout": 40, # Добавить таймаут подключения
},
2024-10-13 23:05:20 +00:00
)
else:
2024-10-14 09:19:30 +00:00
engine = create_engine(DB_URL, echo=False, connect_args={"check_same_thread": False})
2024-10-14 06:12:20 +00:00
2024-02-25 13:43:04 +00:00
inspector = inspect(engine)
2024-02-25 15:08:02 +00:00
configure_mappers()
2024-04-17 15:32:23 +00:00
T = TypeVar("T")
2024-02-19 10:16:44 +00:00
REGISTRY: Dict[str, type] = {}
2024-04-17 15:32:23 +00:00
FILTERED_FIELDS = ["_sa_instance_state", "search_vector"]
2023-12-24 14:25:57 +00:00
2024-02-21 15:07:02 +00:00
2024-10-13 23:05:20 +00:00
def create_table_if_not_exists(engine, table):
inspector = inspect(engine)
2024-10-14 06:12:20 +00:00
if table and not inspector.has_table(table.__tablename__):
2024-10-13 23:05:20 +00:00
table.__table__.create(engine)
logger.info(f"Table '{table.__tablename__}' created.")
else:
2024-10-15 08:12:09 +00:00
logger.info(f"Table '{table.__tablename__}' ok.")
2024-10-13 23:05:20 +00:00
2025-03-22 06:31:53 +00:00
def sync_indexes():
"""
Синхронизирует индексы в БД с индексами, определенными в моделях SQLAlchemy.
Создает недостающие индексы, если они определены в моделях, но отсутствуют в БД.
Использует pg_catalog для PostgreSQL для получения списка существующих индексов.
"""
if not DB_URL.startswith("postgres"):
logger.warning("Функция sync_indexes поддерживается только для PostgreSQL.")
return
logger.info("Начинаем синхронизацию индексов в базе данных...")
# Получаем все существующие индексы в БД
with local_session() as session:
existing_indexes_query = text("""
SELECT
t.relname AS table_name,
i.relname AS index_name
FROM
pg_catalog.pg_class i
JOIN
pg_catalog.pg_index ix ON ix.indexrelid = i.oid
JOIN
pg_catalog.pg_class t ON t.oid = ix.indrelid
JOIN
pg_catalog.pg_namespace n ON n.oid = i.relnamespace
WHERE
i.relkind = 'i'
AND n.nspname = 'public'
AND t.relkind = 'r'
ORDER BY
t.relname, i.relname;
""")
existing_indexes = {row[1].lower() for row in session.execute(existing_indexes_query)}
logger.debug(f"Найдено {len(existing_indexes)} существующих индексов в БД")
# Проверяем каждую модель и её индексы
for _model_name, model_class in REGISTRY.items():
if hasattr(model_class, "__table__") and hasattr(model_class, "__table_args__"):
table_args = model_class.__table_args__
# Если table_args - это кортеж, ищем в нём объекты Index
if isinstance(table_args, tuple):
for arg in table_args:
if isinstance(arg, Index):
index_name = arg.name.lower()
# Проверяем, существует ли индекс в БД
if index_name not in existing_indexes:
logger.info(
f"Создаем отсутствующий индекс {index_name} для таблицы {model_class.__tablename__}"
)
# Создаем индекс если он отсутствует
try:
arg.create(engine)
logger.info(f"Индекс {index_name} успешно создан")
except Exception as e:
logger.error(f"Ошибка при создании индекса {index_name}: {e}")
else:
logger.debug(f"Индекс {index_name} уже существует")
# Анализируем таблицы для оптимизации запросов
for model_name, model_class in REGISTRY.items():
if hasattr(model_class, "__tablename__"):
try:
session.execute(text(f"ANALYZE {model_class.__tablename__}"))
logger.debug(f"Таблица {model_class.__tablename__} проанализирована")
except Exception as e:
logger.error(f"Ошибка при анализе таблицы {model_class.__tablename__}: {e}")
logger.info("Синхронизация индексов завершена.")
2024-02-24 10:22:35 +00:00
# noinspection PyUnusedLocal
2024-04-17 15:32:23 +00:00
def local_session(src=""):
2023-11-22 16:38:39 +00:00
return Session(bind=engine, expire_on_commit=False)
2024-02-21 07:27:16 +00:00
class Base(declarative_base()):
2022-09-03 10:50:14 +00:00
__table__: Table
__tablename__: str
__new__: Callable
__init__: Callable
2023-01-31 07:36:54 +00:00
__allow_unmapped__ = True
2023-01-31 07:44:06 +00:00
__abstract__ = True
2024-04-17 15:32:23 +00:00
__table_args__ = {"extend_existing": True}
2023-01-31 07:44:06 +00:00
id = Column(Integer, primary_key=True)
2022-09-03 10:50:14 +00:00
def __init_subclass__(cls, **kwargs):
REGISTRY[cls.__name__] = cls
def dict(self) -> Dict[str, Any]:
2024-05-30 04:12:00 +00:00
column_names = filter(lambda x: x not in FILTERED_FIELDS, self.__table__.columns.keys())
2024-08-07 07:30:51 +00:00
data = {}
2023-11-03 10:10:22 +00:00
try:
2024-08-07 07:30:51 +00:00
for column_name in column_names:
value = getattr(self, column_name)
# Check if the value is JSON and decode it if necessary
if isinstance(value, (str, bytes)) and isinstance(self.__table__.columns[column_name].type, JSON):
try:
2025-03-20 08:55:21 +00:00
data[column_name] = orjson.loads(value)
except (TypeError, orjson.JSONDecodeError) as e:
2024-08-07 07:30:51 +00:00
logger.error(f"Error decoding JSON for column '{column_name}': {e}")
data[column_name] = value
2024-02-26 15:00:55 +00:00
else:
2024-08-07 07:30:51 +00:00
data[column_name] = value
# Add synthetic field .stat if it exists
2024-04-17 15:32:23 +00:00
if hasattr(self, "stat"):
data["stat"] = self.stat
2023-11-03 10:10:22 +00:00
except Exception as e:
2024-04-17 15:32:23 +00:00
logger.error(f"Error occurred while converting object to dictionary: {e}")
2024-08-07 07:30:51 +00:00
return data
2023-11-22 16:38:39 +00:00
def update(self, values: Dict[str, Any]) -> None:
for key, value in values.items():
if hasattr(self, key):
setattr(self, key, value)
2024-02-19 10:16:44 +00:00
2024-02-21 07:27:16 +00:00
2024-04-26 22:41:47 +00:00
# make_searchable(Base.metadata)
2024-10-13 23:05:20 +00:00
# Base.metadata.create_all(bind=engine)
2024-02-25 17:58:48 +00:00
# Функция для вывода полного трейсбека при предупреждениях
2024-05-30 04:12:00 +00:00
def warning_with_traceback(message: Warning | str, category, filename: str, lineno: int, file=None, line=None):
2024-02-25 17:58:48 +00:00
tb = traceback.format_stack()
2024-04-17 15:32:23 +00:00
tb_str = "".join(tb)
return f"{message} ({filename}, {lineno}): {category.__name__}\n{tb_str}"
2024-02-25 17:58:48 +00:00
# Установка функции вывода трейсбека для предупреждений SQLAlchemy
2024-02-29 12:15:04 +00:00
warnings.showwarning = warning_with_traceback
2024-04-17 15:32:23 +00:00
warnings.simplefilter("always", exc.SAWarning)
2024-02-25 17:58:48 +00:00
2024-08-12 08:00:01 +00:00
# Функция для извлечения SQL-запроса из контекста
def get_statement_from_context(context):
2024-10-14 09:19:30 +00:00
query = ""
2024-10-13 23:05:20 +00:00
compiled = context.compiled
2024-10-14 06:12:20 +00:00
if compiled:
compiled_statement = compiled.string
compiled_parameters = compiled.params
if compiled_statement:
if compiled_parameters:
try:
# Безопасное форматирование параметров
query = compiled_statement % compiled_parameters
except Exception as e:
logger.error(f"Error formatting query: {e}")
else:
query = compiled_statement
if query:
query = query.replace("\n", " ").replace(" ", " ").replace(" ", " ").strip()
return query
2024-08-12 08:00:01 +00:00
# Обработчик события перед выполнением запроса
2024-04-17 15:32:23 +00:00
@event.listens_for(Engine, "before_cursor_execute")
2024-02-29 12:15:04 +00:00
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.query_start_time = time.time()
2024-08-12 08:00:01 +00:00
conn.cursor_id = id(cursor) # Отслеживание конкретного курсора
2024-02-25 17:58:48 +00:00
2024-03-28 16:08:55 +00:00
2024-08-12 08:00:01 +00:00
# Обработчик события после выполнения запроса
2024-04-17 15:32:23 +00:00
@event.listens_for(Engine, "after_cursor_execute")
2024-02-29 12:15:04 +00:00
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
2024-10-14 06:12:20 +00:00
if hasattr(conn, "cursor_id") and conn.cursor_id == id(cursor):
query = get_statement_from_context(context)
if query:
elapsed = time.time() - conn.query_start_time
if elapsed > 1:
query_end = query[-16:]
query = query.split(query_end)[0] + query_end
logger.debug(query)
elapsed_n = math.floor(elapsed)
2024-10-14 09:19:30 +00:00
logger.debug("*" * (elapsed_n))
2024-10-14 06:12:20 +00:00
logger.debug(f"{elapsed:.3f} s")
del conn.cursor_id # Удаление идентификатора курсора после выполнения
2024-11-01 17:11:58 +00:00
def get_json_builder():
"""
Возвращает подходящие функции для построения JSON объектов в зависимости от драйвера БД
"""
dialect = engine.dialect.name
2024-11-01 17:24:09 +00:00
json_cast = lambda x: x # noqa: E731
if dialect.startswith("postgres"):
json_cast = lambda x: func.cast(x, sqlalchemy.Text) # noqa: E731
return func.json_build_object, func.json_agg, json_cast
elif dialect.startswith("sqlite") or dialect.startswith("mysql"):
return func.json_object, func.json_group_array, json_cast
2024-11-01 17:11:58 +00:00
else:
raise NotImplementedError(f"JSON builder not implemented for dialect {dialect}")
2024-11-01 17:24:09 +00:00
2024-11-01 17:11:58 +00:00
# Используем их в коде
2024-11-01 17:24:09 +00:00
json_builder, json_array_builder, json_cast = get_json_builder()
# Fetch all shouts, with authors preloaded
# This function is used for search indexing
async def fetch_all_shouts(session=None):
"""Fetch all published shouts for search indexing with authors preloaded"""
from orm.shout import Shout
close_session = False
if session is None:
session = local_session()
close_session = True
try:
# Fetch only published and non-deleted shouts with authors preloaded
query = session.query(Shout).options(
joinedload(Shout.authors)
).filter(
Shout.published_at.is_not(None),
Shout.deleted_at.is_(None)
)
shouts = query.all()
return shouts
except Exception as e:
logger.error(f"Error fetching shouts for search indexing: {e}")
return []
finally:
if close_session:
session.close()