fmt+follows-refactored

This commit is contained in:
2024-05-30 07:12:00 +03:00
parent a9ab2e8bb2
commit e638ad81e2
17 changed files with 178 additions and 451 deletions

View File

@@ -26,9 +26,7 @@ async def cache_author(author: dict):
if isinstance(followers, list):
for follower in followers:
follower_follows_authors = []
follower_follows_authors_str = await redis.execute(
"GET", f"author:{author_id}:follows-authors"
)
follower_follows_authors_str = await redis.execute("GET", f"author:{author_id}:follows-authors")
if isinstance(follower_follows_authors_str, str):
follower_follows_authors = json.loads(follower_follows_authors_str)
c = 0
@@ -49,9 +47,7 @@ async def cache_author(author: dict):
if isinstance(follows_authors, list):
for followed_author in follows_authors:
followed_author_followers = []
followed_author_followers_str = await redis.execute(
"GET", f"author:{author_id}:followers"
)
followed_author_followers_str = await redis.execute("GET", f"author:{author_id}:followers")
if isinstance(followed_author_followers_str, str):
followed_author_followers = json.loads(followed_author_followers_str)
c = 0
@@ -139,18 +135,11 @@ async def cache_follow_author_change(follower: dict, author: dict, is_insert=Tru
async def cache_topic(topic_dict: dict):
# update stat all field for followers' caches in <topics> list
followers = (
local_session()
.query(TopicFollower)
.filter(TopicFollower.topic == topic_dict.get("id"))
.all()
)
followers = local_session().query(TopicFollower).filter(TopicFollower.topic == topic_dict.get("id")).all()
for tf in followers:
follower_id = tf.follower
follower_follows_topics = []
follower_follows_topics_str = await redis.execute(
"GET", f"author:{follower_id}:follows-topics"
)
follower_follows_topics_str = await redis.execute("GET", f"author:{follower_id}:follows-topics")
if isinstance(follower_follows_topics_str, str):
follower_follows_topics = json.loads(follower_follows_topics_str)
c = 0

View File

@@ -45,9 +45,7 @@ class Base(declarative_base()):
REGISTRY[cls.__name__] = cls
def dict(self) -> Dict[str, Any]:
column_names = filter(
lambda x: x not in FILTERED_FIELDS, self.__table__.columns.keys()
)
column_names = filter(lambda x: x not in FILTERED_FIELDS, self.__table__.columns.keys())
try:
data = {}
for c in column_names:
@@ -76,9 +74,7 @@ Base.metadata.create_all(bind=engine)
# Функция для вывода полного трейсбека при предупреждениях
def warning_with_traceback(
message: Warning | str, category, filename: str, lineno: int, file=None, line=None
):
def warning_with_traceback(message: Warning | str, category, filename: str, lineno: int, file=None, line=None):
tb = traceback.format_stack()
tb_str = "".join(tb)
return f"{message} ({filename}, {lineno}): {category.__name__}\n{tb_str}"

View File

@@ -127,9 +127,7 @@ class SearchService:
logger.debug(f"Проверяем индекс {self.index_name}...")
if not self.client.indices.exists(index=self.index_name):
self.create_index()
self.client.indices.put_mapping(
index=self.index_name, body=expected_mapping
)
self.client.indices.put_mapping(index=self.index_name, body=expected_mapping)
else:
logger.info(f"Найден существующий индекс {self.index_name}")
# Проверка и обновление структуры индекса, если необходимо
@@ -138,17 +136,9 @@ class SearchService:
result = json.loads(result)
if isinstance(result, dict):
mapping = result.get(self.index_name, {}).get("mappings")
logger.debug(
f"Найдена структура индексации: {mapping['properties'].keys()}"
)
if (
mapping
and mapping["properties"].keys()
!= expected_mapping["properties"].keys()
):
logger.debug(
f"Ожидаемая структура индексации: {expected_mapping}"
)
logger.debug(f"Найдена структура индексации: {mapping['properties'].keys()}")
if mapping and mapping["properties"].keys() != expected_mapping["properties"].keys():
logger.debug(f"Ожидаемая структура индексации: {expected_mapping}")
logger.warn("[!!!] Требуется переиндексация всех данных")
self.delete_index()
self.client = None
@@ -177,9 +167,7 @@ class SearchService:
logger.debug(f"Ищем: {text}")
search_body = {"query": {"match": {"_all": text}}}
if self.client:
search_response = self.client.search(
index=self.index_name, body=search_body, size=limit, from_=offset
)
search_response = self.client.search(index=self.index_name, body=search_body, size=limit, from_=offset)
hits = search_response["hits"]["hits"]
results = [{**hit["_source"], "score": hit["_score"]} for hit in hits]

View File

@@ -20,9 +20,7 @@ DEFAULT_FOLLOWS = {
}
async def handle_author_follower_change(
author_id: int, follower_id: int, is_insert: bool
):
async def handle_author_follower_change(author_id: int, follower_id: int, is_insert: bool):
logger.info(author_id)
author_query = select(Author).select_from(Author).filter(Author.id == author_id)
[author] = get_with_stat(author_query)
@@ -30,17 +28,13 @@ async def handle_author_follower_change(
[follower] = get_with_stat(follower_query)
if follower and author:
await cache_author(author.dict())
await cache_follows(
follower.dict(), "author", author.dict(), is_insert
) # cache_author(follower_dict) inside
await cache_follows(follower.dict(), "author", author.dict(), is_insert) # cache_author(follower_dict) inside
await cache_follow_author_change(
follower.dict(), author.dict(), is_insert
) # cache_author(follower_dict) inside
async def handle_topic_follower_change(
topic_id: int, follower_id: int, is_insert: bool
):
async def handle_topic_follower_change(topic_id: int, follower_id: int, is_insert: bool):
logger.info(topic_id)
topic_query = select(Topic).filter(Topic.id == topic_id)
[topic] = get_with_stat(topic_query)
@@ -48,9 +42,7 @@ async def handle_topic_follower_change(
[follower] = get_with_stat(follower_query)
if follower and topic:
await cache_author(follower.dict())
await redis.execute(
"SET", f"topic:{topic.id}", json.dumps(topic.dict(), cls=CustomJSONEncoder)
)
await redis.execute("SET", f"topic:{topic.id}", json.dumps(topic.dict(), cls=CustomJSONEncoder))
await cache_follows(follower.dict(), "topic", topic.dict(), is_insert)
@@ -84,9 +76,7 @@ def after_reaction_update(mapper, connection, reaction: Reaction):
# reaction repliers
replied_author_subquery = (
select(Author)
.join(Reaction, Author.id == Reaction.created_by)
.where(Reaction.id == reaction.reply_to)
select(Author).join(Reaction, Author.id == Reaction.created_by).where(Reaction.id == reaction.reply_to)
)
authors_with_stat = get_with_stat(replied_author_subquery)
for author_with_stat in authors_with_stat:

View File

@@ -60,25 +60,19 @@ class ViewedStorage:
try:
if os.path.exists(VIEWS_FILEPATH):
start_date_int = os.path.getmtime(VIEWS_FILEPATH)
start_date_str = datetime.fromtimestamp(start_date_int).strftime(
"%Y-%m-%d"
)
start_date_str = datetime.fromtimestamp(start_date_int).strftime("%Y-%m-%d")
self.start_date = start_date_str
now_date = datetime.now().strftime("%Y-%m-%d")
if now_date == self.start_date:
logger.info(" * Данные актуализованы!")
else:
logger.warn(
f" * Файл просмотров {VIEWS_FILEPATH} устарел: {self.start_date}"
)
logger.warn(f" * Файл просмотров {VIEWS_FILEPATH} устарел: {self.start_date}")
with open(VIEWS_FILEPATH, "r") as file:
precounted_views = json.load(file)
self.views_by_shout.update(precounted_views)
logger.info(
f" * {len(precounted_views)} публикаций с просмотрами успешно загружены."
)
logger.info(f" * {len(precounted_views)} публикаций с просмотрами успешно загружены.")
else:
logger.info(" * Файл просмотров не найден.")
except Exception as e:
@@ -99,9 +93,7 @@ class ViewedStorage:
property=f"properties/{GOOGLE_PROPERTY_ID}",
dimensions=[Dimension(name="pagePath")],
metrics=[Metric(name="screenPageViews")],
date_ranges=[
DateRange(start_date=self.start_date, end_date="today")
],
date_ranges=[DateRange(start_date=self.start_date, end_date="today")],
)
response = self.analytics_client.run_report(request)
if response and isinstance(response.rows, list):
@@ -118,9 +110,7 @@ class ViewedStorage:
views_count = int(row.metric_values[0].value)
# Обновление данных в хранилище
self.views_by_shout[slug] = self.views_by_shout.get(
slug, 0
)
self.views_by_shout[slug] = self.views_by_shout.get(slug, 0)
self.views_by_shout[slug] += views_count
self.update_topics(slug)
@@ -179,20 +169,12 @@ class ViewedStorage:
# Обновление тем и авторов с использованием вспомогательной функции
for [_shout_topic, topic] in (
session.query(ShoutTopic, Topic)
.join(Topic)
.join(Shout)
.where(Shout.slug == shout_slug)
.all()
session.query(ShoutTopic, Topic).join(Topic).join(Shout).where(Shout.slug == shout_slug).all()
):
update_groups(self.shouts_by_topic, topic.slug, shout_slug)
for [_shout_topic, author] in (
session.query(ShoutAuthor, Author)
.join(Author)
.join(Shout)
.where(Shout.slug == shout_slug)
.all()
session.query(ShoutAuthor, Author).join(Author).join(Shout).where(Shout.slug == shout_slug).all()
):
update_groups(self.shouts_by_author, author.slug, shout_slug)
@@ -219,8 +201,7 @@ class ViewedStorage:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat())
logger.info(
" ⎩ Следующее обновление: %s"
% (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])
" ⎩ Следующее обновление: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])
)
await asyncio.sleep(self.period)
else:

