profiling-db
Some checks failed
Deploy to core / deploy (push) Failing after 2m5s

This commit is contained in:
Untone 2024-02-21 17:55:54 +03:00
parent e69046a1f8
commit be9f62eb76
5 changed files with 56 additions and 28 deletions

View File

@ -193,7 +193,7 @@ async def get_author(_, _info, slug="", author_id=None):
async def get_author_by_user_id(user_id: str): async def get_author_by_user_id(user_id: str):
redis_key = f"user:{user_id}:author" redis_key = f"user:{user_id}:author"
res = await redis.execute('GET', redis_key) res = await redis.execute("GET", redis_key)
if isinstance(res, str): if isinstance(res, str):
author = json.loads(res) author = json.loads(res)
if author.get("id"): if author.get("id"):
@ -204,7 +204,7 @@ async def get_author_by_user_id(user_id: str):
q = select(Author).filter(Author.user == user_id) q = select(Author).filter(Author.user == user_id)
author = await load_author_with_stats(q) author = await load_author_with_stats(q)
if author: if author:
await redis.execute('set', redis_key, json.dumps(author.dict())) await redis.execute("set", redis_key, json.dumps(author.dict()))
return author return author

View File

@ -132,14 +132,14 @@ def query_follows(user_id: str):
async def get_follows_by_user_id(user_id: str): async def get_follows_by_user_id(user_id: str):
if user_id: if user_id:
redis_key = f"user:{user_id}:follows" redis_key = f"user:{user_id}:follows"
res = await redis.execute('GET', redis_key) res = await redis.execute("GET", redis_key)
if isinstance(res, str): if isinstance(res, str):
follows = json.loads(res) follows = json.loads(res)
return follows return follows
logger.debug(f"getting follows for {user_id}") logger.debug(f"getting follows for {user_id}")
follows = query_follows(user_id) follows = query_follows(user_id)
await redis.execute('SET', redis_key, json.dumps(follows)) await redis.execute("SET", redis_key, json.dumps(follows))
return follows return follows

View File

@ -1,11 +1,12 @@
import math import math
import pstats
import time import time
from functools import wraps from functools import wraps
from sqlalchemy import event, Engine
from typing import Any, Callable, Dict, TypeVar from typing import Any, Callable, Dict, TypeVar
from dogpile.cache import make_region from dogpile.cache import make_region
from sqlalchemy import Column, Integer, create_engine, event from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy.sql.schema import Table from sqlalchemy.sql.schema import Table
@ -21,26 +22,46 @@ T = TypeVar("T")
REGISTRY: Dict[str, type] = {} REGISTRY: Dict[str, type] = {}
Base = declarative_base() Base = declarative_base()
def profile_sqlalchemy_queries(threshold=0.1):
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kw):
elapsed, stat_loader, result = _profile(fn, threshold, *args, **kw)
if elapsed is not None:
print(f"Query took {elapsed:.3f} seconds to execute.")
stats = stat_loader()
stats.sort_stats('cumulative')
stats.print_stats()
return result
return wrapper
return decorator
def _profile(fn, threshold, *args, **kw):
began = time.time()
result = fn(*args, **kw)
ended = time.time()
if ended - began > threshold:
return ended - began, pstats.Stats, result
else:
return None, None, result
# Перехватчики для журнала запросов SQLAlchemy # Перехватчики для журнала запросов SQLAlchemy
@event.listens_for(Engine, "before_cursor_execute") @event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
query_id = id(cursor) # Уникальный идентификатор запроса conn._query_start_time = time.time()
conn.info.setdefault(f"query_start_time:{query_id}", time.time())
@event.listens_for(Engine, "after_cursor_execute") @event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany): def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
query_id = id(cursor) # Уникальный идентификатор запроса if hasattr(conn, '_query_start_time'):
if f"query_start_time:{query_id}" in conn.info: elapsed = time.time() - conn._query_start_time
total = time.time() - conn.info.get(f"query_start_time:{query_id}", time.time()) del conn._query_start_time
try: if elapsed > 0.2: # Adjust threshold as needed
del conn.info[f"query_start_time:{query_id}"] print(f"{'*' * math.floor(elapsed)} {elapsed:.3f} seconds to execute.")
stars = "*" * math.floor(total * 1000) # Profile the query if execution time exceeds the threshold
if stars: profiler = profile_sqlalchemy_queries(threshold=0.2)(cursor.execute)
logger.debug(f"\n{statement}\n {stars} {total*1000} s\n") profiler(statement, parameters)
except Exception:
pass
def local_session(src=""): def local_session(src=""):

View File

@ -16,7 +16,7 @@ from services.viewed import ViewedStorage
@event.listens_for(Author, "after_update") @event.listens_for(Author, "after_update")
def after_author_update(mapper, connection, target): def after_author_update(mapper, connection, target):
redis_key = f"user:{target.user}:author" redis_key = f"user:{target.user}:author"
asyncio.create_task(redis.execute('set', redis_key, json.dumps(vars(target)))) asyncio.create_task(redis.execute("set", redis_key, json.dumps(vars(target))))
@event.listens_for(TopicFollower, "after_insert") @event.listens_for(TopicFollower, "after_insert")
@ -67,7 +67,7 @@ async def update_follows_for_user(connection, user_id, entity_type, entity, is_i
follows[f"{entity_type}s"] = [ follows[f"{entity_type}s"] = [
e for e in follows[f"{entity_type}s"] if e["id"] != entity.id e for e in follows[f"{entity_type}s"] if e["id"] != entity.id
] ]
await redis.execute('set', redis_key, json.dumps(follows)) await redis.execute("set", redis_key, json.dumps(follows))
async def handle_author_follower_change(connection, author_id, follower_id, is_insert): async def handle_author_follower_change(connection, author_id, follower_id, is_insert):
@ -125,14 +125,21 @@ class FollowsCached:
redis_key = f"user:{author.user}:author" redis_key = f"user:{author.user}:author"
author_dict = author.dict() author_dict = author.dict()
if isinstance(author_dict, dict): if isinstance(author_dict, dict):
filtered_author_dict = {k: v for k, v in author_dict.items() if v is not None} filtered_author_dict = {
await redis.execute('set', redis_key, json.dumps(filtered_author_dict)) k: v for k, v in author_dict.items() if v is not None
}
await redis.execute(
"set", redis_key, json.dumps(filtered_author_dict)
)
follows = await get_author_follows(None, None, user=author.user) follows = await get_author_follows(None, None, user=author.user)
if isinstance(follows, dict): if isinstance(follows, dict):
filtered_follows = {k: v for k, v in follows.items() if v is not None} filtered_follows = {
k: v for k, v in follows.items() if v is not None
}
redis_key = f"user:{author.user}:follows" redis_key = f"user:{author.user}:follows"
await redis.execute('set', redis_key, json.dumps(filtered_follows)) await redis.execute(
"set", redis_key, json.dumps(filtered_follows)
)
@staticmethod @staticmethod
async def worker(): async def worker():

View File

@ -23,8 +23,8 @@ class RedisCache:
logger.debug(f"{command} {args} {kwargs}") logger.debug(f"{command} {args} {kwargs}")
for arg in args: for arg in args:
if isinstance(arg, dict): if isinstance(arg, dict):
if arg.get('_sa_instance_state'): if arg.get("_sa_instance_state"):
del arg['_sa_instance_state'] del arg["_sa_instance_state"]
r = await self._client.execute_command(command, *args, **kwargs) r = await self._client.execute_command(command, *args, **kwargs)
logger.debug(type(r)) logger.debug(type(r))
logger.debug(r) logger.debug(r)