From 06437ab5bd3cafd2dfd6c0b5de21c20f902af836 Mon Sep 17 00:00:00 2001 From: knst-kotov Date: Fri, 28 Jan 2022 12:49:46 +0300 Subject: [PATCH 1/8] subscription on comments --- orm/comment.py | 2 +- resolvers/comments.py | 81 +++++++++++++++++++++++++++++++++++++++---- schema.graphql | 14 ++++++++ 3 files changed, 90 insertions(+), 7 deletions(-) diff --git a/orm/comment.py b/orm/comment.py index a36d24e3..c71f4ac6 100644 --- a/orm/comment.py +++ b/orm/comment.py @@ -25,7 +25,7 @@ class Comment(Base): updatedBy = Column(ForeignKey("user.id"), nullable=True, comment="Last Editor") deletedAt = Column(DateTime, nullable=True, comment="Deleted at") deletedBy = Column(ForeignKey("user.id"), nullable=True, comment="Deleted by") - shout: int = Column(ForeignKey("shout.slug"), nullable=False) + shout = Column(ForeignKey("shout.slug"), nullable=False) replyTo: int = Column(ForeignKey("comment.id"), nullable=True, comment="comment ID") ratings = relationship(CommentRating, foreign_keys=CommentRating.comment_id) diff --git a/resolvers/comments.py b/resolvers/comments.py index 68344ce6..b47137c2 100644 --- a/resolvers/comments.py +++ b/resolvers/comments.py @@ -5,6 +5,42 @@ from auth.authenticate import login_required import asyncio from datetime import datetime +class CommentResult: + def __init__(self, status, comment): + self.status = status + self.comment = comment + +class CommentSubscription: + queue = asyncio.Queue() + + def __init__(self, shout_slug): + self.shout_slug = shout_slug + +#TODO: one class for MessageSubscription and CommentSubscription +class CommentSubscriptions: + lock = asyncio.Lock() + subscriptions = [] + + @staticmethod + async def register_subscription(subs): + self = CommentSubscriptions + async with self.lock: + self.subscriptions.append(subs) + + @staticmethod + async def del_subscription(subs): + self = CommentSubscriptions + async with self.lock: + self.subscriptions.remove(subs) + + @staticmethod + async def put(comment_result): + self = CommentSubscriptions + async with self.lock: + for subs in self.subscriptions: + if comment_result.comment.shout == subs.shout_slug: + subs.queue.put_nowait(comment_result) + @mutation.field("createComment") @login_required async def create_comment(_, info, body, shout, replyTo = None): @@ -18,6 +54,9 @@ async def create_comment(_, info, body, shout, replyTo = None): replyTo = replyTo ) + result = CommentResult("NEW", comment) + await CommentSubscriptions.put(result) + return {"comment": comment} @mutation.field("updateComment") @@ -38,6 +77,11 @@ async def update_comment(_, info, id, body): session.commit() + result = CommentResult("UPDATED", comment) + await CommentSubscriptions.put(result) + + return {"comment": comment} + @mutation.field("deleteComment") @login_required async def delete_comment(_, info, id): @@ -54,6 +98,9 @@ async def delete_comment(_, info, id): comment.deletedAt = datetime.now() session.commit() + result = CommentResult("DELETED", comment) + await CommentSubscriptions.put(result) + return {} @mutation.field("rateComment") @@ -63,16 +110,38 @@ async def rate_comment(_, info, id, value): user_id = auth.user_id with local_session() as session: + comment = session.query(Comment).filter(Comment.id == id).first() + if not comment: + return {"error": "invalid comment id"} + rating = session.query(CommentRating).\ filter(CommentRating.comment_id == id and CommentRating.createdBy == user_id).first() if rating: rating.value = value session.commit() - return {} - - CommentRating.create( - comment_id = id, - createdBy = user_id, - value = value) + if not rating: + CommentRating.create( + comment_id = id, + createdBy = user_id, + value = value) + + result = CommentResult("UPDATED_RATING", comment) + await CommentSubscriptions.put(result) + return {} + +@subscription.source("commentUpdated") +async def comment_generator(obj, info, shout): + try: + subs = CommentSubscription(shout) + await CommentSubscriptions.register_subscription(subs) + while True: + result = await subs.queue.get() + yield result + finally: + await CommentSubscriptions.del_subscription(subs) + +@subscription.field("commentUpdated") +def comment_resolver(result, info, shout): + return result diff --git a/schema.graphql b/schema.graphql index 4b4ef942..71245cd3 100644 --- a/schema.graphql +++ b/schema.graphql @@ -95,6 +95,19 @@ type TopicResult { topic: Topic } +enum CommentStatus { + NEW + UPDATED + UPDATED_RATING + DELETED +} + +type CommentUpdatedResult { + error: String + status: CommentStatus + comment: Comment +} + ################################### Mutation type Mutation { @@ -192,6 +205,7 @@ type Subscription { shoutUpdated: Shout! userUpdated: User! topicUpdated(user_id: Int!): Shout! + commentUpdated(shout: String!): CommentUpdatedResult! } ############################################ Entities From ab42c3b60cc0e620af60b406fa8453453a12b03e Mon Sep 17 00:00:00 2001 From: knst-kotov Date: Fri, 28 Jan 2022 13:33:16 +0300 Subject: [PATCH 2/8] signIn logs --- resolvers/auth.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/resolvers/auth.py b/resolvers/auth.py index f32d78a4..b3f6504e 100644 --- a/resolvers/auth.py +++ b/resolvers/auth.py @@ -86,9 +86,11 @@ async def login(_, info: GraphQLResolveInfo, email: str, password: str = ""): with local_session() as session: orm_user = session.query(User).filter(User.email == email).first() if orm_user is None: + print(f"signIn {email}: invalid email") return {"error" : "invalid email"} if not password: + print(f"signIn {email}: send auth email") await send_auth_email(orm_user) return {} @@ -101,9 +103,11 @@ async def login(_, info: GraphQLResolveInfo, email: str, password: str = ""): try: user = Identity.identity(orm_user, password) except InvalidPassword: + print(f"signIn {email}: invalid password") return {"error" : "invalid password"} token = await Authorize.authorize(user, device=device, auto_delete=auto_delete) + print(f"signIn {email}: OK") return {"token" : token, "user": orm_user} From 3828e92c198558922b0461562f0e974fdfcc27f8 Mon Sep 17 00:00:00 2001 From: knst-kotov Date: Fri, 28 Jan 2022 15:52:14 +0300 Subject: [PATCH 3/8] get messages from redis --- resolvers/inbox.py | 48 ++++++++++++++++++++-------------------------- schema.graphql | 6 ++---- 2 files changed, 23 insertions(+), 31 deletions(-) diff --git a/resolvers/inbox.py b/resolvers/inbox.py index 6f0da3c5..086e1785 100644 --- a/resolvers/inbox.py +++ b/resolvers/inbox.py @@ -60,20 +60,25 @@ async def create_chat(_, info, description): return { "chatId" : chat_id } -@query.field("enterChat") -@login_required -async def enter_chat(_, info, chatId): - chat = await redis.execute("GET", f"chats/{chatId}") - if not chat: - return { "error" : "chat not exist" } - chat = json.loads(chat) - - message_ids = await redis.lrange(f"chats/{chatId}/message_ids", 0, 10) +async def load_messages(chatId, size, page): + message_ids = await redis.lrange(f"chats/{chatId}/message_ids", + size * (page -1), size * page - 1) messages = [] if message_ids: message_keys = [f"chats/{chatId}/messages/{id.decode('UTF-8')}" for id in message_ids] messages = await redis.mget(*message_keys) messages = [json.loads(msg) for msg in messages] + return messages + +@query.field("enterChat") +@login_required +async def enter_chat(_, info, chatId, size): + chat = await redis.execute("GET", f"chats/{chatId}") + if not chat: + return { "error" : "chat not exist" } + chat = json.loads(chat) + + messages = await load_messages(chatId, size, 1) return { "chat" : chat, @@ -112,25 +117,14 @@ async def create_message(_, info, chatId, body, replyTo = None): @query.field("getMessages") @login_required -async def get_messages(_, info, count, page): - auth = info.context["request"].auth - user_id = auth.user_id - - with local_session() as session: - messages = session.query(Message).filter(Message.author == user_id) - - return messages +async def get_messages(_, info, chatId, size, page): + chat = await redis.execute("GET", f"chats/{chatId}") + if not chat: + return { "error" : "chat not exist" } -def check_and_get_message(message_id, user_id, session) : - message = session.query(Message).filter(Message.id == message_id).first() - - if not message : - raise Exception("invalid id") - - if message.author != user_id : - raise Exception("access denied") - - return message + messages = await load_messages(chatId, size, page) + + return messages @mutation.field("updateMessage") @login_required diff --git a/schema.graphql b/schema.graphql index 71245cd3..07c06c7e 100644 --- a/schema.graphql +++ b/schema.graphql @@ -167,7 +167,8 @@ type Query { getUserRoles(slug: String!): [Role]! # messages - getMessages(count: Int = 100, page: Int = 1): [Message!]! + enterChat(chatId: String!, size: Int = 50): EnterChatResult! + getMessages(chatId: String!, size: Int!, page: Int!): [Message]! # shouts getShoutBySlug(slug: String!): Shout! @@ -192,9 +193,6 @@ type Query { # communities getCommunity(slug: String): Community! getCommunities: [Community]! - - #messages - enterChat(chatId: String!): EnterChatResult! } ############################################ Subscription From 32becea4da91fbd739baf1bf91b7de8e04cbfaac Mon Sep 17 00:00:00 2001 From: knst-kotov Date: Sun, 30 Jan 2022 11:35:49 +0300 Subject: [PATCH 4/8] add author subscription --- orm/user.py | 8 ++++++++ resolvers/profile.py | 31 ++++++++++++++++++++++++++++++- schema.graphql | 3 +++ 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/orm/user.py b/orm/user.py index 47f2c9a2..a9eb78c3 100644 --- a/orm/user.py +++ b/orm/user.py @@ -33,6 +33,14 @@ class UserRole(Base): user_id = Column(ForeignKey('user.id'), primary_key = True) role_id = Column(ForeignKey('role.id'), primary_key = True) +class AuthorSubscription(Base): + __tablename__ = "author_subscription" + + id = None + subscriber = Column(ForeignKey('user.slug'), primary_key = True) + author = Column(ForeignKey('user.slug'), primary_key = True) + createdAt = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") + class User(Base): __tablename__ = "user" diff --git a/resolvers/profile.py b/resolvers/profile.py index f9284214..feb31573 100644 --- a/resolvers/profile.py +++ b/resolvers/profile.py @@ -1,9 +1,10 @@ from orm import User, UserRole, Role, UserRating +from orm.user import AuthorSubscription from orm.base import local_session from resolvers.base import mutation, query, subscription from auth.authenticate import login_required -from sqlalchemy import func +from sqlalchemy import func, and_ from sqlalchemy.orm import selectinload import asyncio @@ -46,3 +47,31 @@ async def update_profile(_, info, profile): session.commit() return {} + +@mutation.field("authorSubscribe") +@login_required +async def author_subscribe(_, info, slug): + user = info.context["request"].user + + AuthorSubscription.create( + subscriber = user.slug, + author = slug + ) + + return {} + +@mutation.field("authorUnsubscribe") +@login_required +async def author_unsubscribe(_, info, slug): + user = info.context["request"].user + + with local_session() as session: + sub = session.query(AuthorSubscription).\ + filter(and_(AuthorSubscription.subscriber == user.slug, AuthorSubscription.author == slug)).\ + first() + if not sub: + return { "error" : "subscription not exist" } + session.delete(sub) + session.commit() + + return {} diff --git a/schema.graphql b/schema.graphql index 07c06c7e..888e5419 100644 --- a/schema.graphql +++ b/schema.graphql @@ -150,6 +150,9 @@ type Mutation { createCommunity(title: String!, desc: String!): Community! updateCommunity(community: CommunityInput!): Community! deleteCommunity(id: Int!): Result! + + authorSubscribe(slug: String!): Result! + authorUnsubscribe(slug: String!): Result! } ################################### Query From 68f7733f912ce047d700c28237f25bfab0cce417 Mon Sep 17 00:00:00 2001 From: knst-kotov Date: Sun, 30 Jan 2022 11:50:29 +0300 Subject: [PATCH 5/8] fix topic subscription --- orm/topic.py | 2 +- resolvers/topics.py | 32 +++++++++++++++++++++----------- schema.graphql | 2 +- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/orm/topic.py b/orm/topic.py index b7be86c3..5d95988a 100644 --- a/orm/topic.py +++ b/orm/topic.py @@ -9,8 +9,8 @@ class TopicSubscription(Base): __tablename__ = "topic_subscription" id = None + subscriber = Column(ForeignKey('user.slug'), primary_key = True) topic = Column(ForeignKey('topic.slug'), primary_key = True) - user = Column(ForeignKey('user.id'), primary_key = True) createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") class Topic(Base): diff --git a/resolvers/topics.py b/resolvers/topics.py index 0eaaa678..c4773743 100644 --- a/resolvers/topics.py +++ b/resolvers/topics.py @@ -7,6 +7,8 @@ from resolvers.zine import ShoutSubscriptions from auth.authenticate import login_required import asyncio +from sqlalchemy import func, and_ + @query.field("topicsBySlugs") async def topics_by_slugs(_, info, slugs = None): with local_session() as session: @@ -62,27 +64,35 @@ async def update_topic(_, info, input): @mutation.field("topicSubscribe") @login_required async def topic_subscribe(_, info, slug): - auth = info.context["request"].auth - user_id = auth.user_id - sub = TopicSubscription.create({ user: user_id, topic: slug }) - return {} # type Result + user = info.context["request"].user + + TopicSubscription.create( + subscriber = user.slug, + topic = slug) + + return {} @mutation.field("topicUnsubscribe") @login_required async def topic_unsubscribe(_, info, slug): - auth = info.context["request"].auth - user_id = auth.user_id - sub = session.query(TopicSubscription).filter(TopicSubscription.user == user_id and TopicSubscription.topic == slug).first() + user = info.context["request"].user + with local_session() as session: + sub = session.query(TopicSubscription).\ + filter(and_(TopicSubscription.subscriber == user.slug, TopicSubscription.topic == slug)).\ + first() + if not sub: + return { "error" : "subscription not exist" } session.delete(sub) - return {} # type Result - return { "error": "session error" } + session.commit() + + return {} @subscription.source("topicUpdated") -async def new_shout_generator(obj, info, user_id): +async def new_shout_generator(obj, info, user_slug): try: with local_session() as session: - topics = session.query(TopicSubscription.topic).filter(TopicSubscription.user == user_id).all() + topics = session.query(TopicSubscription.topic).filter(TopicSubscription.subscriber == user_slug).all() topics = set([item.topic for item in topics]) shouts_queue = asyncio.Queue() await ShoutSubscriptions.register_subscription(shouts_queue) diff --git a/schema.graphql b/schema.graphql index 888e5419..10b0892f 100644 --- a/schema.graphql +++ b/schema.graphql @@ -205,7 +205,7 @@ type Subscription { onlineUpdated: [User!]! shoutUpdated: Shout! userUpdated: User! - topicUpdated(user_id: Int!): Shout! + topicUpdated(user_slug: String!): Shout! commentUpdated(shout: String!): CommentUpdatedResult! } From 073c5252c3ed4979df183bff3d776c06517185f4 Mon Sep 17 00:00:00 2001 From: knst-kotov Date: Sun, 30 Jan 2022 14:28:27 +0300 Subject: [PATCH 6/8] add shoutsByUserRatingOrComment --- orm/user.py | 8 ++++++++ resolvers/zine.py | 20 ++++++++++++++++++++ schema.graphql | 3 +++ 3 files changed, 31 insertions(+) diff --git a/orm/user.py b/orm/user.py index a9eb78c3..febf6a22 100644 --- a/orm/user.py +++ b/orm/user.py @@ -104,6 +104,14 @@ class UserStorage: async with self.lock: return self.users.get(id) + @staticmethod + async def get_user_by_slug(slug): + self = UserStorage + async with self.lock: + for user in self.users.values(): + if user.slug == slug: + return user + @staticmethod async def add_user(user): self = UserStorage diff --git a/resolvers/zine.py b/resolvers/zine.py index 32d5fa86..d2b6813a 100644 --- a/resolvers/zine.py +++ b/resolvers/zine.py @@ -396,3 +396,23 @@ async def shouts_by_community(_, info, community, page, size): limit(size).\ offset(page * size) return shouts + +@query.field("shoutsByUserRatingOrComment") +async def shouts_by_user_rating_or_comment(_, info, userSlug, page, size): + user = await UserStorage.get_user_by_slug(userSlug) + if not user: + return + + with local_session() as session: + shouts_by_rating = session.query(Shout).\ + join(ShoutRating).\ + where(and_(Shout.publishedAt != None, ShoutRating.rater == userSlug)) + shouts_by_comment = session.query(Shout).\ + join(Comment).\ + where(and_(Shout.publishedAt != None, Comment.author == user.id)) + shouts = shouts_by_rating.union(shouts_by_comment).\ + order_by(desc(Shout.publishedAt)).\ + limit(size).\ + offset( (page - 1) * size) + + return shouts diff --git a/schema.graphql b/schema.graphql index 10b0892f..5671e1fd 100644 --- a/schema.graphql +++ b/schema.graphql @@ -196,6 +196,9 @@ type Query { # communities getCommunity(slug: String): Community! getCommunities: [Community]! + + # shoutsByUserSubscriptions(): [Shout]! + shoutsByUserRatingOrComment(userSlug: String!, page: Int!, size: Int!): [Shout]! } ############################################ Subscription From 1f3983fc07aa6e2f565022f5eb1126ff87b4306d Mon Sep 17 00:00:00 2001 From: knst-kotov Date: Mon, 31 Jan 2022 12:16:46 +0300 Subject: [PATCH 7/8] add newShoutsWithoutRating --- resolvers/zine.py | 16 ++++++++++++++++ schema.graphql | 1 + 2 files changed, 17 insertions(+) diff --git a/resolvers/zine.py b/resolvers/zine.py index d2b6813a..231e8532 100644 --- a/resolvers/zine.py +++ b/resolvers/zine.py @@ -416,3 +416,19 @@ async def shouts_by_user_rating_or_comment(_, info, userSlug, page, size): offset( (page - 1) * size) return shouts + +@query.field("newShoutsWithoutRating") +async def new_shouts_without_rating(_, info, userSlug, size): + user = await UserStorage.get_user_by_slug(userSlug) + if not user: + return + + #TODO: postgres heavy load + with local_session() as session: + shouts = session.query(Shout).distinct().\ + outerjoin(ShoutRating).\ + where(and_(Shout.publishedAt != None, ShoutRating.rater != userSlug)).\ + order_by(desc(Shout.publishedAt)).\ + limit(size) + + return shouts diff --git a/schema.graphql b/schema.graphql index 5671e1fd..5427fbf4 100644 --- a/schema.graphql +++ b/schema.graphql @@ -199,6 +199,7 @@ type Query { # shoutsByUserSubscriptions(): [Shout]! shoutsByUserRatingOrComment(userSlug: String!, page: Int!, size: Int!): [Shout]! + newShoutsWithoutRating(userSlug: String!, size: Int = 10): [Shout]! } ############################################ Subscription From 020a3a5abfb3d01f7cdf028fee9f8a631f33a2d2 Mon Sep 17 00:00:00 2001 From: knst-kotov Date: Mon, 31 Jan 2022 14:34:43 +0300 Subject: [PATCH 8/8] add shoutsByUserSubscriptions; use user slug in shout_author table --- orm/shout.py | 4 ++-- resolvers/zine.py | 29 ++++++++++++++++++++++++++--- schema.graphql | 2 +- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/orm/shout.py b/orm/shout.py index be28bb9d..0631c7bc 100644 --- a/orm/shout.py +++ b/orm/shout.py @@ -16,7 +16,7 @@ class ShoutAuthor(Base): id = None shout = Column(ForeignKey('shout.slug'), primary_key = True) - user = Column(ForeignKey('user.id'), primary_key = True) + user = Column(ForeignKey('user.slug'), primary_key = True) class ShoutViewer(Base): __tablename__ = "shout_viewer" @@ -198,7 +198,7 @@ class TopicStat: subs = session.query(TopicSubscription) for sub in subs: topic = sub.topic - user = sub.user + user = sub.subscriber if topic in self.subs_by_topic: self.subs_by_topic[topic].append(user) else: diff --git a/resolvers/zine.py b/resolvers/zine.py index 231e8532..ed6413c8 100644 --- a/resolvers/zine.py +++ b/resolvers/zine.py @@ -1,7 +1,8 @@ from orm import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay, User, Community, Resource,\ ShoutRatingStorage, ShoutViewStorage, Comment, CommentRating, Topic from orm.base import local_session -from orm.user import UserStorage +from orm.user import UserStorage, AuthorSubscription +from orm.topic import TopicSubscription from resolvers.base import mutation, query @@ -222,7 +223,7 @@ async def create_shout(_, info, input): new_shout = Shout.create(**input) ShoutAuthor.create( shout = new_shout.slug, - user = user.id) + user = user.slug) if "mainTopic" in input: topic_slugs.append(input["mainTopic"]) @@ -375,7 +376,7 @@ async def shouts_by_author(_, info, author, page, size): shouts = session.query(Shout).\ join(ShoutAuthor).\ - where(and_(ShoutAuthor.user == user.id, Shout.publishedAt != None)).\ + where(and_(ShoutAuthor.user == author, Shout.publishedAt != None)).\ order_by(desc(Shout.publishedAt)).\ limit(size).\ offset(page * size) @@ -397,6 +398,28 @@ async def shouts_by_community(_, info, community, page, size): offset(page * size) return shouts +@query.field("shoutsByUserSubscriptions") +async def shouts_by_user_subscriptions(_, info, userSlug, page, size): + user = await UserStorage.get_user_by_slug(userSlug) + if not user: + return + + with local_session() as session: + shouts_by_topic = session.query(Shout).\ + join(ShoutTopic).\ + join(TopicSubscription, ShoutTopic.topic == TopicSubscription.topic).\ + where(and_(Shout.publishedAt != None, TopicSubscription.subscriber == userSlug)) + shouts_by_author = session.query(Shout).\ + join(ShoutAuthor).\ + join(AuthorSubscription, ShoutAuthor.user == AuthorSubscription.author).\ + where(and_(Shout.publishedAt != None, AuthorSubscription.subscriber == userSlug)) + shouts = shouts_by_topic.union(shouts_by_author).\ + order_by(desc(Shout.publishedAt)).\ + limit(size).\ + offset( (page - 1) * size) + + return shouts + @query.field("shoutsByUserRatingOrComment") async def shouts_by_user_rating_or_comment(_, info, userSlug, page, size): user = await UserStorage.get_user_by_slug(userSlug) diff --git a/schema.graphql b/schema.graphql index 5427fbf4..099702e3 100644 --- a/schema.graphql +++ b/schema.graphql @@ -197,7 +197,7 @@ type Query { getCommunity(slug: String): Community! getCommunities: [Community]! - # shoutsByUserSubscriptions(): [Shout]! + shoutsByUserSubscriptions(userSlug: String!, page: Int!, size: Int!): [Shout]! shoutsByUserRatingOrComment(userSlug: String!, page: Int!, size: Int!): [Shout]! newShoutsWithoutRating(userSlug: String!, size: Int = 10): [Shout]! }