init-strawberry
Some checks failed
deploy / deploy (push) Failing after 1m9s

This commit is contained in:
Untone 2023-11-26 13:18:57 +03:00
parent 8ef12063b0
commit 62c8d51c5d
13 changed files with 207 additions and 485 deletions

View File

@ -2,9 +2,9 @@
### Что делает
- слушает redis PubSub каналы реакций и постов
- сохраняет уведомления
- формирует дайджесты
- сохраняет тех, кому уведомления уже были отправлены (redis: authors-online)
- формирует дайджесты для остальных
- слушает Redis PubSub канал с обновлениями реакций
### Что НЕ делает

14
main.py
View File

@ -1,20 +1,15 @@
import os
import asyncio
from importlib import import_module
from os.path import exists
from ariadne import load_schema_from_path, make_executable_schema
from ariadne.asgi import GraphQL
from strawberry.asgi import GraphQL
from starlette.applications import Starlette
from services.schema import resolvers
from services.rediscache import redis
from services.keeper import notification_service
from resolvers.listener import start as listener_start, stop as listener_stop
from resolvers.schema import schema
from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN, MODE
import_module("resolvers")
schema = make_executable_schema(load_schema_from_path("notifier.graphql"), resolvers) # type: ignore
async def start_up():
if MODE == "dev":
@ -26,7 +21,7 @@ async def start_up():
f.write(str(os.getpid()))
else:
await redis.connect()
notification_service_task = asyncio.create_task(notification_service.worker())
notification_service_task = asyncio.create_task(listener_start())
print(f"[main] {notification_service_task}")
try:
@ -39,6 +34,7 @@ async def start_up():
async def shutdown():
listener_stop()
await redis.disconnect()

View File

@ -1,4 +1,4 @@
# sigil ver 2.2 dufok 2022-10-15 (with location /connect nobuffer)
# sigil ver 2.2 dufok 2022-10-15 (with location /connect nobuffer)
# Proxy settings
{{ $proxy_settings := "proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $http_connection; proxy_set_header Host $http_host; proxy_set_header X-Request-Start $msec;" }}
# GZIP settings
@ -54,23 +54,6 @@ server {
{{ $cors_headers_get }}
}
# Custom location block for /connect
location /connect/ {
proxy_pass http://presence-8080/;
add_header 'Cache-Control' 'no-cache';
add_header 'Content-Type' 'text/event-stream';
add_header 'Connection' 'keep-alive';
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 36000s;
{{ $proxy_settings }}
{{ $cors_headers_options }}
{{ $cors_headers_post }}
{{ $cors_headers_get }}
}
# Error pages
error_page 400 401 402 403 405 406 407 408 409 410 411 412 413 414 415 416 417 418 420 422 423 424 426 428 429 431 444 449 450 451 /400-error.html;

View File

@ -1,46 +0,0 @@
enum NotificationAction {
CREATE,
UPDATE,
DELETE,
SEEN
}
enum NotificationEntity {
SHOUT,
REACTION
}
type Notification {
id: Int!
action: NotificationAction!
entity: NotificationEntity!
created_at: Int!
seen: Boolean!
data: String # JSON
occurrences: Int
}
input NotificationsQueryParams {
limit: Int
offset: Int
}
type NotificationsQueryResult {
notifications: [Notification]!
total: Int!
unread: Int!
}
type NotificationSeenResult {
error: String
}
type Query {
loadNotifications(params: NotificationsQueryParams!): NotificationsQueryResult!
}
type Mutation {
markNotificationAsRead(notification_id: Int!): NotificationSeenResult!
markAllNotificationsAsRead: NotificationSeenResult!
}

View File

