0.3.0+sentry
Some checks failed
deploy / deploy (push) Failing after 47s

This commit is contained in:
Untone 2024-04-08 09:30:57 +03:00
parent 106f1bfbde
commit c8f65ca0c9
15 changed files with 373 additions and 322 deletions

View File

@ -10,7 +10,6 @@ repos:
- id: trailing-whitespace - id: trailing-whitespace
- id: check-added-large-files - id: check-added-large-files
- id: detect-private-key - id: detect-private-key
- id: double-quote-string-fixer
- id: check-ast - id: check-ast
- id: check-merge-conflict - id: check-merge-conflict

66
main.py
View File

@ -3,55 +3,33 @@ from importlib import import_module
from os.path import exists from os.path import exists
from ariadne import load_schema_from_path, make_executable_schema from ariadne import load_schema_from_path, make_executable_schema
from ariadne.asgi import GraphQL
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.ariadne import AriadneIntegration
from sentry_sdk.integrations.redis import RedisIntegration
from starlette.applications import Starlette from starlette.applications import Starlette
from services.core import CacheStorage
from services.rediscache import redis from services.rediscache import redis
from services.schema import resolvers from services.schema import resolvers
from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN from services.sentry import start_sentry
from settings import DEV_SERVER_PID_FILE_NAME, MODE
import_module("resolvers")
schema = make_executable_schema(load_schema_from_path("inbox.graphql"), resolvers)
import_module('resolvers') async def start():
schema = make_executable_schema(load_schema_from_path('inbox.graphql'), resolvers) # type: ignore if MODE == "development":
if not exists(DEV_SERVER_PID_FILE_NAME):
# pid file management
with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f:
f.write(str(os.getpid()))
print(f"[main] process started in {MODE} mode")
async def start_up(): # main starlette app object with ariadne mounted in root
try: app = Starlette(
await redis.connect() on_startup=[
redis.connect,
await CacheStorage.init() start_sentry,
start,
if MODE == 'dev': ],
if not exists(DEV_SERVER_PID_FILE_NAME): on_shutdown=[redis.disconnect],
with open(DEV_SERVER_PID_FILE_NAME, 'w', encoding='utf-8') as f: debug=True,
f.write(str(os.getpid())) )
else:
# startup sentry monitoring services
import sentry_sdk
sentry_sdk.init(
SENTRY_DSN,
enable_tracing=True,
integrations=[
AriadneIntegration(),
RedisIntegration(),
AioHttpIntegration(),
],
)
except Exception:
print('STARTUP FAILED')
import traceback
traceback.print_exc()
async def shutdown():
await redis.disconnect()
app = Starlette(debug=True, on_startup=[start_up], on_shutdown=[shutdown])
app.mount('/', GraphQL(schema, debug=True))

View File

@ -6,18 +6,18 @@ authors = ["Tony Rewin <anton.rewin@gmail.com>"]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.12" python = "^3.12"
sentry-sdk = "^1.39.1" sentry-sdk = "^1.44.1"
redis = { extras = ["hiredis"], version = "^5.0.1" } redis = "^5.0.3"
ariadne = "^0.21" ariadne = "^0.23.0"
starlette = "^0.36.1" starlette = "^0.37.2"
itsdangerous = "^2.1.2" itsdangerous = "^2.1.2"
aiohttp = "^3.9.1" aiohttp = "^3.9.3"
requests = "^2.31.0" requests = "^2.31.0"
granian = "^1.0.1" granian = "^1.2.1"
pre-commit = "^3.6.0"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
ruff = "^0.2.1" pre-commit = "^3.6.0"
ruff = "^0.3.5"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]

View File

@ -1,20 +1,25 @@
from resolvers.chats import create_chat, delete_chat, update_chat from resolvers.chats import create_chat, delete_chat, update_chat
from resolvers.load import load_chats, load_messages_by from resolvers.load import load_chats, load_messages_by
from resolvers.messages import create_message, delete_message, mark_as_read, update_message from resolvers.messages import (
create_message,
delete_message,
mark_as_read,
update_message,
)
from resolvers.search import search_messages, search_recipients from resolvers.search import search_messages, search_recipients
__all__ = [ __all__ = [
# inbox # inbox
'load_chats', "load_chats",
'load_messages_by', "load_messages_by",
'create_chat', "create_chat",
'delete_chat', "delete_chat",
'update_chat', "update_chat",
'create_message', "create_message",
'delete_message', "delete_message",
'update_message', "update_message",
'mark_as_read', "mark_as_read",
'search_recipients', "search_recipients",
'search_messages', "search_messages",
] ]

View File

