From 247fc98760c2cebba20fe7b5047582946ff5c935 Mon Sep 17 00:00:00 2001 From: Untone Date: Thu, 20 Mar 2025 11:55:21 +0300 Subject: [PATCH] cachedep-fix+orjson+fmt --- CHANGELOG.md | 4 +- cache/cache.py | 54 ++++++------- cache/memorycache.py | 176 +++++++++++++++++++++++++++++++++++++++--- cache/precache.py | 14 ++-- docs/features.md | 10 +++ requirements.txt | 3 +- resolvers/editor.py | 4 +- resolvers/notifier.py | 10 +-- resolvers/reader.py | 15 ++-- services/db.py | 6 +- services/notify.py | 8 +- services/search.py | 6 +- services/viewed.py | 5 +- utils/encoders.py | 5 +- 14 files changed, 245 insertions(+), 75 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2db60b2c..57c736da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ -#### [0.4.12] - 2025-02-12 +#### [0.4.12] - 2025-03-19 - `delete_reaction` detects comments and uses `deleted_at` update +- `check_to_unfeature` etc. update +- dogpile dep in `services/memorycache.py` optimized #### [0.4.11] - 2025-02-12 - `create_draft` resolver requires draft_id fixed diff --git a/cache/cache.py b/cache/cache.py index afd16990..75f7ec68 100644 --- a/cache/cache.py +++ b/cache/cache.py @@ -1,7 +1,7 @@ import asyncio -import json from typing import List +import orjson from sqlalchemy import and_, join, select from orm.author import Author, AuthorFollower @@ -35,7 +35,7 @@ CACHE_KEYS = { # Cache topic data async def cache_topic(topic: dict): - payload = json.dumps(topic, cls=CustomJSONEncoder) + payload = orjson.dumps(topic, cls=CustomJSONEncoder) await asyncio.gather( redis_operation("SET", f"topic:id:{topic['id']}", payload), redis_operation("SET", f"topic:slug:{topic['slug']}", payload), @@ -44,7 +44,7 @@ async def cache_topic(topic: dict): # Cache author data async def cache_author(author: dict): - payload = json.dumps(author, cls=CustomJSONEncoder) + payload = orjson.dumps(author, cls=CustomJSONEncoder) await asyncio.gather( redis_operation("SET", f"author:user:{author['user'].strip()}", str(author["id"])), redis_operation("SET", f"author:id:{author['id']}", payload), @@ -55,13 +55,13 @@ async def cache_author(author: dict): async def cache_follows(follower_id: int, entity_type: str, entity_id: int, is_insert=True): key = f"author:follows-{entity_type}s:{follower_id}" follows_str = await redis_operation("GET", key) - follows = json.loads(follows_str) if follows_str else DEFAULT_FOLLOWS[entity_type] + follows = orjson.loads(follows_str) if follows_str else DEFAULT_FOLLOWS[entity_type] if is_insert: if entity_id not in follows: follows.append(entity_id) else: follows = [eid for eid in follows if eid != entity_id] - await redis_operation("SET", key, json.dumps(follows, cls=CustomJSONEncoder)) + await redis_operation("SET", key, orjson.dumps(follows, cls=CustomJSONEncoder)) await update_follower_stat(follower_id, entity_type, len(follows)) @@ -69,7 +69,7 @@ async def cache_follows(follower_id: int, entity_type: str, entity_id: int, is_i async def update_follower_stat(follower_id, entity_type, count): follower_key = f"author:id:{follower_id}" follower_str = await redis_operation("GET", follower_key) - follower = json.loads(follower_str) if follower_str else None + follower = orjson.loads(follower_str) if follower_str else None if follower: follower["stat"] = {f"{entity_type}s": count} await cache_author(follower) @@ -80,7 +80,7 @@ async def get_cached_author(author_id: int, get_with_stat): author_key = f"author:id:{author_id}" result = await redis_operation("GET", author_key) if result: - return json.loads(result) + return orjson.loads(result) # Load from database if not found in cache q = select(Author).where(Author.id == author_id) authors = get_with_stat(q) @@ -105,14 +105,14 @@ async def get_cached_topic(topic_id: int): topic_key = f"topic:id:{topic_id}" cached_topic = await redis_operation("GET", topic_key) if cached_topic: - return json.loads(cached_topic) + return orjson.loads(cached_topic) # If not in cache, fetch from the database with local_session() as session: topic = session.execute(select(Topic).where(Topic.id == topic_id)).scalar_one_or_none() if topic: topic_dict = topic.dict() - await redis_operation("SET", topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder)) + await redis_operation("SET", topic_key, orjson.dumps(topic_dict, cls=CustomJSONEncoder)) return topic_dict return None @@ -123,7 +123,7 @@ async def get_cached_topic_by_slug(slug: str, get_with_stat): topic_key = f"topic:slug:{slug}" result = await redis_operation("GET", topic_key) if result: - return json.loads(result) + return orjson.loads(result) # Load from database if not found in cache topic_query = select(Topic).where(Topic.slug == slug) topics = get_with_stat(topic_query) @@ -139,7 +139,7 @@ async def get_cached_authors_by_ids(author_ids: List[int]) -> List[dict]: # Fetch all author data concurrently keys = [f"author:id:{author_id}" for author_id in author_ids] results = await asyncio.gather(*(redis_operation("GET", key) for key in keys)) - authors = [json.loads(result) if result else None for result in results] + authors = [orjson.loads(result) if result else None for result in results] # Load missing authors from database and cache missing_indices = [index for index, author in enumerate(authors) if author is None] if missing_indices: @@ -168,7 +168,7 @@ async def get_cached_topic_followers(topic_id: int): cached = await redis_operation("GET", cache_key) if cached: - followers_ids = json.loads(cached) + followers_ids = orjson.loads(cached) logger.debug(f"Found {len(followers_ids)} cached followers for topic #{topic_id}") return await get_cached_authors_by_ids(followers_ids) @@ -181,7 +181,7 @@ async def get_cached_topic_followers(topic_id: int): .all() ] - await redis_operation("SETEX", cache_key, value=json.dumps(followers_ids), ttl=CACHE_TTL) + await redis_operation("SETEX", cache_key, value=orjson.dumps(followers_ids), ttl=CACHE_TTL) followers = await get_cached_authors_by_ids(followers_ids) logger.debug(f"Cached {len(followers)} followers for topic #{topic_id}") return followers @@ -196,7 +196,7 @@ async def get_cached_author_followers(author_id: int): # Check cache for data cached = await redis_operation("GET", f"author:followers:{author_id}") if cached: - followers_ids = json.loads(cached) + followers_ids = orjson.loads(cached) followers = await get_cached_authors_by_ids(followers_ids) logger.debug(f"Cached followers for author #{author_id}: {len(followers)}") return followers @@ -210,7 +210,7 @@ async def get_cached_author_followers(author_id: int): .filter(AuthorFollower.author == author_id, Author.id != author_id) .all() ] - await redis_operation("SET", f"author:followers:{author_id}", json.dumps(followers_ids)) + await redis_operation("SET", f"author:followers:{author_id}", orjson.dumps(followers_ids)) followers = await get_cached_authors_by_ids(followers_ids) return followers @@ -220,7 +220,7 @@ async def get_cached_follower_authors(author_id: int): # Attempt to retrieve authors from cache cached = await redis_operation("GET", f"author:follows-authors:{author_id}") if cached: - authors_ids = json.loads(cached) + authors_ids = orjson.loads(cached) else: # Query authors from database with local_session() as session: @@ -232,7 +232,7 @@ async def get_cached_follower_authors(author_id: int): .where(AuthorFollower.follower == author_id) ).all() ] - await redis_operation("SET", f"author:follows-authors:{author_id}", json.dumps(authors_ids)) + await redis_operation("SET", f"author:follows-authors:{author_id}", orjson.dumps(authors_ids)) authors = await get_cached_authors_by_ids(authors_ids) return authors @@ -243,7 +243,7 @@ async def get_cached_follower_topics(author_id: int): # Attempt to retrieve topics from cache cached = await redis_operation("GET", f"author:follows-topics:{author_id}") if cached: - topics_ids = json.loads(cached) + topics_ids = orjson.loads(cached) else: # Load topics from database and cache them with local_session() as session: @@ -254,13 +254,13 @@ async def get_cached_follower_topics(author_id: int): .where(TopicFollower.follower == author_id) .all() ] - await redis_operation("SET", f"author:follows-topics:{author_id}", json.dumps(topics_ids)) + await redis_operation("SET", f"author:follows-topics:{author_id}", orjson.dumps(topics_ids)) topics = [] for topic_id in topics_ids: topic_str = await redis_operation("GET", f"topic:id:{topic_id}") if topic_str: - topic = json.loads(topic_str) + topic = orjson.loads(topic_str) if topic and topic not in topics: topics.append(topic) @@ -285,7 +285,7 @@ async def get_cached_author_by_user_id(user_id: str, get_with_stat): # If ID is found, get full author data by ID author_data = await redis_operation("GET", f"author:id:{author_id}") if author_data: - return json.loads(author_data) + return orjson.loads(author_data) # If data is not found in cache, query the database author_query = select(Author).where(Author.user == user_id) @@ -296,7 +296,7 @@ async def get_cached_author_by_user_id(user_id: str, get_with_stat): author_dict = author.dict() await asyncio.gather( redis_operation("SET", f"author:user:{user_id.strip()}", str(author.id)), - redis_operation("SET", f"author:id:{author.id}", json.dumps(author_dict)), + redis_operation("SET", f"author:id:{author.id}", orjson.dumps(author_dict)), ) return author_dict @@ -319,7 +319,7 @@ async def get_cached_topic_authors(topic_id: int): rkey = f"topic:authors:{topic_id}" cached_authors_ids = await redis_operation("GET", rkey) if cached_authors_ids: - authors_ids = json.loads(cached_authors_ids) + authors_ids = orjson.loads(cached_authors_ids) else: # If cache is empty, get data from the database with local_session() as session: @@ -331,7 +331,7 @@ async def get_cached_topic_authors(topic_id: int): ) authors_ids = [author_id for (author_id,) in session.execute(query).all()] # Cache the retrieved author IDs - await redis_operation("SET", rkey, json.dumps(authors_ids)) + await redis_operation("SET", rkey, orjson.dumps(authors_ids)) # Retrieve full author details from cached IDs if authors_ids: @@ -378,7 +378,7 @@ async def invalidate_shouts_cache(cache_keys: List[str]): async def cache_topic_shouts(topic_id: int, shouts: List[dict]): """Кэширует список публикаций для темы""" key = f"topic_shouts_{topic_id}" - payload = json.dumps(shouts, cls=CustomJSONEncoder) + payload = orjson.dumps(shouts, cls=CustomJSONEncoder) await redis_operation("SETEX", key, value=payload, ttl=CACHE_TTL) @@ -387,7 +387,7 @@ async def get_cached_topic_shouts(topic_id: int) -> List[dict]: key = f"topic_shouts_{topic_id}" cached = await redis_operation("GET", key) if cached: - return json.loads(cached) + return orjson.loads(cached) return None @@ -467,7 +467,7 @@ async def get_cached_entity(entity_type: str, entity_id: int, get_method, cache_ key = f"{entity_type}:id:{entity_id}" cached = await redis_operation("GET", key) if cached: - return json.loads(cached) + return orjson.loads(cached) entity = await get_method(entity_id) if entity: diff --git a/cache/memorycache.py b/cache/memorycache.py index 003c863d..7cfc94eb 100644 --- a/cache/memorycache.py +++ b/cache/memorycache.py @@ -1,11 +1,169 @@ -from dogpile.cache import make_region +""" +Модуль для кеширования данных с использованием Redis. +Предоставляет API, совместимый с dogpile.cache для поддержки обратной совместимости. +""" -from settings import REDIS_URL +import functools +import hashlib +import inspect +import logging +import pickle +from typing import Callable, Optional -# Создание региона кэша с TTL -cache_region = make_region() -cache_region.configure( - "dogpile.cache.redis", - arguments={"url": f"{REDIS_URL}/1"}, - expiration_time=3600, # Cache expiration time in seconds -) +import orjson + +from services.redis import redis +from utils.encoders import CustomJSONEncoder + +logger = logging.getLogger(__name__) + +DEFAULT_TTL = 300 # время жизни кеша в секундах (5 минут) + + +class RedisCache: + """ + Класс, предоставляющий API, совместимый с dogpile.cache, но использующий Redis. + + Примеры: + >>> cache_region = RedisCache() + >>> @cache_region.cache_on_arguments("my_key") + ... def my_func(arg1, arg2): + ... return arg1 + arg2 + """ + + def __init__(self, ttl: int = DEFAULT_TTL): + """ + Инициализация объекта кеша. + + Args: + ttl: Время жизни кеша в секундах + """ + self.ttl = ttl + + def cache_on_arguments(self, cache_key: Optional[str] = None) -> Callable: + """ + Декоратор для кеширования результатов функций с использованием Redis. + + Args: + cache_key: Опциональный базовый ключ кеша. Если не указан, генерируется из сигнатуры функции. + + Returns: + Декоратор для кеширования функции + + Примеры: + >>> @cache_region.cache_on_arguments("users") + ... def get_users(): + ... return db.query(User).all() + """ + + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def wrapper(*args, **kwargs): + # Генерация ключа кеша + key = self._generate_cache_key(func, cache_key, *args, **kwargs) + + # Попытка получить данные из кеша + cached_data = await redis.get(key) + if cached_data: + try: + return orjson.loads(cached_data) + except Exception: + # Если не удалось десериализовать как JSON, попробуем как pickle + return pickle.loads(cached_data.encode()) + + # Вызов оригинальной функции, если данных в кеше нет + result = func(*args, **kwargs) + + # Сохранение результата в кеш + try: + # Пытаемся сериализовать как JSON + serialized = orjson.dumps(result, cls=CustomJSONEncoder) + except (TypeError, ValueError): + # Если не удалось, используем pickle + serialized = pickle.dumps(result).decode() + + await redis.set(key, serialized, ex=self.ttl) + return result + + @functools.wraps(func) + def sync_wrapper(*args, **kwargs): + # Для функций, которые не являются корутинами + # Генерация ключа кеша + key = self._generate_cache_key(func, cache_key, *args, **kwargs) + + # Синхронная версия не использует await, поэтому результат всегда вычисляется + result = func(*args, **kwargs) + + # Асинхронно записываем в кэш (будет выполнено позже) + try: + import asyncio + + serialized = orjson.dumps(result, cls=CustomJSONEncoder) + asyncio.create_task(redis.set(key, serialized, ex=self.ttl)) + except Exception as e: + logger.error(f"Ошибка при кешировании результата: {e}") + + return result + + # Возвращаем асинхронный или синхронный враппер в зависимости от типа функции + if inspect.iscoroutinefunction(func): + return wrapper + else: + return sync_wrapper + + return decorator + + def _generate_cache_key(self, func: Callable, base_key: Optional[str], *args, **kwargs) -> str: + """ + Генерирует ключ кеша на основе функции и её аргументов. + + Args: + func: Кешируемая функция + base_key: Базовый ключ кеша + *args: Позиционные аргументы функции + **kwargs: Именованные аргументы функции + + Returns: + Строковый ключ для кеша + """ + if base_key: + key_prefix = f"cache:{base_key}" + else: + key_prefix = f"cache:{func.__module__}.{func.__name__}" + + # Создаем хеш аргументов + arg_hash = hashlib.md5() + + # Добавляем позиционные аргументы + for arg in args: + try: + arg_hash.update(str(arg).encode()) + except Exception: + arg_hash.update(str(id(arg)).encode()) + + # Добавляем именованные аргументы (сортируем для детерминированности) + for k in sorted(kwargs.keys()): + try: + arg_hash.update(f"{k}:{kwargs[k]}".encode()) + except Exception: + arg_hash.update(f"{k}:{id(kwargs[k])}".encode()) + + return f"{key_prefix}:{arg_hash.hexdigest()}" + + def invalidate(self, func: Callable, *args, **kwargs) -> None: + """ + Инвалидирует (удаляет) кеш для конкретной функции с конкретными аргументами. + + Args: + func: Кешированная функция + *args: Позиционные аргументы функции + **kwargs: Именованные аргументы функции + """ + key = self._generate_cache_key(func, None, *args, **kwargs) + import asyncio + + asyncio.create_task(redis.execute("DEL", key)) + + +# Экземпляр класса RedisCache для использования в коде +cache_region = RedisCache() diff --git a/cache/precache.py b/cache/precache.py index 94b39960..b0faec5f 100644 --- a/cache/precache.py +++ b/cache/precache.py @@ -1,6 +1,6 @@ import asyncio -import json +import orjson from sqlalchemy import and_, join, select from cache.cache import cache_author, cache_topic @@ -21,7 +21,7 @@ async def precache_authors_followers(author_id, session): result = session.execute(followers_query) authors_followers.update(row[0] for row in result if row[0]) - followers_payload = json.dumps(list(authors_followers), cls=CustomJSONEncoder) + followers_payload = orjson.dumps(list(authors_followers), cls=CustomJSONEncoder) await redis.execute("SET", f"author:followers:{author_id}", followers_payload) @@ -35,9 +35,9 @@ async def precache_authors_follows(author_id, session): follows_authors = {row[0] for row in session.execute(follows_authors_query) if row[0]} follows_shouts = {row[0] for row in session.execute(follows_shouts_query) if row[0]} - topics_payload = json.dumps(list(follows_topics), cls=CustomJSONEncoder) - authors_payload = json.dumps(list(follows_authors), cls=CustomJSONEncoder) - shouts_payload = json.dumps(list(follows_shouts), cls=CustomJSONEncoder) + topics_payload = orjson.dumps(list(follows_topics), cls=CustomJSONEncoder) + authors_payload = orjson.dumps(list(follows_authors), cls=CustomJSONEncoder) + shouts_payload = orjson.dumps(list(follows_shouts), cls=CustomJSONEncoder) await asyncio.gather( redis.execute("SET", f"author:follows-topics:{author_id}", topics_payload), @@ -62,7 +62,7 @@ async def precache_topics_authors(topic_id: int, session): ) topic_authors = {row[0] for row in session.execute(topic_authors_query) if row[0]} - authors_payload = json.dumps(list(topic_authors), cls=CustomJSONEncoder) + authors_payload = orjson.dumps(list(topic_authors), cls=CustomJSONEncoder) await redis.execute("SET", f"topic:authors:{topic_id}", authors_payload) @@ -71,7 +71,7 @@ async def precache_topics_followers(topic_id: int, session): followers_query = select(TopicFollower.follower).where(TopicFollower.topic == topic_id) topic_followers = {row[0] for row in session.execute(followers_query) if row[0]} - followers_payload = json.dumps(list(topic_followers), cls=CustomJSONEncoder) + followers_payload = orjson.dumps(list(topic_followers), cls=CustomJSONEncoder) await redis.execute("SET", f"topic:followers:{topic_id}", followers_payload) diff --git a/docs/features.md b/docs/features.md index 1970dbdb..b6d1bdc5 100644 --- a/docs/features.md +++ b/docs/features.md @@ -14,6 +14,16 @@ - Автоматическое определение сервера авторизации - Корректная обработка CORS для всех поддерживаемых доменов +## Система кеширования + +- Redis используется в качестве основного механизма кеширования +- Поддержка как синхронных, так и асинхронных функций в декораторе cache_on_arguments +- Автоматическая сериализация/десериализация данных в JSON с использованием CustomJSONEncoder +- Резервная сериализация через pickle для сложных объектов +- Генерация уникальных ключей кеша на основе сигнатуры функции и переданных аргументов +- Настраиваемое время жизни кеша (TTL) +- Возможность ручной инвалидации кеша для конкретных функций и аргументов + ## Webhooks - Автоматическая регистрация вебхука для события user.login diff --git a/requirements.txt b/requirements.txt index f6da22f5..d6ce74b9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,6 @@ bcrypt authlib passlib - opensearch-py google-analytics-data colorlog @@ -14,5 +13,5 @@ starlette gql ariadne granian - +orjson pydantic \ No newline at end of file diff --git a/resolvers/editor.py b/resolvers/editor.py index 89c6e4b2..1efc40cc 100644 --- a/resolvers/editor.py +++ b/resolvers/editor.py @@ -1,6 +1,6 @@ -import json import time +import orjson from sqlalchemy import and_, desc, select from sqlalchemy.orm import joinedload from sqlalchemy.sql.functions import coalesce @@ -106,7 +106,7 @@ async def get_my_shout(_, info, shout_id: int): if hasattr(shout, "media") and shout.media: if isinstance(shout.media, str): try: - shout.media = json.loads(shout.media) + shout.media = orjson.loads(shout.media) except Exception as e: logger.error(f"Error parsing shout media: {e}") shout.media = [] diff --git a/resolvers/notifier.py b/resolvers/notifier.py index 5cd2fbcb..569f0f7a 100644 --- a/resolvers/notifier.py +++ b/resolvers/notifier.py @@ -1,7 +1,7 @@ -import json import time from typing import List, Tuple +import orjson from sqlalchemy import and_, select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import aliased @@ -115,7 +115,7 @@ def get_notifications_grouped(author_id: int, after: int = 0, limit: int = 10, o if (groups_amount + offset) >= limit: break - payload = json.loads(str(notification.payload)) + payload = orjson.loads(str(notification.payload)) if str(notification.entity) == NotificationEntity.SHOUT.value: shout = payload @@ -177,7 +177,7 @@ def get_notifications_grouped(author_id: int, after: int = 0, limit: int = 10, o elif str(notification.entity) == "follower": thread_id = "followers" - follower = json.loads(payload) + follower = orjson.loads(payload) group = groups_by_thread.get(thread_id) if group: if str(notification.action) == "follow": @@ -293,11 +293,11 @@ async def notifications_seen_thread(_, info, thread: str, after: int): ) exclude = set() for nr in removed_reaction_notifications: - reaction = json.loads(str(nr.payload)) + reaction = orjson.loads(str(nr.payload)) reaction_id = reaction.get("id") exclude.add(reaction_id) for n in new_reaction_notifications: - reaction = json.loads(str(n.payload)) + reaction = orjson.loads(str(n.payload)) reaction_id = reaction.get("id") if ( reaction_id not in exclude diff --git a/resolvers/reader.py b/resolvers/reader.py index 0be80af4..003a50cd 100644 --- a/resolvers/reader.py +++ b/resolvers/reader.py @@ -1,5 +1,4 @@ -import json - +import orjson from graphql import GraphQLResolveInfo from sqlalchemy import and_, nulls_last, text from sqlalchemy.orm import aliased @@ -222,7 +221,7 @@ def get_shouts_with_links(info, q, limit=20, offset=0): if has_field(info, "stat"): stat = {} if isinstance(row.stat, str): - stat = json.loads(row.stat) + stat = orjson.loads(row.stat) elif isinstance(row.stat, dict): stat = row.stat viewed = ViewedStorage.get_shout(shout_id=shout_id) or 0 @@ -231,7 +230,7 @@ def get_shouts_with_links(info, q, limit=20, offset=0): # Обработка main_topic и topics topics = None if has_field(info, "topics") and hasattr(row, "topics"): - topics = json.loads(row.topics) if isinstance(row.topics, str) else row.topics + topics = orjson.loads(row.topics) if isinstance(row.topics, str) else row.topics # logger.debug(f"Shout#{shout_id} topics: {topics}") shout_dict["topics"] = topics @@ -240,7 +239,7 @@ def get_shouts_with_links(info, q, limit=20, offset=0): if hasattr(row, "main_topic"): # logger.debug(f"Raw main_topic for shout#{shout_id}: {row.main_topic}") main_topic = ( - json.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic + orjson.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic ) # logger.debug(f"Parsed main_topic for shout#{shout_id}: {main_topic}") @@ -260,7 +259,7 @@ def get_shouts_with_links(info, q, limit=20, offset=0): if has_field(info, "authors") and hasattr(row, "authors"): shout_dict["authors"] = ( - json.loads(row.authors) if isinstance(row.authors, str) else row.authors + orjson.loads(row.authors) if isinstance(row.authors, str) else row.authors ) if has_field(info, "media") and shout.media: @@ -268,8 +267,8 @@ def get_shouts_with_links(info, q, limit=20, offset=0): media_data = shout.media if isinstance(media_data, str): try: - media_data = json.loads(media_data) - except json.JSONDecodeError: + media_data = orjson.loads(media_data) + except orjson.JSONDecodeError: media_data = [] shout_dict["media"] = [media_data] if isinstance(media_data, dict) else media_data diff --git a/services/db.py b/services/db.py index bd3072e4..d598e7f2 100644 --- a/services/db.py +++ b/services/db.py @@ -1,10 +1,10 @@ -import json import math import time import traceback import warnings from typing import Any, Callable, Dict, TypeVar +import orjson import sqlalchemy from sqlalchemy import ( JSON, @@ -84,8 +84,8 @@ class Base(declarative_base()): # Check if the value is JSON and decode it if necessary if isinstance(value, (str, bytes)) and isinstance(self.__table__.columns[column_name].type, JSON): try: - data[column_name] = json.loads(value) - except (TypeError, json.JSONDecodeError) as e: + data[column_name] = orjson.loads(value) + except (TypeError, orjson.JSONDecodeError) as e: logger.error(f"Error decoding JSON for column '{column_name}': {e}") data[column_name] = value else: diff --git a/services/notify.py b/services/notify.py index 626afa7b..911bd6ec 100644 --- a/services/notify.py +++ b/services/notify.py @@ -1,4 +1,4 @@ -import json +import orjson from orm.notification import Notification from services.db import local_session @@ -18,7 +18,7 @@ async def notify_reaction(reaction, action: str = "create"): data = {"payload": reaction, "action": action} try: save_notification(action, channel_name, data.get("payload")) - await redis.publish(channel_name, json.dumps(data)) + await redis.publish(channel_name, orjson.dumps(data)) except Exception as e: logger.error(f"Failed to publish to channel {channel_name}: {e}") @@ -28,7 +28,7 @@ async def notify_shout(shout, action: str = "update"): data = {"payload": shout, "action": action} try: save_notification(action, channel_name, data.get("payload")) - await redis.publish(channel_name, json.dumps(data)) + await redis.publish(channel_name, orjson.dumps(data)) except Exception as e: logger.error(f"Failed to publish to channel {channel_name}: {e}") @@ -43,7 +43,7 @@ async def notify_follower(follower: dict, author_id: int, action: str = "follow" save_notification(action, channel_name, data.get("payload")) # Convert data to JSON string - json_data = json.dumps(data) + json_data = orjson.dumps(data) # Ensure the data is not empty before publishing if json_data: diff --git a/services/search.py b/services/search.py index 9c9b13e9..e92c387b 100644 --- a/services/search.py +++ b/services/search.py @@ -1,8 +1,8 @@ import asyncio -import json import logging import os +import orjson from opensearchpy import OpenSearch from services.redis import redis @@ -142,7 +142,7 @@ class SearchService: # Проверка и обновление структуры индекса, если необходимо result = self.client.indices.get_mapping(index=self.index_name) if isinstance(result, str): - result = json.loads(result) + result = orjson.loads(result) if isinstance(result, dict): mapping = result.get(self.index_name, {}).get("mappings") logger.info(f"Найдена структура индексации: {mapping['properties'].keys()}") @@ -210,7 +210,7 @@ class SearchService: "SETEX", redis_key, REDIS_TTL, - json.dumps(results, cls=CustomJSONEncoder), + orjson.dumps(results, cls=CustomJSONEncoder), ) return results return [] diff --git a/services/viewed.py b/services/viewed.py index f1942de0..f54927b2 100644 --- a/services/viewed.py +++ b/services/viewed.py @@ -1,10 +1,11 @@ import asyncio -import json import os import time from datetime import datetime, timedelta, timezone from typing import Dict +import orjson + # ga from google.analytics.data_v1beta import BetaAnalyticsDataClient from google.analytics.data_v1beta.types import ( @@ -84,7 +85,7 @@ class ViewedStorage: logger.warn(f" * {viewfile_path} is too old: {self.start_date}") with open(viewfile_path, "r") as file: - precounted_views = json.load(file) + precounted_views = orjson.load(file) self.precounted_by_slug.update(precounted_views) logger.info(f" * {len(precounted_views)} shouts with views was loaded.") diff --git a/utils/encoders.py b/utils/encoders.py index fe4c97d4..0a6d234c 100644 --- a/utils/encoders.py +++ b/utils/encoders.py @@ -1,8 +1,9 @@ -import json from decimal import Decimal +import orjson -class CustomJSONEncoder(json.JSONEncoder): + +class CustomJSONEncoder(orjson.JSONEncoder): def default(self, obj): if isinstance(obj, Decimal): return str(obj)