@ -1,30 +0,0 @@
import time
from sqlalchemy import JSON, Boolean, Column, Integer, String
from services.db import Base
class User(Base):
__tablename__ = "authorizer_users"
id = Column(String, primary_key=True, unique=True, nullable=False, default=None)
key = Column(String)
email = Column(String, unique=True)
email_verified_at = Column(Integer)
family_name = Column(String)
gender = Column(String)
given_name = Column(String)
is_multi_factor_auth_enabled = Column(Boolean)
middle_name = Column(String)
nickname = Column(String)
password = Column(String)
phone_number = Column(String, unique=True)
phone_number_verified_at = Column(Integer)
# preferred_username = Column(String, nullable=False)
picture = Column(String)
revoked_timestamp = Column(Integer)
roles = Column(JSON)
signup_methods = Column(String, default="magic_link_login")
created_at = Column(Integer, default=lambda: int(time.time()))
updated_at = Column(Integer, default=lambda: int(time.time()))

View File

@ -14,16 +14,14 @@ SQLAlchemy = "^2.0.22"
httpx = "^0.25.0"
psycopg2-binary = "^2.9.9"
redis = {extras = ["hiredis"], version = "^5.0.1"}
sentry-sdk = "^1.32.0"
gql = {git = "https://github.com/graphql-python/gql.git", rev = "master"}
ariadne = {git = "https://github.com/tonyrewin/ariadne.git", rev = "master"}
starlette = {git = "https://github.com/encode/starlette.git", rev = "master"}
uvicorn = "^0.24.0.post1"
strawberry-graphql = { extras = ["asgi"], version = "^0.215.1" }
sentry-sdk = "^1.37.1"
[tool.poetry.dev-dependencies]
pytest = "^7.4.2"
black = { version = "^23.9.1", python = ">=3.12" }
ruff = { version = "^0.1.0", python = ">=3.12" }
mypy = { version = "^1.7", python = ">=3.12" }
[tool.black]
line-length = 120
@ -57,12 +55,18 @@ use_parentheses = true
ensure_newline_before_comments = true
line_length = 120
[tool.ruff]
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
select = ["E4", "E7", "E9", "F"]
[tool.pyright]
include = ["orm", "resolvers"]
exclude = ["**/__pycache__"]
ignore = []
line-length = 120
target-version = "py312"
defineConstant = { DEBUG = true }
reportMissingImports = true
reportMissingTypeStubs = false
pythonVersion = "312"
pythonPlatform = "Linux"
executionEnvironments = []
[tool.mypy]
python_version = "3.12"
warn_unused_configs = true
plugins = ["mypy_sqlalchemy.plugin"]

37
resolvers/listener.py Normal file
View File

@ -0,0 +1,37 @@
import json
from typing import List, Dict
from orm.notification import Notification
from services.db import local_session
from services.rediscache import redis
def handle_reaction(notification: Dict[str, str | int | List[int]]):
"""создаеёт новое хранимое уведомление"""
try:
with local_session() as session:
n = Notification(**notification)
session.add(n)
session.commit(n)
except Exception as e:
session.rollback()
print(f"[listener.handle_reaction] error: {str(e)}")
def stop(pubsub):
pubsub.unsubscribe()
pubsub.close()
def start():
pubsub = redis.pubsub()
pubsub.subscribe("reaction")
try:
# Бесконечный цикл прослушивания
while True:
msg = pubsub.get_message()
handle_reaction(json.loads(msg["data"]))
except Exception:
pass
finally:
stop(pubsub)

View File

@ -1,82 +0,0 @@
from sqlalchemy import and_, desc, select, update
from services.auth import login_required
from services.db import local_session
from services.schema import mutation, query
from orm.notification import Notification
# TODO: occurrencies?
# TODO: use of Author.id?
@query.field("loadNotifications")
@login_required
async def load_notifications(_, info, params=None):
if params is None:
params = {}
user_id = info.context["user_id"]
limit = params.get("limit", 50)
offset = params.get("offset", 0)
q = select(Notification).order_by(desc(Notification.created_at)).limit(limit).offset(offset)
notifications = []
with local_session() as session:
total_count = session.query(Notification).where(Notification.user == user_id).count()
total_unread_count = (
session.query(Notification)
.where(and_(Notification.user == user_id, Notification.seen == False)) # noqa: E712
.count()
)
for [notification] in session.execute(q):
notification.type = notification.type.name
notifications.append(notification)
return {
"notifications": notifications,
"total": total_count,
"unread": total_unread_count,
}
@mutation.field("markNotificationAsRead")
@login_required
async def mark_notification_as_read(_, info, notification_id: int):
user_id = info.context["user_id"]
with local_session() as session:
notification = (
session.query(Notification)
.where(and_(Notification.id == notification_id, Notification.user == user_id))
.one()
)
notification.seen = True
session.commit()
return {}
@mutation.field("markAllNotificationsAsRead")
@login_required
async def mark_all_notifications_as_read(_, info):
user_id = info.context["user_id"]
statement = (
update(Notification)
.where(and_(Notification.user == user_id, Notification.seen == False)) # noqa: E712
.values(seen=True)
)
with local_session() as session:
try:
session.execute(statement)
session.commit()
except Exception as e:
session.rollback()
print(f"[mark_all_notifications_as_read] error: {str(e)}")
return {}