@ -10,11 +10,11 @@ from services.rediscache import redis
from services.schema import mutation from services.schema import mutation
logger = logging.getLogger('[resolvers.chats] ') logger = logging.getLogger("[resolvers.chats] ")
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@mutation.field('update_chat') @mutation.field("update_chat")
@login_required @login_required
async def update_chat(_, info, chat_new: ChatUpdate): async def update_chat(_, info, chat_new: ChatUpdate):
""" """
@ -25,38 +25,38 @@ async def update_chat(_, info, chat_new: ChatUpdate):
:param chat_new: dict with chat data :param chat_new: dict with chat data
:return: Result { error chat } :return: Result { error chat }
""" """
logger.info('update_chat') logger.info("update_chat")
author_id = info.context['author_id'] author_id = info.context["author_id"]
chat_id = chat_new['id'] chat_id = chat_new["id"]
chat_str = await redis.execute('GET', f'chats/{chat_id}') chat_str = await redis.execute("GET", f"chats/{chat_id}")
if not chat_str: if not chat_str:
return {'error': 'chat not exist'} return {"error": "chat not exist"}
elif isinstance(chat_str, str): elif isinstance(chat_str, str):
chat: Chat = json.loads(chat_str) chat: Chat = json.loads(chat_str)
if author_id in chat['admins']: if author_id in chat["admins"]:
chat.update( chat.update(
{ {
'title': chat_new.get('title', chat['title']), "title": chat_new.get("title", chat["title"]),
'description': chat_new.get('description', chat['description']), "description": chat_new.get("description", chat["description"]),
'updated_at': int(time.time()), "updated_at": int(time.time()),
'admins': chat_new.get('admins', chat.get('admins') or []), "admins": chat_new.get("admins", chat.get("admins") or []),
'members': chat_new.get('members', chat['members']), "members": chat_new.get("members", chat["members"]),
} }
) )
await redis.execute('SET', f"chats/{chat['id']}", json.dumps(chat)) await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat))
for member_id in chat['members']: for member_id in chat["members"]:
await notify_chat(chat, member_id, 'update') await notify_chat(chat, member_id, "update")
return {'error': None, 'chat': chat} return {"error": None, "chat": chat}
@mutation.field('create_chat') @mutation.field("create_chat")
@login_required @login_required
async def create_chat(_, info, title='', members=None): async def create_chat(_, info, title="", members=None):
logger.info('create_chat') logger.info("create_chat")
members = members or [] members = members or []
author_id = info.context['author_id'] author_id = info.context["author_id"]
chat: Chat chat: Chat
if author_id: if author_id:
if author_id not in members: if author_id not in members:
@ -64,51 +64,51 @@ async def create_chat(_, info, title='', members=None):
# NOTE: private chats has no title # NOTE: private chats has no title
# reuse private chat created before if exists # reuse private chat created before if exists
if len(members) == 2 and title == '': if len(members) == 2 and title == "":
chatset1 = await redis.execute('SMEMBERS', f'chats_by_author/{members[0]}') chatset1 = await redis.execute("SMEMBERS", f"chats_by_author/{members[0]}")
chatset2 = await redis.execute('SMEMBERS', f'chats_by_author/{members[1]}') chatset2 = await redis.execute("SMEMBERS", f"chats_by_author/{members[1]}")
for c in chatset1.intersection(chatset2): for c in chatset1.intersection(chatset2):
chat = await redis.execute('GET', f'chats/{c}') chat = await redis.execute("GET", f"chats/{c}")
if chat['title'] == '': if chat["title"] == "":
logger.info('[inbox] createChat found old chat') logger.info("[inbox] createChat found old chat")
return {'chat': chat, 'error': 'existed'} return {"chat": chat, "error": "existed"}
chat_id = str(uuid.uuid4()) chat_id = str(uuid.uuid4())
chat: Chat = { chat: Chat = {
'id': chat_id, "id": chat_id,
'members': members, "members": members,
'title': title, "title": title,
'description': '', "description": "",
'created_by': author_id, "created_by": author_id,
'created_at': int(time.time()), "created_at": int(time.time()),
'updated_at': int(time.time()), "updated_at": int(time.time()),
'admins': members if (len(members) == 2 and title == '') else [], "admins": members if (len(members) == 2 and title == "") else [],
} }
for member_id in members: for member_id in members:
await redis.execute('SADD', f'chats_by_author/{member_id}', chat_id) await redis.execute("SADD", f"chats_by_author/{member_id}", chat_id)
await notify_chat(chat, member_id, 'create') await notify_chat(chat, member_id, "create")
print(f'\n\n[resolvers.chats] creating: {chat}\n\n') print(f"\n\n[resolvers.chats] creating: {chat}\n\n")
await redis.execute('SET', f'chats/{chat_id}', json.dumps(chat)) await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat))
await redis.execute('SET', f'chats/{chat_id}/next_message_id', str(0)) await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0))
return {'error': None, 'chat': chat} return {"error": None, "chat": chat}
return {'error': 'no chat was created'} return {"error": "no chat was created"}
@mutation.field('delete_chat') @mutation.field("delete_chat")
@login_required @login_required
async def delete_chat(_, info, chat_id: str): async def delete_chat(_, info, chat_id: str):
logger.info('delete_chat') logger.info("delete_chat")
author_id = info.context['author_id'] author_id = info.context["author_id"]
chat_str = await redis.execute('GET', f'chats/{chat_id}') chat_str = await redis.execute("GET", f"chats/{chat_id}")
if isinstance(chat_str, str): if isinstance(chat_str, str):
chat: Chat = json.loads(chat_str) chat: Chat = json.loads(chat_str)
if author_id in chat['admins']: if author_id in chat["admins"]:
await redis.execute('DEL', f'chats/{chat_id}') await redis.execute("DEL", f"chats/{chat_id}")
await redis.execute('SREM', f'chats_by_author/{author_id}', chat_id) await redis.execute("SREM", f"chats_by_author/{author_id}", chat_id)
for member_id in chat['members']: for member_id in chat["members"]:
await notify_chat(chat, member_id, 'delete') await notify_chat(chat, member_id, "delete")
return {'error': 'chat not exist'} return {"error": "chat not exist"}

View File

