core/resolvers/inbox.py

118 lines
2.7 KiB
Python
Raw Normal View History

2021-07-01 18:26:04 +00:00
from orm import Message, User
2021-08-05 16:49:08 +00:00
from orm.base import local_session
2021-07-02 09:16:43 +00:00
from resolvers.base import mutation, query, subscription
2021-07-01 18:26:04 +00:00
from auth.authenticate import login_required
2021-07-02 09:16:43 +00:00
import asyncio
2021-07-08 07:58:49 +00:00
class MessageQueue:
new_message = asyncio.Queue()
updated_message = asyncio.Queue()
deleted_message = asyncio.Queue()
2021-07-02 09:16:43 +00:00
2021-07-01 18:26:04 +00:00
@mutation.field("createMessage")
@login_required
2021-08-04 13:38:56 +00:00
async def create_message(_, info, body, replyTo = None):
2021-07-01 18:26:04 +00:00
auth = info.context["request"].auth
user_id = auth.user_id
new_message = Message.create(
author = user_id,
2021-08-04 13:38:56 +00:00
body = body,
replyTo = replyTo
2021-07-01 18:26:04 +00:00
)
2021-07-08 07:58:49 +00:00
MessageQueue.new_message.put_nowait(new_message)
2021-07-02 09:16:43 +00:00
2021-08-04 13:38:56 +00:00
return {"message" : new_message}
2021-07-01 18:26:04 +00:00
@query.field("getMessages")
@login_required
async def get_messages(_, info, count, page):
auth = info.context["request"].auth
user_id = auth.user_id
2021-08-05 16:49:08 +00:00
with local_session() as session:
messages = session.query(Message).filter(Message.author == user_id)
2021-07-01 18:26:04 +00:00
return messages
2021-08-05 16:49:08 +00:00
def check_and_get_message(message_id, user_id, session) :
message = session.query(Message).filter(Message.id == message_id).first()
2021-07-01 18:26:04 +00:00
if not message :
raise Exception("invalid id")
if message.author != user_id :
raise Exception("access denied")
return message
2021-07-01 18:26:04 +00:00
@mutation.field("updateMessage")
@login_required
2021-08-04 13:38:56 +00:00
async def update_message(_, info, id, body):
2021-07-01 18:26:04 +00:00
auth = info.context["request"].auth
user_id = auth.user_id
2021-08-05 16:49:08 +00:00
with local_session() as session:
try:
message = check_and_get_message(id, user_id, session)
except Exception as err:
return {"error" : err}
2021-07-01 18:26:04 +00:00
2021-08-05 16:49:08 +00:00
message.body = body
session.commit()
2021-07-01 18:26:04 +00:00
2021-07-08 07:58:49 +00:00
MessageQueue.updated_message.put_nowait(message)
2021-07-02 09:16:43 +00:00
2021-08-04 13:38:56 +00:00
return {"message" : message}
2021-07-01 18:26:04 +00:00
@mutation.field("deleteMessage")
@login_required
async def delete_message(_, info, id):
auth = info.context["request"].auth
user_id = auth.user_id
2021-08-05 16:49:08 +00:00
with local_session() as session:
try:
message = check_and_get_message(id, user_id, session)
except Exception as err:
return {"error" : err}
2021-07-01 18:26:04 +00:00
2021-08-05 16:49:08 +00:00
session.delete(message)
session.commit()
2021-07-01 18:26:04 +00:00
2021-07-08 07:58:49 +00:00
MessageQueue.deleted_message.put_nowait(message)
2021-07-02 09:16:43 +00:00
2021-08-04 13:38:56 +00:00
return {}
2021-07-02 09:16:43 +00:00
@subscription.source("messageCreated")
async def new_message_generator(obj, info):
while True:
2021-07-08 07:58:49 +00:00
new_message = await MessageQueue.new_message.get()
2021-07-02 09:16:43 +00:00
yield new_message
@subscription.source("messageUpdated")
async def updated_message_generator(obj, info):
while True:
2021-07-08 07:58:49 +00:00
message = await MessageQueue.updated_message.get()
2021-07-02 09:16:43 +00:00
yield message
@subscription.source("messageDeleted")
async def deleted_message_generator(obj, info):
while True:
2021-07-08 07:58:49 +00:00
message = await MessageQueue.deleted_message.get()
2021-07-02 09:16:43 +00:00
yield new_message
@subscription.field("messageCreated")
@subscription.field("messageUpdated")
@subscription.field("messageDeleted")
def message_resolver(message, info):
return message