View File

@@ -21,15 +21,11 @@ class WebhookEndpoint(HTTPEndpoint):
raise HTTPException(status_code=400, detail="Request body is empty")
auth = request.headers.get("Authorization")
if not auth or auth != os.environ.get("WEBHOOK_SECRET"):
raise HTTPException(
status_code=401, detail="Invalid Authorization header"
)
raise HTTPException(status_code=401, detail="Invalid Authorization header")
# logger.debug(data)
user = data.get("user")
if not isinstance(user, dict):
raise HTTPException(
status_code=400, detail="User data is not a dictionary"
)
raise HTTPException(status_code=400, detail="User data is not a dictionary")
#
name: str = (
f"{user.get('given_name', user.get('slug'))} {user.get('middle_name', '')}"
@@ -40,19 +36,13 @@ class WebhookEndpoint(HTTPEndpoint):
pic: str = user.get("picture", "")
if user_id:
with local_session() as session:
author = (
session.query(Author).filter(Author.user == user_id).first()
)
author = session.query(Author).filter(Author.user == user_id).first()
if not author:
# If the author does not exist, create a new one
slug: str = email.split("@")[0].replace(".", "-").lower()
slug: str = re.sub("[^0-9a-z]+", "-", slug)
while True:
author = (
session.query(Author)
.filter(Author.slug == slug)
.first()
)
author = session.query(Author).filter(Author.slug == slug).first()
if not author:
break
slug = f"{slug}-{len(session.query(Author).filter(Author.email == email).all()) + 1}"
@@ -66,9 +56,7 @@ class WebhookEndpoint(HTTPEndpoint):
return JSONResponse({"status": "success"})
except HTTPException as e:
return JSONResponse(
{"status": "error", "message": str(e.detail)}, status_code=e.status_code
)
return JSONResponse({"status": "error", "message": str(e.detail)}, status_code=e.status_code)
except Exception as e:
import traceback