This commit is contained in:
tonyrewin 2022-11-12 00:27:17 +03:00
parent a88ede7a97
commit 6bc7b6f433
6 changed files with 325 additions and 164 deletions

View File

@ -60,6 +60,13 @@ from resolvers.zine import (
shouts_by_communities, shouts_by_communities,
) )
from resolvers.inbox.chats import load_chats, \
create_chat, delete_chat, update_chat, \
invite_to_chat, enter_chat
from resolvers.inbox.messages import load_chat_messages, \
create_message, delete_message, update_message, \
message_generator, mark_as_read
__all__ = [ __all__ = [
"follow", "follow",
"unfollow", "unfollow",
@ -116,4 +123,20 @@ __all__ = [
"create_reaction", "create_reaction",
"update_reaction", "update_reaction",
"delete_reaction", "delete_reaction",
# inbox
"create_chat",
"delete_chat",
"update_chat",
"load_chats",
"create_message",
"delete_message",
"update_message",
"load_chat_messages",
"message_generator",
"mark_as_read",
"search_users",
"search_chats",
"search_messages",
"enter_chat",
"invite_to_chat"
] ]

200
resolvers/inbox/chats.py Normal file
View File

@ -0,0 +1,200 @@
import json
import uuid
from datetime import datetime
from auth.authenticate import login_required
from base.redis import redis
from base.resolvers import mutation, query
from resolvers.inbox.messages import load_messages
async def get_unread_counter(chat_id: str, user_slug: str):
try:
return int(await redis.execute("LLEN", f"chats/{chat_id}/unread/{user_slug}"))
except Exception:
return 0
async def get_total_unread_counter(user_slug: str):
chats = await redis.execute("GET", f"chats_by_user/{user_slug}")
if not chats:
return 0
chats = json.loads(chats)
unread = 0
for chat_id in chats:
n = await get_unread_counter(chat_id, user_slug)
unread += n
return unread
async def add_user_to_chat(user_slug: str, chat_id: str, chat=None):
for member in chat["users"]:
chats_ids = await redis.execute("GET", f"chats_by_user/{member}")
if chats_ids:
chats_ids = list(json.loads(chats_ids))
else:
chats_ids = []
if chat_id not in chats_ids:
chats_ids.append(chat_id)
await redis.execute("SET", f"chats_by_user/{member}", json.dumps(chats_ids))
async def get_chats_by_user(slug: str):
chats = await redis.execute("GET", f"chats_by_user/{slug}")
if chats:
chats = list(json.loads(chats))
return chats or []
async def load_user_chats(slug, offset: int, amount: int):
""" load :amount chats of :slug user with :offset """
chats = await get_chats_by_user(slug)[offset:offset + amount]
if not chats:
chats = []
for c in chats:
c['messages'] = await load_messages(c['id'])
c['unread'] = await get_unread_counter(c['id'], slug)
return {
"chats": chats,
"error": None
}
@query.field("loadChats")
@login_required
async def load_chats(_, info):
user = info.context["request"].user
return await load_user_chats(user.slug)
@mutation.field("enterChat")
@login_required
async def enter_chat(_, info, chat_id: str):
''' enter to public chat with :chat_id '''
user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {
"error": "chat not exist"
}
else:
chat = dict(json.loads(chat))
if chat['private']:
return {
"error": "cannot enter private chat"
}
if user.slug not in chat["users"]:
chat["users"].append(user.slug)
await add_user_to_chat(user.slug, chat_id, chat)
await redis.execute("SET" f"chats/{chat_id}", json.dumps(chat))
chat['messages'] = await load_messages(chat_id)
return {
"chat": chat,
"error": None
}
@mutation.field("inviteChat")
async def invite_to_chat(_, info, invited: str, chat_id: str):
''' invite user with :slug to chat with :chat_id '''
user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {
"error": "chat not exist"
}
chat = dict(json.loads(chat))
if not chat['private'] and user.slug not in chat['admins']:
return {
"error": "only admins can invite to private chat",
"chat": chat
}
else:
chat["users"].append(invited)
await add_user_to_chat(user.slug, chat_id, chat)
await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat))
return {
"error": None,
"chat": chat
}
@mutation.field("updateChat")
@login_required
async def update_chat(_, info, chat_new: dict):
"""
updating chat
requires info["request"].user.slug to be in chat["admins"]
:param info: GraphQLInfo with request
:param chat_new: dict with chat data
:return: Result { error chat }
"""
user = info.context["request"].user
chat_id = chat_new["id"]
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {
"error": "chat not exist"
}
chat = dict(json.loads(chat))
if user.slug in chat["admins"]:
chat.update({
"title": chat_new.get("title", chat["title"]),
"description": chat_new.get("description", chat["description"]),
"updatedAt": int(datetime.now().timestamp()),
"admins": chat_new.get("admins", chat["admins"]),
"users": chat_new.get("users", chat["users"])
})
await add_user_to_chat(user.slug, chat_id, chat)
await redis.execute("SET", f"chats/{chat.id}", json.dumps(chat))
await redis.execute("SET", f"chats/{chat.id}/next_message_id", 0)
return {
"error": None,
"chat": chat
}
@mutation.field("createChat")
@login_required
async def create_chat(_, info, title="", members=[]):
user = info.context["request"].user
chat_id = str(uuid.uuid4())
if user.slug not in members:
members.append(user.slug)
chat = {
"title": title,
"createdAt": int(datetime.now().timestamp()),
"updatedAt": int(datetime.now().timestamp()),
"createdBy": user.slug,
"id": chat_id,
"users": members,
"admins": [user.slug, ]
}
await add_user_to_chat(user.slug, chat_id, chat)
await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat))
await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0))
return {
"error": None,
"chat": chat
}
@mutation.field("deleteChat")
@login_required
async def delete_chat(_, info, chat_id: str):
user = info.context["request"].user
chat = await redis.execute("GET", f"/chats/{chat_id}")
if chat:
chat = dict(json.loads(chat))
if user.slug in chat['admins']:
await redis.execute("DEL", f"chats/{chat_id}")
else:
return {
"error": "chat not exist"
}