108
resolvers/schema.py Normal file
View File

@ -0,0 +1,108 @@
import strawberry
from sqlalchemy import and_
from orm.author import Author
from services.auth import login_required
from services.db import local_session
@strawberry.type
class NotificationSeen:
notification: int # notification id
viewer: int # author id
@strawberry.type
class Notification:
id: int
action: str # create update delete join follow etc.
entity: str # REACTION SHOUT
created_at: int
seen: list[NotificationSeen]
data: str # JSON data
occurrences: int
@strawberry.type
class NotificationsQueryResult:
notifications: list[Notification]
unread: int
@strawberry.type
class NotificationSeenResult:
error: str
def notification_seen_by_viewer(viewer_id, notification_id, session):
seen = (
session.query(NotificationSeen)
.filter(NotificationSeen.viewer == viewer_id, NotificationSeen.notification == notification_id)
.first()
)
return seen is not None
@strawberry.type
class Query:
@login_required
@strawberry.field
async def load_notifications(self, info, limit: int = 50, offset: int = 0) -> dict:
"""непрочитанные уведомления"""
user_id = info.context["user_id"]
with local_session() as session:
author = session.query(Author).filter(Author.user == user_id).first()
nslist = (
session.query(Notification)
.outerjoin(
NotificationSeen,
and_(NotificationSeen.viewer == author.id, NotificationSeen.notification == Notification.id),
)
.limit(limit)
.offset(offset)
.all()
)
for notification in nslist:
notification.seen_by_viewer = notification_seen_by_viewer(author.id, notification.id, session)
unread = sum(1 for n in nslist if not n.seen_by_viewer)
return {"notifications": nslist, "unread": unread}
@strawberry.type
class Mutation:
@strawberry.mutation
@login_required
async def mark_notification_as_read(self, info, notification_id: int) -> NotificationSeenResult:
user_id = info.context["user_id"]
try:
with local_session() as session:
author = session.query(Author).filter(Author.user == user_id).first()
ns = NotificationSeen({"notification": notification_id, "viewer": author.id})
session.add(ns)
session.commit()
except Exception as e:
session.rollback()
print(f"[mark_notification_as_read] error: {str(e)}")
return {}
@strawberry.mutation
@login_required
async def mark_all_notifications_as_read(self, info) -> NotificationSeenResult:
user_id = info.context["user_id"]
with local_session() as session:
try:
author = session.query(Author).filter(Author.user == user_id).first()
_nslist = session.quuery(NotificationSeen).filter(NotificationSeen.viewer == author.id).all()
except Exception as e:
session.rollback()
print(f"[mark_all_notifications_as_read] error: {str(e)}")
return {}
schema = strawberry.Schema(query=Query, mutation=Mutation)

View File