@ -11,12 +11,12 @@ from services.rediscache import redis
from services.schema import query from services.schema import query
logger = logging.getLogger('[resolvers.load] ') logger = logging.getLogger("[resolvers.load] ")
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
async def get_unread_counter(chat_id: str, member_id: int) -> int: async def get_unread_counter(chat_id: str, member_id: int) -> int:
unread = await redis.execute('LLEN', f'chats/{chat_id}/unread/{member_id}') unread = await redis.execute("LLEN", f"chats/{chat_id}/unread/{member_id}")
if isinstance(unread, int): if isinstance(unread, int):
return unread return unread
else: else:
@ -24,25 +24,31 @@ async def get_unread_counter(chat_id: str, member_id: int) -> int:
# NOTE: not an API handler # NOTE: not an API handler
async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids: Optional[List[int]] = None): async def load_messages(
chat_id: str, limit: int = 5, offset: int = 0, ids: Optional[List[int]] = None
):
"""load :limit messages for :chat_id with :offset""" """load :limit messages for :chat_id with :offset"""
logger.info('load_messages') logger.info("load_messages")
messages = [] messages = []
try: try:
message_ids = [] + (ids or []) message_ids = [] + (ids or [])
if limit: if limit:
mids = await redis.execute('LRANGE', f'chats/{chat_id}/message_ids', offset, offset + limit) mids = await redis.execute(
"LRANGE", f"chats/{chat_id}/message_ids", offset, offset + limit
)
if isinstance(mids, list): if isinstance(mids, list):
message_ids.extend(mids) message_ids.extend(mids)
if message_ids: if message_ids:
message_keys = [f'chats/{chat_id}/messages/{mid}' for mid in message_ids] message_keys = [f"chats/{chat_id}/messages/{mid}" for mid in message_ids]
messages = await redis.execute('MGET', *message_keys) messages = await redis.execute("MGET", *message_keys)
if isinstance(messages, list): if isinstance(messages, list):
messages = [json.loads(m) if isinstance(m, str) else m for m in messages] messages = [
json.loads(m) if isinstance(m, str) else m for m in messages
]
replies = [] replies = []
for m in messages: for m in messages:
if m: if m:
reply_to = m.get('reply_to') reply_to = m.get("reply_to")
if reply_to: if reply_to:
reply_to = int(reply_to) reply_to = int(reply_to)
if reply_to not in message_ids: if reply_to not in message_ids:
@ -59,85 +65,91 @@ async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids: Opti
return messages return messages
@query.field('load_chats') @query.field("load_chats")
@login_required @login_required
async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Union[List[Dict[str, Any]], None]]: async def load_chats(
_, info, limit: int = 50, offset: int = 0
) -> Dict[str, Union[List[Dict[str, Any]], None]]:
"""load :limit chats of current user with :offset""" """load :limit chats of current user with :offset"""
logger.info('load_chats') logger.info("load_chats")
author_id = info.context['author_id'] author_id = info.context["author_id"]
chats = [] chats = []
try: try:
if author_id: if author_id:
logger.debug('got author', author_id) logger.debug("got author", author_id)
cids = await redis.execute('SMEMBERS', f'chats_by_author/{author_id}') cids = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
logger.debug('got cids', cids) logger.debug("got cids", cids)
members_online = (await redis.execute('SMEMBERS', 'authors-online')) or [] # to show online status members_online = (
logger.debug('members online', members_online) await redis.execute("SMEMBERS", "authors-online")
) or [] # to show online status
logger.debug("members online", members_online)
if isinstance(cids, set): if isinstance(cids, set):
# TODO: add sort by chat.created_at with in-memory caching chats service # TODO: add sort by chat.created_at with in-memory caching chats service
cids = list(cids)[offset : (offset + limit)] cids = list(cids)[offset : (offset + limit)]
lock = asyncio.Lock() lock = asyncio.Lock()
if len(cids) == 0: if len(cids) == 0:
logger.debug(f'no chats for user with id={author_id}') logger.debug(f"no chats for user with id={author_id}")
r = await create_chat(None, info, members=[2]) # member with id = 2 is discours r = await create_chat(
None, info, members=[2]
) # member with id = 2 is discours
logger.debug(f"created chat: {r['chat_id']}") logger.debug(f"created chat: {r['chat_id']}")
cids.append(r['chat']['id']) cids.append(r["chat"]["id"])
logger.debug(f"getting data for {len(cids)} user's chats") logger.debug(f"getting data for {len(cids)} user's chats")
for cid in cids: for cid in cids:
async with lock: async with lock:
chat_str = await redis.execute('GET', f'chats/{cid}') chat_str = await redis.execute("GET", f"chats/{cid}")
if isinstance(chat_str, str): if isinstance(chat_str, str):
logger.debug(f'redis GET by {cid}: {chat_str}') logger.debug(f"redis GET by {cid}: {chat_str}")
c: ChatPayload = json.loads(chat_str) c: ChatPayload = json.loads(chat_str)
c['messages'] = (await load_messages(cid, 5, 0)) or [] c["messages"] = (await load_messages(cid, 5, 0)) or []
c['unread'] = await get_unread_counter(cid, author_id) c["unread"] = await get_unread_counter(cid, author_id)
member_ids = c['members'].copy() member_ids = c["members"].copy()
c['members'] = [] c["members"] = []
for member_id in member_ids: for member_id in member_ids:
a = CacheStorage.authors_by_id.get(str(member_id)) a = CacheStorage.authors_by_id.get(str(member_id))
if a: if a:
a['online'] = a.get('id') in members_online a["online"] = a.get("id") in members_online
c['members'].append(a) c["members"].append(a)
else: else:
logger.error(f'cant find author by id {member_id}') logger.error(f"cant find author by id {member_id}")
chats.append(c) chats.append(c)
else: else:
logger.error(f'cant find chat by id {cid}') logger.error(f"cant find chat by id {cid}")
except Exception: except Exception:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
return {'chats': chats, 'error': None} return {"chats": chats, "error": None}
@query.field('load_messages_by') @query.field("load_messages_by")
@login_required @login_required
async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0):
"""load :limit messages of :chat_id with :offset""" """load :limit messages of :chat_id with :offset"""
logger.info('load_messages_by') logger.info("load_messages_by")
author_id = info.context['author_id'] author_id = info.context["author_id"]
author_chats = await redis.execute('SMEMBERS', f'chats_by_author/{author_id}') author_chats = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
try: try:
if isinstance(author_chats, set): if isinstance(author_chats, set):
author_chats = list(author_chats) author_chats = list(author_chats)
messages = [] messages = []
by_chat = by.get('chat') by_chat = by.get("chat")
if by_chat in author_chats: if by_chat in author_chats:
chat = await redis.execute('GET', f'chats/{by_chat}') chat = await redis.execute("GET", f"chats/{by_chat}")
if not chat: if not chat:
return {'messages': [], 'error': 'chat not exist'} return {"messages": [], "error": "chat not exist"}
# everyone's messages in filtered chat # everyone's messages in filtered chat
messages = await load_messages(by_chat, limit, offset) messages = await load_messages(by_chat, limit, offset)
if isinstance(messages, list): if isinstance(messages, list):
sorted_messages = [m for m in messages if m and m.get('created_at')] sorted_messages = [m for m in messages if m and m.get("created_at")]
return { return {
'messages': sorted( "messages": sorted(
sorted_messages, sorted_messages,
key=lambda m: m.get('created_at'), key=lambda m: m.get("created_at"),
), ),
'error': None, "error": None,
} }
except Exception as exc: except Exception as exc:
@ -145,4 +157,4 @@ async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0):
import traceback import traceback
traceback.print_exc() traceback.print_exc()
return {'error': 'Cannot get messages of this chat'} return {"error": "Cannot get messages of this chat"}