View File

@ -1,141 +1,16 @@
import asyncio import asyncio
import json import json
import uuid
from datetime import datetime from datetime import datetime
from auth.authenticate import login_required from auth.authenticate import login_required
from base.redis import redis from base.redis import redis
from base.resolvers import mutation, query, subscription from base.resolvers import mutation, query, subscription
from services.inbox import ChatFollowing, MessageResult, MessagesStorage from services.inbox import ChatFollowing, MessageResult, MessagesStorage
from resolvers.inbox.chats import get_chats_by_user
async def get_unread_counter(chat_id: str, user_slug: str):
try:
return int(await redis.execute("LLEN", f"chats/{chat_id}/unread/{user_slug}"))
except Exception:
return 0
async def get_total_unread_counter(user_slug: str):
chats = await redis.execute("GET", f"chats_by_user/{user_slug}")
if not chats:
return 0
chats = json.loads(chats)
unread = 0
for chat_id in chats:
n = await get_unread_counter(chat_id, user_slug)
unread += n
return unread
async def add_user_to_chat(user_slug: str, chat_id: str, chat=None):
for member in chat["users"]:
chats_ids = await redis.execute("GET", f"chats_by_user/{member}")
chats_ids = list(json.loads(chats_ids))
if chat_id not in chats_ids:
chats_ids.append(chat_id)
await redis.execute("SET", f"chats_by_user/{member}", json.dumps(chats_ids))
async def get_chats_by_user(slug: str):
chats = await redis.execute("GET", f"chats_by_user/{slug}")
return chats or []
@mutation.field("enterChat")
@login_required
async def enter_chat(_, info, chat_id: str):
user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {
"error": "chat not exist"
}
else:
chat = dict(json.loads(chat))
if user.slug not in chat["users"]:
chat["users"].append(user.slug)
await add_user_to_chat(user.slug, chat_id, chat)
await redis.execute("SET" f"chats/{chat_id}", json.dumps(chat))
chat['messages'] = await load_messages(chat_id)
return {
"chat": chat,
"error": None
}
@mutation.field("inviteChat")
async def invite_to_chat(_, info, invited: str, chat_id: str):
user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chat_id}")
if chat:
chat = dict(json.loads(chat))
if user.slug in chat['admins']:
chat["users"].append(invited)
await add_user_to_chat(user.slug, chat_id, chat)
await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat))
return {
"error": None,
"chat": chat
}
@mutation.field("updateChat")
@login_required
async def update_chat(_, info, chat_new: dict):
user = info.context["request"].user
chat_id = chat_new["id"]
chat = await redis.execute("GET", f"chats/{chat_id}")
if chat:
chat = dict(json.loads(chat))
if user.slug in chat["admins"]:
chat.update({
"title": chat_new.get("title", chat["title"]),
"description": chat_new.get("description", chat["description"]),
"updatedAt": int(datetime.now().timestamp()),
"admins": chat_new.get("admins", chat["admins"]),
"users": chat_new.get("users", chat["users"])
})
await add_user_to_chat(user.slug, chat_id, chat)
await redis.execute("SET", f"chats/{chat.id}", json.dumps(chat))
await redis.execute("SET", f"chats/{chat.id}/next_message_id", 0)
return {
"error": None,
"chat": chat
}
@mutation.field("createChat")
@login_required
async def create_chat(_, info, title="", members=[]):
user = info.context["request"].user
chat_id = str(uuid.uuid4())
if user.slug not in members:
members.append(user.slug)
chat = {
"title": title,
"createdAt": int(datetime.now().timestamp()),
"updatedAt": int(datetime.now().timestamp()),
"createdBy": user.slug,
"id": chat_id,
"users": members,
"admins": [user.slug, ]
}
await add_user_to_chat(user.slug, chat_id, chat)
await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat))
await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0))
return {
"error": None,
"chat": chat
}
async def load_messages(chatId: str, offset: int, amount: int): async def load_messages(chatId: str, offset: int, amount: int):
''' load :amount messages for :chatId with :offset '''
messages = [] messages = []
message_ids = await redis.lrange( message_ids = await redis.lrange(
f"chats/{chatId}/message_ids", 0 - offset - amount, 0 - offset f"chats/{chatId}/message_ids", 0 - offset - amount, 0 - offset
@ -152,18 +27,18 @@ async def load_messages(chatId: str, offset: int, amount: int):
} }
@query.field("myChats") @query.field("loadMessages")
@login_required @login_required
async def user_chats(_, info): async def load_chat_messages(_, info, chat_id: str, offset: int = 0, amount: int = 50):
user = info.context["request"].user ''' load [amount] chat's messages with [offset] '''
chats = await get_chats_by_user(user.slug) chat = await redis.execute("GET", f"chats/{chat_id}")
if not chats: if not chat:
chats = [] return {
for c in chats: "error": "chat not exist"
c['messages'] = await load_messages(c['id']) }
c['unread'] = await get_unread_counter(c['id'], user.slug) messages = await load_messages(chat_id, offset, amount)
return { return {
"chats": chats, "messages": messages,
"error": None "error": None
} }
@ -171,17 +46,15 @@ async def user_chats(_, info):
@mutation.field("createMessage") @mutation.field("createMessage")
@login_required @login_required
async def create_message(_, info, chat_id: str, body: str, replyTo=None): async def create_message(_, info, chat_id: str, body: str, replyTo=None):
""" create message with :body for :chat_id replying to :replyTo optionally """
user = info.context["request"].user user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chat_id}") chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat: if not chat:
return { return {
"error": "chat not exist" "error": "chat not exist"
} }
message_id = await redis.execute("GET", f"chats/{chat_id}/next_message_id") message_id = await redis.execute("GET", f"chats/{chat_id}/next_message_id")
message_id = int(message_id) message_id = int(message_id)
new_message = { new_message = {
"chatId": chat_id, "chatId": chat_id,
"id": message_id, "id": message_id,
@ -190,7 +63,6 @@ async def create_message(_, info, chat_id: str, body: str, replyTo=None):
"replyTo": replyTo, "replyTo": replyTo,
"createdAt": int(datetime.now().timestamp()), "createdAt": int(datetime.now().timestamp()),
} }
await redis.execute( await redis.execute(
"SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(new_message) "SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(new_message)
) )
@ -213,23 +85,6 @@ async def create_message(_, info, chat_id: str, body: str, replyTo=None):
} }
@query.field("loadChat")
@login_required
async def load_chat_messages(_, info, chat_id: str, offset: int = 0, amount: int = 50):
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {
"error": "chat not exist"
}
messages = await load_messages(chat_id, offset, amount)
return {
"messages": messages,
"error": None
}
@mutation.field("updateMessage") @mutation.field("updateMessage")
@login_required @login_required
async def update_message(_, info, chat_id: str, message_id: int, body: str): async def update_message(_, info, chat_id: str, message_id: int, body: str):