@ -1,82 +1,18 @@
from httpx import AsyncClient
from settings import API_BASE
from typing import List
from models.member import ChatMember
from httpx import AsyncClient
from orm.author import Author
from orm.shout import Shout
from settings import API_BASE
headers = {"Content-Type": "application/json"}
async def get_all_authors() -> List[ChatMember]:
query_name = "authorsAll"
query_type = "query"
operation = "AuthorsAll"
query_fields = "id slug pic name"
gql = {
"query": query_type + " " + operation + " { " + query_name + " { " + query_fields + " } " + " }",
"operationName": operation,
"variables": None,
}
async def _request_endpoint(query_name, body):
async with AsyncClient() as client:
try:
response = await client.post(API_BASE, headers=headers, json=gql)
print(f"[services.core] {query_name}: [{response.status_code}] {len(response.text)} bytes")
if response.status_code != 200:
return []
r = response.json()
if r:
return r.get("data", {}).get(query_name, [])
except Exception:
import traceback
traceback.print_exc()
return []
async def get_my_followings() -> List[ChatMember]:
query_name = "loadMySubscriptions"
query_type = "query"
operation = "LoadMySubscriptions"
query_fields = "id slug pic name"
gql = {
"query": query_type + " " + operation + " { " + query_name + " { " + query_fields + " } " + " }",
"operationName": operation,
"variables": None,
}
async with AsyncClient() as client:
try:
response = await client.post(API_BASE, headers=headers, json=gql)
print(f"[services.core] {query_name}: [{response.status_code}] {len(response.text)} bytes")
if response.status_code != 200:
return []
r = response.json()
if r:
return r.get("data", {}).get(query_name, {}).get("authors", [])
except Exception:
import traceback
traceback.print_exc()
return []
async def get_author(author_id) -> Author:
query_name = "getAuthor"
query_type = "query"
operation = "GetAuthor"
query_fields = "id slug pic name"
gql = {
"query": query_type + " " + operation + " { " + query_name + " { authors {" + query_fields + "} } " + " }",
"operationName": operation,
"variables": None,
}
async with AsyncClient() as client:
try:
response = await client.post(API_BASE, headers=headers, json=gql)
response = await client.post(API_BASE, headers=headers, json=body)
print(f"[services.core] {query_name}: [{response.status_code}] {len(response.text)} bytes")
if response.status_code != 200:
return []
@ -89,3 +25,31 @@ async def get_author(author_id) -> Author:
import traceback
traceback.print_exc()
async def get_author(author_id) -> Author:
query_name = "getAuthor"
query_type = "query"
operation = "GetAuthor"
query_fields = "id slug pic name"
gql = {
"query": query_type + " " + operation + " { " + query_name + " { " + query_fields + "м} " + " }",
"operationName": operation,
"variables": None,
}
return await _request_endpoint(query_name, gql)
async def get_followed_shouts(author_id: int) -> List[Shout]:
query_name = "getFollowedShouts"
query_type = "query"
operation = "GetFollowedShouts"
query_fields = "id slug title"
query = f"{query_type} {operation}($author_id: Int!) {{ {query_name}(author_id: $author_id) {{ {query_fields} }} }}"
body = {"query": query, "operationName": operation, "variables": {"author_id": author_id}}
return await _request_endpoint(query_name, body)

View File