View File

@ -9,161 +9,183 @@ from services.rediscache import redis
from services.schema import mutation from services.schema import mutation
logger = logging.getLogger('[resolvers.messages] ') logger = logging.getLogger("[resolvers.messages] ")
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@mutation.field('create_message') @mutation.field("create_message")
@login_required @login_required
async def create_message(_, info, chat_id: str, body: str, reply_to=None): async def create_message(_, info, chat_id: str, body: str, reply_to=None):
"""Создание сообщения с телом :body для чата :chat_id с возможным ответом на :reply_to""" """Создание сообщения с телом :body для чата :chat_id с возможным ответом на :reply_to"""
author_id = info.context['author_id'] author_id = info.context["author_id"]
# Получение данных чата из Redis # Получение данных чата из Redis
chat_data = await redis.execute('GET', f'chats/{chat_id}') chat_data = await redis.execute("GET", f"chats/{chat_id}")
logger.debug(f'chat data: {chat_data}') logger.debug(f"chat data: {chat_data}")
# Если данных чата нет, возвращаем ошибку # Если данных чата нет, возвращаем ошибку
if not chat_data: if not chat_data:
return {'error': 'chat is not exist'} return {"error": "chat is not exist"}
elif isinstance(chat_data, str): elif isinstance(chat_data, str):
# Преобразование данных чата из строки JSON в словарь # Преобразование данных чата из строки JSON в словарь
chat_dict = json.loads(chat_data) chat_dict = json.loads(chat_data)
chat_id = chat_dict['id'] chat_id = chat_dict["id"]
# Получение ID следующего сообщения # Получение ID следующего сообщения
message_id = await redis.execute('GET', f"chats/{chat_dict['id']}/next_message_id") message_id = await redis.execute(
"GET", f"chats/{chat_dict['id']}/next_message_id"
)
if isinstance(message_id, str) or isinstance(message_id, int): if isinstance(message_id, str) or isinstance(message_id, int):
message_id = int(message_id) if message_id else 0 message_id = int(message_id) if message_id else 0
# Создание нового сообщения # Создание нового сообщения
new_message: Message = { new_message: Message = {
'chat_id': chat_id, "chat_id": chat_id,
'id': message_id, "id": message_id,
'created_by': author_id, "created_by": author_id,
'body': body, "body": body,
'created_at': int(time.time()), "created_at": int(time.time()),
'updated_at': None, "updated_at": None,
'reply_to': None, "reply_to": None,
} }
# Если есть ответ, добавляем его в сообщение # Если есть ответ, добавляем его в сообщение
if reply_to: if reply_to:
new_message['reply_to'] = reply_to new_message["reply_to"] = reply_to
# Обновление времени последнего обновления чата # Обновление времени последнего обновления чата
chat_dict['updated_at'] = new_message['created_at'] chat_dict["updated_at"] = new_message["created_at"]
# Запись обновленных данных чата обратно в Redis # Запись обновленных данных чата обратно в Redis
await redis.execute('SET', f'chats/{chat_id}', json.dumps(chat_dict)) await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat_dict))
logger.debug(f'creating message {new_message}') logger.debug(f"creating message {new_message}")
# Запись нового сообщения в Redis # Запись нового сообщения в Redis
await redis.execute( await redis.execute(
'SET', "SET",
f'chats/{chat_id}/messages/{message_id}', f"chats/{chat_id}/messages/{message_id}",
json.dumps(new_message), json.dumps(new_message),
) )
# Добавление ID нового сообщения в список ID сообщений чата # Добавление ID нового сообщения в список ID сообщений чата
await redis.execute('LPUSH', f'chats/{chat_id}/message_ids', str(message_id)) await redis.execute(
"LPUSH", f"chats/{chat_id}/message_ids", str(message_id)
)
# Обновление ID следующего сообщения # Обновление ID следующего сообщения
await redis.execute('SET', f'chats/{chat_id}/next_message_id', str(message_id + 1)) await redis.execute(
"SET", f"chats/{chat_id}/next_message_id", str(message_id + 1)
)
# Добавление нового сообщения в список непрочитанных сообщений для каждого участника чата # Добавление нового сообщения в список непрочитанных сообщений для каждого участника чата
members = chat_dict['members'] members = chat_dict["members"]
for member_id in members: for member_id in members:
await redis.execute('LPUSH', f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id)) await redis.execute(
"LPUSH",
f"chats/{chat_dict['id']}/unread/{member_id}",
str(message_id),
)
# Отправка уведомления о новом сообщении # Отправка уведомления о новом сообщении
new_message['chat_id'] = chat_id new_message["chat_id"] = chat_id
await notify_message(new_message, 'create') await notify_message(new_message, "create")
return {'message': new_message, 'error': None} return {"message": new_message, "error": None}
return {'error': 'cannot create message'} return {"error": "cannot create message"}
@mutation.field('update_message') @mutation.field("update_message")
@login_required @login_required
async def update_message(_, info, message): async def update_message(_, info, message):
author_id = info.context['author_id'] author_id = info.context["author_id"]
chat_id = message.get('chat_id') chat_id = message.get("chat_id")
chat_str = '' chat_str = ""
if chat_id: if chat_id:
chat_str = await redis.execute('GET', f'chats/{chat_id}') chat_str = await redis.execute("GET", f"chats/{chat_id}")
if not chat_str: if not chat_str:
return {'error': 'chat not exist'} return {"error": "chat not exist"}
message_id = message.get('id') message_id = message.get("id")
body = message.get('body') body = message.get("body")
if message_id: if message_id:
message = await redis.execute('GET', f'chats/{chat_id}/messages/{message_id}') message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}")
if isinstance(message, str): if isinstance(message, str):
message = json.loads(message) message = json.loads(message)
if message['created_by'] != author_id: if message["created_by"] != author_id:
return {'error': 'access denied'} return {"error": "access denied"}
if body: if body:
message['body'] = body message["body"] = body
message['updated_at'] = int(time.time()) message["updated_at"] = int(time.time())
await redis.execute('SET', f'chats/{chat_id}/messages/{message_id}', json.dumps(message)) await redis.execute(
"SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message)
)
# Отправка уведомления # Отправка уведомления
message['chat_id'] = chat_id message["chat_id"] = chat_id
await notify_message(message, 'update') await notify_message(message, "update")
return {'message': message, 'error': None} return {"message": message, "error": None}
return {'message': message, 'error': 'cannot update'} return {"message": message, "error": "cannot update"}
@mutation.field('delete_message') @mutation.field("delete_message")
@login_required @login_required
async def delete_message(_, info, chat_id: str, message_id: int): async def delete_message(_, info, chat_id: str, message_id: int):
author_id = info.context['author_id'] author_id = info.context["author_id"]
chat_str = await redis.execute('GET', f'chats/{chat_id}') chat_str = await redis.execute("GET", f"chats/{chat_id}")
if isinstance(chat_str, str): if isinstance(chat_str, str):
chat = json.loads(chat_str) chat = json.loads(chat_str)
message_data = await redis.execute('GET', f'chats/{chat_id}/messages/{str(message_id)}') message_data = await redis.execute(
"GET", f"chats/{chat_id}/messages/{str(message_id)}"
)
if isinstance(message_data, str): if isinstance(message_data, str):
message: Message = json.loads(message_data) message: Message = json.loads(message_data)
if message['created_by'] != author_id: if message["created_by"] != author_id:
return {'error': 'access denied'} return {"error": "access denied"}
await redis.execute('LREM', f'chats/{chat_id}/message_ids', 0, str(message_id)) await redis.execute(
await redis.execute('DEL', f'chats/{chat_id}/messages/{str(message_id)}') "LREM", f"chats/{chat_id}/message_ids", 0, str(message_id)
)
await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}")
members = chat['members'] members = chat["members"]
for member_id in members: for member_id in members:
await redis.execute('LREM', f'chats/{chat_id}/unread/{member_id}', 0, str(message_id)) await redis.execute(
"LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id)
)
message['chat_id'] = chat_id message["chat_id"] = chat_id
await notify_message(message, 'delete') await notify_message(message, "delete")
return {} return {}
@mutation.field('mark_as_read') @mutation.field("mark_as_read")
@login_required @login_required
async def mark_as_read(_, info, chat_id: str, message_id: int): async def mark_as_read(_, info, chat_id: str, message_id: int):
author_id = info.context['author_id'] author_id = info.context["author_id"]
chat_str = await redis.execute('GET', f'chats/{chat_id}') chat_str = await redis.execute("GET", f"chats/{chat_id}")
if isinstance(chat_str, str): if isinstance(chat_str, str):
chat = json.loads(chat_str) chat = json.loads(chat_str)
members = set(chat['members']) members = set(chat["members"])
if author_id not in members: if author_id not in members:
return {'error': 'access denied'} return {"error": "access denied"}
await redis.execute('LREM', f'chats/{chat_id}/unread/{author_id}', 0, str(message_id)) await redis.execute(
"LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id)
)
message_data = await redis.execute('GET', f'chats/{chat_id}/messages/{str(message_id)}') message_data = await redis.execute(
"GET", f"chats/{chat_id}/messages/{str(message_id)}"
)
if isinstance(message_data, str): if isinstance(message_data, str):
message: Message = json.loads(message_data) message: Message = json.loads(message_data)
await notify_message(message, 'seen') await notify_message(message, "seen")
return {'error': None} return {"error": None}