77
resolvers/inbox/search.py Normal file
View File

@ -0,0 +1,77 @@
import json
from auth.authenticate import login_required
from base.redis import redis
from base.resolvers import query, session
from orm.zine import AuthorFollower
@query.field("searchUsers")
@login_required
async def search_user(_, info, query: str, offset: int = 0, amount: int = 50):
result = []
# TODO: maybe redis scan?
user = info.context["request"].user
talk_before = await redis.execute("GET", f"/chats_by_user/{user.slug}")
if talk_before:
talk_before = list(json.loads(talk_before))[offset:offset + amount]
for chat_id in talk_before:
members = await redis.execute("GET", f"/chats/{chat_id}/users")
if members:
members = list(json.loads(members))
for member in members:
if member.startswith(query):
if member not in result:
result.append(member)
user = info.context["request"].user
more_amount = amount - len(result)
# followings
result += session.query(AuthorFollower.author).where(AuthorFollower.follower.startswith(query))\
.offset(offset + len(result)).limit(more_amount)
more_amount = amount
# followers
result += session.query(AuthorFollower.follower).where(AuthorFollower.author.startswith(query))\
.offset(offset + len(result)).limit(offset + len(result) + amount)
return {
"slugs": list(result),
"error": None
}
@query.field("searchChats")
@login_required
async def search_chat(_, info, query: str, offset: int = 0, amount: int = 50):
user = info.context["request"].user
my_chats = await redis.execute("GET", f"/chats_by_user/{user.slug}")
chats = []
for chat_id in my_chats:
chat = await redis.execute("GET", f"chats/{chat_id}")
if chat:
chat = dict(json.loads(chat))
chats.append(chat)
return {
"chats": chats,
"error": None
}
@query.field("searchMessages")
@login_required
async def search_messages(_, info, query: str, offset: int = 0, amount: int = 50):
user = info.context["request"].user
my_chats = await redis.execute("GET", f"/chats_by_user/{user.slug}")
chats = []
if my_chats:
my_chats = list(json.loads(my_chats))
for chat_id in my_chats:
chat = await redis.execute("GET", f"chats/{chat_id}")
if chat:
chat = dict(json.loads(chat))
chats.append(chat)
return {
"chats": chats,
"error": None
}

