From f0c76a2e7e35dc0251e5944b83097161df80d2ae Mon Sep 17 00:00:00 2001 From: Tony Rewin Date: Tue, 3 Oct 2023 17:15:17 +0300 Subject: [PATCH] isolated --- .gitignore | 5 ++ CHANGELOG.txt | 17 +++++ Dockerfile | 7 ++ README.md | 16 +++++ inbox.graphql | 84 ++++++++++++++++++++++ main.py | 30 ++++++++ orm/__init__.py | 6 ++ orm/author.py | 43 ++++++++++++ requirements.txt | 5 ++ resolvers/__init__.py | 45 ++++++++++++ resolvers/chats.py | 114 ++++++++++++++++++++++++++++++ resolvers/load.py | 133 +++++++++++++++++++++++++++++++++++ resolvers/messages.py | 157 ++++++++++++++++++++++++++++++++++++++++++ resolvers/search.py | 96 ++++++++++++++++++++++++++ resolvers/unread.py | 36 ++++++++++ services/auth.py | 79 +++++++++++++++++++++ services/db.py | 54 +++++++++++++++ services/redis.py | 56 +++++++++++++++ settings.py | 11 +++ 19 files changed, 994 insertions(+) create mode 100644 .gitignore create mode 100644 CHANGELOG.txt create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 inbox.graphql create mode 100644 main.py create mode 100644 orm/__init__.py create mode 100644 orm/author.py create mode 100644 requirements.txt create mode 100644 resolvers/__init__.py create mode 100644 resolvers/chats.py create mode 100644 resolvers/load.py create mode 100644 resolvers/messages.py create mode 100644 resolvers/search.py create mode 100644 resolvers/unread.py create mode 100644 services/auth.py create mode 100644 services/db.py create mode 100644 services/redis.py create mode 100644 settings.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c7ea411 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +venv +.DS_Store +__pycache__ +.idea +.vscode \ No newline at end of file diff --git a/CHANGELOG.txt b/CHANGELOG.txt new file mode 100644 index 0000000..3368306 --- /dev/null +++ b/CHANGELOG.txt @@ -0,0 +1,17 @@ +[0.2.8] +- sse removed to presence service +- bugfixes +- pydantic removed as not used + +[0.2.7] +- search messages fix +- context with author_id fix +- redis pubsub new_message event announce +- sse new_message events broadcast + +[0.2.6] +- authors / members / users terms revision +- auth service connection + +[0.2.5] +- dummy isolation \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6ae1a41 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,7 @@ +FROM python:slim +WORKDIR /app +COPY requirements.txt . +RUN pip install -r requirements.txt +COPY . . +EXPOSE 8080 +CMD ["python", "main.py"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..cd17cda --- /dev/null +++ b/README.md @@ -0,0 +1,16 @@ +### `inbox`: Сервер для внутренних переписок + +Для + +### ENV + - REDIS_URL + - AUTH_URL + - API_BASE + +### Как это работает + +__Redis__: + - Для каждого пользователя создаётся запись в хранилищах `chats_by_author/` и `chats/` и канал redis `chat:`, в котором публикуюутся обновления всех переписок. + +__SSE__: + - Каждый пользователь подписывается на свой канал по урлу `/sse/` \ No newline at end of file diff --git a/inbox.graphql b/inbox.graphql new file mode 100644 index 0000000..d4bc472 --- /dev/null +++ b/inbox.graphql @@ -0,0 +1,84 @@ +scalar DateTime + +enum MessageStatus { + NEW + UPDATED + DELETED +} + +type ChatMember { + id: Int! + slug: String! + name: String! + userpic: String + lastSeen: DateTime + online: Boolean + # invitedAt: DateTime + # invitedBy: String # user slug + # TODO: keep invite databit +} + +input ChatInput { + id: String! + title: String + description: String +} + + +type Mutation { + # inbox + createChat(title: String, members: [Int]!): Result! + updateChat(chat: ChatInput!): Result! + deleteChat(chatId: String!): Result! + + createMessage(chat: String!, body: String!, replyTo: Int): Result! + updateMessage(chatId: String!, id: Int!, body: String!): Result! + deleteMessage(chatId: String!, id: Int!): Result! + markAsRead(chatId: String!, ids: [Int]!): Result! + +} + +input MessagesBy { + author: String + body: String + chat: String + order: String + days: Int + stat: String +} + +type Query { + # inbox + loadChats( limit: Int, offset: Int): Result! # your chats + loadMessagesBy(by: MessagesBy!, limit: Int, offset: Int): Result! + loadRecipients(limit: Int, offset: Int): Result! + searchRecipients(query: String!, limit: Int, offset: Int): Result! + searchMessages(by: MessagesBy!, limit: Int, offset: Int): Result! + +} + +type Message { + author: Int! + chatId: String! + body: String! + createdAt: Int! + id: Int! + replyTo: Int + updatedAt: Int + seen: Boolean +} + +type Chat { + id: String! + createdAt: Int! + createdBy: Int! + updatedAt: Int! + title: String + description: String + users: [Int] + members: [ChatMember] + admins: [Int] + messages: [Message] + unread: Int + private: Boolean +} diff --git a/main.py b/main.py new file mode 100644 index 0000000..eb18334 --- /dev/null +++ b/main.py @@ -0,0 +1,30 @@ +from aiohttp import web +from ariadne import make_executable_schema, load_schema_from_path +from ariadne.asgi import GraphQL + +from services.redis import redis +from resolvers import resolvers + +type_defs = load_schema_from_path("inbox.graphql") +schema = make_executable_schema(type_defs, resolvers) + + +async def on_startup(_app): + await redis.connect() + + +async def on_cleanup(_app): + await redis.disconnect() + + +# Run the aiohttp server +if __name__ == "__main__": + app = web.Application() + app.on_startup.append(on_startup) + app.on_cleanup.append(on_cleanup) + app.router.add_route( + "*", + "/graphql", + GraphQL(schema), + ) + web.run_app(app) diff --git a/orm/__init__.py b/orm/__init__.py new file mode 100644 index 0000000..f4fc8f2 --- /dev/null +++ b/orm/__init__.py @@ -0,0 +1,6 @@ +from services.db import Base, engine + + +def init_tables(): + Base.metadata.create_all(engine) + print("[orm] tables initialized") diff --git a/orm/author.py b/orm/author.py new file mode 100644 index 0000000..b2b5a99 --- /dev/null +++ b/orm/author.py @@ -0,0 +1,43 @@ +from datetime import datetime +from sqlalchemy import JSON as JSONType +from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String +from sqlalchemy.orm import relationship +from services.db import Base + + +class AuthorRating(Base): + __tablename__ = "author_rating" + + id = None # type: ignore + rater = Column(ForeignKey("author.id"), primary_key=True, index=True) + author = Column(ForeignKey("author.id"), primary_key=True, index=True) + value = Column(Integer) + + +class AuthorFollower(Base): + __tablename__ = "author_follower" + + id = None # type: ignore + follower = Column(ForeignKey("author.id"), primary_key=True, index=True) + author = Column(ForeignKey("author.id"), primary_key=True, index=True) + createdAt = Column(DateTime, nullable=False, default=datetime.now) + auto = Column(Boolean, nullable=False, default=False) + + +class Author(Base): + __tablename__ = "author" + + user = Column(Integer, nullable=False) # unbounded link with authorizer's User type + bio = Column(String, nullable=True, comment="Bio") # status description + about = Column(String, nullable=True, comment="About") # long and formatted + userpic = Column(String, nullable=True, comment="Userpic") + name = Column(String, nullable=True, comment="Display name") + slug = Column(String, unique=True, comment="Author's slug") + muted = Column(Boolean, default=False) + createdAt = Column(DateTime, nullable=False, default=datetime.now) + lastSeen = Column(DateTime, nullable=False, default=datetime.now) + deletedAt = Column(DateTime, nullable=True, comment="Deleted at") + links = Column(JSONType, nullable=True, comment="Links") + ratings = relationship( + AuthorRating, foreign_keys=AuthorRating.author, nullable=True + ) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1bde1ec --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +aiohttp +aioredis +ariadne +sqlalchemy +gql diff --git a/resolvers/__init__.py b/resolvers/__init__.py new file mode 100644 index 0000000..b40ad96 --- /dev/null +++ b/resolvers/__init__.py @@ -0,0 +1,45 @@ +from .chats import chats_resolvers +from .load import load_resolvers +from .messages import messages_resolvers +from .search import search_resolvers +from .unread import unread_resolvers +import json +from ariadne import MutationType, QueryType, SubscriptionType +from ariadne import ScalarType + + +datetime_scalar = ScalarType("DateTime") + + +@datetime_scalar.serializer +def serialize_datetime(value): + return value.isoformat() + + +query = QueryType() +mutation = MutationType() +subscription = SubscriptionType() + + +dict_scalar = ScalarType("Dict") + + +@dict_scalar.serializer +def serialize_dict(value): + return json.dumps(value) + + +@dict_scalar.value_parser +def parse_dict_value(value): + return json.loads(value) + + +resolvers = { + **chats_resolvers, + **load_resolvers, + **messages_resolvers, + **search_resolvers, + **unread_resolvers, + "DateTime": datetime_scalar, + "Dict": dict_scalar, +} diff --git a/resolvers/chats.py b/resolvers/chats.py new file mode 100644 index 0000000..7d6914b --- /dev/null +++ b/resolvers/chats.py @@ -0,0 +1,114 @@ +import json +import uuid +from datetime import datetime, timezone + +from services.auth import login_required +from services.redis import redis +from resolvers import mutation + + +@mutation.field("updateChat") +@login_required +async def update_chat(_, info, chat_new): + """ + updating chat + requires info["request"].user.slug to be in chat["admins"] + + :param info: GraphQLInfo with request + :param chat_new: dict with chat data + :return: Result { error chat } + """ + author_id = info.context["author_id"] + chat_id = chat_new["id"] + chat = await redis.execute("GET", f"chats/{chat_id}") + if not chat: + return {"error": "chat not exist"} + chat = dict(json.loads(chat)) + + # TODO + if author_id in chat["admins"]: + chat.update( + { + "title": chat_new.get("title", chat["title"]), + "description": chat_new.get("description", chat["description"]), + "updatedAt": int(datetime.now(tz=timezone.utc).timestamp()), + "admins": chat_new.get("admins", chat.get("admins") or []), + "users": chat_new.get("users", chat["users"]), + } + ) + await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat)) + await redis.execute("COMMIT") + + return {"error": None, "chat": chat} + + +@mutation.field("createChat") +@login_required +async def create_chat(_, info, title="", members=[]): + chat = None + author_id = info.context["author_id"] + print("create_chat members: %r" % members) + if author_id not in members: + members.append(int(author_id)) + # NOTE: private chats has no title + # reuse private chat created before if exists + if len(members) == 2 and title == "": + chatset1 = set( + (await redis.execute("SMEMBERS", f"chats_by_author/{members[0]}")) or [] + ) + chatset2 = set( + (await redis.execute("SMEMBERS", f"chats_by_author/{members[1]}")) or [] + ) + for c in chatset1.intersection(chatset2): + chat = await redis.execute("GET", f"chats/{c.decode('utf-8')}") + if chat: + chat = json.loads(chat) + if chat["title"] == "": + print("[inbox] createChat found old chat") + print(chat) + break + if chat: + return {"chat": chat, "error": "existed"} + + chat_id = str(uuid.uuid4()) + chat = { + "id": chat_id, + "members": members, + "title": title, + "createdBy": author_id, + "createdAt": int(datetime.now(tz=timezone.utc).timestamp()), + "updatedAt": int(datetime.now(tz=timezone.utc).timestamp()), + "admins": members if (len(members) == 2 and title == "") else [], + } + + for m in members: + await redis.execute("SADD", f"chats_by_author/{m}", chat_id) + await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) + await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0)) + await redis.execute("COMMIT") + return {"error": None, "chat": chat} + + +@mutation.field("deleteChat") +@login_required +async def delete_chat(_, info, chat_id: str): + author_id = info.context["author_id"] + + chat = await redis.execute("GET", f"/chats/{chat_id}") + if chat: + chat = dict(json.loads(chat)) + if author_id in chat["admins"]: + await redis.execute("DEL", f"chats/{chat_id}") + await redis.execute("SREM", f"chats_by_author/{author_id}", chat_id) + await redis.execute("COMMIT") + else: + return {"error": "chat not exist"} + + +chats_resolvers = { + "Mutation": { + "deleteChat": delete_chat, + "createChat": create_chat, + "updateChat": update_chat, + }, +} diff --git a/resolvers/load.py b/resolvers/load.py new file mode 100644 index 0000000..51f26a3 --- /dev/null +++ b/resolvers/load.py @@ -0,0 +1,133 @@ +import json + +from services.db import local_session +from services.redis import redis +from resolvers import query +from orm.author import Author +from services.auth import login_required +from .chats import create_chat +from .unread import get_unread_counter + + +# NOTE: not an API handler +async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids=[]): + """load :limit messages for :chat_id with :offset""" + messages = [] + try: + message_ids = [] + ids + if limit: + mids = await redis.lrange( + f"chats/{chat_id}/message_ids", offset, offset + limit + ) + mids = [mid.decode("utf-8") for mid in mids] + message_ids += mids + if message_ids: + message_keys = [f"chats/{chat_id}/messages/{mid}" for mid in message_ids] + messages = await redis.mget(*message_keys) + messages = [json.loads(msg.decode("utf-8")) for msg in messages] + replies = [] + for m in messages: + rt = m.get("replyTo") + if rt: + rt = int(rt) + if rt not in message_ids: + replies.append(rt) + if replies: + messages += await load_messages(redis, chat_id, limit=0, ids=replies) + except Exception as e: + print(e) + return messages + + +@query.field("loadChats") +@login_required +async def load_chats(_, info, limit: int = 50, offset: int = 0): + """load :limit chats of current user with :offset""" + author_id = info.context["author_id"] + cids = list( + (await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or [] + )[offset : offset + limit] + members_online = (await redis.execute("SMEMBERS", "authors-online")) or [] + chats = [] + if len(cids) == 0: + r = await create_chat(None, info, members=[1, int(author_id)]) + cids.append(r["chat"]["id"]) + with local_session() as session: + for cid in cids: + cid = cid.decode("utf-8") + c = await redis.execute("GET", "chats/" + cid) + if c: + c = dict(json.loads(c)) + c["messages"] = await load_messages(cid, 5, 0) + c["unread"] = await get_unread_counter(cid, author_id) + member_ids = c["members"].copy() + c["members"] = [] + for member_id in member_ids: + a = session.query(Author).where(Author.id == member_id).first() + if a: + c["members"].append( + { + "id": a.id, + "slug": a.slug, + "userpic": a.userpic, + "name": a.name, + "lastSeen": a.lastSeen, + "online": a.id in members_online, + } + ) + chats.append(c) + return {"chats": chats, "error": None} + + +@query.field("loadMessagesBy") +@login_required +async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): + """load :limit messages of :chat_id with :offset""" + author_id = info.context["author_id"] + user_chats = await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id)) + user_chats = [c.decode("utf-8") for c in user_chats] + if user_chats: + messages = [] + by_chat = by.get("chat") + if by_chat in user_chats: + chat = await redis.execute("GET", f"chats/{by_chat}") + if not chat: + return {"messages": [], "error": "chat not exist"} + # everyone's messages in filtered chat + messages = await load_messages(by_chat, limit, offset) + return { + "messages": sorted(list(messages), key=lambda m: m["createdAt"]), + "error": None, + } + else: + return {"error": "Cannot access messages of this chat"} + + +@query.field("loadRecipients") +async def load_recipients(_, _info, limit=50, offset=0): + """load possible chat participants""" + onliners = (await redis.execute("SMEMBERS", "authors-online")) or [] + members = [] + with local_session() as session: + all_authors = session.query(Author).limit(limit).offset(offset) + for a in all_authors: + members.append( + { + "id": a.id, + "slug": a.slug, + "userpic": a.userpic, + "name": a.name, + "lastSeen": a.lastSeen, + "online": a.id in onliners, + } + ) + return {"members": members, "error": None} + + +load_resolvers = { + "Query": { + "loadRecipients": load_recipients, + "loadMessagesBy": load_messages_by, + "loadChats": load_chats, + } +} diff --git a/resolvers/messages.py b/resolvers/messages.py new file mode 100644 index 0000000..ca5dc0d --- /dev/null +++ b/resolvers/messages.py @@ -0,0 +1,157 @@ +import json +from datetime import datetime, timezone +from services.auth import login_required +from services.redis import redis +from resolvers import mutation + + +@mutation.field("createMessage") +@login_required +async def create_message(_, info, chat: str, body: str, reply_to=None): + """ + create message with + :body for + :chat_id replying to + :reply_to optionally + """ + author_id = info.context["author_id"] + + chat = await redis.execute("GET", f"chats/{chat}") + if not chat: + return {"error": "chat is not exist"} + else: + chat = dict(json.loads(chat)) + message_id = await redis.execute("GET", f"chats/{chat['id']}/next_message_id") + message_id = int(message_id) + new_message = { + "chat": chat["id"], + "id": message_id, + "author": author_id, + "body": body, + "createdAt": int(datetime.now(tz=timezone.utc).timestamp()), + } + if reply_to: + new_message["replyTo"] = reply_to + chat["updatedAt"] = new_message["createdAt"] + await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat)) + print(f"[inbox] creating message {new_message}") + await redis.execute( + "SET", f"chats/{chat['id']}/messages/{message_id}", json.dumps(new_message) + ) + await redis.execute("LPUSH", f"chats/{chat['id']}/message_ids", str(message_id)) + await redis.execute( + "SET", f"chats/{chat['id']}/next_message_id", str(message_id + 1) + ) + + members = chat["members"] + for member_id in members: + await redis.execute( + "LPUSH", f"chats/{chat['id']}/unread/{member_id}", str(message_id) + ) + + # result = FollowingResult("NEW", "chat", new_message) + # await FollowingManager.push("chat", result) + + # subscribe on updates + channel_name = ( + f"private:{author_id}" if not chat["title"] else f"group:{chat['id']}" + ) + redis.execute("PUBLISH", channel_name, json.dumps(new_message)) + + return {"message": new_message, "error": None} + + +@mutation.field("updateMessage") +@login_required +async def update_message(_, info, chat_id: str, message_id: int, body: str): + author_id = info.context["author_id"] + + chat = await redis.execute("GET", f"chats/{chat_id}") + if not chat: + return {"error": "chat not exist"} + + message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}") + if not message: + return {"error": "message not exist"} + + message = json.loads(message) + if message["author"] != author_id: + return {"error": "access denied"} + + message["body"] = body + message["updatedAt"] = int(datetime.now(tz=timezone.utc).timestamp()) + + await redis.execute( + "SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message) + ) + + # result = FollowingResult("UPDATE", "chat", new_message) + # await FollowingManager.push("chat", result) + # TODO: subscribe on updates + + return {"message": message, "error": None} + + +@mutation.field("deleteMessage") +@login_required +async def delete_message(_, info, chat_id: str, message_id: int): + author_id = info.context["author_id"] + + chat = await redis.execute("GET", f"chats/{chat_id}") + if not chat: + return {"error": "chat not exist"} + chat = json.loads(chat) + + message = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}") + if not message: + return {"error": "message not exist"} + message = json.loads(message) + if message["author"] != author_id: + return {"error": "access denied"} + + await redis.execute("LREM", f"chats/{chat_id}/message_ids", 0, str(message_id)) + await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}") + + members = chat["members"] + for member_id in members: + await redis.execute( + "LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id) + ) + + # result = FollowingResult("DELETED", "chat", message) + # await FollowingManager.push(result) + # TODO ? + + return {} + + +@mutation.field("markAsRead") +@login_required +async def mark_as_read(_, info, chat_id: str, messages: [int]): + author_id = info.context["author_id"] + + chat = await redis.execute("GET", f"chats/{chat_id}") + if not chat: + return {"error": "chat not exist"} + + chat = json.loads(chat) + members = set(chat["members"]) + if author_id not in members: + return {"error": "access denied"} + + for message_id in messages: + await redis.execute( + "LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id) + ) + + return {"error": None} + + +messages_resolvers = { + "Mutation": { + "markAsRead": mark_as_read, + "deleteMessage": delete_message, + "updateMessage": update_message, + "createMessage": create_message, + } +} diff --git a/resolvers/search.py b/resolvers/search.py new file mode 100644 index 0000000..ed591dc --- /dev/null +++ b/resolvers/search.py @@ -0,0 +1,96 @@ +import json +from datetime import datetime, timezone, timedelta +from services.auth import login_required +from services.redis import redis +from resolvers import query +from services.db import local_session +from orm.author import AuthorFollower, Author +from resolvers.load import load_messages + + +@query.field("searchRecipients") +@login_required +async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0): + result = [] + # TODO: maybe redis scan? + author = info.context["author"] + talk_before = await redis.execute("GET", f"/chats_by_author/{author.id}") + if talk_before: + talk_before = list(json.loads(talk_before))[offset : (offset + limit)] + for chat_id in talk_before: + members = await redis.execute("GET", f"/chats/{chat_id}/members") + if members: + members = list(json.loads(members)) + for member in members: + if member.startswith(text): + if member not in result: + result.append(member) + + more_amount = limit - len(result) + + with local_session() as session: + # followings + result += ( + session.query(AuthorFollower.author) + .join(Author, Author.id == AuthorFollower.follower) + .where(Author.slug.startswith(text)) + .offset(offset + len(result)) + .limit(more_amount) + ) + + # followers + result += ( + session.query(AuthorFollower.follower) + .join(Author, Author.id == AuthorFollower.author) + .where(Author.slug.startswith(text)) + .offset(offset + len(result)) + .limit(offset + len(result) + limit) + ) + return {"members": list(result), "error": None} + + +@query.field("searchMessages") +@login_required +async def search_in_chats(_, info, by, limit, offset): + author_id = info.context["author_id"] + lookup_chats = set(await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) + messages_set = set([]) + + by_member = by.get("author") + body_like = by.get("body") + days_ago = by.get("days") + + # pre-filter lookup chats + if by_member: + # all author's chats where reqeusting author is participating + lookup_chats = filter( + lambda ca: by_member in ca["members"], + list(lookup_chats), + ) + + # load the messages from lookup chats + for c in lookup_chats: + chat_id = c.decode("utf-8") + mmm = await load_messages(chat_id, limit, offset) + if by_member: + mmm = filter(lambda mx: mx["author"] == by_member, mmm) + if body_like: + mmm = filter(lambda mx: body_like in mx["body"], mmm) + if days_ago: + mmm = filter( + lambda msg: datetime.now(tz=timezone.utc) - int(msg["createdAt"]) + < timedelta(days=days_ago), + mmm, + ) + + messages_set.union(set(mmm)) + + return {"messages": messages_sorted, "error": None} + + +search_resolvers = { + "Query": { + "searchMessages": search_in_chats, + "searchRecipients": search_recipients, + } +} diff --git a/resolvers/unread.py b/resolvers/unread.py new file mode 100644 index 0000000..c19f30a --- /dev/null +++ b/resolvers/unread.py @@ -0,0 +1,36 @@ +from services.redis import redis +import json + +from services.auth import login_required + + +async def get_unread_counter(chat_id: str, author_id: int): + try: + unread = await redis.execute( + "LLEN", f"chats/{chat_id.decode('utf-8')}/unread/{author_id}" + ) + if unread: + return unread + except Exception: + return 0 + + +async def get_total_unread_counter(author_id: int): + chats = await redis.execute("GET", f"chats_by_author/{author_id}") + unread = 0 + if chats: + chats = json.loads(chats) + for chat_id in chats: + n = await get_unread_counter(chat_id.decode("utf-8"), author_id) + unread += n + return unread + + +@login_required +async def resolve_total_unread_counter(_, info): + author_id = info.context["author_id"] + + return get_total_unread_counter(author_id) + + +unread_resolvers = {"Query": {"totalUnreadCounter": resolve_total_unread_counter}} diff --git a/services/auth.py b/services/auth.py new file mode 100644 index 0000000..057c3f1 --- /dev/null +++ b/services/auth.py @@ -0,0 +1,79 @@ +from functools import wraps +from gql.transport import aiohttp +import aiohttp +import json +from services.db import local_session +from settings import AUTH_URL +from orm.author import Author +from graphql.error import GraphQLError + + +class BaseHttpException(GraphQLError): + code = 500 + message = "500 Server error" + + +class Unauthorized(BaseHttpException): + code = 401 + message = "401 Unauthorized" + + +async def check_auth(req): + token = req.headers.get("Authorization") + gql = ( + {"mutation": "{ getSession { user { id } } }"} + if "v2" in AUTH_URL + else {"query": "{ session { user { id } } }"} + ) + headers = {"Authorization": token, "Content-Type": "application/json"} + async with aiohttp.ClientSession(headers=headers) as session: + async with session.post(AUTH_URL, data=json.dumps(gql)) as response: + if response.status != 200: + return False, None + r = await response.json() + user_id = ( + r.get("data", {}).get("session", {}).get("user", {}).get("id", None) + ) + is_authenticated = user_id is not None + return is_authenticated, user_id + + +def author_id_by_user_id(user_id): + async with local_session() as session: + author = session(Author).where(Author.user == user_id).first() + return author.id + + +def login_required(f): + @wraps(f) + async def decorated_function(*args, **kwargs): + info = args[1] + context = info.context + req = context.get("request") + is_authenticated, user_id = await check_auth(req) + if not is_authenticated: + raise Exception("You are not logged in") + else: + # Добавляем author_id в контекст + author_id = await author_id_by_user_id(user_id) + context["author_id"] = author_id + + # Если пользователь аутентифицирован, выполняем резолвер + return await f(*args, **kwargs) + + return decorated_function + + +def auth_request(f): + @wraps(f) + async def decorated_function(*args, **kwargs): + req = args[0] + is_authenticated, user_id = await check_auth(req) + if not is_authenticated: + raise Unauthorized("You are not logged in") + else: + author_id = await author_id_by_user_id(user_id) + req["author_id"] = author_id + return await f(*args, **kwargs) + + return decorated_function diff --git a/services/db.py b/services/db.py new file mode 100644 index 0000000..297ca4a --- /dev/null +++ b/services/db.py @@ -0,0 +1,54 @@ +from typing import TypeVar, Any, Dict, Generic, Callable + +from sqlalchemy import create_engine, Column, Integer +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import Session +from sqlalchemy.sql.schema import Table + +from settings import DB_URL + +engine = create_engine(DB_URL, echo=False, pool_size=10, max_overflow=20) + +T = TypeVar("T") + +REGISTRY: Dict[str, type] = {} + + +def local_session(): + return Session(bind=engine, expire_on_commit=False) + + +class Base(declarative_base()): + __table__: Table + __tablename__: str + __new__: Callable + __init__: Callable + __allow_unmapped__ = True + __abstract__ = True + __table_args__ = {"extend_existing": True} + + id = Column(Integer, primary_key=True) + + def __init_subclass__(cls, **kwargs): + REGISTRY[cls.__name__] = cls + + @classmethod + def create(cls: Generic[T], **kwargs) -> Generic[T]: + instance = cls(**kwargs) + return instance.save() + + def save(self) -> Generic[T]: + with local_session() as session: + session.add(self) + session.commit() + return self + + def update(self, input): + column_names = self.__table__.columns.keys() + for name, value in input.items(): + if name in column_names: + setattr(self, name, value) + + def dict(self) -> Dict[str, Any]: + column_names = self.__table__.columns.keys() + return {c: getattr(self, c) for c in column_names} diff --git a/services/redis.py b/services/redis.py new file mode 100644 index 0000000..07bd266 --- /dev/null +++ b/services/redis.py @@ -0,0 +1,56 @@ +import asyncio +import aioredis +from settings import REDIS_URL + + +class RedisCache: + def __init__(self, uri=REDIS_URL): + self._uri: str = uri + self.pubsub_channels = [] + self._redis = None + + async def connect(self): + pool = aioredis.ConnectionPool.from_url( + self._uri, encoding="utf-8", max_connections=10 + ) + self._redis = aioredis.Redis(connection_pool=pool) + + async def disconnect(self): + await self._redis.wait_closed() + self._redis = None + + async def execute(self, command, *args, **kwargs): + while not self._redis: + await asyncio.sleep(1) + try: + print("[redis] " + command + " " + " ".join(args)) + return await self._redis.execute_command(command, *args, **kwargs) + except Exception: + pass + + async def subscribe(self, *channels): + if not self._redis: + await self.connect() + for channel in channels: + await self._redis.execute_pubsub("SUBSCRIBE", channel) + self.pubsub_channels.append(channel) + + async def unsubscribe(self, *channels): + if not self._redis: + return + for channel in channels: + await self._redis.execute_pubsub("UNSUBSCRIBE", channel) + self.pubsub_channels.remove(channel) + + async def lrange(self, key, start, stop): + print(f"[redis] LRANGE {key} {start} {stop}") + return await self._redis.lrange(key, start, stop) + + async def mget(self, key, *keys): + print(f"[redis] MGET {key} {keys}") + return await self._redis.mget(key, *keys) + + +redis = RedisCache() + +__all__ = ["redis"] diff --git a/settings.py b/settings.py new file mode 100644 index 0000000..23550d3 --- /dev/null +++ b/settings.py @@ -0,0 +1,11 @@ +from os import environ + +PORT = 8080 +DB_URL = ( + environ.get("DATABASE_URL") + or environ.get("DB_URL") + or "postgresql://postgres@localhost:5432/discoursio" +) +REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1" +API_BASE = environ.get("API_BASE") or "" +AUTH_URL = environ.get("AUTH_URL") or ""