View File

@ -8,47 +8,47 @@ from services.rediscache import redis
from services.schema import query from services.schema import query
@query.field('search_recipients') @query.field("search_recipients")
@login_required @login_required
async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0): async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0):
result = set() result = set()
# TODO: maybe redis scan? # TODO: maybe redis scan?
author_id = info.context['author_id'] author_id = info.context["author_id"]
existed_chats = await redis.execute('SMEMBERS', f'/chats_by_author/{author_id}') existed_chats = await redis.execute("SMEMBERS", f"/chats_by_author/{author_id}")
if isinstance(existed_chats, set): if isinstance(existed_chats, set):
chats_list = list(existed_chats) chats_list = list(existed_chats)
for chat_id in chats_list[offset : (offset + limit)]: for chat_id in chats_list[offset : (offset + limit)]:
members_ids = await redis.execute('SMEMBERS', f'/chats/{chat_id}/members') members_ids = await redis.execute("SMEMBERS", f"/chats/{chat_id}/members")
if isinstance(members_ids, set): if isinstance(members_ids, set):
for member_id in members_ids: for member_id in members_ids:
author = CacheStorage.authors_by_id.get(str(member_id)) author = CacheStorage.authors_by_id.get(str(member_id))
if author: if author:
if author['name'].startswith(text): if author["name"].startswith(text):
result.add(author) result.add(author)
more_amount = limit - len(result) more_amount = limit - len(result)
if more_amount > 0: if more_amount > 0:
result.update(CacheStorage.authors[0:more_amount]) result.update(CacheStorage.authors[0:more_amount])
return {'members': list(result), 'error': None} return {"members": list(result), "error": None}
@query.field('search_messages') @query.field("search_messages")
@login_required @login_required
async def search_messages( async def search_messages(
_, info, by: Dict[str, Union[str, int]], limit: int, offset: int _, info, by: Dict[str, Union[str, int]], limit: int, offset: int
) -> Dict[str, Union[List[Dict[str, Any]], None]]: ) -> Dict[str, Union[List[Dict[str, Any]], None]]:
messages_set = set() messages_set = set()
author_id = info.context['author_id'] author_id = info.context["author_id"]
lookup_chats = await redis.execute('SMEMBERS', f'chats_by_author/{author_id}') lookup_chats = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
if isinstance(lookup_chats, set): if isinstance(lookup_chats, set):
# pre-filter lookup chats # pre-filter lookup chats
by_member = by.get('author') by_member = by.get("author")
if by_member: if by_member:
lookup_chats = filter( lookup_chats = filter(
lambda ca: by_member in ca['members'], lambda ca: by_member in ca["members"],
list(lookup_chats), list(lookup_chats),
) )
@ -58,18 +58,18 @@ async def search_messages(
fltr = None fltr = None
now = int(time.time()) now = int(time.time())
if by_member: if by_member:
fltr = lambda mx: mx and mx['created_by'] == by_member # noqa E731 fltr = lambda mx: mx and mx["created_by"] == by_member # noqa E731
body_like = by.get('body') or '' body_like = by.get("body") or ""
if isinstance(body_like, str): if isinstance(body_like, str):
fltr = lambda mx: mx and body_like in mx['body'] # noqa E731 fltr = lambda mx: mx and body_like in mx["body"] # noqa E731
days_ago = int(by.get('days') or '0') days_ago = int(by.get("days") or "0")
if days_ago: if days_ago:
ts = days_ago * 24 * 60 * 60 ts = days_ago * 24 * 60 * 60
fltr = lambda mx: mx and now - mx['created_by'] < ts # noqa E731 fltr = lambda mx: mx and now - mx["created_by"] < ts # noqa E731
if fltr: if fltr:
mmm = await load_messages(chat_id, limit, offset) mmm = await load_messages(chat_id, limit, offset)
if isinstance(mmm, list): if isinstance(mmm, list):
mmm = list(filter(fltr, mmm)) mmm = list(filter(fltr, mmm))
messages_set |= set(mmm) messages_set |= set(mmm)
return {'messages': sorted(messages_set), 'error': None} return {"messages": sorted(messages_set), "error": None}

View File

@ -4,15 +4,15 @@ from granian.server import Granian
from settings import PORT from settings import PORT
if __name__ == '__main__': if __name__ == "__main__":
print('[server] starting...') print("[server] starting...")
granian_instance = Granian( granian_instance = Granian(
'main:app', "main:app",
address='0.0.0.0', # noqa S104 address="0.0.0.0", # noqa S104
port=PORT, port=PORT,
threads=2, threads=2,
websockets=False, websockets=False,
interface=Interfaces.ASGI interface=Interfaces.ASGI,
) )
granian_instance.serve() granian_instance.serve()

View File

@ -8,34 +8,34 @@ from services.core import get_author_by_user
from settings import AUTH_URL from settings import AUTH_URL
logger = logging.getLogger('[services.auth] ') logger = logging.getLogger("[services.auth] ")
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
async def check_auth(req) -> str | None: async def check_auth(req):
logger.debug('checking auth...') logger.debug("checking auth...")
user_id = '' user_id = ""
try: try:
token = req.headers.get('Authorization') token = req.headers.get("Authorization")
if token: if token:
# Logging the authentication token # Logging the authentication token
query_name = 'validate_jwt_token' query_name = "validate_jwt_token"
operation = 'ValidateToken' operation = "ValidateToken"
headers = { headers = {
'Content-Type': 'application/json', "Content-Type": "application/json",
} }
variables = { variables = {
'params': { "params": {
'token_type': 'access_token', "token_type": "access_token",
'token': token, "token": token,
} }
} }
gql = { gql = {
'query': f'query {operation}($params: ValidateJWTTokenInput!) {{ {query_name}(params: $params) {{ is_valid claims }} }}', "query": f"query {operation}($params: ValidateJWTTokenInput!) {{ {query_name}(params: $params) {{ is_valid claims }} }}",
'variables': variables, "variables": variables,
'operationName': operation, "operationName": operation,
} }
# Asynchronous HTTP request to the authentication server # Asynchronous HTTP request to the authentication server
async with ClientSession() as session: async with ClientSession() as session:
@ -44,24 +44,24 @@ async def check_auth(req) -> str | None:
) as response: ) as response:
if response.status == 200: if response.status == 200:
data = await response.json() data = await response.json()
errors = data.get('errors') errors = data.get("errors")
if errors: if errors:
logger.error(f'{errors}') logger.error(f"{errors}")
else: else:
user_id = ( user_id = (
data.get('data', {}) data.get("data", {})
.get(query_name, {}) .get(query_name, {})
.get('claims', {}) .get("claims", {})
.get('sub') .get("sub")
) )
logger.info(f'got user_id: {user_id}') logger.info(f"got user_id: {user_id}")
return user_id return user_id
except Exception as e: except Exception as e:
# Handling and logging exceptions during authentication check # Handling and logging exceptions during authentication check
logger.error(e) logger.error(e)
if not user_id: if not user_id:
raise HTTPException(status_code=401, detail='Unauthorized') raise HTTPException(status_code=401, detail="Unauthorized")
def login_required(f): def login_required(f):
@ -69,16 +69,16 @@ def login_required(f):
async def decorated_function(*args, **kwargs): async def decorated_function(*args, **kwargs):
info = args[1] info = args[1]
context = info.context context = info.context
req = context.get('request') req = context.get("request")
user_id = await check_auth(req) user_id = await check_auth(req)
if user_id: if user_id:
context['user_id'] = user_id.strip() context["user_id"] = user_id.strip()
author = get_author_by_user(user_id) author = get_author_by_user(user_id)
if author and 'id' in author: if author and "id" in author:
context['author_id'] = author['id'] context["author_id"] = author["id"]
else: else:
logger.debug(author) logger.debug(author)
HTTPException(status_code=401, detail='Unauthorized') HTTPException(status_code=401, detail="Unauthorized")
return await f(*args, **kwargs) return await f(*args, **kwargs)
return decorated_function return decorated_function

