diff --git a/auth/authenticate.py b/auth/authenticate.py index 95695604..242d2793 100644 --- a/auth/authenticate.py +++ b/auth/authenticate.py @@ -9,7 +9,7 @@ from auth.credentials import AuthCredentials, AuthUser from services.auth.users import UserStorage from settings import SESSION_TOKEN_HEADER from auth.tokenstorage import SessionToken -from base.exceptions import InvalidToken +from base.exceptions import InvalidToken, OperationNotAllowed, Unauthorized class JWTAuthenticate(AuthenticationBackend): @@ -30,27 +30,26 @@ class JWTAuthenticate(AuthenticationBackend): try: if len(token.split('.')) > 1: payload = await SessionToken.verify(token) + if payload is None: + return AuthCredentials(scopes=[]), AuthUser(user_id=None) + user = await UserStorage.get_user(payload.user_id) + if not user: + return AuthCredentials(scopes=[]), AuthUser(user_id=None) + scopes = await user.get_permission() + return ( + AuthCredentials( + user_id=payload.user_id, + scopes=scopes, + logged_in=True + ), + user, + ) else: InvalidToken("please try again") except Exception as exc: print("[auth.authenticate] session token verify error") print(exc) - return AuthCredentials(scopes=[], error_message=str(exc)), AuthUser( - user_id=None - ) - - if payload is None: - return AuthCredentials(scopes=[]), AuthUser(user_id=None) - - user = await UserStorage.get_user(payload.user_id) - if not user: - return AuthCredentials(scopes=[]), AuthUser(user_id=None) - - scopes = await user.get_permission() - return ( - AuthCredentials(user_id=payload.user_id, scopes=scopes, logged_in=True), - user, - ) + return AuthCredentials(scopes=[], error_message=str(exc)), AuthUser(user_id=None) def login_required(func): @@ -58,10 +57,9 @@ def login_required(func): async def wrap(parent, info: GraphQLResolveInfo, *args, **kwargs): # print('[auth.authenticate] login required for %r with info %r' % (func, info)) # debug only auth: AuthCredentials = info.context["request"].auth - if auth and auth.user_id: - print(auth) # debug only + # print(auth) if not auth.logged_in: - return {"error": auth.error_message or "Please login"} + raise OperationNotAllowed(auth.error_message or "Please login") return await func(parent, info, *args, **kwargs) return wrap @@ -73,9 +71,9 @@ def permission_required(resource, operation, func): print('[auth.authenticate] permission_required for %r with info %r' % (func, info)) # debug only auth: AuthCredentials = info.context["request"].auth if not auth.logged_in: - return {"error": auth.error_message or "Please login"} + raise Unauthorized(auth.error_message or "Please login") - # TODO: add check permission logix + # TODO: add actual check permission logix here return await func(parent, info, *args, **kwargs) diff --git a/auth/credentials.py b/auth/credentials.py index 401ae420..15738d16 100644 --- a/auth/credentials.py +++ b/auth/credentials.py @@ -2,7 +2,7 @@ from typing import List, Optional, Text from pydantic import BaseModel -from base.exceptions import OperationNotAllowed +from base.exceptions import Unauthorized class Permission(BaseModel): @@ -17,11 +17,13 @@ class AuthCredentials(BaseModel): @property def is_admin(self): + # TODO: check admin logix return True async def permissions(self) -> List[Permission]: if self.user_id is None: - raise OperationNotAllowed("Please login first") + raise Unauthorized("Please login first") + # TODO: implement permissions logix return NotImplemented() diff --git a/auth/jwtcodec.py b/auth/jwtcodec.py index c2feacd3..387df057 100644 --- a/auth/jwtcodec.py +++ b/auth/jwtcodec.py @@ -34,7 +34,7 @@ class JWTCodec: issuer="discours" ) r = TokenPayload(**payload) - print('[auth.jwtcodec] debug payload %r' % r) + # print('[auth.jwtcodec] debug payload %r' % r) return r except jwt.InvalidIssuedAtError: print('[auth.jwtcodec] invalid issued at: %r' % r) diff --git a/base/redis.py b/base/redis.py index 7468af0a..ccc3758a 100644 --- a/base/redis.py +++ b/base/redis.py @@ -12,6 +12,7 @@ class RedisCache: if self._instance is not None: return self._instance = await from_url(self._uri, encoding="utf-8") + # print(self._instance) async def disconnect(self): if self._instance is None: @@ -23,10 +24,11 @@ class RedisCache: async def execute(self, command, *args, **kwargs): while not self._instance: await sleep(1) - try: - await self._instance.execute_command(command, *args, **kwargs) - except Exception: - pass + try: + print("[redis] " + command + ' ' + ' '.join(args)) + return await self._instance.execute_command(command, *args, **kwargs) + except Exception: + pass async def lrange(self, key, start, stop): return await self._instance.lrange(key, start, stop) diff --git a/main.py b/main.py index 3e1deaae..49746195 100644 --- a/main.py +++ b/main.py @@ -20,6 +20,7 @@ from services.stat.reacted import ReactedStorage from services.stat.topicstat import TopicStat from services.stat.viewed import ViewedStorage from services.zine.gittask import GitTask +from services.zine.shoutauthor import ShoutAuthorStorage from settings import DEV_SERVER_STATUS_FILE_NAME import_module("resolvers") @@ -39,11 +40,14 @@ async def start_up(): print(views_stat_task) reacted_storage_task = asyncio.create_task(ReactedStorage.worker()) print(reacted_storage_task) + shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker()) + print(shout_author_task) topic_stat_task = asyncio.create_task(TopicStat.worker()) print(topic_stat_task) git_task = asyncio.create_task(GitTask.git_task_worker()) print(git_task) + async def dev_start_up(): if exists(DEV_SERVER_STATUS_FILE_NAME): return diff --git a/migration/tables/users.py b/migration/tables/users.py index fe9b7374..6db7a243 100644 --- a/migration/tables/users.py +++ b/migration/tables/users.py @@ -17,7 +17,7 @@ def migrate(entry): "username": email, "email": email, "createdAt": parse(entry["createdAt"]), - "emailConfirmed": bool(entry["emails"][0]["verified"]), + "emailConfirmed": ("@discours.io" in email) or bool(entry["emails"][0]["verified"]), "muted": False, # amnesty "bio": entry["profile"].get("bio", ""), "notifications": [], diff --git a/resolvers/__init__.py b/resolvers/__init__.py index fccca3a0..da8800a2 100644 --- a/resolvers/__init__.py +++ b/resolvers/__init__.py @@ -48,8 +48,8 @@ from resolvers.zine.load import ( from resolvers.inbox.chats import ( create_chat, delete_chat, - update_chat, - invite_to_chat + update_chat + ) from resolvers.inbox.messages import ( create_message, @@ -111,7 +111,6 @@ __all__ = [ # inbox "load_chats", "load_messages_by", - "invite_to_chat", "create_chat", "delete_chat", "update_chat", diff --git a/resolvers/auth.py b/resolvers/auth.py index d6f1d40b..317d2733 100644 --- a/resolvers/auth.py +++ b/resolvers/auth.py @@ -13,7 +13,7 @@ from auth.identity import Identity, Password from auth.jwtcodec import JWTCodec from auth.tokenstorage import TokenStorage from base.exceptions import (BaseHttpException, InvalidPassword, InvalidToken, - ObjectNotExist, OperationNotAllowed) + ObjectNotExist, OperationNotAllowed, Unauthorized) from base.orm import local_session from base.resolvers import mutation, query from orm import Role, User @@ -37,7 +37,7 @@ async def get_current_user(_, info): "news": await user_subscriptions(user.slug), } else: - raise OperationNotAllowed("No session token present in request, try to login") + raise Unauthorized("No session token present in request, try to login") @mutation.field("confirmEmail") diff --git a/resolvers/inbox/chats.py b/resolvers/inbox/chats.py index f4b1ca1e..e24acf0c 100644 --- a/resolvers/inbox/chats.py +++ b/resolvers/inbox/chats.py @@ -7,43 +7,6 @@ from base.redis import redis from base.resolvers import mutation -async def add_user_to_chat(user_slug: str, chat_id: str, chat=None): - for member in chat["users"]: - chats_ids = await redis.execute("GET", f"chats_by_user/{member}") - if chats_ids: - chats_ids = list(json.loads(chats_ids)) - else: - chats_ids = [] - if chat_id not in chats_ids: - chats_ids.append(chat_id) - await redis.execute("SET", f"chats_by_user/{member}", json.dumps(chats_ids)) - - -@mutation.field("inviteChat") -async def invite_to_chat(_, info, invited: str, chat_id: str): - ''' invite user with :slug to chat with :chat_id ''' - user = info.context["request"].user - chat = await redis.execute("GET", f"chats/{chat_id}") - if not chat: - return { - "error": "chat not exist" - } - chat = dict(json.loads(chat)) - if not chat['private'] and user.slug not in chat['admins']: - return { - "error": "only admins can invite to private chat", - "chat": chat - } - else: - chat["users"].append(invited) - await add_user_to_chat(user.slug, chat_id, chat) - await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) - return { - "error": None, - "chat": chat - } - - @mutation.field("updateChat") @login_required async def update_chat(_, info, chat_new: dict): @@ -71,9 +34,8 @@ async def update_chat(_, info, chat_new: dict): "admins": chat_new.get("admins", chat["admins"]), "users": chat_new.get("users", chat["users"]) }) - await add_user_to_chat(user.slug, chat_id, chat) await redis.execute("SET", f"chats/{chat.id}", json.dumps(chat)) - await redis.execute("SET", f"chats/{chat.id}/next_message_id", 0) + await redis.execute("COMMIT") return { "error": None, @@ -97,11 +59,22 @@ async def create_chat(_, info, title="", members=[]): "users": members, "admins": [user.slug, ] } + # double creation protection + cids = await redis.execute("SMEMBERS", f"chats_by_user/{user.slug}") + for cid in cids: + c = await redis.execute("GET", F"chats/{cid.decode('utf-8')}") + isc = [x for x in c["users"] if x not in chat["users"]] + if isc == [] and chat["title"] == c["title"]: + return { + "error": "chat was created before", + "chat": chat + } - await add_user_to_chat(user.slug, chat_id, chat) + for m in members: + await redis.execute("SADD", f"chats_by_user/{m}", chat_id) await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0)) - + await redis.execute("COMMIT") return { "error": None, "chat": chat @@ -117,6 +90,8 @@ async def delete_chat(_, info, chat_id: str): chat = dict(json.loads(chat)) if user.slug in chat['admins']: await redis.execute("DEL", f"chats/{chat_id}") + await redis.execute("SREM", "chats_by_user/" + user, chat_id) + await redis.execute("COMMIT") else: return { "error": "chat not exist" diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py index cd38690b..871b0700 100644 --- a/resolvers/inbox/load.py +++ b/resolvers/inbox/load.py @@ -5,52 +5,51 @@ from auth.authenticate import login_required from base.redis import redis from base.orm import local_session from base.resolvers import query +from base.exceptions import ObjectNotExist from orm.user import User from resolvers.zine.profile import followed_authors from .unread import get_unread_counter -async def load_messages(chatId: str, limit: int, offset: int): - ''' load :limit messages for :chatId with :offset ''' +async def load_messages(chat_id: str, limit: int, offset: int): + ''' load :limit messages for :chat_id with :offset ''' messages = [] message_ids = await redis.lrange( - f"chats/{chatId}/message_ids", 0 - offset - limit, 0 - offset + f"chats/{chat_id}/message_ids", offset + limit, offset ) if message_ids: message_keys = [ - f"chats/{chatId}/messages/{mid}" for mid in message_ids + f"chats/{chat_id}/messages/{mid}" for mid in message_ids ] messages = await redis.mget(*message_keys) messages = [json.loads(msg) for msg in messages] - return { - "messages": messages, - "error": None - } + return messages @query.field("loadChats") @login_required -async def load_chats(_, info, limit: int, offset: int): +async def load_chats(_, info, limit: int = 50, offset: int = 0): """ load :limit chats of current user with :offset """ user = info.context["request"].user - if user: - chats = await redis.execute("GET", f"chats_by_user/{user.slug}") - if chats: - chats = list(json.loads(chats))[offset:offset + limit] - if not chats: - chats = [] - for c in chats: - c['messages'] = await load_messages(c['id'], limit, offset) - c['unread'] = await get_unread_counter(c['id'], user.slug) - return { - "chats": chats, - "error": None - } - else: - return { - "error": "please login", - "chats": [] - } + print('[inbox] load user\'s chats') + cids = await redis.execute("SMEMBERS", "chats_by_user/" + user.slug) + if cids: + cids = list(cids)[offset:offset + limit] + if not cids: + print('[inbox.load] no chats were found') + cids = [] + chats = [] + for cid in cids: + c = await redis.execute("GET", "chats/" + cid.decode("utf-8")) + if c: + c = json.loads(c) + c['messages'] = await load_messages(cid, 50, 0) + c['unread'] = await get_unread_counter(cid, user.slug) + chats.append(c) + return { + "chats": chats, + "error": None + } @query.field("loadMessagesBy") @@ -58,28 +57,36 @@ async def load_chats(_, info, limit: int, offset: int): async def load_messages_by(_, info, by, limit: int = 50, offset: int = 0): ''' load :amolimitunt messages of :chat_id with :offset ''' user = info.context["request"].user - my_chats = await redis.execute("GET", f"chats_by_user/{user.slug}") - chat_id = by.get('chat') - if chat_id: - chat = await redis.execute("GET", f"chats/{chat_id}") + cids = await redis.execute("SMEMBERS", "chats_by_user/" + user.slug) + by_chat = by.get('chat') + messages = [] + if by_chat: + chat = await redis.execute("GET", f"chats/{by_chat}") if not chat: - return { - "error": "chat not exist" - } - messages = await load_messages(chat_id, limit, offset) - user_id = by.get('author') - if user_id: - chats = await redis.execute("GET", f"chats_by_user/{user_id}") - our_chats = list(set(chats) & set(my_chats)) - for c in our_chats: - messages += await load_messages(c, limit, offset) + raise ObjectNotExist("Chat not exists") + messages = await load_messages(by_chat, limit, offset) + by_author = by.get('author') + if by_author: + if not by_chat: + # all author's messages + by_author_cids = await redis.execute("SMEMBERS", f"chats_by_user/{by_author}") + for c in list(by_author_cids & cids): + messages += await load_messages(c, limit, offset) + else: + # author's messages in chat + messages = filter(lambda m: m["author"] == by_author, messages) body_like = by.get('body') if body_like: - for c in my_chats: - mmm = await load_messages(c, limit, offset) - for m in mmm: - if body_like in m["body"]: - messages.append(m) + if not by_chat: + # search in all messages in all user's chats + for c in list(cids): + mmm = await load_messages(c, limit, offset) + for m in mmm: + if body_like in m["body"]: + messages.append(m) + else: + # search in chat's messages + messages = filter(lambda m: body_like in m["body"], messages) days = by.get("days") if days: messages = filter( diff --git a/resolvers/inbox/unread.py b/resolvers/inbox/unread.py index 0075a877..d5d2e553 100644 --- a/resolvers/inbox/unread.py +++ b/resolvers/inbox/unread.py @@ -17,6 +17,6 @@ async def get_total_unread_counter(user_slug: str): if chats: chats = json.loads(chats) for chat_id in chats: - n = await get_unread_counter(chat_id, user_slug) + n = await get_unread_counter(chat_id.decode('utf-8'), user_slug) unread += n return unread diff --git a/schema.graphql b/schema.graphql index e1d30873..e08342ec 100644 --- a/schema.graphql +++ b/schema.graphql @@ -151,7 +151,6 @@ type Mutation { createChat(title: String, members: [String]!): Result! updateChat(chat: ChatInput!): Result! deleteChat(chatId: String!): Result! - inviteChat(chatId: String!, userslug: String!): Result! createMessage(chat: String!, body: String!, replyTo: String): Result! updateMessage(chatId: String!, id: Int!, body: String!): Result! @@ -515,13 +514,13 @@ type Message { type Chat { id: String! createdAt: Int! - createdBy: User! + createdBy: String! updatedAt: Int! title: String description: String - users: [User]! - admins: [User] - messages: [Message]! + users: [String]! + admins: [String] + messages: [Message] unread: Int private: Boolean } diff --git a/services/stat/topicstat.py b/services/stat/topicstat.py index c95d0850..3a5689e6 100644 --- a/services/stat/topicstat.py +++ b/services/stat/topicstat.py @@ -3,7 +3,7 @@ import time from base.orm import local_session from orm.shout import Shout, ShoutTopic, ShoutAuthor from orm.topic import TopicFollower -from sqlalchemy.sql.expression import select +# from sqlalchemy.sql.expression import select class TopicStat: @@ -20,21 +20,19 @@ class TopicStat: start = time.time() self = TopicStat shout_topics = session.query(ShoutTopic, Shout).join(Shout).all() - all_shout_authors = session.query(ShoutAuthor).all() print("[stat.topics] %d links for shouts" % len(shout_topics)) for [shout_topic, shout] in shout_topics: tpc = shout_topic.topic - # shouts by topics - # shout = session.query(Shout).where(Shout.slug == shout_topic.shout).first() self.shouts_by_topic[tpc] = self.shouts_by_topic.get(tpc, dict()) self.shouts_by_topic[tpc][shout.slug] = shout - - # authors by topics - shout_authors = filter(lambda asa: asa.shout == shout.slug, all_shout_authors) - self.authors_by_topic[tpc] = self.authors_by_topic.get(tpc, dict()) - for sa in shout_authors: - self.authors_by_topic[tpc][sa.shout] = sa.caption + authors = session.query( + ShoutAuthor.user, ShoutAuthor.caption + ).filter( + ShoutAuthor.shout == shout.slug + ).all() + for a in authors: + self.authors_by_topic[tpc][a[0]] = a[1] self.followers_by_topic = {} followings = session.query(TopicFollower).all() diff --git a/services/zine/shoutauthor.py b/services/zine/shoutauthor.py new file mode 100644 index 00000000..477ff384 --- /dev/null +++ b/services/zine/shoutauthor.py @@ -0,0 +1,46 @@ +import asyncio +from base.orm import local_session +from orm.shout import ShoutAuthor, Shout + + +class ShoutAuthorStorage: + authors_by_shout = {} + lock = asyncio.Lock() + period = 30 * 60 # sec + + @staticmethod + async def load_captions(session): + self = ShoutAuthorStorage + sas = session.query(ShoutAuthor).join(Shout).all() + for sa in sas: + self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, []) + self.authors_by_shout[sa.shout].append([sa.user, sa.caption]) + print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout)) + + @staticmethod + async def get_authors(shout): + self = ShoutAuthorStorage + async with self.lock: + return self.authors_by_shout.get(shout, []) + + @staticmethod + async def get_author_caption(shout, author): + self = ShoutAuthorStorage + async with self.lock: + for a in self.authors_by_shout.get(shout, []): + if author in a: + return a[1] + return {"error": "author caption not found"} + + @staticmethod + async def worker(): + self = ShoutAuthorStorage + while True: + try: + with local_session() as session: + async with self.lock: + await self.load_captions(session) + print("[zine.authors] index by authors was updated") + except Exception as err: + print("[zine.authors] error indexing by author: %s" % (err)) + await asyncio.sleep(self.period)