core/cache/precache.py
Stepan Vladovskiy 2235fb6537
All checks were successful
Deploy on push / deploy (push) Successful in 47s
debug: logs in steps in precahing
2025-05-19 16:19:27 -03:00

176 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
from sqlalchemy import and_, join, select
from cache.cache import cache_author, cache_topic
from orm.author import Author, AuthorFollower
from orm.shout import Shout, ShoutAuthor, ShoutReactionsFollower, ShoutTopic
from orm.topic import Topic, TopicFollower
from resolvers.stat import get_with_stat
from services.db import local_session
from services.redis import redis
from utils.encoders import CustomJSONEncoder
from utils.logger import root_logger as logger
# Предварительное кеширование подписчиков автора
async def precache_authors_followers(author_id, session):
authors_followers = set()
followers_query = select(AuthorFollower.follower).where(AuthorFollower.author == author_id)
result = session.execute(followers_query)
authors_followers.update(row[0] for row in result if row[0])
followers_payload = json.dumps(list(authors_followers), cls=CustomJSONEncoder)
await redis.execute("SET", f"author:followers:{author_id}", followers_payload)
# Предварительное кеширование подписок автора
async def precache_authors_follows(author_id, session):
follows_topics_query = select(TopicFollower.topic).where(TopicFollower.follower == author_id)
follows_authors_query = select(AuthorFollower.author).where(AuthorFollower.follower == author_id)
follows_shouts_query = select(ShoutReactionsFollower.shout).where(ShoutReactionsFollower.follower == author_id)
follows_topics = {row[0] for row in session.execute(follows_topics_query) if row[0]}
follows_authors = {row[0] for row in session.execute(follows_authors_query) if row[0]}
follows_shouts = {row[0] for row in session.execute(follows_shouts_query) if row[0]}
topics_payload = json.dumps(list(follows_topics), cls=CustomJSONEncoder)
authors_payload = json.dumps(list(follows_authors), cls=CustomJSONEncoder)
shouts_payload = json.dumps(list(follows_shouts), cls=CustomJSONEncoder)
await asyncio.gather(
redis.execute("SET", f"author:follows-topics:{author_id}", topics_payload),
redis.execute("SET", f"author:follows-authors:{author_id}", authors_payload),
redis.execute("SET", f"author:follows-shouts:{author_id}", shouts_payload),
)
# Предварительное кеширование авторов тем
async def precache_topics_authors(topic_id: int, session):
topic_authors_query = (
select(ShoutAuthor.author)
.select_from(join(ShoutTopic, Shout, ShoutTopic.shout == Shout.id))
.join(ShoutAuthor, ShoutAuthor.shout == Shout.id)
.filter(
and_(
ShoutTopic.topic == topic_id,
Shout.published_at.is_not(None),
Shout.deleted_at.is_(None),
)
)
)
topic_authors = {row[0] for row in session.execute(topic_authors_query) if row[0]}
authors_payload = json.dumps(list(topic_authors), cls=CustomJSONEncoder)
await redis.execute("SET", f"topic:authors:{topic_id}", authors_payload)
# Предварительное кеширование подписчиков тем
async def precache_topics_followers(topic_id: int, session):
followers_query = select(TopicFollower.follower).where(TopicFollower.topic == topic_id)
topic_followers = {row[0] for row in session.execute(followers_query) if row[0]}
followers_payload = json.dumps(list(topic_followers), cls=CustomJSONEncoder)
await redis.execute("SET", f"topic:followers:{topic_id}", followers_payload)
async def precache_data():
logger.info("precaching...")
try:
key = "authorizer_env"
# cache reset
value = await redis.execute("HGETALL", key)
await redis.execute("FLUSHDB")
logger.info("redis: FLUSHDB")
# Преобразуем словарь в список аргументов для HSET
if value:
# Если значение - словарь, преобразуем его в плоский список для HSET
if isinstance(value, dict):
flattened = []
for field, val in value.items():
flattened.extend([field, val])
await redis.execute("HSET", key, *flattened)
else:
# Предполагаем, что значение уже содержит список
await redis.execute("HSET", key, *value)
logger.info(f"redis hash '{key}' was restored")
# Set a start time to track total execution time
import time
start_time = time.time()
with local_session() as session:
# topics
q = select(Topic).where(Topic.community == 1)
topics = get_with_stat(q)
for topic in topics:
topic_dict = topic.dict() if hasattr(topic, "dict") else topic
await cache_topic(topic_dict)
await asyncio.gather(
precache_topics_followers(topic_dict["id"], session),
precache_topics_authors(topic_dict["id"], session),
)
logger.info(f"{len(topics)} topics and their followings precached")
# authors
try:
authors = get_with_stat(select(Author).where(Author.user.is_not(None)))
logger.info(f"{len(authors)} authors found in database")
# Process authors in smaller batches to avoid long-running operations
batch_size = 50
total_processed = 0
# Create batches
author_batches = [authors[i:i + batch_size] for i in range(0, len(authors), batch_size)]
logger.info(f"Processing authors in {len(author_batches)} batches of {batch_size}")
for batch_idx, author_batch in enumerate(author_batches):
batch_tasks = []
for author in author_batch:
if isinstance(author, Author):
profile = author.dict()
author_id = profile.get("id")
user_id = profile.get("user", "").strip()
if author_id and user_id:
# Add task to the batch
cache_task = cache_author(profile)
follower_task = precache_authors_followers(author_id, session)
follows_task = precache_authors_follows(author_id, session)
batch_tasks.extend([cache_task, follower_task, follows_task])
else:
logger.error(f"fail caching {author}")
# Run all tasks for this batch with timeout
if batch_tasks:
try:
await asyncio.wait_for(asyncio.gather(*batch_tasks), timeout=30)
total_processed += len(author_batch)
logger.info(f"Processed batch {batch_idx+1}/{len(author_batches)} ({total_processed}/{len(authors)} authors)")
except asyncio.TimeoutError:
logger.error(f"Timeout processing author batch {batch_idx+1}, continuing with next batch")
logger.info(f"{total_processed} authors and their followings precached (out of {len(authors)} total)")
except Exception as author_exc:
import traceback
logger.error(f"Error processing authors: {author_exc}")
logger.error(traceback.format_exc())
# Calculate total execution time and log it
import time
end_time = time.time()
execution_time = end_time - start_time
logger.info(f"Total precache execution time: {execution_time:.2f} seconds")
# Double-check that we're actually returning and not getting stuck somewhere
logger.info("Precache operation complete - returning to caller")
return True
except Exception as exc:
import traceback
traceback.print_exc()
logger.error(f"Error in precache_data: {exc}")
return False