View File

@ -9,59 +9,61 @@ from models.member import ChatMember
from settings import API_BASE from settings import API_BASE
logger = logging.getLogger('[services.core] ') logger = logging.getLogger("[services.core] ")
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
def _request_endpoint(query_name, body) -> dict: def _request_endpoint(query_name, body) -> dict:
logger.debug(f'requesting {query_name}...') logger.debug(f"requesting {query_name}...")
response = requests.post(API_BASE, headers={'Content-Type': 'application/json'}, json=body, timeout=30.0) response = requests.post(
API_BASE, headers={"Content-Type": "application/json"}, json=body, timeout=30.0
)
if response.status_code == 200: if response.status_code == 200:
try: try:
r = response.json() r = response.json()
result = r.get('data', {}).get(query_name, {}) result = r.get("data", {}).get(query_name, {})
if result: if result:
logger.info(f'entries amount in result: {len(result)} ') logger.info(f"entries amount in result: {len(result)} ")
return result return result
except ValueError as e: except ValueError as e:
logger.error(f'Error decoding JSON response: {e}') logger.error(f"Error decoding JSON response: {e}")
return {} return {}
def get_all_authors(): def get_all_authors():
query_name = 'get_authors_all' query_name = "get_authors_all"
gql = { gql = {
'query': 'query { ' + query_name + '{ id slug pic name user } }', "query": "query { " + query_name + "{ id slug pic name user } }",
'variables': None, "variables": None,
} }
return _request_endpoint(query_name, gql) return _request_endpoint(query_name, gql)
def get_author_by_user(user: str): def get_author_by_user(user: str):
operation = 'GetAuthorId' operation = "GetAuthorId"
query_name = 'get_author_id' query_name = "get_author_id"
gql = { gql = {
'query': f'query {operation}($user: String!) {{ {query_name}(user: $user){{ id }} }}', # noqa E201, E202 "query": f"query {operation}($user: String!) {{ {query_name}(user: $user){{ id }} }}", # noqa E201, E202
'operationName': operation, "operationName": operation,
'variables': {'user': user.strip()}, "variables": {"user": user.strip()},
} }
return _request_endpoint(query_name, gql) return _request_endpoint(query_name, gql)
def get_my_followed() -> List[ChatMember]: def get_my_followed() -> List[ChatMember]:
query_name = 'get_my_followed' query_name = "get_my_followed"
gql = { gql = {
'query': 'query { ' + query_name + ' { authors { id slug pic name } } }', "query": "query { " + query_name + " { authors { id slug pic name } } }",
'variables': None, "variables": None,
} }
result = _request_endpoint(query_name, gql) result = _request_endpoint(query_name, gql)
return result.get('authors', []) return result.get("authors", [])
class CacheStorage: class CacheStorage:
@ -85,12 +87,12 @@ class CacheStorage:
self = CacheStorage self = CacheStorage
async with self.lock: async with self.lock:
result = get_all_authors() result = get_all_authors()
logger.info(f'cache loaded {len(result)}') logger.info(f"cache loaded {len(result)}")
if result: if result:
CacheStorage.authors = result CacheStorage.authors = result
for a in result: for a in result:
user_id = a.get('user') user_id = a.get("user")
author_id = str(a.get('id')) author_id = str(a.get("id"))
self.authors_by_user[user_id] = a self.authors_by_user[user_id] = a
self.authors_by_id[author_id] = a self.authors_by_id[author_id] = a
@ -101,14 +103,14 @@ class CacheStorage:
self = CacheStorage self = CacheStorage
while True: while True:
try: try:
logger.info(' - updating profiles data...') logger.info(" - updating profiles data...")
await self.update_authors() await self.update_authors()
failed = 0 failed = 0
except Exception as er: except Exception as er:
failed += 1 failed += 1
logger.error(f'{er} - update failed #{failed}, wait 10 seconds') logger.error(f"{er} - update failed #{failed}, wait 10 seconds")
if failed > 3: if failed > 3:
logger.error(' - not trying to update anymore') logger.error(" - not trying to update anymore")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
@ -116,8 +118,11 @@ class CacheStorage:
if failed == 0: if failed == 0:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period) when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat()) t = format(when.astimezone().isoformat())
logger.info(' ⎩ next update: %s' % (t.split('T')[0] + ' ' + t.split('T')[1].split('.')[0])) logger.info(
" ⎩ next update: %s"
% (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])
)
await asyncio.sleep(self.period) await asyncio.sleep(self.period)
else: else:
await asyncio.sleep(10) await asyncio.sleep(10)
logger.info(' - trying to update data again') logger.info(" - trying to update data again")

