This commit is contained in:
Untone 2023-10-11 11:56:46 +03:00
commit 6252671b85
21 changed files with 651 additions and 254 deletions

View File

@ -36,7 +36,7 @@ class JWTCodec:
issuer="discours", issuer="discours",
) )
r = TokenPayload(**payload) r = TokenPayload(**payload)
print("[auth.jwtcodec] debug token %r" % r) # print('[auth.jwtcodec] debug token %r' % r)
return r return r
except jwt.InvalidIssuedAtError: except jwt.InvalidIssuedAtError:
print("[auth.jwtcodec] invalid issued at: %r" % payload) print("[auth.jwtcodec] invalid issued at: %r" % payload)

0
base/redis.py Normal file
View File

0
base/resolvers.py Normal file
View File

17
main.py
View File

@ -13,8 +13,6 @@ from orm import init_tables
from auth.authenticate import JWTAuthenticate from auth.authenticate import JWTAuthenticate
from auth.oauth import oauth_login, oauth_authorize from auth.oauth import oauth_login, oauth_authorize
from services.redis import redis
from services.schema import resolvers
from resolvers.auth import confirm_email_handler from resolvers.auth import confirm_email_handler
from resolvers.upload import upload_handler from resolvers.upload import upload_handler
from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN
@ -26,7 +24,7 @@ import_module("resolvers")
schema = make_executable_schema(load_schema_from_path("schemas/core.graphql"), resolvers) # type: ignore schema = make_executable_schema(load_schema_from_path("schemas/core.graphql"), resolvers) # type: ignore
middleware = [ middleware = [
Middleware(AuthenticationMiddleware, backend=JWTAuthenticate()), Middleware(AuthenticationMiddleware, backend=JWTAuthenticate()),
Middleware(SessionMiddleware, secret_key="!secret"), Middleware(SessionMiddleware, secret_key=SESSION_SECRET_KEY),
] ]
@ -39,7 +37,6 @@ async def start_up():
_views_stat_task = asyncio.create_task(ViewedStorage().worker()) _views_stat_task = asyncio.create_task(ViewedStorage().worker())
try: try:
import sentry_sdk import sentry_sdk
sentry_sdk.init(SENTRY_DSN) sentry_sdk.init(SENTRY_DSN)
print("[sentry] started") print("[sentry] started")
except Exception as e: except Exception as e:
@ -78,14 +75,12 @@ app = Starlette(
middleware=middleware, middleware=middleware,
routes=routes, routes=routes,
) )
app.mount( app.mount("/", GraphQL(
"/", schema,
GraphQL(schema, debug=True), debug=True
) ))
print("[main] app mounted") dev_app = Starlette(
dev_app = app = Starlette(
debug=True, debug=True,
on_startup=[dev_start_up], on_startup=[dev_start_up],
on_shutdown=[shutdown], on_shutdown=[shutdown],

View File

View File

View File

@ -7,7 +7,18 @@ from orm.shout import Shout
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from orm.user import User, UserRating from orm.user import User, UserRating
# NOTE: keep orm module isolated
def init_tables():
Base.metadata.create_all(engine)
Operation.init_table()
Resource.init_table()
User.init_table()
Community.init_table()
Role.init_table()
UserRating.init_table()
Shout.init_table()
print("[orm] tables initialized")
__all__ = [ __all__ = [
"User", "User",
@ -21,16 +32,5 @@ __all__ = [
"Notification", "Notification",
"Reaction", "Reaction",
"UserRating", "UserRating",
"init_tables"
] ]
def init_tables():
Base.metadata.create_all(engine)
Operation.init_table()
Resource.init_table()
User.init_table()
Community.init_table()
Role.init_table()
UserRating.init_table()
Shout.init_table()
print("[orm] tables initialized")

View File

@ -1,13 +1,25 @@
from datetime import datetime from datetime import datetime
from sqlalchemy import Column, String, JSON, ForeignKey, DateTime, Boolean from sqlalchemy import Column, Enum, ForeignKey, DateTime, Boolean, Integer
from services.db import Base from sqlalchemy.dialects.postgresql import JSONB
from base.orm import Base
from enum import Enum as Enumeration
class NotificationType(Enumeration):
NEW_REACTION = 1
NEW_SHOUT = 2
NEW_FOLLOWER = 3
class Notification(Base): class Notification(Base):
__tablename__ = "notification" __tablename__ = "notification"
shout = Column(ForeignKey("shout.id"), index=True)
reaction = Column(ForeignKey("reaction.id"), index=True)
user = Column(ForeignKey("user.id"), index=True) user = Column(ForeignKey("user.id"), index=True)
createdAt = Column(DateTime, nullable=False, default=datetime.now, index=True) createdAt = Column(DateTime, nullable=False, default=datetime.now, index=True)
seen = Column(Boolean, nullable=False, default=False, index=True) seen = Column(Boolean, nullable=False, default=False, index=True)
type = Column(String, nullable=False) type = Column(Enum(NotificationType), nullable=False)
data = Column(JSON, nullable=True) data = Column(JSONB, nullable=True)
occurrences = Column(Integer, default=1)

0
resetdb.sh Normal file → Executable file
View File

View File

View File

@ -137,7 +137,7 @@ async def load_shouts_by(_, info, options):
""" """
:param options: { :param options: {
filters: { filters: {
layout: 'audio', layout: 'music',
excludeLayout: 'article', excludeLayout: 'article',
visibility: "public", visibility: "public",
author: 'discours', author: 'discours',
@ -208,6 +208,7 @@ async def load_shouts_by(_, info, options):
@query.field("loadDrafts") @query.field("loadDrafts")
@login_required
async def get_drafts(_, info): async def get_drafts(_, info):
auth: AuthCredentials = info.context["request"].auth auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id user_id = auth.user_id

View File

@ -0,0 +1,84 @@
from sqlalchemy import select, desc, and_, update
from auth.credentials import AuthCredentials
from base.resolvers import query, mutation
from auth.authenticate import login_required
from base.orm import local_session
from orm import Notification
@query.field("loadNotifications")
@login_required
async def load_notifications(_, info, params=None):
if params is None:
params = {}
auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id
limit = params.get('limit', 50)
offset = params.get('offset', 0)
q = select(Notification).where(
Notification.user == user_id
).order_by(desc(Notification.createdAt)).limit(limit).offset(offset)
with local_session() as session:
total_count = session.query(Notification).where(
Notification.user == user_id
).count()
total_unread_count = session.query(Notification).where(
and_(
Notification.user == user_id,
Notification.seen is False
)
).count()
notifications = session.execute(q).fetchall()
return {
"notifications": notifications,
"totalCount": total_count,
"totalUnreadCount": total_unread_count
}
@mutation.field("markNotificationAsRead")
@login_required
async def mark_notification_as_read(_, info, notification_id: int):
auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id
with local_session() as session:
notification = session.query(Notification).where(
and_(Notification.id == notification_id, Notification.user == user_id)
).one()
notification.seen = True
session.commit()
return {}
@mutation.field("markAllNotificationsAsRead")
@login_required
async def mark_all_notifications_as_read(_, info):
auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id
statement = update(Notification).where(
and_(
Notification.user == user_id,
Notification.seen == False
)
).values(seen=True)
with local_session() as session:
try:
session.execute(statement)
session.commit()
except Exception as e:
session.rollback()
print(f"[mark_all_notifications_as_read] error: {str(e)}")
return {}

View File

@ -266,10 +266,20 @@ async def get_authors_all(_, _info):
@query.field("getAuthor") @query.field("getAuthor")
async def get_author(_, _info, slug): async def get_author(_, _info, slug):
q = select(User).where(User.slug == slug) q = select(User).where(User.slug == slug)
q = add_author_stat_columns(q, True) q = add_author_stat_columns(q)
authors = get_authors_from_query(q) [author] = get_authors_from_query(q)
return authors[0]
with local_session() as session:
comments_count = session.query(Reaction).where(
and_(
Reaction.createdBy == author.id,
Reaction.kind == ReactionKind.COMMENT
)
).count()
author.stat["commented"] = comments_count
return author
@query.field("loadAuthorsBy") @query.field("loadAuthorsBy")

View File

@ -10,6 +10,7 @@ from services.schema import mutation, query
from orm.reaction import Reaction, ReactionKind from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout, ShoutReactionsFollower from orm.shout import Shout, ShoutReactionsFollower
from orm.user import User from orm.user import User
from services.notifications.notification_service import notification_service
def add_reaction_stat_columns(q): def add_reaction_stat_columns(q):
@ -217,6 +218,8 @@ async def create_reaction(_, info, reaction):
r = Reaction.create(**reaction) r = Reaction.create(**reaction)
# Proposal accepting logix # Proposal accepting logix
# FIXME: will break if there will be 2 proposals
# FIXME: will break if shout will be changed
if ( if (
r.replyTo is not None r.replyTo is not None
and r.kind == ReactionKind.ACCEPT and r.kind == ReactionKind.ACCEPT
@ -237,12 +240,14 @@ async def create_reaction(_, info, reaction):
session.add(r) session.add(r)
session.commit() session.commit()
await notification_service.handle_new_reaction(r.id)
rdict = r.dict() rdict = r.dict()
rdict["shout"] = shout.dict() rdict["shout"] = shout.dict()
rdict["createdBy"] = author.dict() rdict["createdBy"] = author.dict()
# self-regulation mechanics # self-regulation mechanics
if check_to_hide(session, auth.user_id, r): if check_to_hide(session, auth.user_id, r):
set_hidden(session, r.shout) set_hidden(session, r.shout)
elif check_to_publish(session, auth.user_id, r): elif check_to_publish(session, auth.user_id, r):

View File

View File

@ -7,18 +7,18 @@ type _Service {
################################### Payload ################################### ################################### Payload ###################################
type UserFollowings { type UserFollowings {
unread: Int unread: Int
topics: [String] topics: [String]
authors: [String] authors: [String]
reactions: [Int] reactions: [Int]
communities: [String] communities: [String]
} }
type AuthResult { type AuthResult {
error: String error: String
token: String token: String
user: User user: User
news: UserFollowings news: UserFollowings
} }
type AuthorStat { type AuthorStat {
@ -61,107 +61,118 @@ type Result {
} }
enum ReactionStatus { enum ReactionStatus {
NEW NEW
UPDATED UPDATED
CHANGED CHANGED
EXPLAINED EXPLAINED
DELETED DELETED
} }
type ReactionUpdating { type ReactionUpdating {
error: String error: String
status: ReactionStatus status: ReactionStatus
reaction: Reaction reaction: Reaction
} }
################################### Inputs ################################### ################################### Inputs ###################################
input ShoutInput { input ShoutInput {
slug: String slug: String
title: String title: String
body: String body: String
lead: String lead: String
description: String description: String
layout: String layout: String
media: String media: String
authors: [String] authors: [String]
topics: [TopicInput] topics: [TopicInput]
community: Int community: Int
mainTopic: TopicInput mainTopic: TopicInput
subtitle: String subtitle: String
cover: String cover: String
} }
input ProfileInput { input ProfileInput {
slug: String slug: String
name: String name: String
userpic: String userpic: String
links: [String] links: [String]
bio: String bio: String
about: String about: String
} }
input TopicInput { input TopicInput {
id: Int, id: Int,
slug: String! slug: String!
# community: String! # community: String!
title: String title: String
body: String body: String
pic: String pic: String
# children: [String] # children: [String]
# parents: [String] # parents: [String]
} }
input ReactionInput { input ReactionInput {
kind: ReactionKind! kind: ReactionKind!
shout: Int! shout: Int!
range: String range: String
body: String body: String
replyTo: Int replyTo: Int
} }
enum FollowingEntity { enum FollowingEntity {
TOPIC TOPIC
AUTHOR AUTHOR
COMMUNITY COMMUNITY
REACTIONS REACTIONS
} }
################################### Mutation ################################### Mutation
type Mutation { type Mutation {
# inbox
createChat(title: String, members: [Int]!): Result!
updateChat(chat: ChatInput!): Result!
deleteChat(chatId: String!): Result!
# auth createMessage(chat: String!, body: String!, replyTo: Int): Result!
getSession: AuthResult! updateMessage(chatId: String!, id: Int!, body: String!): Result!
registerUser(email: String!, password: String, name: String): AuthResult! deleteMessage(chatId: String!, id: Int!): Result!
sendLink(email: String!, lang: String, template: String): Result! markAsRead(chatId: String!, ids: [Int]!): Result!
confirmEmail(token: String!): AuthResult!
# shout # auth
createShout(inp: ShoutInput!): Result! getSession: AuthResult!
updateShout(shout_id: Int!, shout_input: ShoutInput, publish: Boolean): Result! registerUser(email: String!, password: String, name: String): AuthResult!
deleteShout(shout_id: Int!): Result! sendLink(email: String!, lang: String, template: String): Result!
confirmEmail(token: String!): AuthResult!
# user profile # shout
rateUser(slug: String!, value: Int!): Result! createShout(inp: ShoutInput!): Result!
updateOnlineStatus: Result! updateShout(shout_id: Int!, shout_input: ShoutInput, publish: Boolean): Result!
updateProfile(profile: ProfileInput!): Result! deleteShout(shout_id: Int!): Result!
# topics # user profile
createTopic(input: TopicInput!): Result! rateUser(slug: String!, value: Int!): Result!
# TODO: mergeTopics(t1: String!, t2: String!): Result! updateProfile(profile: ProfileInput!): Result!
updateTopic(input: TopicInput!): Result!
destroyTopic(slug: String!): Result!
# reactions # topics
createReaction(reaction: ReactionInput!): Result! createTopic(input: TopicInput!): Result!
updateReaction(id: Int!, reaction: ReactionInput!): Result! # TODO: mergeTopics(t1: String!, t2: String!): Result!
deleteReaction(id: Int!): Result! updateTopic(input: TopicInput!): Result!
destroyTopic(slug: String!): Result!
# following # reactions
follow(what: FollowingEntity!, slug: String!): Result! createReaction(reaction: ReactionInput!): Result!
unfollow(what: FollowingEntity!, slug: String!): Result! updateReaction(id: Int!, reaction: ReactionInput!): Result!
deleteReaction(id: Int!): Result!
# following
follow(what: FollowingEntity!, slug: String!): Result!
unfollow(what: FollowingEntity!, slug: String!): Result!
markNotificationAsRead(notification_id: Int!): Result!
markAllNotificationsAsRead: Result!
} }
input AuthorsBy { input AuthorsBy {
@ -176,24 +187,24 @@ input AuthorsBy {
} }
input LoadShoutsFilters { input LoadShoutsFilters {
title: String title: String
body: String body: String
topic: String topic: String
author: String author: String
layout: String layout: String
excludeLayout: String excludeLayout: String
visibility: String visibility: String
days: Int days: Int
reacted: Boolean reacted: Boolean
} }
input LoadShoutsOptions { input LoadShoutsOptions {
filters: LoadShoutsFilters filters: LoadShoutsFilters
with_author_captions: Boolean with_author_captions: Boolean
limit: Int! limit: Int!
offset: Int offset: Int
order_by: String order_by: String
order_by_desc: Boolean order_by_desc: Boolean
} }
input ReactionBy { input ReactionBy {
@ -206,7 +217,17 @@ input ReactionBy {
days: Int # before days: Int # before
sort: String # how to sort, default createdAt sort: String # how to sort, default createdAt
} }
################################### Query
input NotificationsQueryParams {
limit: Int
offset: Int
}
type NotificationsQueryResult {
notifications: [Notification]!
totalCount: Int!
totalUnreadCount: Int!
}
type Query { type Query {
@ -245,178 +266,194 @@ type Query {
############################################ Entities ############################################ Entities
type Resource { type Resource {
id: Int! id: Int!
name: String! name: String!
} }
type Operation { type Operation {
id: Int! id: Int!
name: String! name: String!
} }
type Permission { type Permission {
operation: Int! operation: Int!
resource: Int! resource: Int!
} }
type Role { type Role {
id: Int! id: Int!
name: String! name: String!
community: String! community: String!
desc: String desc: String
permissions: [Permission!]! permissions: [Permission!]!
} }
type Rating { type Rating {
rater: String! rater: String!
value: Int! value: Int!
} }
type User { type User {
id: Int! id: Int!
username: String! # to login, ex. email, phone username: String! # to login, ex. email, phone
createdAt: DateTime! createdAt: DateTime!
lastSeen: DateTime lastSeen: DateTime
slug: String! slug: String!
name: String # to display name: String # to display
email: String email: String
password: String password: String
oauth: String # provider:token oauth: String # provider:token
userpic: String userpic: String
links: [String] links: [String]
emailConfirmed: Boolean # should contain all emails too emailConfirmed: Boolean # should contain all emails too
muted: Boolean muted: Boolean
updatedAt: DateTime updatedAt: DateTime
ratings: [Rating] ratings: [Rating]
bio: String bio: String
about: String about: String
communities: [Int] # user participating communities communities: [Int] # user participating communities
oid: String oid: String
} }
enum ReactionKind { enum ReactionKind {
LIKE LIKE
DISLIKE DISLIKE
AGREE AGREE
DISAGREE DISAGREE
PROOF PROOF
DISPROOF DISPROOF
COMMENT COMMENT
QUOTE QUOTE
PROPOSE PROPOSE
ASK ASK
REMARK REMARK
FOOTNOTE FOOTNOTE
ACCEPT ACCEPT
REJECT REJECT
} }
type Reaction { type Reaction {
id: Int! id: Int!
shout: Shout! shout: Shout!
createdAt: DateTime! createdAt: DateTime!
createdBy: User! createdBy: User!
updatedAt: DateTime updatedAt: DateTime
deletedAt: DateTime deletedAt: DateTime
deletedBy: User deletedBy: User
range: String # full / 0:2340 range: String # full / 0:2340
kind: ReactionKind! kind: ReactionKind!
body: String body: String
replyTo: Int replyTo: Int
stat: Stat stat: Stat
old_id: String old_id: String
old_thread: String old_thread: String
} }
# is publication # is publication
type Shout { type Shout {
id: Int! id: Int!
slug: String! slug: String!
body: String! body: String!
lead: String lead: String
description: String description: String
createdAt: DateTime! createdAt: DateTime!
topics: [Topic] topics: [Topic]
mainTopic: String mainTopic: String
title: String title: String
subtitle: String subtitle: String
authors: [Author] authors: [Author]
lang: String lang: String
community: String community: String
cover: String cover: String
layout: String # audio video literature image layout: String # music video literature image
versionOf: String # for translations and re-telling the same story versionOf: String # for translations and re-telling the same story
visibility: String # owner authors community public visibility: String # owner authors community public
updatedAt: DateTime updatedAt: DateTime
updatedBy: User updatedBy: User
deletedAt: DateTime deletedAt: DateTime
deletedBy: User deletedBy: User
publishedAt: DateTime publishedAt: DateTime
media: String # json [ { title pic url body }, .. ] media: String # json [ { title pic url body }, .. ]
stat: Stat stat: Stat
} }
type Stat { type Stat {
viewed: Int viewed: Int
reacted: Int reacted: Int
rating: Int rating: Int
commented: Int commented: Int
ranking: Int ranking: Int
} }
type Community { type Community {
id: Int! id: Int!
slug: String! slug: String!
name: String! name: String!
desc: String desc: String
pic: String! pic: String!
createdAt: DateTime! createdAt: DateTime!
createdBy: User! createdBy: User!
} }
type Collection { type Collection {
id: Int! id: Int!
slug: String! slug: String!
title: String! title: String!
desc: String desc: String
amount: Int amount: Int
publishedAt: DateTime publishedAt: DateTime
createdAt: DateTime! createdAt: DateTime!
createdBy: User! createdBy: User!
} }
type TopicStat { type TopicStat {
shouts: Int! shouts: Int!
followers: Int! followers: Int!
authors: Int! authors: Int!
# viewed: Int # viewed: Int
# reacted: Int! # reacted: Int!
# commented: Int # commented: Int
# rating: Int # rating: Int
} }
type Topic { type Topic {
id: Int! id: Int!
slug: String! slug: String!
title: String title: String
body: String body: String
pic: String pic: String
# community: Community! # community: Community!
stat: TopicStat stat: TopicStat
oid: String oid: String
} }
type Token { type Token {
createdAt: DateTime! createdAt: DateTime!
expiresAt: DateTime expiresAt: DateTime
id: Int! id: Int!
ownerId: Int! ownerId: Int!
usedAt: DateTime usedAt: DateTime
value: String! value: String!
}
enum NotificationType {
NEW_COMMENT,
NEW_REPLY
}
type Notification {
id: Int!
shout: Int
reaction: Int
type: NotificationType
createdAt: DateTime!
seen: Boolean!
data: String # JSON
occurrences: Int!
} }

View File

@ -44,7 +44,7 @@ log_settings = {
local_headers = [ local_headers = [
("Access-Control-Allow-Methods", "GET, POST, OPTIONS, HEAD"), ("Access-Control-Allow-Methods", "GET, POST, OPTIONS, HEAD"),
("Access-Control-Allow-Origin", "http://localhost:3000"), ("Access-Control-Allow-Origin", "https://localhost:3000"),
( (
"Access-Control-Allow-Headers", "Access-Control-Allow-Headers",
"DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,Authorization", "DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,Authorization",

View File

@ -0,0 +1,137 @@
import asyncio
import json
from datetime import datetime, timezone
from sqlalchemy import and_
from base.orm import local_session
from orm import Reaction, Shout, Notification, User
from orm.notification import NotificationType
from orm.reaction import ReactionKind
from services.notifications.sse import connection_manager
def update_prev_notification(notification, user):
notification_data = json.loads(notification.data)
notification_data["users"] = [
user for user in notification_data["users"] if user['id'] != user.id
]
notification_data["users"].append({
"id": user.id,
"name": user.name
})
notification.data = json.dumps(notification_data, ensure_ascii=False)
notification.seen = False
notification.occurrences = notification.occurrences + 1
notification.createdAt = datetime.now(tz=timezone.utc)
class NewReactionNotificator:
def __init__(self, reaction_id):
self.reaction_id = reaction_id
async def run(self):
with local_session() as session:
reaction = session.query(Reaction).where(Reaction.id == self.reaction_id).one()
shout = session.query(Shout).where(Shout.id == reaction.shout).one()
user = session.query(User).where(User.id == reaction.createdBy).one()
notify_user_ids = []
if reaction.kind == ReactionKind.COMMENT:
parent_reaction = None
if reaction.replyTo:
parent_reaction = session.query(Reaction).where(Reaction.id == reaction.replyTo).one()
if parent_reaction.createdBy != reaction.createdBy:
prev_new_reply_notification = session.query(Notification).where(
and_(
Notification.user == shout.createdBy,
Notification.type == NotificationType.NEW_REPLY,
Notification.shout == shout.id,
Notification.reaction == parent_reaction.id
)
).first()
if prev_new_reply_notification:
update_prev_notification(prev_new_reply_notification, user)
else:
reply_notification_data = json.dumps({
"shout": {
"title": shout.title
},
"users": [
{"id": user.id, "name": user.name}
]
}, ensure_ascii=False)
reply_notification = Notification.create(**{
"user": parent_reaction.createdBy,
"type": NotificationType.NEW_REPLY.name,
"shout": shout.id,
"reaction": parent_reaction.id,
"data": reply_notification_data
})
session.add(reply_notification)
notify_user_ids.append(parent_reaction.createdBy)
if reaction.createdBy != shout.createdBy and (
parent_reaction is None or parent_reaction.createdBy != shout.createdBy
):
prev_new_comment_notification = session.query(Notification).where(
and_(
Notification.user == shout.createdBy,
Notification.type == NotificationType.NEW_COMMENT,
Notification.shout == shout.id
)
).first()
if prev_new_comment_notification:
update_prev_notification(prev_new_comment_notification, user)
else:
notification_data_string = json.dumps({
"shout": {
"title": shout.title
},
"users": [
{"id": user.id, "name": user.name}
]
}, ensure_ascii=False)
author_notification = Notification.create(**{
"user": shout.createdBy,
"type": NotificationType.NEW_COMMENT.name,
"shout": shout.id,
"data": notification_data_string
})
session.add(author_notification)
notify_user_ids.append(shout.createdBy)
session.commit()
for user_id in notify_user_ids:
await connection_manager.notify_user(user_id)
class NotificationService:
def __init__(self):
self._queue = asyncio.Queue()
async def handle_new_reaction(self, reaction_id):
notificator = NewReactionNotificator(reaction_id)
await self._queue.put(notificator)
async def worker(self):
while True:
notificator = await self._queue.get()
try:
await notificator.run()
except Exception as e:
print(f'[NotificationService.worker] error: {str(e)}')
notification_service = NotificationService()

View File

@ -0,0 +1,72 @@
import json
from sse_starlette.sse import EventSourceResponse
from starlette.requests import Request
import asyncio
class ConnectionManager:
def __init__(self):
self.connections_by_user_id = {}
def add_connection(self, user_id, connection):
if user_id not in self.connections_by_user_id:
self.connections_by_user_id[user_id] = []
self.connections_by_user_id[user_id].append(connection)
def remove_connection(self, user_id, connection):
if user_id not in self.connections_by_user_id:
return
self.connections_by_user_id[user_id].remove(connection)
if len(self.connections_by_user_id[user_id]) == 0:
del self.connections_by_user_id[user_id]
async def notify_user(self, user_id):
if user_id not in self.connections_by_user_id:
return
for connection in self.connections_by_user_id[user_id]:
data = {
"type": "newNotifications"
}
data_string = json.dumps(data, ensure_ascii=False)
await connection.put(data_string)
async def broadcast(self, data: str):
for user_id in self.connections_by_user_id:
for connection in self.connections_by_user_id[user_id]:
await connection.put(data)
class Connection:
def __init__(self):
self._queue = asyncio.Queue()
async def put(self, data: str):
await self._queue.put(data)
async def listen(self):
data = await self._queue.get()
return data
connection_manager = ConnectionManager()
async def sse_subscribe_handler(request: Request):
user_id = int(request.path_params["user_id"])
connection = Connection()
connection_manager.add_connection(user_id, connection)
async def event_publisher():
try:
while True:
data = await connection.listen()
yield data
except asyncio.CancelledError as e:
connection_manager.remove_connection(user_id, connection)
raise e
return EventSourceResponse(event_publisher())

View File

@ -27,6 +27,7 @@ SHOUTS_REPO = "content"
SESSION_TOKEN_HEADER = "Authorization" SESSION_TOKEN_HEADER = "Authorization"
SENTRY_DSN = environ.get("SENTRY_DSN") SENTRY_DSN = environ.get("SENTRY_DSN")
SESSION_SECRET_KEY = environ.get("SESSION_SECRET_KEY") or "!secret"
# for local development # for local development
DEV_SERVER_PID_FILE_NAME = 'dev-server.pid' DEV_SERVER_PID_FILE_NAME = 'dev-server.pid'

43
test/test.json Normal file

File diff suppressed because one or more lines are too long