notifier-draft-2

This commit is contained in:
Untone 2023-11-24 05:18:02 +03:00
parent ec20a4ebcd
commit 73797752f2
9 changed files with 283 additions and 11 deletions

22
.gitea/workflows/main.yml Normal file
View File

@ -0,0 +1,22 @@
name: 'deploy'
on: [push]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Cloning repo
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Get Repo Name
id: repo_name
run: echo "::set-output name=repo::$(echo ${GITHUB_REPOSITORY##*/})"
- name: Push to dokku
uses: dokku/github-action@master
with:
branch: 'main'
git_remote_url: 'ssh://dokku@staging.discours.io:22/${{ steps.repo_name.outputs.repo }}'
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}

View File

@ -1,3 +1,13 @@
# notifier
собирает уведомления и формирует дайджесты
### Что делает
- слушает redis PubSub каналы реакций и постов
- собирает уведомления
- формирует дайджесты
### Что НЕ делает
- не отправляет сообщения по SSE
- не определяет кому их отправлять

View File

@ -1,4 +1,5 @@
import os
import asyncio
from importlib import import_module
from os.path import exists
@ -8,10 +9,11 @@ from starlette.applications import Starlette
from services.schema import resolvers
from services.rediscache import redis
from services.keeper import notification_service
from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN, MODE
import_module("resolvers")
schema = make_executable_schema(load_schema_from_path("inbox.graphql"), resolvers) # type: ignore
schema = make_executable_schema(load_schema_from_path("notifier.graphql"), resolvers) # type: ignore
async def start_up():
@ -24,6 +26,9 @@ async def start_up():
f.write(str(os.getpid()))
else:
await redis.connect()
notification_service_task = asyncio.create_task(notification_service.worker())
print(f"[main] {notification_service_task}")
try:
import sentry_sdk

0
notifier.graphql Normal file
View File

View File