View File

@ -4,21 +4,21 @@ from models.chat import ChatUpdate, Message
from services.rediscache import redis from services.rediscache import redis
async def notify_message(message: Message, action='create'): async def notify_message(message: Message, action="create"):
channel_name = f"message:{message['chat_id']}" channel_name = f"message:{message['chat_id']}"
data = {'payload': message, 'action': action} data = {"payload": message, "action": action}
try: try:
await redis.publish(channel_name, json.dumps(data)) await redis.publish(channel_name, json.dumps(data))
print(f'[services.presence] ok {data}') print(f"[services.presence] ok {data}")
except Exception as e: except Exception as e:
print(f'Failed to publish to channel {channel_name}: {e}') print(f"Failed to publish to channel {channel_name}: {e}")
async def notify_chat(chat: ChatUpdate, member_id: int, action='create'): async def notify_chat(chat: ChatUpdate, member_id: int, action="create"):
channel_name = f'chat:{member_id}' channel_name = f"chat:{member_id}"
data = {'payload': chat, 'action': action} data = {"payload": chat, "action": action}
try: try:
await redis.publish(channel_name, json.dumps(data)) await redis.publish(channel_name, json.dumps(data))
print(f'[services.presence] ok {data}') print(f"[services.presence] ok {data}")
except Exception as e: except Exception as e:
print(f'Failed to publish to channel {channel_name}: {e}') print(f"Failed to publish to channel {channel_name}: {e}")

