inspected

This commit is contained in:
Untone 2023-10-14 17:55:51 +03:00
parent 154633c114
commit 34dd4ec140
14 changed files with 148 additions and 199 deletions

View File

@ -6,7 +6,7 @@ from ariadne import load_schema_from_path, make_executable_schema
from ariadne.asgi import GraphQL
from starlette.applications import Starlette
from services.redis import redis
from services.rediscache import redis
from services.schema import resolvers
from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN, MODE

View File

@ -3,43 +3,42 @@ import uuid
from datetime import datetime, timezone
from services.auth import login_required
from services.redis import redis
from services.rediscache import redis
from services.schema import mutation
from validators.inbox import Chat
from validators.chat import Chat, ChatUpdate
@mutation.field("updateChat")
@login_required
async def update_chat(_, info, chat_new: Chat):
async def update_chat(_, info, chat_new: ChatUpdate):
"""
updating chat
requires info["request"].user.slug to be in chat["admins"]
requires info.context["author_id"] to be in chat["admins"]
:param _: not used
:param info: GraphQLInfo with request
:param chat_new: dict with chat data
:return: Result { error chat }
"""
author_id = info.context["author_id"]
chat_id = chat_new["id"]
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
chat_str = await redis.execute("GET", f"chats/{chat_id}")
if not chat_str:
return {"error": "chat not exist"}
else:
chat: Chat = json.loads(chat)
chat: Chat = json.loads(chat_str)
if author_id in chat["admins"]:
chat.update(
{
"title": chat_new.get("title", chat["title"]),
"description": chat_new.get("description", chat["description"]),
"updatedAt": int(datetime.now(tz=timezone.utc).timestamp()),
"admins": chat_new.get("admins", chat.get("admins") or []),
"members": chat_new.get("members", chat["members"]),
}
)
await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat))
if author_id in chat["admins"]:
chat.update(
{
"title": chat_new.get("title", chat["title"]),
"description": chat_new.get("description", chat["description"]),
"updatedAt": int(datetime.now(tz=timezone.utc).timestamp()),
"admins": chat_new.get("admins", chat.get("admins") or []),
"members": chat_new.get("members", chat["members"]),
}
)
await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat))
return {"error": None, "chat": chat}
return {"error": None, "chat": chat}
@mutation.field("createChat")
@ -47,7 +46,6 @@ async def update_chat(_, info, chat_new: Chat):
async def create_chat(_, info, title="", members=None):
if members is None:
members = []
chat = None
author_id = info.context["author_id"]
print("create_chat members: %r" % members)
if author_id not in members:
@ -56,28 +54,22 @@ async def create_chat(_, info, title="", members=None):
# NOTE: private chats has no title
# reuse private chat created before if exists
if len(members) == 2 and title == "":
chatset1 = set(
(await redis.execute("SMEMBERS", f"chats_by_author/{members[0]}")) or []
)
chatset2 = set(
(await redis.execute("SMEMBERS", f"chats_by_author/{members[1]}")) or []
)
chatset1 = set((await redis.execute("SMEMBERS", f"chats_by_author/{members[0]}")) or [])
chatset2 = set((await redis.execute("SMEMBERS", f"chats_by_author/{members[1]}")) or [])
for c in chatset1.intersection(chatset2):
chat = await redis.execute("GET", f"chats/{c.decode('utf-8')}")
if chat:
chat = json.loads(chat)
chat_data = await redis.execute("GET", f"chats/{c.decode('utf-8')}")
if chat_data:
chat = json.loads(chat_data)
if chat["title"] == "":
print("[inbox] createChat found old chat")
print(chat)
break
if chat:
return {"chat": chat, "error": "existed"}
return {"chat": chat, "error": "existed"}
chat_id = str(uuid.uuid4())
chat: Chat = {
"id": chat_id,
"members": members,
"title": title,
"description": "",
"createdBy": author_id,
"createdAt": int(datetime.now(tz=timezone.utc).timestamp()),
"updatedAt": int(datetime.now(tz=timezone.utc).timestamp()),
@ -100,9 +92,9 @@ async def create_chat(_, info, title="", members=None):
async def delete_chat(_, info, chat_id: str):
author_id = info.context["author_id"]
chat = await redis.execute("GET", f"/chats/{chat_id}")
if chat:
chat: Chat = json.loads(chat)
chat_str = await redis.execute("GET", f"/chats/{chat_id}")
if chat_str:
chat: Chat = json.loads(chat_str)
if author_id in chat["admins"]:
await redis.execute("DEL", f"chats/{chat_id}")
await redis.execute("SREM", f"chats_by_author/{author_id}", chat_id)

View File

@ -4,16 +4,21 @@ from typing import Any, Dict, List, Optional, Union
from services.auth import login_required
from services.core import get_author, get_network
from services.redis import redis
from services.rediscache import redis
from services.schema import query
from validators.inbox import Message, Chat, ChatMember
from validators.chat import Message, ChatPayload
from validators.member import ChatMember
from .chats import create_chat
from .unread import get_unread_counter
async def get_unread_counter(chat_id: str, author_id: int) -> int:
unread = await redis.execute("LLEN", f"chats/{chat_id}/unread/{author_id}")
return unread or 0
# NOTE: not an API handler
async def load_messages(
chat_id: str, limit: int = 5, offset: int = 0, ids: Optional[List[str]] = None
chat_id: str, limit: int = 5, offset: int = 0, ids: Optional[List[int]] = None
) -> List[Message]:
"""load :limit messages for :chat_id with :offset"""
if ids is None:
@ -22,11 +27,7 @@ async def load_messages(
try:
message_ids = [] + ids
if limit:
mids = (
await redis.lrange(
f"chats/{chat_id}/message_ids", offset, offset + limit
)
) or []
mids = (await redis.lrange(f"chats/{chat_id}/message_ids", offset, offset + limit)) or []
mids = [mid for mid in mids]
message_ids += mids
if message_ids:
@ -49,14 +50,12 @@ async def load_messages(
@query.field("loadChats")
@login_required
async def load_chats(
_, info, limit: int = 50, offset: int = 0
) -> Dict[str, Union[List[Dict[str, Any]], None]]:
async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Union[List[Dict[str, Any]], None]]:
"""load :limit chats of current user with :offset"""
author_id = info.context["author_id"]
cids = (await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or []
members_online = (await redis.execute("SMEMBERS", "authors-online")) or []
cids = list(cids)[offset: (offset + limit)]
cids = list(cids)[offset : (offset + limit)]
chats = []
lock = asyncio.Lock()
if len(cids) == 0:
@ -66,16 +65,16 @@ async def load_chats(
cids.append(r["chat"]["id"])
for cid in cids:
async with lock:
c = await redis.execute("GET", f"chats/{cid}")
print(f"[resolvers.load] redis GET by {cid}: {c}")
if c:
c: Chat = json.loads(c)
chat_str = await redis.execute("GET", f"chats/{cid}")
print(f"[resolvers.load] redis GET by {cid}: {chat_str}")
if chat_str:
c: ChatPayload = json.loads(chat_str)
c["messages"] = await load_messages(cid, 5, 0)
c["unread"] = await get_unread_counter(cid, author_id)
member_ids = c["members"].copy()
c["members"] = []
for member_id in member_ids:
a = await get_author(member_id)
a: ChatMember = await get_author(member_id)
if a:
a["online"] = a.get("id") in members_online
c["members"].append(a)
@ -88,9 +87,7 @@ async def load_chats(
async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0):
"""load :limit messages of :chat_id with :offset"""
author_id = info.context["author_id"]
user_chats = (
await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id))
) or []
user_chats = (await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id))) or []
user_chats = [c for c in user_chats]
if user_chats:
messages = []