@ -25,11 +25,11 @@ async def load_notifications(_, info, params=None):
notifications = []
with local_session() as session:
total_count = session.query(Notification).where(Notification.author == author_id).count()
total_count = session.query(Notification).where(Notification.user == user_id).count()
total_unread_count = (
session.query(Notification)
.where(and_(Notification.author == author_id, Notification.seen == False)) # noqa: E712
.where(and_(Notification.user == user_id, Notification.seen == False)) # noqa: E712
.count()
)
@ -47,7 +47,7 @@ async def load_notifications(_, info, params=None):
@mutation.field("markNotificationAsRead")
@login_required
async def mark_notification_as_read(_, info, notification_id: int):
author_id = info.context["author_id"]
user_id = info.context["user_id"]
with local_session() as session:
notification = (
@ -64,8 +64,7 @@ async def mark_notification_as_read(_, info, notification_id: int):
@mutation.field("markAllNotificationsAsRead")
@login_required
async def mark_all_notifications_as_read(_, info):
auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id
user_id = info.context["user_id"]
statement = (
update(Notification)

View File

@ -58,7 +58,7 @@ def auth_request(f):
if not is_authenticated:
raise HTTPError("please, login first")
else:
req["author_id"] = user_id
req["user_id"] = user_id
return await f(*args, **kwargs)
return decorated_function

View File

@ -10,7 +10,7 @@ async def get_all_authors() -> List[ChatMember]:
query_name = "authorsAll"
query_type = "query"
operation = "AuthorsAll"
query_fields = "id slug userpic name"
query_fields = "id slug pic name"
gql = {
"query": query_type + " " + operation + " { " + query_name + " { " + query_fields + " } " + " }",
@ -38,10 +38,10 @@ async def get_my_followings() -> List[ChatMember]:
query_name = "loadMySubscriptions"
query_type = "query"
operation = "LoadMySubscriptions"
query_fields = "id slug userpic name"
query_fields = "id slug pic name"
gql = {
"query": query_type + " " + operation + " { " + query_name + " { authors {" + query_fields + "} } " + " }",
"query": query_type + " " + operation + " { " + query_name + " { " + query_fields + " } " + " }",
"operationName": operation,
"variables": None,
}
@ -60,3 +60,32 @@ async def get_my_followings() -> List[ChatMember]:
traceback.print_exc()
return []
async def get_author(author_id) -> Author:
query_name = "getAuthor"
query_type = "query"
operation = "GetAuthor"
query_fields = "id slug pic name"
gql = {
"query": query_type + " " + operation + " { " + query_name + " { authors {" + query_fields + "} } " + " }",
"operationName": operation,
"variables": None,
}
async with AsyncClient() as client:
try:
response = await client.post(API_BASE, headers=headers, json=gql)
print(f"[services.core] {query_name}: [{response.status_code}] {len(response.text)} bytes")
if response.status_code != 200:
return []
r = response.json()
if r:
return r.get("data", {}).get(query_name, {})
else:
raise Exception("json response error")
except Exception:
import traceback
traceback.print_exc()

164
services/keeper.py Normal file
View File

@ -0,0 +1,164 @@
import asyncio
import json
from datetime import datetime, timezone
from sqlalchemy import and_
from services.db import local_session
from orm.shout import Shout
from orm.author import Author
from orm.user import User
from orm.notification import NotificationType, Notification
from orm.reaction import ReactionKind, Reaction
def shout_to_shout_data(shout):
return {"title": shout.title, "slug": shout.slug}
def user_to_user_data(user):
return {"id": user.id, "name": user.name, "slug": user.slug, "userpic": user.userpic}
def update_prev_notification(notification, user, reaction):
notification_data = json.loads(notification.data)
notification_data["users"] = [u for u in notification_data["users"] if u["id"] != user.id]
notification_data["users"].append(user_to_user_data(user))
if notification_data["reactionIds"] is None:
notification_data["reactionIds"] = []
notification_data["reactionIds"].append(reaction.id)
notification.data = json.dumps(notification_data, ensure_ascii=False)
notification.seen = False
notification.occurrences = notification.occurrences + 1
notification.createdAt = datetime.now(tz=timezone.utc)
class NewReactionNotificator:
def __init__(self, reaction_id):
self.reaction_id = reaction_id
async def run(self):
with local_session() as session:
reaction = session.query(Reaction).where(Reaction.id == self.reaction_id).one()
shout = session.query(Shout).where(Shout.id == reaction.shout).one()
user = session.query(User).where(User.id == reaction.created_by).one()
notify_user_ids = []
if reaction.kind == ReactionKind.COMMENT:
parent_reaction = None
if reaction.replyTo:
parent_reaction = session.query(Reaction).where(Reaction.id == reaction.replyTo).one()
if parent_reaction.createdBy != reaction.createdBy:
prev_new_reply_notification = (
session.query(Notification)
.where(
and_(
Notification.user == shout.created_by,
Notification.action == NotificationAction.CREATE,
Notification.entity == NotificationAction.REACTION,
# Notification.shout == shout.id,
# Notification.reaction == parent_reaction.id,
# TODO: filter by payload content
Notification.seen == False, # noqa: E712
)
)
.first()
)
if prev_new_reply_notification:
update_prev_notification(prev_new_reply_notification, user, reaction)
else:
reply_notification_data = json.dumps(
{
"shout": shout_to_shout_data(shout),
"users": [user_to_user_data(user)],
"reactionIds": [reaction.id],
},
ensure_ascii=False,
)
reply_notification = Notification.create(
**{
"user": parent_reaction.created_by,
"action": NotificationAction.CREATE,
"entity": NotificationEntity.REACTION,
# TODO: filter by payload content
# "shout": shout.id,
# "reaction": parent_reaction.id,
"data": reply_notification_data,
}
)
session.add(reply_notification)
notify_user_ids.append(parent_reaction.createdBy)
if reaction.createdBy != shout.createdBy and (
parent_reaction is None or parent_reaction.created_by != shout.created_by
):
prev_new_comment_notification = (
session.query(Notification)
.where(
and_(
Notification.user == shout.created_by,
Notification.action == NotificationAction.CREATE,
Notification.entity == NotificationEntity.REACTION,
Notification.seen == False, # noqa: E712
)
)
.first()
)
if prev_new_comment_notification:
update_prev_notification(prev_new_comment_notification, user, reaction)
else:
notification_data_string = json.dumps(
{
"shout": shout_to_shout_data(shout),
"users": [user_to_user_data(user)],
"reactionIds": [reaction.id],
},
ensure_ascii=False,
)
author_notification = Notification.create(
**{
"user": shout.created_by,
"entity": NotificationEntity.REACTION,
"action": NotificationAction.CREATE,
"shout": shout.id,
"data": notification_data_string,
}
)
session.add(author_notification)
notify_user_ids.append(shout.created_by)
session.commit()
for user_id in notify_user_ids:
await connection_manager.notify_user(user_id)
class NotificationService:
def __init__(self):
self._queue = asyncio.Queue(maxsize=1000)
async def handle_reaction(self, reaction_id):
notificator = NewReactionNotificator(reaction_id)
await self._queue.put(notificator)
async def worker(self):
while True:
notificator = await self._queue.get()
try:
await notificator.run()
except Exception as e:
print(f"[NotificationService.worker] error: {str(e)}")
notification_service = NotificationService()

43
services/listener.py Normal file
View File

@ -0,0 +1,43 @@
import json
from services.rediscache import redis
from servies.notifier import notification_service
# Каналы для прослушивания
channels = ["reaction", "shout"]
pubsubs = []
def create_notification_channel(redis_conn, channel_name):
pubsub = redis_conn.pubsub()
pubsub.subscribe(channel_name)
return pubsub
def close_notification_channel(pubsub):
pubsub.unsubscribe()
pubsub.close()
def start():
# Подписка на каналы
pubsubs = [create_notification_channel(redis_conn, channel) for channel in channels]
try:
# Бесконечный цикл прослушивания
while True:
for pubsub in pubsubs:
msg = pubsub.get_message()
notification_service.handle_reaction(msg["data"])
except Exception:
pass
finally:
# Отписка от каналов при завершении
for pubsub in pubsubs:
close_notification_channel(pubsub)
def stop():
for pubsub in pubsubs:
close_notification_channel(pubsub)