fixed-replyto
This commit is contained in:
parent
c812db810b
commit
4d1759c188
20
main.py
20
main.py
|
@ -19,8 +19,8 @@ from services.main import storages_init
|
|||
from services.stat.viewed import ViewedStorage
|
||||
from services.zine.gittask import GitTask
|
||||
from settings import DEV_SERVER_STATUS_FILE_NAME, SENTRY_DSN
|
||||
from ariadne.asgi.handlers import GraphQLTransportWSHandler
|
||||
from services.inbox.presence import on_connect, on_disconnect
|
||||
# from sse.transport import GraphQLSSEHandler
|
||||
# from services.inbox.presence import on_connect, on_disconnect
|
||||
# from services.inbox.sse import sse_messages
|
||||
|
||||
|
||||
|
@ -81,10 +81,10 @@ app = Starlette(
|
|||
app.mount("/", GraphQL(
|
||||
schema,
|
||||
debug=True,
|
||||
websocket_handler=GraphQLTransportWSHandler(
|
||||
on_connect=on_connect,
|
||||
on_disconnect=on_disconnect
|
||||
)
|
||||
# websocket_handler=GraphQLTransportWSHandler(
|
||||
# on_connect=on_connect,
|
||||
# on_disconnect=on_disconnect
|
||||
# )
|
||||
))
|
||||
|
||||
dev_app = app = Starlette(
|
||||
|
@ -97,8 +97,8 @@ dev_app = app = Starlette(
|
|||
dev_app.mount("/", GraphQL(
|
||||
schema,
|
||||
debug=True,
|
||||
websocket_handler=GraphQLTransportWSHandler(
|
||||
on_connect=on_connect,
|
||||
on_disconnect=on_disconnect
|
||||
)
|
||||
# websocket_handler=GraphQLTransportWSHandler(
|
||||
# on_connect=on_connect,
|
||||
# on_disconnect=on_disconnect
|
||||
# )
|
||||
))
|
||||
|
|
|
@ -11,27 +11,35 @@ from resolvers.zine.profile import followed_authors
|
|||
from .unread import get_unread_counter
|
||||
|
||||
|
||||
async def load_messages(chat_id: str, limit: int, offset: int):
|
||||
async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids=[]):
|
||||
''' load :limit messages for :chat_id with :offset '''
|
||||
messages = []
|
||||
# print(f'[inbox] loading messages by chat: {chat_id}[{offset}:{offset + limit}]')
|
||||
message_ids = []
|
||||
if ids:
|
||||
message_ids += ids
|
||||
try:
|
||||
message_ids = await redis.lrange(f"chats/{chat_id}/message_ids",
|
||||
offset,
|
||||
offset + limit
|
||||
)
|
||||
|
||||
# print(f'[inbox] message_ids: {message_ids}')
|
||||
if limit:
|
||||
message_ids = await redis.lrange(f"chats/{chat_id}/message_ids",
|
||||
offset,
|
||||
offset + limit
|
||||
)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
if message_ids:
|
||||
message_keys = [
|
||||
f"chats/{chat_id}/messages/{mid.decode('utf-8')}" for mid in message_ids
|
||||
]
|
||||
# print(message_keys)
|
||||
messages = await redis.mget(*message_keys)
|
||||
messages = [json.loads(msg.decode('utf-8')) for msg in messages]
|
||||
# print('[inbox] messages \n%r' % messages)
|
||||
replies = []
|
||||
for m in messages:
|
||||
rt = m.get('replyTo')
|
||||
if rt:
|
||||
rt = int(rt)
|
||||
if rt not in message_ids:
|
||||
replies.append(rt)
|
||||
if replies:
|
||||
messages += await load_messages(chat_id, limit=0, ids=replies)
|
||||
return messages
|
||||
|
||||
|
||||
|
|
|
@ -30,10 +30,10 @@ async def create_message(_, info, chat: str, body: str, replyTo=None):
|
|||
"id": message_id,
|
||||
"author": auth.user_id,
|
||||
"body": body,
|
||||
"createdAt": int(datetime.now(tz=timezone.utc).timestamp()),
|
||||
"createdAt": int(datetime.now(tz=timezone.utc).timestamp())
|
||||
}
|
||||
if replyTo:
|
||||
new_message = int(replyTo)
|
||||
new_message['replyTo'] = replyTo
|
||||
chat['updatedAt'] = new_message['createdAt']
|
||||
await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat))
|
||||
print(f"[inbox] creating message {new_message}")
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
from datetime import datetime, timedelta, timezone
|
||||
from sqlalchemy import and_, asc, desc, select, text, func
|
||||
from sqlalchemy.orm import aliased
|
||||
from auth.authenticate import login_required
|
||||
from auth.credentials import AuthCredentials
|
||||
from base.orm import local_session
|
||||
|
|
|
@ -11,9 +11,9 @@ async def set_online_status(user_id, status):
|
|||
await redis.execute("SREM", "users-online", user_id)
|
||||
|
||||
|
||||
async def on_connect(websocket, params):
|
||||
async def on_connect(req, params):
|
||||
if not isinstance(params, dict):
|
||||
websocket.scope["connection_params"] = {}
|
||||
req.scope["connection_params"] = {}
|
||||
return
|
||||
token = params.get('token')
|
||||
if not token:
|
||||
|
@ -21,12 +21,12 @@ async def on_connect(websocket, params):
|
|||
else:
|
||||
payload = await SessionToken.verify(token)
|
||||
if payload and payload.user_id:
|
||||
websocket.scope["user_id"] = payload.user_id
|
||||
req.scope["user_id"] = payload.user_id
|
||||
await set_online_status(payload.user_id, True)
|
||||
|
||||
|
||||
async def on_disconnect(websocket):
|
||||
user_id = websocket.scope.get("user_id")
|
||||
async def on_disconnect(req):
|
||||
user_id = req.scope.get("user_id")
|
||||
await set_online_status(user_id, False)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user