From f2a061787150417ff9289adca16a7cf789f79d96 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Sun, 4 Dec 2022 12:42:04 +0300 Subject: [PATCH 01/17] fix-error-messags --- resolvers/inbox/load.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py index a53e435d..7c281169 100644 --- a/resolvers/inbox/load.py +++ b/resolvers/inbox/load.py @@ -6,7 +6,6 @@ from auth.credentials import AuthCredentials from base.redis import redis from base.orm import local_session from base.resolvers import query -from base.exceptions import ObjectNotExist from orm.user import User from resolvers.zine.profile import followed_authors from .unread import get_unread_counter @@ -100,7 +99,10 @@ async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): if by_chat: chat = await redis.execute("GET", f"chats/{by_chat}") if not chat: - raise ObjectNotExist("Chat not exists") + return { + "messages": [], + "error": "chat not exist" + } # everyone's messages in filtered chat messages.union(set(await load_messages(by_chat, limit, offset))) From e278039ecef1b116f121e22b9caff27fce2d95a0 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Sun, 4 Dec 2022 13:09:08 +0300 Subject: [PATCH 02/17] fixed-load-msgs --- resolvers/inbox/load.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py index 7c281169..217efe2c 100644 --- a/resolvers/inbox/load.py +++ b/resolvers/inbox/load.py @@ -23,7 +23,7 @@ async def load_messages(chat_id: str, limit: int, offset: int): ] messages = await redis.mget(*message_keys) messages = [json.loads(msg) for msg in messages] - return messages + return set(messages) @query.field("loadChats") @@ -64,11 +64,11 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0): } -async def search_user_chats(by, messages: set, user_id: int, limit, offset): +async def search_user_chats(by, messages, user_id: int, limit, offset): cids = set([]) by_author = by.get('author') body_like = by.get('body') - cids.unioin(set(await redis.execute("SMEMBERS", "chats_by_user/" + str(user_id)))) + cids.union(set(await redis.execute("SMEMBERS", "chats_by_user/" + str(user_id)))) if by_author: # all author's messages cids.union(set(await redis.execute("SMEMBERS", f"chats_by_user/{by_author}"))) @@ -79,7 +79,7 @@ async def search_user_chats(by, messages: set, user_id: int, limit, offset): if body_like: # search in all messages in all user's chats for c in cids: - # FIXME: user redis scan here + # FIXME: use redis scan here mmm = set(await load_messages(c, limit, offset)) for m in mmm: if body_like in m["body"]: @@ -109,8 +109,7 @@ async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): auth: AuthCredentials = info.context["request"].auth if len(messages) == 0: - # FIXME - messages.union(search_user_chats(by, messages, auth.user_id, limit, offset)) + messages.union(set(await search_user_chats(by, messages, auth.user_id, limit, offset))) days = by.get("days") if days: @@ -120,8 +119,8 @@ async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): ))) return { "messages": sorted( - lambda m: m.createdAt, - list(messages) + list(messages), + key=lambda m: m.createdAt ), "error": None } From 1591b9548fa116c3fdd0cbb5765ab04585a0c34a Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Sun, 4 Dec 2022 13:18:56 +0300 Subject: [PATCH 03/17] -2tabs --- resolvers/inbox/load.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py index 217efe2c..5ef3ccc6 100644 --- a/resolvers/inbox/load.py +++ b/resolvers/inbox/load.py @@ -57,7 +57,7 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0): "name": a.name, "lastSeen": a.lastSeen, }) - chats.append(c) + chats.append(c) return { "chats": chats, "error": None From f59b8474e40dab59a0457a924ca7b385a061acf5 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Sun, 4 Dec 2022 13:38:18 +0300 Subject: [PATCH 04/17] create-msg-fix --- resolvers/inbox/messages.py | 1 - 1 file changed, 1 deletion(-) diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index 2772dcf6..9ffd341a 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -38,7 +38,6 @@ async def create_message(_, info, chat: str, body: str, replyTo=None): await redis.execute("LPUSH", f"chats/{chat['id']}/message_ids", str(message_id)) await redis.execute("SET", f"chats/{chat['id']}/next_message_id", str(message_id + 1)) - chat = json.loads(chat) users = chat["users"] for user_slug in users: await redis.execute( From 5b67d372adc76a6abf5fb5538037ff768c456d59 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Sun, 4 Dec 2022 14:16:41 +0300 Subject: [PATCH 05/17] fixd --- schema.graphql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schema.graphql b/schema.graphql index 7d7729ca..4c8261af 100644 --- a/schema.graphql +++ b/schema.graphql @@ -498,7 +498,7 @@ type Token { } type Message { - author: String! + author: Int! chatId: String! body: String! createdAt: Int! From a034eda220bc746ae212f9b84d237bd9eabbffca Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Sun, 4 Dec 2022 17:03:55 +0300 Subject: [PATCH 06/17] ws-subs --- auth/authenticate.py | 1 - main.py | 20 +++++++++--- resolvers/create/editor.py | 2 +- resolvers/inbox/load.py | 2 ++ resolvers/inbox/messages.py | 6 ++-- schema.graphql | 1 + services/inbox/helpers.py | 14 ++++++++ services/inbox/presence.py | 43 +++++++++++++++++++++++++ services/{inbox.py => inbox/storage.py} | 13 -------- settings.py | 3 +- 10 files changed, 83 insertions(+), 22 deletions(-) create mode 100644 services/inbox/helpers.py create mode 100644 services/inbox/presence.py rename services/{inbox.py => inbox/storage.py} (72%) diff --git a/auth/authenticate.py b/auth/authenticate.py index e11e821e..3e587225 100644 --- a/auth/authenticate.py +++ b/auth/authenticate.py @@ -3,7 +3,6 @@ from typing import Optional, Tuple from graphql.type import GraphQLResolveInfo from sqlalchemy.orm import joinedload, exc -from sqlalchemy import select, and_ from starlette.authentication import AuthenticationBackend from starlette.requests import HTTPConnection diff --git a/main.py b/main.py index 1c77edd7..c9d5ff2a 100644 --- a/main.py +++ b/main.py @@ -18,7 +18,11 @@ from resolvers.auth import confirm_email_handler from services.main import storages_init from services.stat.viewed import ViewedStorage from services.zine.gittask import GitTask -from settings import DEV_SERVER_STATUS_FILE_NAME, SENTRY_ID +from settings import DEV_SERVER_STATUS_FILE_NAME, SENTRY_DSN +from ariadne.asgi.handlers import GraphQLTransportWSHandler +from services.inbox.presence import on_connect, on_disconnect +# from services.inbox.sse import sse_messages + import_module("resolvers") schema = make_executable_schema(load_schema_from_path("schema.graphql"), resolvers) # type: ignore @@ -39,7 +43,7 @@ async def start_up(): print(git_task) try: import sentry_sdk - sentry_sdk.init("https://%s@testsentry.discours.io/2" % SENTRY_ID) + sentry_sdk.init(SENTRY_DSN) except Exception as e: print('[sentry] init error') print(e) @@ -63,7 +67,8 @@ async def shutdown(): routes = [ Route("/oauth/{provider}", endpoint=oauth_login), Route("/oauth-authorize", endpoint=oauth_authorize), - Route("/confirm/{token}", endpoint=confirm_email_handler) + Route("/confirm/{token}", endpoint=confirm_email_handler), + # Route("/chat/{chat_id}", endpoint=sse_messages) ] app = Starlette( @@ -73,7 +78,14 @@ app = Starlette( middleware=middleware, routes=routes, ) -app.mount("/", GraphQL(schema, debug=True)) +app.mount("/", GraphQL( + schema, + debug=True, + websocket_handler=GraphQLTransportWSHandler( + on_connect=on_connect, + on_disconnect=on_disconnect + ) +)) dev_app = app = Starlette( debug=True, diff --git a/resolvers/create/editor.py b/resolvers/create/editor.py index 82ccaf57..84d744a4 100644 --- a/resolvers/create/editor.py +++ b/resolvers/create/editor.py @@ -13,7 +13,7 @@ from orm.user import User from resolvers.zine.reactions import reactions_follow, reactions_unfollow from services.zine.gittask import GitTask from resolvers.inbox.chats import create_chat -from services.inbox import MessagesStorage +from services.inbox.storage import MessagesStorage from orm.collab import Collab diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py index 5ef3ccc6..ec032029 100644 --- a/resolvers/inbox/load.py +++ b/resolvers/inbox/load.py @@ -33,6 +33,7 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0): auth: AuthCredentials = info.context["request"].auth cids = await redis.execute("SMEMBERS", "chats_by_user/" + str(auth.user_id)) + onliners = await redis.execute("SMEMBERS", "users-online") if cids: cids = list(cids)[offset:offset + limit] if not cids: @@ -56,6 +57,7 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0): "userpic": a.userpic, "name": a.name, "lastSeen": a.lastSeen, + "online": a.id in onliners }) chats.append(c) return { diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index 9ffd341a..cc9304a5 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -6,7 +6,8 @@ from auth.authenticate import login_required from auth.credentials import AuthCredentials from base.redis import redis from base.resolvers import mutation, subscription -from services.inbox import ChatFollowing, MessageResult, MessagesStorage +from services.inbox.helpers import ChatFollowing, MessageResult +from services.inbox.storage import MessagesStorage @mutation.field("createMessage") @@ -18,7 +19,7 @@ async def create_message(_, info, chat: str, body: str, replyTo=None): chat = await redis.execute("GET", f"chats/{chat}") if not chat: return { - "error": "chat not exist" + "error": "chat is not exist" } else: chat = dict(json.loads(chat)) @@ -153,6 +154,7 @@ async def message_generator(obj, info): chat = await redis.execute("GET", f"chats/{chat_id}") updated[chat_id] = chat['updatedAt'] user_following_chats_sorted = sorted(user_following_chats, key=lambda x: updated[x], reverse=True) + for chat_id in user_following_chats_sorted: following_chat = ChatFollowing(chat_id) await MessagesStorage.register_chat(following_chat) diff --git a/schema.graphql b/schema.graphql index 4c8261af..647c9774 100644 --- a/schema.graphql +++ b/schema.graphql @@ -29,6 +29,7 @@ type ChatMember { name: String! userpic: String lastSeen: DateTime + online: Boolean # invitedAt: DateTime # invitedBy: String # user slug # TODO: keep invite databit diff --git a/services/inbox/helpers.py b/services/inbox/helpers.py new file mode 100644 index 00000000..56223160 --- /dev/null +++ b/services/inbox/helpers.py @@ -0,0 +1,14 @@ +import asyncio + + +class MessageResult: + def __init__(self, status, message): + self.status = status + self.message = message + + +class ChatFollowing: + queue = asyncio.Queue() + + def __init__(self, chat_id): + self.chat_id = chat_id diff --git a/services/inbox/presence.py b/services/inbox/presence.py new file mode 100644 index 00000000..b6d710ea --- /dev/null +++ b/services/inbox/presence.py @@ -0,0 +1,43 @@ +from base.exceptions import Unauthorized +from auth.tokenstorage import SessionToken +from base.redis import redis + + +async def set_online_status(user_id, status): + if user_id: + if status: + await redis.execute("SADD", "users-online", user_id) + else: + await redis.execute("SREM", "users-online", user_id) + + +async def on_connect(websocket, params): + if not isinstance(params, dict): + websocket.scope["connection_params"] = {} + return + token = params.get('token') + if not token: + raise Unauthorized("Please login") + else: + payload = await SessionToken.verify(token) + if payload and payload.user_id: + websocket.scope["user_id"] = payload.user_id + await set_online_status(payload.user_id, True) + + +async def on_disconnect(websocket): + user_id = websocket.scope.get("user_id") + await set_online_status(user_id, False) + + +# FIXME: not used yet +def context_value(request): + context = {} + print(f"[inbox.presense] request debug: {request}") + if request.scope["type"] == "websocket": + # request is an instance of WebSocket + context.update(request.scope["connection_params"]) + else: + context["token"] = request.META.get("authorization") + + return context diff --git a/services/inbox.py b/services/inbox/storage.py similarity index 72% rename from services/inbox.py rename to services/inbox/storage.py index d8222fa3..dd6e5fcf 100644 --- a/services/inbox.py +++ b/services/inbox/storage.py @@ -1,13 +1,6 @@ import asyncio -class ChatFollowing: - queue = asyncio.Queue() - - def __init__(self, chat_id): - self.chat_id = chat_id - - class MessagesStorage: lock = asyncio.Lock() chats = [] @@ -28,9 +21,3 @@ class MessagesStorage: for chat in MessagesStorage.chats: if message_result.message["chatId"] == chat.chat_id: chat.queue.put_nowait(message_result) - - -class MessageResult: - def __init__(self, status, message): - self.status = status - self.message = message diff --git a/settings.py b/settings.py index 196bc822..c712ddd1 100644 --- a/settings.py +++ b/settings.py @@ -26,6 +26,7 @@ FRONTEND_URL = environ.get("FRONTEND_URL") or "http://localhost:3000" SHOUTS_REPO = "content" SESSION_TOKEN_HEADER = "Authorization" +SENTRY_DSN = environ.get("SENTRY_DSN") + # for local development DEV_SERVER_STATUS_FILE_NAME = 'dev-server-status.txt' -SENTRY_ID = environ.get("SENTRY_ID") From 058fdcc64f147fdfc0e6a587f71d3495298e1d1b Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Mon, 5 Dec 2022 10:10:49 +0300 Subject: [PATCH 07/17] fix-load-messages --- resolvers/inbox/load.py | 96 +++++++++++++++++++++++-------------- resolvers/inbox/messages.py | 1 + 2 files changed, 62 insertions(+), 35 deletions(-) diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py index ec032029..a6cf46bf 100644 --- a/resolvers/inbox/load.py +++ b/resolvers/inbox/load.py @@ -1,5 +1,5 @@ import json -from datetime import datetime, timedelta, timezone +# from datetime import datetime, timedelta, timezone from auth.authenticate import login_required from auth.credentials import AuthCredentials @@ -14,16 +14,25 @@ from .unread import get_unread_counter async def load_messages(chat_id: str, limit: int, offset: int): ''' load :limit messages for :chat_id with :offset ''' messages = [] - message_ids = await redis.lrange( - f"chats/{chat_id}/message_ids", offset + limit, offset - ) + # print(f'[inbox] loading messages by chat: {chat_id}[{offset}:{offset + limit}]') + try: + message_ids = await redis.lrange(f"chats/{chat_id}/message_ids", + offset, + offset + limit + ) + + # print(f'[inbox] message_ids: {message_ids}') + except Exception as e: + print(e) if message_ids: message_keys = [ - f"chats/{chat_id}/messages/{mid}" for mid in message_ids + f"chats/{chat_id}/messages/{mid.decode('utf-8')}" for mid in message_ids ] + # print(message_keys) messages = await redis.mget(*message_keys) - messages = [json.loads(msg) for msg in messages] - return set(messages) + messages = [json.loads(msg.decode('utf-8')) for msg in messages] + # print('[inbox] messages \n%r' % messages) + return messages @query.field("loadChats") @@ -41,7 +50,8 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0): cids = [] chats = [] for cid in cids: - c = await redis.execute("GET", "chats/" + cid.decode("utf-8")) + cid = cid.decode("utf-8") + c = await redis.execute("GET", "chats/" + cid) if c: c = dict(json.loads(c)) c['messages'] = await load_messages(cid, 5, 0) @@ -71,17 +81,21 @@ async def search_user_chats(by, messages, user_id: int, limit, offset): by_author = by.get('author') body_like = by.get('body') cids.union(set(await redis.execute("SMEMBERS", "chats_by_user/" + str(user_id)))) + # messages_by_chat = [] if by_author: # all author's messages cids.union(set(await redis.execute("SMEMBERS", f"chats_by_user/{by_author}"))) # author's messages in filtered chat messages.union(set(filter(lambda m: m["author"] == by_author, list(messages)))) for c in cids: - messages.union(set(await load_messages(c, limit, offset))) + c = c.decode('utf-8') + # messages_by_chat = await load_messages(c, limit, offset) + if body_like: # search in all messages in all user's chats for c in cids: # FIXME: use redis scan here + c = c.decode('utf-8') mmm = set(await load_messages(c, limit, offset)) for m in mmm: if body_like in m["body"]: @@ -96,36 +110,48 @@ async def search_user_chats(by, messages, user_id: int, limit, offset): @login_required async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): ''' load :limit messages of :chat_id with :offset ''' - messages = set([]) - by_chat = by.get('chat') - if by_chat: - chat = await redis.execute("GET", f"chats/{by_chat}") - if not chat: - return { - "messages": [], - "error": "chat not exist" - } - # everyone's messages in filtered chat - messages.union(set(await load_messages(by_chat, limit, offset))) auth: AuthCredentials = info.context["request"].auth + userchats = await redis.execute("SMEMBERS", "chats_by_user/" + str(auth.user_id)) + userchats = [c.decode('utf-8') for c in userchats] + # print('[inbox] userchats: %r' % userchats) + if userchats: + # print('[inbox] loading messages by...') + messages = [] + by_chat = by.get('chat') + if by_chat in userchats: + chat = await redis.execute("GET", f"chats/{by_chat}") + # print(chat) + if not chat: + return { + "messages": [], + "error": "chat not exist" + } + # everyone's messages in filtered chat + messages = await load_messages(by_chat, limit, offset) - if len(messages) == 0: - messages.union(set(await search_user_chats(by, messages, auth.user_id, limit, offset))) + # if len(messages) == 0: + # messages.union(set(await search_user_chats(by, messages, auth.user_id, limit, offset))) - days = by.get("days") - if days: - messages.union(set(filter( - lambda m: datetime.now(tz=timezone.utc) - int(m["createdAt"]) < timedelta(days=by.get("days")), - list(messages) - ))) - return { - "messages": sorted( - list(messages), - key=lambda m: m.createdAt - ), - "error": None - } + # days = by.get("days") + # if days: + # messages.union(set(filter( + # list(messages), + # key=lambda m: ( + # datetime.now(tz=timezone.utc) - int(m["createdAt"]) < timedelta(days=by["days"]) + # ) + # ))) + return { + "messages": sorted( + list(messages), + key=lambda m: m['createdAt'] + ), + "error": None + } + else: + return { + "error": "Cannot access messages of this chat" + } @query.field("loadRecipients") diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index cc9304a5..d9ca6a4c 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -33,6 +33,7 @@ async def create_message(_, info, chat: str, body: str, replyTo=None): "replyTo": replyTo, "createdAt": int(datetime.now(tz=timezone.utc).timestamp()), } + print(f"[inbox] creating message {new_message}") await redis.execute( "SET", f"chats/{chat['id']}/messages/{message_id}", json.dumps(new_message) ) From 35f540b26a87a875543d8d599c81334e539d5329 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Mon, 5 Dec 2022 12:48:21 +0300 Subject: [PATCH 08/17] search-messages --- base/redis.py | 2 ++ resolvers/inbox/load.py | 43 +---------------------------------- resolvers/inbox/search.py | 48 ++++++++++++++++++++++++++++++++++++++- schema.graphql | 1 + 4 files changed, 51 insertions(+), 43 deletions(-) diff --git a/base/redis.py b/base/redis.py index ccc3758a..e1a4b903 100644 --- a/base/redis.py +++ b/base/redis.py @@ -31,9 +31,11 @@ class RedisCache: pass async def lrange(self, key, start, stop): + print(f"[redis] LRANGE {key} {start} {stop}") return await self._instance.lrange(key, start, stop) async def mget(self, key, *keys): + print(f"[redis] MGET {key} {keys}") return await self._instance.mget(key, *keys) diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py index a6cf46bf..4fce0f94 100644 --- a/resolvers/inbox/load.py +++ b/resolvers/inbox/load.py @@ -76,36 +76,6 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0): } -async def search_user_chats(by, messages, user_id: int, limit, offset): - cids = set([]) - by_author = by.get('author') - body_like = by.get('body') - cids.union(set(await redis.execute("SMEMBERS", "chats_by_user/" + str(user_id)))) - # messages_by_chat = [] - if by_author: - # all author's messages - cids.union(set(await redis.execute("SMEMBERS", f"chats_by_user/{by_author}"))) - # author's messages in filtered chat - messages.union(set(filter(lambda m: m["author"] == by_author, list(messages)))) - for c in cids: - c = c.decode('utf-8') - # messages_by_chat = await load_messages(c, limit, offset) - - if body_like: - # search in all messages in all user's chats - for c in cids: - # FIXME: use redis scan here - c = c.decode('utf-8') - mmm = set(await load_messages(c, limit, offset)) - for m in mmm: - if body_like in m["body"]: - messages.add(m) - else: - # search in chat's messages - messages.union(set(filter(lambda m: body_like in m["body"], list(messages)))) - return messages - - @query.field("loadMessagesBy") @login_required async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): @@ -129,18 +99,6 @@ async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): } # everyone's messages in filtered chat messages = await load_messages(by_chat, limit, offset) - - # if len(messages) == 0: - # messages.union(set(await search_user_chats(by, messages, auth.user_id, limit, offset))) - - # days = by.get("days") - # if days: - # messages.union(set(filter( - # list(messages), - # key=lambda m: ( - # datetime.now(tz=timezone.utc) - int(m["createdAt"]) < timedelta(days=by["days"]) - # ) - # ))) return { "messages": sorted( list(messages), @@ -161,6 +119,7 @@ async def load_recipients(_, info, limit=50, offset=0): try: chat_users += await followed_authors(auth.user_id) + print("[resolvers.inbox] ") limit = limit - len(chat_users) except Exception: pass diff --git a/resolvers/inbox/search.py b/resolvers/inbox/search.py index 20269e59..1ca340e5 100644 --- a/resolvers/inbox/search.py +++ b/resolvers/inbox/search.py @@ -1,11 +1,12 @@ import json - +from datetime import datetime, timezone, timedelta from auth.authenticate import login_required from auth.credentials import AuthCredentials from base.redis import redis from base.resolvers import query from base.orm import local_session from orm.user import AuthorFollower, User +from resolvers.inbox.load import load_messages @query.field("searchRecipients") @@ -47,3 +48,48 @@ async def search_recipients(_, info, query: str, limit: int = 50, offset: int = "members": list(result), "error": None } + + +@query.field("searchMessages") +@login_required +async def search_user_chats(by, messages, user_id: int, limit, offset): + cids = set([]) + cids.union(set(await redis.execute("SMEMBERS", "chats_by_user/" + str(user_id)))) + messages = [] + + by_author = by.get('author') + if by_author: + # all author's messages + cids.union(set(await redis.execute("SMEMBERS", f"chats_by_user/{by_author}"))) + # author's messages in filtered chat + messages.union(set(filter(lambda m: m["author"] == by_author, list(messages)))) + for c in cids: + c = c.decode('utf-8') + messages = await load_messages(c, limit, offset) + + body_like = by.get('body') + if body_like: + # search in all messages in all user's chats + for c in cids: + # FIXME: use redis scan here + c = c.decode('utf-8') + mmm = await load_messages(c, limit, offset) + for m in mmm: + if body_like in m["body"]: + messages.add(m) + else: + # search in chat's messages + messages.extend(filter(lambda m: body_like in m["body"], list(messages))) + + days = by.get("days") + if days: + messages.extend(filter( + list(messages), + key=lambda m: ( + datetime.now(tz=timezone.utc) - int(m["createdAt"]) < timedelta(days=by["days"]) + ) + )) + return { + "messages": messages, + "error": None + } diff --git a/schema.graphql b/schema.graphql index 647c9774..09cb934d 100644 --- a/schema.graphql +++ b/schema.graphql @@ -270,6 +270,7 @@ type Query { loadMessagesBy(by: MessagesBy!, limit: Int, offset: Int): Result! loadRecipients(limit: Int, offset: Int): Result! searchRecipients(query: String!, limit: Int, offset: Int): Result! + searchMessages(by: MessagesBy!, limit: Int, offset: Int): Result! # auth isEmailUsed(email: String!): Boolean! From f70633d36178ee748812fe0ff95673f600935af7 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Tue, 6 Dec 2022 07:53:20 +0300 Subject: [PATCH 09/17] update renew, schema update, admins fix --- resolvers/inbox/chats.py | 2 +- resolvers/inbox/messages.py | 2 ++ schema.graphql | 6 +++--- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/resolvers/inbox/chats.py b/resolvers/inbox/chats.py index c158e066..2a953a12 100644 --- a/resolvers/inbox/chats.py +++ b/resolvers/inbox/chats.py @@ -80,7 +80,7 @@ async def create_chat(_, info, title="", members=[]): "createdBy": auth.user_id, "createdAt": int(datetime.now(tz=timezone.utc).timestamp()), "updatedAt": int(datetime.now(tz=timezone.utc).timestamp()), - "admins": [] + "admins": members if (len(members) == 2 and title == "") else [] } for m in members: diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index d9ca6a4c..03285b67 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -33,6 +33,8 @@ async def create_message(_, info, chat: str, body: str, replyTo=None): "replyTo": replyTo, "createdAt": int(datetime.now(tz=timezone.utc).timestamp()), } + chat['updatedAt'] = new_message['createdAt'] + await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat)) print(f"[inbox] creating message {new_message}") await redis.execute( "SET", f"chats/{chat['id']}/messages/{message_id}", json.dumps(new_message) diff --git a/schema.graphql b/schema.graphql index 09cb934d..35f4663d 100644 --- a/schema.graphql +++ b/schema.graphql @@ -512,13 +512,13 @@ type Message { type Chat { id: String! createdAt: Int! - createdBy: String! + createdBy: Int! updatedAt: Int! title: String description: String - users: [String] + users: [Int] members: [ChatMember] - admins: [String] + admins: [Int] messages: [Message] unread: Int private: Boolean From 6782177ce698bcf4a19af58abfcf5c24a8fee9ff Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Tue, 6 Dec 2022 10:53:23 +0300 Subject: [PATCH 10/17] profile-error, authors commented stat --- resolvers/zine/profile.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/resolvers/zine/profile.py b/resolvers/zine/profile.py index 0944eb3f..b8d3e1ac 100644 --- a/resolvers/zine/profile.py +++ b/resolvers/zine/profile.py @@ -42,9 +42,9 @@ def add_author_stat_columns(q): # ) q = q.add_columns(literal(0).label('commented_stat')) - # q = q.outerjoin(Reaction, and_(Reaction.createdBy == User.id, Reaction.body.is_not(None))).add_columns( - # func.count(distinct(Reaction.id)).label('commented_stat') - # ) + q = q.outerjoin(Reaction, and_(Reaction.createdBy == User.id, Reaction.body.is_not(None))).add_columns( + func.count(distinct(Reaction.id)).label('commented_stat') + ) q = q.group_by(User.id) @@ -152,7 +152,7 @@ async def get_user_roles(slug): .all() ) - return [] # roles + return roles @mutation.field("updateProfile") @@ -161,9 +161,18 @@ async def update_profile(_, info, profile): auth = info.context["request"].auth user_id = auth.user_id with local_session() as session: - session.query(User).filter(User.id == user_id).update(profile) + user = session.query(User).filter(User.id == user_id).one() + slugowner = session.query(User).where(User.slug == profile['slug']).one() + if slugowner: + if slugowner.id != user_id: + return { + "error": "slug is used by another user" + } + user.update(profile) session.commit() - return {} + return { + "error": None + } @mutation.field("rateUser") From c89d226dee2edabfc5de08ed7d091468bcb4341b Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Tue, 6 Dec 2022 11:44:29 +0300 Subject: [PATCH 11/17] loadchats-fix+sse --- main.py | 4 ++-- resolvers/inbox/chats.py | 17 ++++++++++++++--- resolvers/zine/profile.py | 2 +- services/inbox/sse.py | 9 +++++++++ 4 files changed, 26 insertions(+), 6 deletions(-) create mode 100644 services/inbox/sse.py diff --git a/main.py b/main.py index c9d5ff2a..fdb0b4ca 100644 --- a/main.py +++ b/main.py @@ -21,7 +21,7 @@ from services.zine.gittask import GitTask from settings import DEV_SERVER_STATUS_FILE_NAME, SENTRY_DSN from ariadne.asgi.handlers import GraphQLTransportWSHandler from services.inbox.presence import on_connect, on_disconnect -# from services.inbox.sse import sse_messages +from services.inbox.sse import sse_messages import_module("resolvers") @@ -68,7 +68,7 @@ routes = [ Route("/oauth/{provider}", endpoint=oauth_login), Route("/oauth-authorize", endpoint=oauth_authorize), Route("/confirm/{token}", endpoint=confirm_email_handler), - # Route("/chat/{chat_id}", endpoint=sse_messages) + Route("/messages", endpoint=sse_messages) ] app = Starlette( diff --git a/resolvers/inbox/chats.py b/resolvers/inbox/chats.py index 2a953a12..56e80423 100644 --- a/resolvers/inbox/chats.py +++ b/resolvers/inbox/chats.py @@ -57,14 +57,25 @@ async def create_chat(_, info, title="", members=[]): # reuse chat craeted before if exists if len(members) == 2 and title == "": - chats1 = await redis.execute("SMEMBERS", f"chats_by_user/{members[0]}") - chats2 = await redis.execute("SMEMBERS", f"chats_by_user/{members[1]}") chat = None - for c in chats1.intersection(chats2): + print(members) + chatset1 = await redis.execute("SMEMBERS", f"chats_by_user/{members[0]}") + if not chatset1: + chatset1 = set([]) + print(chatset1) + chatset2 = await redis.execute("SMEMBERS", f"chats_by_user/{members[1]}") + if not chatset2: + chatset2 = set([]) + print(chatset2) + chatset = chatset1.intersection(chatset2) + print(chatset) + for c in chatset: chat = await redis.execute("GET", f"chats/{c.decode('utf-8')}") if chat: chat = json.loads(chat) if chat['title'] == "": + print('[inbox] craeteChat found old chat') + print(chat) break if chat: return { diff --git a/resolvers/zine/profile.py b/resolvers/zine/profile.py index b8d3e1ac..609ccccd 100644 --- a/resolvers/zine/profile.py +++ b/resolvers/zine/profile.py @@ -41,7 +41,7 @@ def add_author_stat_columns(q): # func.sum(user_rating_aliased.value).label('rating_stat') # ) - q = q.add_columns(literal(0).label('commented_stat')) + # q = q.add_columns(literal(0).label('commented_stat')) q = q.outerjoin(Reaction, and_(Reaction.createdBy == User.id, Reaction.body.is_not(None))).add_columns( func.count(distinct(Reaction.id)).label('commented_stat') ) diff --git a/services/inbox/sse.py b/services/inbox/sse.py new file mode 100644 index 00000000..32e4b8b9 --- /dev/null +++ b/services/inbox/sse.py @@ -0,0 +1,9 @@ +from sse_starlette.sse import EventSourceResponse +from resolvers.inbox.messages import message_generator + + +async def sse_messages(request): + print(f'[SSE] {request}') + # https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md + + return EventSourceResponse(message_generator) From 00054a94ec4c2e9894343fdd82bc31d03986f7d6 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Tue, 6 Dec 2022 14:58:52 +0300 Subject: [PATCH 12/17] try-to-sse --- requirements.txt | 1 + resolvers/inbox/load.py | 1 - resolvers/inbox/messages.py | 11 ++++++++--- services/inbox/sse.py | 14 +++++++++----- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/requirements.txt b/requirements.txt index 801121e9..2ef836aa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,3 +27,4 @@ python-dateutil~=2.8.2 beautifulsoup4~=4.11.1 lxml sentry-sdk>=0.10.2 +sse_starlette diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py index 4fce0f94..0db75f04 100644 --- a/resolvers/inbox/load.py +++ b/resolvers/inbox/load.py @@ -119,7 +119,6 @@ async def load_recipients(_, info, limit=50, offset=0): try: chat_users += await followed_authors(auth.user_id) - print("[resolvers.inbox] ") limit = limit - len(chat_users) except Exception: pass diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index 03285b67..36cde3ac 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -143,10 +143,14 @@ async def mark_as_read(_, info, chat_id: str, messages: [int]): @subscription.source("newMessage") @login_required async def message_generator(obj, info): - try: - auth: AuthCredentials = info.context["request"].auth + print(f"[resolvers.messages] generator {info}") + auth: AuthCredentials = info.context["request"].auth + return await messages_generator_by_user(auth.user_id) - user_following_chats = await redis.execute("GET", f"chats_by_user/{auth.user_id}") + +async def messages_generator_by_user(user_id): + try: + user_following_chats = await redis.execute("GET", f"chats_by_user/{user_id}") if user_following_chats: user_following_chats = list(json.loads(user_following_chats)) # chat ids else: @@ -170,3 +174,4 @@ async def message_generator(obj, info): yield msg finally: await MessagesStorage.remove_chat(following_chat) + diff --git a/services/inbox/sse.py b/services/inbox/sse.py index 32e4b8b9..e3dde165 100644 --- a/services/inbox/sse.py +++ b/services/inbox/sse.py @@ -1,9 +1,13 @@ from sse_starlette.sse import EventSourceResponse -from resolvers.inbox.messages import message_generator +from starlette.requests import Request +from resolvers.inbox.messages import messages_generator_by_user +from base.exceptions import Unauthorized -async def sse_messages(request): - print(f'[SSE] {request}') +async def sse_messages(request: Request): + print(f'[SSE] {request.scope}') # https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md - - return EventSourceResponse(message_generator) + if request['user']: + return EventSourceResponse(messages_generator_by_user(request['user'].user_id)) + else: + raise Unauthorized("Please login") From cf5789f2beda926285eb38b70801d797225199fd Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Wed, 7 Dec 2022 10:49:43 +0300 Subject: [PATCH 13/17] title-fix --- resolvers/inbox/messages.py | 3 +-- schema.graphql | 3 ++- services/inbox/helpers.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index 36cde3ac..de05b2e6 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -140,7 +140,7 @@ async def mark_as_read(_, info, chat_id: str, messages: [int]): } -@subscription.source("newMessage") +@subscription.source("newMessages") @login_required async def message_generator(obj, info): print(f"[resolvers.messages] generator {info}") @@ -174,4 +174,3 @@ async def messages_generator_by_user(user_id): yield msg finally: await MessagesStorage.remove_chat(following_chat) - diff --git a/schema.graphql b/schema.graphql index 35f4663d..0bc4e096 100644 --- a/schema.graphql +++ b/schema.graphql @@ -305,7 +305,7 @@ type Query { ############################################ Subscription type Subscription { - newMessage(chats: [Int!]): Message! + newMessages: Message! onlineUpdated: [User!]! shoutUpdated: Shout! userUpdated: User! @@ -507,6 +507,7 @@ type Message { id: Int! replyTo: String updatedAt: Int + seen: Boolean } type Chat { diff --git a/services/inbox/helpers.py b/services/inbox/helpers.py index 56223160..d8791218 100644 --- a/services/inbox/helpers.py +++ b/services/inbox/helpers.py @@ -3,7 +3,7 @@ import asyncio class MessageResult: def __init__(self, status, message): - self.status = status + self.seen = status self.message = message From c9400f23fb12cb743ed98579a475cda4c60613f7 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Wed, 7 Dec 2022 16:02:26 +0300 Subject: [PATCH 14/17] fix-schema --- schema.graphql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schema.graphql b/schema.graphql index 0bc4e096..092fdf87 100644 --- a/schema.graphql +++ b/schema.graphql @@ -305,7 +305,7 @@ type Query { ############################################ Subscription type Subscription { - newMessages: Message! + newMessages: Message onlineUpdated: [User!]! shoutUpdated: Shout! userUpdated: User! From 044bfa5bd46d7b3114d4df5c109706ab2ea6a52b Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Wed, 7 Dec 2022 21:51:38 +0300 Subject: [PATCH 15/17] subs-on-ws --- main.py | 13 ++++++++++--- requirements.txt | 3 ++- resolvers/inbox/messages.py | 7 ++----- schema.graphql | 8 +++----- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/main.py b/main.py index fdb0b4ca..f171ed77 100644 --- a/main.py +++ b/main.py @@ -21,7 +21,7 @@ from services.zine.gittask import GitTask from settings import DEV_SERVER_STATUS_FILE_NAME, SENTRY_DSN from ariadne.asgi.handlers import GraphQLTransportWSHandler from services.inbox.presence import on_connect, on_disconnect -from services.inbox.sse import sse_messages +# from services.inbox.sse import sse_messages import_module("resolvers") @@ -68,7 +68,7 @@ routes = [ Route("/oauth/{provider}", endpoint=oauth_login), Route("/oauth-authorize", endpoint=oauth_authorize), Route("/confirm/{token}", endpoint=confirm_email_handler), - Route("/messages", endpoint=sse_messages) + # Route("/messages", endpoint=sse_messages) ] app = Starlette( @@ -94,4 +94,11 @@ dev_app = app = Starlette( middleware=middleware, routes=routes, ) -dev_app.mount("/", GraphQL(schema, debug=True)) +dev_app.mount("/", GraphQL( + schema, + debug=True, + websocket_handler=GraphQLTransportWSHandler( + on_connect=on_connect, + on_disconnect=on_disconnect + ) +)) diff --git a/requirements.txt b/requirements.txt index 2ef836aa..69f5c1e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ PyYAML>=5.4 pyjwt>=2.6.0 starlette~=0.20.4 sqlalchemy>=1.4.41 -graphql-core +graphql-core>=3.0.3 gql uvicorn>=0.18.3 pydantic>=1.10.2 @@ -28,3 +28,4 @@ beautifulsoup4~=4.11.1 lxml sentry-sdk>=0.10.2 sse_starlette +graphql-ws diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index de05b2e6..de20567f 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -140,15 +140,12 @@ async def mark_as_read(_, info, chat_id: str, messages: [int]): } -@subscription.source("newMessages") +@subscription.source("newMessage") @login_required async def message_generator(obj, info): print(f"[resolvers.messages] generator {info}") auth: AuthCredentials = info.context["request"].auth - return await messages_generator_by_user(auth.user_id) - - -async def messages_generator_by_user(user_id): + user_id = auth.user_id try: user_following_chats = await redis.execute("GET", f"chats_by_user/{user_id}") if user_following_chats: diff --git a/schema.graphql b/schema.graphql index 092fdf87..61eaeb59 100644 --- a/schema.graphql +++ b/schema.graphql @@ -305,11 +305,9 @@ type Query { ############################################ Subscription type Subscription { - newMessages: Message - onlineUpdated: [User!]! - shoutUpdated: Shout! - userUpdated: User! - reactionUpdated(shout: String!): ReactionUpdating! + newMessage: Message # new messages in inbox + collabUpdate(collab: Int!): Reaction # new reactions in collaborative editor + } ############################################ Entities From feb184d8e89cb261886a297aed68e3cc78d8f28c Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Fri, 9 Dec 2022 08:54:26 +0300 Subject: [PATCH 16/17] about in scheme --- schema.graphql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/schema.graphql b/schema.graphql index 61eaeb59..cf589b00 100644 --- a/schema.graphql +++ b/schema.graphql @@ -51,6 +51,7 @@ type Author { userpic: String caption: String # only for full shout bio: String + about: String links: [String] stat: AuthorStat roles: [Role] # in different communities @@ -370,6 +371,7 @@ type User { updatedAt: DateTime ratings: [Rating] bio: String + about: String notifications: [Int] communities: [Int] # user participating communities oid: String From 9331c095268edc165b540ba41a213b2ecd5b172c Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Sat, 10 Dec 2022 10:35:29 +0300 Subject: [PATCH 17/17] schema-fix --- resolvers/inbox/messages.py | 3 ++- schema.graphql | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index de20567f..16cd100d 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -30,9 +30,10 @@ async def create_message(_, info, chat: str, body: str, replyTo=None): "id": message_id, "author": auth.user_id, "body": body, - "replyTo": replyTo, "createdAt": int(datetime.now(tz=timezone.utc).timestamp()), } + if replyTo: + new_message = int(replyTo) chat['updatedAt'] = new_message['createdAt'] await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat)) print(f"[inbox] creating message {new_message}") diff --git a/schema.graphql b/schema.graphql index cf589b00..47db95fc 100644 --- a/schema.graphql +++ b/schema.graphql @@ -505,7 +505,7 @@ type Message { body: String! createdAt: Int! id: Int! - replyTo: String + replyTo: Int updatedAt: Int seen: Boolean }