diff --git a/auth/authenticate.py b/auth/authenticate.py index e11e821e..3e587225 100644 --- a/auth/authenticate.py +++ b/auth/authenticate.py @@ -3,7 +3,6 @@ from typing import Optional, Tuple from graphql.type import GraphQLResolveInfo from sqlalchemy.orm import joinedload, exc -from sqlalchemy import select, and_ from starlette.authentication import AuthenticationBackend from starlette.requests import HTTPConnection diff --git a/base/redis.py b/base/redis.py index ccc3758a..e1a4b903 100644 --- a/base/redis.py +++ b/base/redis.py @@ -31,9 +31,11 @@ class RedisCache: pass async def lrange(self, key, start, stop): + print(f"[redis] LRANGE {key} {start} {stop}") return await self._instance.lrange(key, start, stop) async def mget(self, key, *keys): + print(f"[redis] MGET {key} {keys}") return await self._instance.mget(key, *keys) diff --git a/main.py b/main.py index 1c77edd7..f171ed77 100644 --- a/main.py +++ b/main.py @@ -18,7 +18,11 @@ from resolvers.auth import confirm_email_handler from services.main import storages_init from services.stat.viewed import ViewedStorage from services.zine.gittask import GitTask -from settings import DEV_SERVER_STATUS_FILE_NAME, SENTRY_ID +from settings import DEV_SERVER_STATUS_FILE_NAME, SENTRY_DSN +from ariadne.asgi.handlers import GraphQLTransportWSHandler +from services.inbox.presence import on_connect, on_disconnect +# from services.inbox.sse import sse_messages + import_module("resolvers") schema = make_executable_schema(load_schema_from_path("schema.graphql"), resolvers) # type: ignore @@ -39,7 +43,7 @@ async def start_up(): print(git_task) try: import sentry_sdk - sentry_sdk.init("https://%s@testsentry.discours.io/2" % SENTRY_ID) + sentry_sdk.init(SENTRY_DSN) except Exception as e: print('[sentry] init error') print(e) @@ -63,7 +67,8 @@ async def shutdown(): routes = [ Route("/oauth/{provider}", endpoint=oauth_login), Route("/oauth-authorize", endpoint=oauth_authorize), - Route("/confirm/{token}", endpoint=confirm_email_handler) + Route("/confirm/{token}", endpoint=confirm_email_handler), + # Route("/messages", endpoint=sse_messages) ] app = Starlette( @@ -73,7 +78,14 @@ app = Starlette( middleware=middleware, routes=routes, ) -app.mount("/", GraphQL(schema, debug=True)) +app.mount("/", GraphQL( + schema, + debug=True, + websocket_handler=GraphQLTransportWSHandler( + on_connect=on_connect, + on_disconnect=on_disconnect + ) +)) dev_app = app = Starlette( debug=True, @@ -82,4 +94,11 @@ dev_app = app = Starlette( middleware=middleware, routes=routes, ) -dev_app.mount("/", GraphQL(schema, debug=True)) +dev_app.mount("/", GraphQL( + schema, + debug=True, + websocket_handler=GraphQLTransportWSHandler( + on_connect=on_connect, + on_disconnect=on_disconnect + ) +)) diff --git a/requirements.txt b/requirements.txt index 801121e9..69f5c1e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ PyYAML>=5.4 pyjwt>=2.6.0 starlette~=0.20.4 sqlalchemy>=1.4.41 -graphql-core +graphql-core>=3.0.3 gql uvicorn>=0.18.3 pydantic>=1.10.2 @@ -27,3 +27,5 @@ python-dateutil~=2.8.2 beautifulsoup4~=4.11.1 lxml sentry-sdk>=0.10.2 +sse_starlette +graphql-ws diff --git a/resolvers/create/editor.py b/resolvers/create/editor.py index 82ccaf57..84d744a4 100644 --- a/resolvers/create/editor.py +++ b/resolvers/create/editor.py @@ -13,7 +13,7 @@ from orm.user import User from resolvers.zine.reactions import reactions_follow, reactions_unfollow from services.zine.gittask import GitTask from resolvers.inbox.chats import create_chat -from services.inbox import MessagesStorage +from services.inbox.storage import MessagesStorage from orm.collab import Collab diff --git a/resolvers/inbox/chats.py b/resolvers/inbox/chats.py index c158e066..56e80423 100644 --- a/resolvers/inbox/chats.py +++ b/resolvers/inbox/chats.py @@ -57,14 +57,25 @@ async def create_chat(_, info, title="", members=[]): # reuse chat craeted before if exists if len(members) == 2 and title == "": - chats1 = await redis.execute("SMEMBERS", f"chats_by_user/{members[0]}") - chats2 = await redis.execute("SMEMBERS", f"chats_by_user/{members[1]}") chat = None - for c in chats1.intersection(chats2): + print(members) + chatset1 = await redis.execute("SMEMBERS", f"chats_by_user/{members[0]}") + if not chatset1: + chatset1 = set([]) + print(chatset1) + chatset2 = await redis.execute("SMEMBERS", f"chats_by_user/{members[1]}") + if not chatset2: + chatset2 = set([]) + print(chatset2) + chatset = chatset1.intersection(chatset2) + print(chatset) + for c in chatset: chat = await redis.execute("GET", f"chats/{c.decode('utf-8')}") if chat: chat = json.loads(chat) if chat['title'] == "": + print('[inbox] craeteChat found old chat') + print(chat) break if chat: return { @@ -80,7 +91,7 @@ async def create_chat(_, info, title="", members=[]): "createdBy": auth.user_id, "createdAt": int(datetime.now(tz=timezone.utc).timestamp()), "updatedAt": int(datetime.now(tz=timezone.utc).timestamp()), - "admins": [] + "admins": members if (len(members) == 2 and title == "") else [] } for m in members: diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py index a53e435d..0db75f04 100644 --- a/resolvers/inbox/load.py +++ b/resolvers/inbox/load.py @@ -1,12 +1,11 @@ import json -from datetime import datetime, timedelta, timezone +# from datetime import datetime, timedelta, timezone from auth.authenticate import login_required from auth.credentials import AuthCredentials from base.redis import redis from base.orm import local_session from base.resolvers import query -from base.exceptions import ObjectNotExist from orm.user import User from resolvers.zine.profile import followed_authors from .unread import get_unread_counter @@ -15,15 +14,24 @@ from .unread import get_unread_counter async def load_messages(chat_id: str, limit: int, offset: int): ''' load :limit messages for :chat_id with :offset ''' messages = [] - message_ids = await redis.lrange( - f"chats/{chat_id}/message_ids", offset + limit, offset - ) + # print(f'[inbox] loading messages by chat: {chat_id}[{offset}:{offset + limit}]') + try: + message_ids = await redis.lrange(f"chats/{chat_id}/message_ids", + offset, + offset + limit + ) + + # print(f'[inbox] message_ids: {message_ids}') + except Exception as e: + print(e) if message_ids: message_keys = [ - f"chats/{chat_id}/messages/{mid}" for mid in message_ids + f"chats/{chat_id}/messages/{mid.decode('utf-8')}" for mid in message_ids ] + # print(message_keys) messages = await redis.mget(*message_keys) - messages = [json.loads(msg) for msg in messages] + messages = [json.loads(msg.decode('utf-8')) for msg in messages] + # print('[inbox] messages \n%r' % messages) return messages @@ -34,6 +42,7 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0): auth: AuthCredentials = info.context["request"].auth cids = await redis.execute("SMEMBERS", "chats_by_user/" + str(auth.user_id)) + onliners = await redis.execute("SMEMBERS", "users-online") if cids: cids = list(cids)[offset:offset + limit] if not cids: @@ -41,7 +50,8 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0): cids = [] chats = [] for cid in cids: - c = await redis.execute("GET", "chats/" + cid.decode("utf-8")) + cid = cid.decode("utf-8") + c = await redis.execute("GET", "chats/" + cid) if c: c = dict(json.loads(c)) c['messages'] = await load_messages(cid, 5, 0) @@ -57,72 +67,49 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0): "userpic": a.userpic, "name": a.name, "lastSeen": a.lastSeen, + "online": a.id in onliners }) - chats.append(c) + chats.append(c) return { "chats": chats, "error": None } -async def search_user_chats(by, messages: set, user_id: int, limit, offset): - cids = set([]) - by_author = by.get('author') - body_like = by.get('body') - cids.unioin(set(await redis.execute("SMEMBERS", "chats_by_user/" + str(user_id)))) - if by_author: - # all author's messages - cids.union(set(await redis.execute("SMEMBERS", f"chats_by_user/{by_author}"))) - # author's messages in filtered chat - messages.union(set(filter(lambda m: m["author"] == by_author, list(messages)))) - for c in cids: - messages.union(set(await load_messages(c, limit, offset))) - if body_like: - # search in all messages in all user's chats - for c in cids: - # FIXME: user redis scan here - mmm = set(await load_messages(c, limit, offset)) - for m in mmm: - if body_like in m["body"]: - messages.add(m) - else: - # search in chat's messages - messages.union(set(filter(lambda m: body_like in m["body"], list(messages)))) - return messages - - @query.field("loadMessagesBy") @login_required async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): ''' load :limit messages of :chat_id with :offset ''' - messages = set([]) - by_chat = by.get('chat') - if by_chat: - chat = await redis.execute("GET", f"chats/{by_chat}") - if not chat: - raise ObjectNotExist("Chat not exists") - # everyone's messages in filtered chat - messages.union(set(await load_messages(by_chat, limit, offset))) auth: AuthCredentials = info.context["request"].auth - - if len(messages) == 0: - # FIXME - messages.union(search_user_chats(by, messages, auth.user_id, limit, offset)) - - days = by.get("days") - if days: - messages.union(set(filter( - lambda m: datetime.now(tz=timezone.utc) - int(m["createdAt"]) < timedelta(days=by.get("days")), - list(messages) - ))) - return { - "messages": sorted( - lambda m: m.createdAt, - list(messages) - ), - "error": None - } + userchats = await redis.execute("SMEMBERS", "chats_by_user/" + str(auth.user_id)) + userchats = [c.decode('utf-8') for c in userchats] + # print('[inbox] userchats: %r' % userchats) + if userchats: + # print('[inbox] loading messages by...') + messages = [] + by_chat = by.get('chat') + if by_chat in userchats: + chat = await redis.execute("GET", f"chats/{by_chat}") + # print(chat) + if not chat: + return { + "messages": [], + "error": "chat not exist" + } + # everyone's messages in filtered chat + messages = await load_messages(by_chat, limit, offset) + return { + "messages": sorted( + list(messages), + key=lambda m: m['createdAt'] + ), + "error": None + } + else: + return { + "error": "Cannot access messages of this chat" + } @query.field("loadRecipients") diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index 2772dcf6..16cd100d 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -6,7 +6,8 @@ from auth.authenticate import login_required from auth.credentials import AuthCredentials from base.redis import redis from base.resolvers import mutation, subscription -from services.inbox import ChatFollowing, MessageResult, MessagesStorage +from services.inbox.helpers import ChatFollowing, MessageResult +from services.inbox.storage import MessagesStorage @mutation.field("createMessage") @@ -18,7 +19,7 @@ async def create_message(_, info, chat: str, body: str, replyTo=None): chat = await redis.execute("GET", f"chats/{chat}") if not chat: return { - "error": "chat not exist" + "error": "chat is not exist" } else: chat = dict(json.loads(chat)) @@ -29,16 +30,19 @@ async def create_message(_, info, chat: str, body: str, replyTo=None): "id": message_id, "author": auth.user_id, "body": body, - "replyTo": replyTo, "createdAt": int(datetime.now(tz=timezone.utc).timestamp()), } + if replyTo: + new_message = int(replyTo) + chat['updatedAt'] = new_message['createdAt'] + await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat)) + print(f"[inbox] creating message {new_message}") await redis.execute( "SET", f"chats/{chat['id']}/messages/{message_id}", json.dumps(new_message) ) await redis.execute("LPUSH", f"chats/{chat['id']}/message_ids", str(message_id)) await redis.execute("SET", f"chats/{chat['id']}/next_message_id", str(message_id + 1)) - chat = json.loads(chat) users = chat["users"] for user_slug in users: await redis.execute( @@ -140,10 +144,11 @@ async def mark_as_read(_, info, chat_id: str, messages: [int]): @subscription.source("newMessage") @login_required async def message_generator(obj, info): + print(f"[resolvers.messages] generator {info}") + auth: AuthCredentials = info.context["request"].auth + user_id = auth.user_id try: - auth: AuthCredentials = info.context["request"].auth - - user_following_chats = await redis.execute("GET", f"chats_by_user/{auth.user_id}") + user_following_chats = await redis.execute("GET", f"chats_by_user/{user_id}") if user_following_chats: user_following_chats = list(json.loads(user_following_chats)) # chat ids else: @@ -154,6 +159,7 @@ async def message_generator(obj, info): chat = await redis.execute("GET", f"chats/{chat_id}") updated[chat_id] = chat['updatedAt'] user_following_chats_sorted = sorted(user_following_chats, key=lambda x: updated[x], reverse=True) + for chat_id in user_following_chats_sorted: following_chat = ChatFollowing(chat_id) await MessagesStorage.register_chat(following_chat) diff --git a/resolvers/inbox/search.py b/resolvers/inbox/search.py index 20269e59..1ca340e5 100644 --- a/resolvers/inbox/search.py +++ b/resolvers/inbox/search.py @@ -1,11 +1,12 @@ import json - +from datetime import datetime, timezone, timedelta from auth.authenticate import login_required from auth.credentials import AuthCredentials from base.redis import redis from base.resolvers import query from base.orm import local_session from orm.user import AuthorFollower, User +from resolvers.inbox.load import load_messages @query.field("searchRecipients") @@ -47,3 +48,48 @@ async def search_recipients(_, info, query: str, limit: int = 50, offset: int = "members": list(result), "error": None } + + +@query.field("searchMessages") +@login_required +async def search_user_chats(by, messages, user_id: int, limit, offset): + cids = set([]) + cids.union(set(await redis.execute("SMEMBERS", "chats_by_user/" + str(user_id)))) + messages = [] + + by_author = by.get('author') + if by_author: + # all author's messages + cids.union(set(await redis.execute("SMEMBERS", f"chats_by_user/{by_author}"))) + # author's messages in filtered chat + messages.union(set(filter(lambda m: m["author"] == by_author, list(messages)))) + for c in cids: + c = c.decode('utf-8') + messages = await load_messages(c, limit, offset) + + body_like = by.get('body') + if body_like: + # search in all messages in all user's chats + for c in cids: + # FIXME: use redis scan here + c = c.decode('utf-8') + mmm = await load_messages(c, limit, offset) + for m in mmm: + if body_like in m["body"]: + messages.add(m) + else: + # search in chat's messages + messages.extend(filter(lambda m: body_like in m["body"], list(messages))) + + days = by.get("days") + if days: + messages.extend(filter( + list(messages), + key=lambda m: ( + datetime.now(tz=timezone.utc) - int(m["createdAt"]) < timedelta(days=by["days"]) + ) + )) + return { + "messages": messages, + "error": None + } diff --git a/resolvers/zine/profile.py b/resolvers/zine/profile.py index 0944eb3f..609ccccd 100644 --- a/resolvers/zine/profile.py +++ b/resolvers/zine/profile.py @@ -41,10 +41,10 @@ def add_author_stat_columns(q): # func.sum(user_rating_aliased.value).label('rating_stat') # ) - q = q.add_columns(literal(0).label('commented_stat')) - # q = q.outerjoin(Reaction, and_(Reaction.createdBy == User.id, Reaction.body.is_not(None))).add_columns( - # func.count(distinct(Reaction.id)).label('commented_stat') - # ) + # q = q.add_columns(literal(0).label('commented_stat')) + q = q.outerjoin(Reaction, and_(Reaction.createdBy == User.id, Reaction.body.is_not(None))).add_columns( + func.count(distinct(Reaction.id)).label('commented_stat') + ) q = q.group_by(User.id) @@ -152,7 +152,7 @@ async def get_user_roles(slug): .all() ) - return [] # roles + return roles @mutation.field("updateProfile") @@ -161,9 +161,18 @@ async def update_profile(_, info, profile): auth = info.context["request"].auth user_id = auth.user_id with local_session() as session: - session.query(User).filter(User.id == user_id).update(profile) + user = session.query(User).filter(User.id == user_id).one() + slugowner = session.query(User).where(User.slug == profile['slug']).one() + if slugowner: + if slugowner.id != user_id: + return { + "error": "slug is used by another user" + } + user.update(profile) session.commit() - return {} + return { + "error": None + } @mutation.field("rateUser") diff --git a/schema.graphql b/schema.graphql index 98b82af4..47db95fc 100644 --- a/schema.graphql +++ b/schema.graphql @@ -29,6 +29,7 @@ type ChatMember { name: String! userpic: String lastSeen: DateTime + online: Boolean # invitedAt: DateTime # invitedBy: String # user slug # TODO: keep invite databit @@ -270,6 +271,7 @@ type Query { loadMessagesBy(by: MessagesBy!, limit: Int, offset: Int): Result! loadRecipients(limit: Int, offset: Int): Result! searchRecipients(query: String!, limit: Int, offset: Int): Result! + searchMessages(by: MessagesBy!, limit: Int, offset: Int): Result! # auth isEmailUsed(email: String!): Boolean! @@ -304,11 +306,9 @@ type Query { ############################################ Subscription type Subscription { - newMessage(chats: [Int!]): Message! - onlineUpdated: [User!]! - shoutUpdated: Shout! - userUpdated: User! - reactionUpdated(shout: String!): ReactionUpdating! + newMessage: Message # new messages in inbox + collabUpdate(collab: Int!): Reaction # new reactions in collaborative editor + } ############################################ Entities @@ -371,6 +371,7 @@ type User { updatedAt: DateTime ratings: [Rating] bio: String + about: String notifications: [Int] communities: [Int] # user participating communities oid: String @@ -499,25 +500,26 @@ type Token { } type Message { - author: String! + author: Int! chatId: String! body: String! createdAt: Int! id: Int! - replyTo: String + replyTo: Int updatedAt: Int + seen: Boolean } type Chat { id: String! createdAt: Int! - createdBy: String! + createdBy: Int! updatedAt: Int! title: String description: String - users: [String] + users: [Int] members: [ChatMember] - admins: [String] + admins: [Int] messages: [Message] unread: Int private: Boolean diff --git a/services/inbox/helpers.py b/services/inbox/helpers.py new file mode 100644 index 00000000..d8791218 --- /dev/null +++ b/services/inbox/helpers.py @@ -0,0 +1,14 @@ +import asyncio + + +class MessageResult: + def __init__(self, status, message): + self.seen = status + self.message = message + + +class ChatFollowing: + queue = asyncio.Queue() + + def __init__(self, chat_id): + self.chat_id = chat_id diff --git a/services/inbox/presence.py b/services/inbox/presence.py new file mode 100644 index 00000000..b6d710ea --- /dev/null +++ b/services/inbox/presence.py @@ -0,0 +1,43 @@ +from base.exceptions import Unauthorized +from auth.tokenstorage import SessionToken +from base.redis import redis + + +async def set_online_status(user_id, status): + if user_id: + if status: + await redis.execute("SADD", "users-online", user_id) + else: + await redis.execute("SREM", "users-online", user_id) + + +async def on_connect(websocket, params): + if not isinstance(params, dict): + websocket.scope["connection_params"] = {} + return + token = params.get('token') + if not token: + raise Unauthorized("Please login") + else: + payload = await SessionToken.verify(token) + if payload and payload.user_id: + websocket.scope["user_id"] = payload.user_id + await set_online_status(payload.user_id, True) + + +async def on_disconnect(websocket): + user_id = websocket.scope.get("user_id") + await set_online_status(user_id, False) + + +# FIXME: not used yet +def context_value(request): + context = {} + print(f"[inbox.presense] request debug: {request}") + if request.scope["type"] == "websocket": + # request is an instance of WebSocket + context.update(request.scope["connection_params"]) + else: + context["token"] = request.META.get("authorization") + + return context diff --git a/services/inbox/sse.py b/services/inbox/sse.py new file mode 100644 index 00000000..e3dde165 --- /dev/null +++ b/services/inbox/sse.py @@ -0,0 +1,13 @@ +from sse_starlette.sse import EventSourceResponse +from starlette.requests import Request +from resolvers.inbox.messages import messages_generator_by_user +from base.exceptions import Unauthorized + + +async def sse_messages(request: Request): + print(f'[SSE] {request.scope}') + # https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md + if request['user']: + return EventSourceResponse(messages_generator_by_user(request['user'].user_id)) + else: + raise Unauthorized("Please login") diff --git a/services/inbox.py b/services/inbox/storage.py similarity index 72% rename from services/inbox.py rename to services/inbox/storage.py index d8222fa3..dd6e5fcf 100644 --- a/services/inbox.py +++ b/services/inbox/storage.py @@ -1,13 +1,6 @@ import asyncio -class ChatFollowing: - queue = asyncio.Queue() - - def __init__(self, chat_id): - self.chat_id = chat_id - - class MessagesStorage: lock = asyncio.Lock() chats = [] @@ -28,9 +21,3 @@ class MessagesStorage: for chat in MessagesStorage.chats: if message_result.message["chatId"] == chat.chat_id: chat.queue.put_nowait(message_result) - - -class MessageResult: - def __init__(self, status, message): - self.status = status - self.message = message diff --git a/settings.py b/settings.py index 196bc822..c712ddd1 100644 --- a/settings.py +++ b/settings.py @@ -26,6 +26,7 @@ FRONTEND_URL = environ.get("FRONTEND_URL") or "http://localhost:3000" SHOUTS_REPO = "content" SESSION_TOKEN_HEADER = "Authorization" +SENTRY_DSN = environ.get("SENTRY_DSN") + # for local development DEV_SERVER_STATUS_FILE_NAME = 'dev-server-status.txt' -SENTRY_ID = environ.get("SENTRY_ID")