View File

@ -4,9 +4,9 @@ from typing import List
from services.auth import login_required
from services.presence import notify_message
from services.redis import redis
from services.rediscache import redis
from services.schema import mutation
from validators.inbox import Message
from validators.chat import Message
@mutation.field("createMessage")
@ -35,7 +35,7 @@ async def create_message(_, info, chat: str, body: str, reply_to=None):
"author": author_id,
"body": body,
"createdAt": int(datetime.now(tz=timezone.utc).timestamp()),
"updatedAt": None
"updatedAt": None,
}
if reply_to:
new_message["replyTo"] = reply_to
@ -47,18 +47,12 @@ async def create_message(_, info, chat: str, body: str, reply_to=None):
f"chats/{chat_dict['id']}/messages/{message_id}",
json.dumps(new_message),
)
await redis.execute(
"LPUSH", f"chats/{chat_dict['id']}/message_ids", str(message_id)
)
await redis.execute(
"SET", f"chats/{chat_dict['id']}/next_message_id", str(message_id + 1)
)
await redis.execute("LPUSH", f"chats/{chat_dict['id']}/message_ids", str(message_id))
await redis.execute("SET", f"chats/{chat_dict['id']}/next_message_id", str(message_id + 1))
members = chat_dict["members"]
for member_id in members:
await redis.execute(
"LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id)
)
await redis.execute("LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id))
# result = FollowingResult("NEW", "chat", new_message)
# await FollowingManager.push("chat", result)
@ -89,9 +83,7 @@ async def update_message(_, info, chat_id: str, message_id: int, body: str):
message["body"] = body
message["updatedAt"] = int(datetime.now(tz=timezone.utc).timestamp())
await redis.execute(
"SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message)
)
await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message))
# result = FollowingResult("UPDATE", "chat", new_message)
# await FollowingManager.push("chat", result)
@ -122,9 +114,7 @@ async def delete_message(_, info, chat_id: str, message_id: int):
members = chat["members"]
for member_id in members:
await redis.execute(
"LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id)
)
await redis.execute("LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id))
# result = FollowingResult("DELETED", "chat", message)
# await FollowingManager.push(result)
@ -148,8 +138,6 @@ async def mark_as_read(_, info, chat_id: str, messages: List[int]):
return {"error": "access denied"}
for message_id in messages:
await redis.execute(
"LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id)
)
await redis.execute("LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id))
return {"error": None}

View File

@ -5,7 +5,7 @@ from typing import Dict, Union, List, Any
from resolvers.load import load_messages
from services.auth import login_required
from services.core import get_network
from services.redis import redis
from services.rediscache import redis
from services.schema import query
@ -19,7 +19,7 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0
author_id = info.context["author_id"]
talk_before = await redis.execute("GET", f"/chats_by_author/{author_id}")
if talk_before:
talk_before = list(json.loads(talk_before))[offset: (offset + limit)]
talk_before = list(json.loads(talk_before))[offset : (offset + limit)]
for chat_id in talk_before:
members = await redis.execute("GET", f"/chats/{chat_id}/members")
if members:
@ -41,9 +41,7 @@ async def search_in_chats(
_, info, by: Dict[str, Union[str, int]], limit: int, offset: int
) -> Dict[str, Union[List[Dict[str, Any]], None]]:
author_id = info.context["author_id"]
lookup_chats = set(
(await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or []
)
lookup_chats = set((await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or [])
messages_set = set([])
by_member = by.get("author")
@ -68,9 +66,8 @@ async def search_in_chats(
if days_ago:
mmm = list(
filter(
lambda msg: int(datetime.now(tz=timezone.utc))
- int(msg["createdAt"])
< timedelta(days=days_ago),
lambda msg: int(datetime.now(tz=timezone.utc)) - int(msg["createdAt"])
< int(timedelta(days=days_ago)),
mmm,
)
)

View File

@ -1,30 +0,0 @@
import json
from services.auth import login_required
from services.redis import redis
async def get_unread_counter(chat_id: str, author_id: int) -> int:
try:
unread = await redis.execute("LLEN", f"chats/{chat_id}/unread/{author_id}")
return unread or 0
except Exception:
return 0
async def get_total_unread_counter(author_id: int) -> int:
chats = await redis.execute("GET", f"chats_by_author/{author_id}")
unread = 0
if chats:
chats = json.loads(chats)
for chat_id in chats:
n = await get_unread_counter(chat_id.decode("utf-8"), author_id)
unread += n
return unread
@login_required
async def resolve_total_unread_counter(_, info):
author_id = info.context["author_id"]
return get_total_unread_counter(author_id)

View File

@ -1,4 +1,3 @@
import json
from functools import wraps
from httpx import AsyncClient, HTTPError
@ -19,19 +18,13 @@ async def check_auth(req):
headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
gql = {
"query": query_type
+ " "
+ operation
+ " { "
+ query_name
+ " { user { id } } "
+ " }",
"query": query_type + " " + operation + " { " + query_name + " { user { id } } " + " }",
"operationName": operation,
"variables": None,
}
async with AsyncClient() as client:
response = await client.post(AUTH_URL, headers=headers, data=json.dumps(gql))
response = await client.post(AUTH_URL, headers=headers, json=gql)
print(f"[services.auth] response: {response.status_code} {response.text}")
if response.status_code != 200:
return False, None
@ -40,10 +33,7 @@ async def check_auth(req):
user_id = (
r.get("data", {}).get(query_name, {}).get("user", {}).get("id", None)
if INTERNAL_AUTH_SERVER
else r.get("data", {})
.get(query_name, {})
.get("user", {})
.get("id", None)
else r.get("data", {}).get(query_name, {}).get("user", {}).get("id", None)
)
is_authenticated = user_id is not None
return is_authenticated, user_id

View File

@ -1,43 +1,38 @@
import json
from httpx import AsyncClient
from settings import API_BASE
from validators.member import ChatMember
headers = {"Content-Type": "application/json"}
async def get_author(author_id):
gql = {
"query": '''query GetAuthorById($author_id: Int!) {
getAuthorById(author_id: $author_id) { id slug userpic name lastSeen }
}''',
"query": """query GetAuthorById($author_id: Int!) {
getAuthorById(author_id: $author_id) {
id slug userpic name lastSeen
}
}""",
"operation": "GetAuthorById",
"variables": {"author_id": author_id},
}
async with AsyncClient() as client:
try:
response = await client.post(
API_BASE, headers=headers, data=json.dumps(gql)
)
print(f"[services.core] get_author: {response.status_code} {response.text}")
if response.status_code != 200:
return None
r = response.json()
author = r.get("data", {}).get("getAuthorById")
return author
except Exception:
response = await client.post(API_BASE, headers=headers, json=gql)
print(f"[services.core] get_author: {response.status_code} {response.text}")
if response.status_code != 200:
return None
r = response.json()
author: ChatMember | None = r.get("data", {}).get("getAuthorById")
return author
async def get_network(author_id: int, limit: int = 50, offset: int = 0) -> list:
gql = {
"query": '''query LoadAuthors($author_id: Int!, $limit: Int, $offset: Int) {
authorFollowings(author_id: $author_id, limit: $limit, offset: $offset) {
id slug userpic name
"query": """query LoadAuthors($author_id: Int!, $limit: Int, $offset: Int) {
authorFollowings(author_id: $author_id, limit: $limit, offset: $offset) {
id slug userpic name
}
}''',
}""",
"operation": "LoadAuthors",
"variables": {"author_id": author_id, "limit": limit, "offset": offset},
}
@ -45,9 +40,7 @@ async def get_network(author_id: int, limit: int = 50, offset: int = 0) -> list:
followings = []
try:
async with AsyncClient() as client:
response = await client.post(
API_BASE, headers=headers, data=json.dumps(gql)
)
response = await client.post(API_BASE, headers=headers, json=gql)
if response.status_code != 200:
return []
r = response.json()
@ -64,25 +57,21 @@ async def get_network(author_id: int, limit: int = 50, offset: int = 0) -> list:
async def get_followers(author_id, amount):
gql = {
"query": '''query LoadAuthors($author_id: Int!, $limit: Int, $offset: Int) {
"query": """query LoadAuthors($author_id: Int!, $limit: Int, $offset: Int) {
authorFollowers(author_id: $author_id, limit: $limit) {
id slug userpic name
}
}''',
}""",
"operation": "LoadAuthors",
"variables": {"author_id": author_id, "limit": amount},
}
followers = []
try:
async with AsyncClient() as client:
response = await client.post(
API_BASE, headers=headers, data=json.dumps(gql)
)
response = await client.post(API_BASE, headers=headers, json=gql)
if response.status_code != 200:
return []
r = response.json()
followers = r.get("data", {}).get("authorFollowers", [])
return r.get("data", {}).get("authorFollowers", [])
except Exception as e:
print(e)
followers = []
return followers
return []

View File

@ -1,7 +1,7 @@
import json
from services.redis import redis
from validators.inbox import Message
from services.rediscache import redis
from validators.chat import Message
async def notify_message(message: Message, chat_id: str):

36
validators/chat.py Normal file
View File

@ -0,0 +1,36 @@
from typing import TypedDict, Optional, List
from validators.member import ChatMember
from validators.message import Message
class Chat(TypedDict):
id: str
members: List[int]
admins: List[int]
title: str
updatedAt: Optional[int]
createdAt: int
createdBy: int
description: Optional[str]
class ChatPayload(TypedDict):
id: str
members: List[int | ChatMember]
admins: List[int]
title: str
updatedAt: Optional[int]
createdAt: int
createdBy: int
description: Optional[str]
messages: Optional[List[Message]]
unread: Optional[int]
class ChatUpdate(TypedDict):
id: str
members: List[int]
admins: List[int]
title: str
description: Optional[str]

View File

@ -1,32 +0,0 @@
from typing import TypedDict, Optional, List
class Message(TypedDict):
id: int
chat: str
author: int
body: str
createdAt: int
replyTo: Optional[int]
createdAt: int
updatedAt: Optional[int]
class Chat(TypedDict):
id: str
members: List[int]
admins: List[int]
title: str
updatedAt: Optional[int]
createdAt: int
createdBy: int
description: Optional[str]
class ChatMember(TypedDict):
id: int
slug: str
name: str
userpic: Optional[str]
lastSeen: int
online: Optional[bool]

10
validators/member.py Normal file
View File

@ -0,0 +1,10 @@
from typing import TypedDict, Optional
class ChatMember(TypedDict):
id: int
slug: str
name: str
userpic: Optional[str]
lastSeen: int
online: Optional[bool]

12
validators/message.py Normal file
View File

@ -0,0 +1,12 @@
from typing import TypedDict, Optional
class Message(TypedDict):
id: int
chat: str
author: int
body: str
createdAt: int
replyTo: Optional[int]
createdAt: int
updatedAt: Optional[int]