View File

@ -11,7 +11,7 @@ from orm.shout import Shout
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from orm.user import User, UserRole, Role, UserRating, AuthorFollower from orm.user import User, UserRole, Role, UserRating, AuthorFollower
from .community import followed_communities from .community import followed_communities
from .inbox import get_total_unread_counter from .inbox.messages import get_total_unread_counter
from .topics import get_topic_stat from .topics import get_topic_stat
from services.auth.users import UserStorage from services.auth.users import UserStorage
from services.zine.shoutscache import ShoutsCache from services.zine.shoutscache import ShoutsCache
@ -33,12 +33,13 @@ async def get_author_stat(slug):
with local_session() as session: with local_session() as session:
return { return {
"followers": session.query(AuthorFollower).where(AuthorFollower.author == slug).count(), "followers": session.query(AuthorFollower).where(AuthorFollower.author == slug).count(),
"followings": session.query(AuthorFollower).where(AuthorFollower.follower == slug).count(),
"rating": session.query(func.sum(UserRating.value)).where(UserRating.user == slug).first() "rating": session.query(func.sum(UserRating.value)).where(UserRating.user == slug).first()
} }
@query.field("userReactedShouts") @query.field("userReactedShouts")
async def get_user_reacted_shouts(_, slug, offset, limit) -> List[Shout]: async def get_user_reacted_shouts(_, slug: str, offset: int, limit: int) -> List[Shout]:
user = await UserStorage.get_user_by_slug(slug) user = await UserStorage.get_user_by_slug(slug)
if not user: if not user:
return [] return []
@ -49,7 +50,7 @@ async def get_user_reacted_shouts(_, slug, offset, limit) -> List[Shout]:
.where(Reaction.createdBy == user.slug) .where(Reaction.createdBy == user.slug)
.order_by(desc(Reaction.createdAt)) .order_by(desc(Reaction.createdAt))
.limit(limit) .limit(limit)
.offset() .offset(offset)
.all() .all()
) )
return shouts return shouts

View File

@ -34,6 +34,8 @@ type ChatMember {
type Result { type Result {
error: String error: String
uids: [String]
slugs: [String]
chat: Chat chat: Chat
chats: [Chat] chats: [Chat]
message: Message message: Message
@ -197,9 +199,11 @@ type Mutation {
type Query { type Query {
# inbox # inbox
myChats: Result! loadChats(offset: Int, amount: Int): Result!
searchRecipient(q: String!): Result! loadMessages(chatId: String!, offset: Int, amount: Int): Result!
loadChat(chatId: String!, offset: Int, amount: Int): Result! searchUsers(q: String!, offset: Int, amount: Int): Result!
searchChats(q: String!, offset: Int, amount: Int): Result!
searchMessages(q: String!, offset: Int, amount: Int): Result!
# auth # auth
isEmailUsed(email: String!): Boolean! isEmailUsed(email: String!): Boolean!
@ -503,4 +507,5 @@ type Chat {
admins: [User] admins: [User]
messages: [Message]! messages: [Message]!
unread: Int unread: Int
private: Boolean
} }