View File

@ -5,7 +5,7 @@ import redis.asyncio as aredis
from settings import REDIS_URL from settings import REDIS_URL
logger = logging.getLogger('[services.redis] ') logger = logging.getLogger("[services.redis] ")
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@ -25,7 +25,7 @@ class RedisCache:
async def execute(self, command, *args, **kwargs): async def execute(self, command, *args, **kwargs):
if self._client: if self._client:
try: try:
logger.debug(f'{command} {args} {kwargs}') logger.debug(f"{command} {args} {kwargs}")
r = await self._client.execute_command(command, *args, **kwargs) r = await self._client.execute_command(command, *args, **kwargs)
logger.debug(type(r)) logger.debug(type(r))
logger.debug(r) logger.debug(r)
@ -56,4 +56,4 @@ class RedisCache:
redis = RedisCache() redis = RedisCache()
__all__ = ['redis'] __all__ = ["redis"]

30
services/sentry.py Normal file
View File

@ -0,0 +1,30 @@
import sentry_sdk
from sentry_sdk.integrations.ariadne import AriadneIntegration
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.integrations.starlette import StarletteIntegration
from settings import SENTRY_DSN
def start_sentry():
# sentry monitoring
try:
sentry_sdk.init(
SENTRY_DSN,
# Set traces_sample_rate to 1.0 to capture 100%
# of transactions for performance monitoring.
traces_sample_rate=1.0,
# Set profiles_sample_rate to 1.0 to profile 100%
# of sampled transactions.
# We recommend adjusting this value in production.
profiles_sample_rate=1.0,
enable_tracing=True,
integrations=[
StarletteIntegration(),
AriadneIntegration(),
RedisIntegration(),
],
)
except Exception as e:
print("[services.sentry] init error")
print(e)

View File

@ -2,9 +2,9 @@ from os import environ
PORT = 8000 PORT = 8000
REDIS_URL = environ.get('REDIS_URL') or 'redis://127.0.0.1' REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1"
API_BASE = environ.get('API_BASE') or 'http://127.0.0.1:8001/' API_BASE = environ.get("API_BASE") or "http://127.0.0.1:8001/"
AUTH_URL = environ.get('AUTH_URL') or 'http://127.0.0.1:8080/graphql/' AUTH_URL = environ.get("AUTH_URL") or "http://127.0.0.1:8080/graphql/"
MODE = environ.get('MODE') or 'production' MODE = environ.get("MODE") or "production"
SENTRY_DSN = environ.get('SENTRY_DSN') SENTRY_DSN = environ.get("SENTRY_DSN")
DEV_SERVER_PID_FILE_NAME = 'dev-server.pid' DEV_SERVER_PID_FILE_NAME = "dev-server.pid"