@ -1,164 +0,0 @@
import asyncio
import json
from datetime import datetime, timezone
from sqlalchemy import and_
from services.db import local_session
from orm.shout import Shout
from orm.author import Author
from orm.user import User
from orm.notification import NotificationAction, NotificationEntity, Notification
from orm.reaction import ReactionKind, Reaction
def shout_to_shout_data(shout):
return {"title": shout.title, "slug": shout.slug}
def user_to_user_data(user):
return {"id": user.id, "name": user.name, "slug": user.slug, "userpic": user.userpic}
def update_prev_notification(notification, user, reaction):
notification_data = json.loads(notification.data)
notification_data["users"] = [u for u in notification_data["users"] if u["id"] != user.id]
notification_data["users"].append(user_to_user_data(user))
if notification_data["reactionIds"] is None:
notification_data["reactionIds"] = []
notification_data["reactionIds"].append(reaction.id)
notification.data = json.dumps(notification_data, ensure_ascii=False)
notification.seen = False
notification.occurrences = notification.occurrences + 1
notification.createdAt = datetime.now(tz=timezone.utc)
class NewReactionNotificator:
def __init__(self, reaction_id):
self.reaction_id = reaction_id
async def run(self):
with local_session() as session:
reaction = session.query(Reaction).where(Reaction.id == self.reaction_id).one()
shout = session.query(Shout).where(Shout.id == reaction.shout).one()
user = session.query(User).where(User.id == reaction.created_by).one()
notify_user_ids = []
if reaction.kind == ReactionKind.COMMENT:
parent_reaction = None
if reaction.replyTo:
parent_reaction = session.query(Reaction).where(Reaction.id == reaction.replyTo).one()
if parent_reaction.createdBy != reaction.createdBy:
prev_new_reply_notification = (
session.query(Notification)
.where(
and_(
Notification.user == shout.created_by,
Notification.action == NotificationAction.CREATE,
Notification.entity == NotificationAction.REACTION,
# Notification.shout == shout.id,
# Notification.reaction == parent_reaction.id,
# TODO: filter by payload content
Notification.seen == False, # noqa: E712
)
)
.first()
)
if prev_new_reply_notification:
update_prev_notification(prev_new_reply_notification, user, reaction)
else:
reply_notification_data = json.dumps(
{
"shout": shout_to_shout_data(shout),
"users": [user_to_user_data(user)],
"reactionIds": [reaction.id],
},
ensure_ascii=False,
)
reply_notification = Notification.create(
**{
"user": parent_reaction.created_by,
"action": NotificationAction.CREATE,
"entity": NotificationEntity.REACTION,
# TODO: filter by payload content
# "shout": shout.id,
# "reaction": parent_reaction.id,
"data": reply_notification_data,
}
)
session.add(reply_notification)
notify_user_ids.append(parent_reaction.createdBy)
if reaction.createdBy != shout.createdBy and (
parent_reaction is None or parent_reaction.created_by != shout.created_by
):
prev_new_comment_notification = (
session.query(Notification)
.where(
and_(
Notification.user == shout.created_by,
Notification.action == NotificationAction.CREATE,
Notification.entity == NotificationEntity.REACTION,
Notification.seen == False, # noqa: E712
)
)
.first()
)
if prev_new_comment_notification:
update_prev_notification(prev_new_comment_notification, user, reaction)
else:
notification_data_string = json.dumps(
{
"shout": shout_to_shout_data(shout),
"users": [user_to_user_data(user)],
"reactionIds": [reaction.id],
},
ensure_ascii=False,
)
author_notification = Notification.create(
**{
"user": shout.created_by,
"entity": NotificationEntity.REACTION,
"action": NotificationAction.CREATE,
"shout": shout.id,
"data": notification_data_string,
}
)
session.add(author_notification)
notify_user_ids.append(shout.created_by)
session.commit()
for user_id in notify_user_ids:
await connection_manager.notify_user(user_id)
class NotificationService:
def __init__(self):
self._queue = asyncio.Queue(maxsize=1000)
async def handle_reaction(self, reaction_id):
notificator = NewReactionNotificator(reaction_id)
await self._queue.put(notificator)
async def worker(self):
while True:
notificator = await self._queue.get()
try:
await notificator.run()
except Exception as e:
print(f"[NotificationService.worker] error: {str(e)}")
notification_service = NotificationService()

View File

@ -1,43 +0,0 @@
import json
from services.rediscache import redis
from servies.notifier import notification_service
# Каналы для прослушивания
channels = ["reaction", "shout"]
pubsubs = []
def create_notification_channel(redis_conn, channel_name):
pubsub = redis_conn.pubsub()
pubsub.subscribe(channel_name)
return pubsub
def close_notification_channel(pubsub):
pubsub.unsubscribe()
pubsub.close()
def start():
# Подписка на каналы
pubsubs = [create_notification_channel(redis_conn, channel) for channel in channels]
try:
# Бесконечный цикл прослушивания
while True:
for pubsub in pubsubs:
msg = pubsub.get_message()
notification_service.handle_reaction(msg["data"])
except Exception:
pass
finally:
# Отписка от каналов при завершении
for pubsub in pubsubs:
close_notification_channel(pubsub)
def stop():
for pubsub in pubsubs:
close_notification_channel(pubsub)

View File

@ -1,5 +0,0 @@
from ariadne import QueryType, MutationType
query = QueryType()
mutation = MutationType()
resolvers = [query, mutation]