ws-subs
This commit is contained in:
parent
5b67d372ad
commit
a034eda220
|
@ -3,7 +3,6 @@ from typing import Optional, Tuple
|
|||
|
||||
from graphql.type import GraphQLResolveInfo
|
||||
from sqlalchemy.orm import joinedload, exc
|
||||
from sqlalchemy import select, and_
|
||||
from starlette.authentication import AuthenticationBackend
|
||||
from starlette.requests import HTTPConnection
|
||||
|
||||
|
|
20
main.py
20
main.py
|
@ -18,7 +18,11 @@ from resolvers.auth import confirm_email_handler
|
|||
from services.main import storages_init
|
||||
from services.stat.viewed import ViewedStorage
|
||||
from services.zine.gittask import GitTask
|
||||
from settings import DEV_SERVER_STATUS_FILE_NAME, SENTRY_ID
|
||||
from settings import DEV_SERVER_STATUS_FILE_NAME, SENTRY_DSN
|
||||
from ariadne.asgi.handlers import GraphQLTransportWSHandler
|
||||
from services.inbox.presence import on_connect, on_disconnect
|
||||
# from services.inbox.sse import sse_messages
|
||||
|
||||
|
||||
import_module("resolvers")
|
||||
schema = make_executable_schema(load_schema_from_path("schema.graphql"), resolvers) # type: ignore
|
||||
|
@ -39,7 +43,7 @@ async def start_up():
|
|||
print(git_task)
|
||||
try:
|
||||
import sentry_sdk
|
||||
sentry_sdk.init("https://%s@testsentry.discours.io/2" % SENTRY_ID)
|
||||
sentry_sdk.init(SENTRY_DSN)
|
||||
except Exception as e:
|
||||
print('[sentry] init error')
|
||||
print(e)
|
||||
|
@ -63,7 +67,8 @@ async def shutdown():
|
|||
routes = [
|
||||
Route("/oauth/{provider}", endpoint=oauth_login),
|
||||
Route("/oauth-authorize", endpoint=oauth_authorize),
|
||||
Route("/confirm/{token}", endpoint=confirm_email_handler)
|
||||
Route("/confirm/{token}", endpoint=confirm_email_handler),
|
||||
# Route("/chat/{chat_id}", endpoint=sse_messages)
|
||||
]
|
||||
|
||||
app = Starlette(
|
||||
|
@ -73,7 +78,14 @@ app = Starlette(
|
|||
middleware=middleware,
|
||||
routes=routes,
|
||||
)
|
||||
app.mount("/", GraphQL(schema, debug=True))
|
||||
app.mount("/", GraphQL(
|
||||
schema,
|
||||
debug=True,
|
||||
websocket_handler=GraphQLTransportWSHandler(
|
||||
on_connect=on_connect,
|
||||
on_disconnect=on_disconnect
|
||||
)
|
||||
))
|
||||
|
||||
dev_app = app = Starlette(
|
||||
debug=True,
|
||||
|
|
|
@ -13,7 +13,7 @@ from orm.user import User
|
|||
from resolvers.zine.reactions import reactions_follow, reactions_unfollow
|
||||
from services.zine.gittask import GitTask
|
||||
from resolvers.inbox.chats import create_chat
|
||||
from services.inbox import MessagesStorage
|
||||
from services.inbox.storage import MessagesStorage
|
||||
from orm.collab import Collab
|
||||
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0):
|
|||
auth: AuthCredentials = info.context["request"].auth
|
||||
|
||||
cids = await redis.execute("SMEMBERS", "chats_by_user/" + str(auth.user_id))
|
||||
onliners = await redis.execute("SMEMBERS", "users-online")
|
||||
if cids:
|
||||
cids = list(cids)[offset:offset + limit]
|
||||
if not cids:
|
||||
|
@ -56,6 +57,7 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0):
|
|||
"userpic": a.userpic,
|
||||
"name": a.name,
|
||||
"lastSeen": a.lastSeen,
|
||||
"online": a.id in onliners
|
||||
})
|
||||
chats.append(c)
|
||||
return {
|
||||
|
|
|
@ -6,7 +6,8 @@ from auth.authenticate import login_required
|
|||
from auth.credentials import AuthCredentials
|
||||
from base.redis import redis
|
||||
from base.resolvers import mutation, subscription
|
||||
from services.inbox import ChatFollowing, MessageResult, MessagesStorage
|
||||
from services.inbox.helpers import ChatFollowing, MessageResult
|
||||
from services.inbox.storage import MessagesStorage
|
||||
|
||||
|
||||
@mutation.field("createMessage")
|
||||
|
@ -18,7 +19,7 @@ async def create_message(_, info, chat: str, body: str, replyTo=None):
|
|||
chat = await redis.execute("GET", f"chats/{chat}")
|
||||
if not chat:
|
||||
return {
|
||||
"error": "chat not exist"
|
||||
"error": "chat is not exist"
|
||||
}
|
||||
else:
|
||||
chat = dict(json.loads(chat))
|
||||
|
@ -153,6 +154,7 @@ async def message_generator(obj, info):
|
|||
chat = await redis.execute("GET", f"chats/{chat_id}")
|
||||
updated[chat_id] = chat['updatedAt']
|
||||
user_following_chats_sorted = sorted(user_following_chats, key=lambda x: updated[x], reverse=True)
|
||||
|
||||
for chat_id in user_following_chats_sorted:
|
||||
following_chat = ChatFollowing(chat_id)
|
||||
await MessagesStorage.register_chat(following_chat)
|
||||
|
|
|
@ -29,6 +29,7 @@ type ChatMember {
|
|||
name: String!
|
||||
userpic: String
|
||||
lastSeen: DateTime
|
||||
online: Boolean
|
||||
# invitedAt: DateTime
|
||||
# invitedBy: String # user slug
|
||||
# TODO: keep invite databit
|
||||
|
|
14
services/inbox/helpers.py
Normal file
14
services/inbox/helpers.py
Normal file
|
@ -0,0 +1,14 @@
|
|||
import asyncio
|
||||
|
||||
|
||||
class MessageResult:
|
||||
def __init__(self, status, message):
|
||||
self.status = status
|
||||
self.message = message
|
||||
|
||||
|
||||
class ChatFollowing:
|
||||
queue = asyncio.Queue()
|
||||
|
||||
def __init__(self, chat_id):
|
||||
self.chat_id = chat_id
|
43
services/inbox/presence.py
Normal file
43
services/inbox/presence.py
Normal file
|
@ -0,0 +1,43 @@
|
|||
from base.exceptions import Unauthorized
|
||||
from auth.tokenstorage import SessionToken
|
||||
from base.redis import redis
|
||||
|
||||
|
||||
async def set_online_status(user_id, status):
|
||||
if user_id:
|
||||
if status:
|
||||
await redis.execute("SADD", "users-online", user_id)
|
||||
else:
|
||||
await redis.execute("SREM", "users-online", user_id)
|
||||
|
||||
|
||||
async def on_connect(websocket, params):
|
||||
if not isinstance(params, dict):
|
||||
websocket.scope["connection_params"] = {}
|
||||
return
|
||||
token = params.get('token')
|
||||
if not token:
|
||||
raise Unauthorized("Please login")
|
||||
else:
|
||||
payload = await SessionToken.verify(token)
|
||||
if payload and payload.user_id:
|
||||
websocket.scope["user_id"] = payload.user_id
|
||||
await set_online_status(payload.user_id, True)
|
||||
|
||||
|
||||
async def on_disconnect(websocket):
|
||||
user_id = websocket.scope.get("user_id")
|
||||
await set_online_status(user_id, False)
|
||||
|
||||
|
||||
# FIXME: not used yet
|
||||
def context_value(request):
|
||||
context = {}
|
||||
print(f"[inbox.presense] request debug: {request}")
|
||||
if request.scope["type"] == "websocket":
|
||||
# request is an instance of WebSocket
|
||||
context.update(request.scope["connection_params"])
|
||||
else:
|
||||
context["token"] = request.META.get("authorization")
|
||||
|
||||
return context
|
|
@ -1,13 +1,6 @@
|
|||
import asyncio
|
||||
|
||||
|
||||
class ChatFollowing:
|
||||
queue = asyncio.Queue()
|
||||
|
||||
def __init__(self, chat_id):
|
||||
self.chat_id = chat_id
|
||||
|
||||
|
||||
class MessagesStorage:
|
||||
lock = asyncio.Lock()
|
||||
chats = []
|
||||
|
@ -28,9 +21,3 @@ class MessagesStorage:
|
|||
for chat in MessagesStorage.chats:
|
||||
if message_result.message["chatId"] == chat.chat_id:
|
||||
chat.queue.put_nowait(message_result)
|
||||
|
||||
|
||||
class MessageResult:
|
||||
def __init__(self, status, message):
|
||||
self.status = status
|
||||
self.message = message
|
|
@ -26,6 +26,7 @@ FRONTEND_URL = environ.get("FRONTEND_URL") or "http://localhost:3000"
|
|||
SHOUTS_REPO = "content"
|
||||
SESSION_TOKEN_HEADER = "Authorization"
|
||||
|
||||
SENTRY_DSN = environ.get("SENTRY_DSN")
|
||||
|
||||
# for local development
|
||||
DEV_SERVER_STATUS_FILE_NAME = 'dev-server-status.txt'
|
||||
SENTRY_ID = environ.get("SENTRY_ID")
|
||||
|
|
Loading…
Reference in New Issue
Block a user