precommit-setup

This commit is contained in:
Untone 2024-01-25 12:25:52 +03:00
parent 8435c8e6b5
commit 152d5a4e99
14 changed files with 320 additions and 309 deletions

View File

@ -14,11 +14,9 @@ repos:
- id: check-ast
- id: check-merge-conflict
- repo: local
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.6
hooks:
- id: lint-python
name: Lint Python
entry: poetry run ruff check .
types: [python]
language: system
pass_filenames: false
- id: ruff
args: [--fix]
- id: ruff-format

30
main.py
View File

@ -2,24 +2,23 @@ import os
from importlib import import_module
from os.path import exists
from granian import Granian
from ariadne import load_schema_from_path, make_executable_schema
from ariadne.asgi import GraphQL
from granian import Granian
from granian.server import Interfaces
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.ariadne import AriadneIntegration
from sentry_sdk.integrations.redis import RedisIntegration
from starlette.applications import Starlette
import asyncio
from services.core import CacheStorage
from services.rediscache import redis
from services.schema import resolvers
from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN
from services.core import CacheStorage
import_module("resolvers")
schema = make_executable_schema(load_schema_from_path("inbox.graphql"), resolvers) # type: ignore
import_module('resolvers')
schema = make_executable_schema(load_schema_from_path('inbox.graphql'), resolvers) # type: ignore
async def start_up():
@ -28,9 +27,9 @@ async def start_up():
await CacheStorage.init()
if MODE == "dev":
if MODE == 'dev':
if not exists(DEV_SERVER_PID_FILE_NAME):
with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f:
with open(DEV_SERVER_PID_FILE_NAME, 'w', encoding='utf-8') as f:
f.write(str(os.getpid()))
else:
# startup sentry monitoring services
@ -45,8 +44,8 @@ async def start_up():
AioHttpIntegration(),
],
)
except Exception as e:
print("STARTUP FAILED")
except Exception:
print('STARTUP FAILED')
import traceback
traceback.print_exc()
@ -57,12 +56,7 @@ async def shutdown():
app = Starlette(debug=True, on_startup=[start_up], on_shutdown=[shutdown])
app.mount("/", GraphQL(schema, debug=True))
app.mount('/', GraphQL(schema, debug=True))
if __name__ == "__main__":
Granian(
target="main:app",
port=8888,
interface=Interfaces.ASGI,
reload=True
).serve()
if __name__ == '__main__':
Granian(target='main:app', port=8888, interface=Interfaces.ASGI, reload=True).serve()

View File

@ -51,11 +51,18 @@ extend-ignore = [
'E501', # leave line length to black
'N818', # leave to us exceptions naming
'S101', # assert is fine
'RUF100', # black's noqa
]
flake8-quotes = { multiline-quotes = 'double' }
flake8-quotes = { inline-quotes = 'single', multiline-quotes = 'double' }
mccabe = { max-complexity = 13 }
target-version = "py312"
[tool.ruff.format]
quote-style = 'single'
[tool.black]
skip-string-normalization = true
[tool.ruff.isort]
combine-as-imports = true
lines-after-imports = 2
@ -64,6 +71,13 @@ known-first-party = ['resolvers', 'services', 'orm', 'tests']
[tool.ruff.per-file-ignores]
'tests/**' = ['B018', 'S110', 'S501']
[tool.mypy]
python_version = "3.12"
warn_return_any = true
warn_unused_configs = true
ignore_missing_imports = true
exclude = ["nb"]
[tool.pytest.ini_options]
asyncio_mode = 'auto'

View File

@ -3,17 +3,18 @@ from resolvers.load import load_chats, load_messages_by
from resolvers.messages import create_message, delete_message, mark_as_read, update_message
from resolvers.search import search_messages, search_recipients
__all__ = [
# inbox
"load_chats",
"load_messages_by",
"create_chat",
"delete_chat",
"update_chat",
"create_message",
"delete_message",
"update_message",
"mark_as_read",
"search_recipients",
"search_messages",
'load_chats',
'load_messages_by',
'create_chat',
'delete_chat',
'update_chat',
'create_message',
'delete_message',
'update_message',
'mark_as_read',
'search_recipients',
'search_messages',
]

View File

@ -1,7 +1,7 @@
import json
import logging
import time
import uuid
import logging
from models.chat import Chat, ChatUpdate
from services.auth import login_required
@ -9,11 +9,12 @@ from services.presence import notify_chat
from services.rediscache import redis
from services.schema import mutation
logger = logging.getLogger("[resolvers.chats] ")
logger = logging.getLogger('[resolvers.chats] ')
logger.setLevel(logging.DEBUG)
@mutation.field("update_chat")
@mutation.field('update_chat')
@login_required
async def update_chat(_, info, chat_new: ChatUpdate):
"""
@ -24,38 +25,38 @@ async def update_chat(_, info, chat_new: ChatUpdate):
:param chat_new: dict with chat data
:return: Result { error chat }
"""
logger.info("update_chat")
author_id = info.context["author_id"]
chat_id = chat_new["id"]
chat_str = await redis.execute("GET", f"chats/{chat_id}")
logger.info('update_chat')
author_id = info.context['author_id']
chat_id = chat_new['id']
chat_str = await redis.execute('GET', f'chats/{chat_id}')
if not chat_str:
return {"error": "chat not exist"}
return {'error': 'chat not exist'}
elif isinstance(chat_str, str):
chat: Chat = json.loads(chat_str)
if author_id in chat["admins"]:
if author_id in chat['admins']:
chat.update(
{
"title": chat_new.get("title", chat["title"]),
"description": chat_new.get("description", chat["description"]),
"updated_at": int(time.time()),
"admins": chat_new.get("admins", chat.get("admins") or []),
"members": chat_new.get("members", chat["members"]),
'title': chat_new.get('title', chat['title']),
'description': chat_new.get('description', chat['description']),
'updated_at': int(time.time()),
'admins': chat_new.get('admins', chat.get('admins') or []),
'members': chat_new.get('members', chat['members']),
}
)
await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat))
for member_id in chat["members"]:
await notify_chat(chat, member_id, "update")
await redis.execute('SET', f"chats/{chat['id']}", json.dumps(chat))
for member_id in chat['members']:
await notify_chat(chat, member_id, 'update')
return {"error": None, "chat": chat}
return {'error': None, 'chat': chat}
@mutation.field("create_chat")
@mutation.field('create_chat')
@login_required
async def create_chat(_, info, title="", members=None):
logger.info("create_chat")
async def create_chat(_, info, title='', members=None):
logger.info('create_chat')
members = members or []
author_id = info.context["author_id"]
author_id = info.context['author_id']
chat: Chat
if author_id:
if author_id not in members:
@ -63,51 +64,51 @@ async def create_chat(_, info, title="", members=None):
# NOTE: private chats has no title
# reuse private chat created before if exists
if len(members) == 2 and title == "":
chatset1 = await redis.execute("SMEMBERS", f"chats_by_author/{members[0]}")
chatset2 = await redis.execute("SMEMBERS", f"chats_by_author/{members[1]}")
if len(members) == 2 and title == '':
chatset1 = await redis.execute('SMEMBERS', f'chats_by_author/{members[0]}')
chatset2 = await redis.execute('SMEMBERS', f'chats_by_author/{members[1]}')
for c in chatset1.intersection(chatset2):
chat = await redis.execute("GET", f"chats/{c}")
if chat["title"] == "":
logger.info("[inbox] createChat found old chat")
return {"chat": chat, "error": "existed"}
chat = await redis.execute('GET', f'chats/{c}')
if chat['title'] == '':
logger.info('[inbox] createChat found old chat')
return {'chat': chat, 'error': 'existed'}
chat_id = str(uuid.uuid4())
chat: Chat = {
"id": chat_id,
"members": members,
"title": title,
"description": "",
"created_by": author_id,
"created_at": int(time.time()),
"updated_at": int(time.time()),
"admins": members if (len(members) == 2 and title == "") else [],
'id': chat_id,
'members': members,
'title': title,
'description': '',
'created_by': author_id,
'created_at': int(time.time()),
'updated_at': int(time.time()),
'admins': members if (len(members) == 2 and title == '') else [],
}
for member_id in members:
await redis.execute("SADD", f"chats_by_author/{member_id}", chat_id)
await notify_chat(chat, member_id, "create")
await redis.execute('SADD', f'chats_by_author/{member_id}', chat_id)
await notify_chat(chat, member_id, 'create')
print(f"\n\n[resolvers.chats] creating: {chat}\n\n")
print(f'\n\n[resolvers.chats] creating: {chat}\n\n')
await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat))
await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0))
await redis.execute('SET', f'chats/{chat_id}', json.dumps(chat))
await redis.execute('SET', f'chats/{chat_id}/next_message_id', str(0))
return {"error": None, "chat": chat}
return {"error": "no chat was created"}
return {'error': None, 'chat': chat}
return {'error': 'no chat was created'}
@mutation.field("delete_chat")
@mutation.field('delete_chat')
@login_required
async def delete_chat(_, info, chat_id: str):
logger.info("delete_chat")
author_id = info.context["author_id"]
chat_str = await redis.execute("GET", f"chats/{chat_id}")
logger.info('delete_chat')
author_id = info.context['author_id']
chat_str = await redis.execute('GET', f'chats/{chat_id}')
if isinstance(chat_str, str):
chat: Chat = json.loads(chat_str)
if author_id in chat["admins"]:
await redis.execute("DEL", f"chats/{chat_id}")
await redis.execute("SREM", f"chats_by_author/{author_id}", chat_id)
for member_id in chat["members"]:
await notify_chat(chat, member_id, "delete")
return {"error": "chat not exist"}
if author_id in chat['admins']:
await redis.execute('DEL', f'chats/{chat_id}')
await redis.execute('SREM', f'chats_by_author/{author_id}', chat_id)
for member_id in chat['members']:
await notify_chat(chat, member_id, 'delete')
return {'error': 'chat not exist'}

View File

@ -1,22 +1,22 @@
import asyncio
import json
import logging
from typing import Any, Dict, List, Optional, Union
from models.chat import ChatPayload, Message
from models.chat import ChatPayload
from resolvers.chats import create_chat
from services.auth import login_required
from services.core import CacheStorage
from services.rediscache import redis
from services.schema import query
import logging
logger = logging.getLogger("[resolvers.load] ")
logger = logging.getLogger('[resolvers.load] ')
logger.setLevel(logging.DEBUG)
async def get_unread_counter(chat_id: str, member_id: int) -> int:
unread = await redis.execute("LLEN", f"chats/{chat_id}/unread/{member_id}")
unread = await redis.execute('LLEN', f'chats/{chat_id}/unread/{member_id}')
if isinstance(unread, int):
return unread
else:
@ -24,27 +24,25 @@ async def get_unread_counter(chat_id: str, member_id: int) -> int:
# NOTE: not an API handler
async def load_messages(
chat_id: str, limit: int = 5, offset: int = 0, ids: Optional[List[int]] = None
):
async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids: Optional[List[int]] = None):
"""load :limit messages for :chat_id with :offset"""
logger.info("load_messages")
logger.info('load_messages')
messages = []
try:
message_ids = [] + (ids or [])
if limit:
mids = await redis.execute("LRANGE", f"chats/{chat_id}/message_ids", offset, offset + limit)
mids = await redis.execute('LRANGE', f'chats/{chat_id}/message_ids', offset, offset + limit)
if isinstance(mids, list):
message_ids.extend(mids)
if message_ids:
message_keys = [f"chats/{chat_id}/messages/{mid}" for mid in message_ids]
messages = await redis.execute("MGET", *message_keys)
message_keys = [f'chats/{chat_id}/messages/{mid}' for mid in message_ids]
messages = await redis.execute('MGET', *message_keys)
if isinstance(messages, list):
messages = [json.loads(m) if isinstance(m, str) else m for m in messages]
replies = []
for m in messages:
if m:
reply_to = m.get("reply_to")
reply_to = m.get('reply_to')
if reply_to:
reply_to = int(reply_to)
if reply_to not in message_ids:
@ -53,96 +51,98 @@ async def load_messages(
more_messages = await load_messages(chat_id, offset, limit, replies)
if isinstance(more_messages, list):
messages.extend(more_messages)
except Exception:
except Exception as ex:
logger.error(ex)
import traceback
traceback.print_exc()
return messages
@query.field("load_chats")
@query.field('load_chats')
@login_required
async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Union[List[Dict[str, Any]], None]]:
"""load :limit chats of current user with :offset"""
logger.info("load_chats")
author_id = info.context["author_id"]
logger.info('load_chats')
author_id = info.context['author_id']
chats = []
try:
if author_id:
logger.debug("got author", author_id)
cids = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
logger.debug("got cids", cids)
members_online = (await redis.execute("SMEMBERS", "authors-online")) or [] # to show online status
logger.debug("members online", members_online)
logger.debug('got author', author_id)
cids = await redis.execute('SMEMBERS', f'chats_by_author/{author_id}')
logger.debug('got cids', cids)
members_online = (await redis.execute('SMEMBERS', 'authors-online')) or [] # to show online status
logger.debug('members online', members_online)
if isinstance(cids, set):
# TODO: add sort by chat.created_at with in-memory caching chats service
cids = list(cids)[offset : (offset + limit)]
lock = asyncio.Lock()
if len(cids) == 0:
logger.debug(f"no chats for user with id={author_id}")
logger.debug(f'no chats for user with id={author_id}')
r = await create_chat(None, info, members=[2]) # member with id = 2 is discours
logger.debug(f"created chat: {r['chat_id']}")
cids.append(r["chat"]["id"])
cids.append(r['chat']['id'])
logger.debug(f"getting data for {len(cids)} user's chats")
for cid in cids:
async with lock:
chat_str = await redis.execute("GET", f"chats/{cid}")
chat_str = await redis.execute('GET', f'chats/{cid}')
if isinstance(chat_str, str):
logger.debug(f"redis GET by {cid}: {chat_str}")
logger.debug(f'redis GET by {cid}: {chat_str}')
c: ChatPayload = json.loads(chat_str)
c["messages"] = (await load_messages(cid, 5, 0)) or []
c["unread"] = await get_unread_counter(cid, author_id)
member_ids = c["members"].copy()
c["members"] = []
c['messages'] = (await load_messages(cid, 5, 0)) or []
c['unread'] = await get_unread_counter(cid, author_id)
member_ids = c['members'].copy()
c['members'] = []
for member_id in member_ids:
a = CacheStorage.authors_by_id.get(str(member_id))
if a:
a["online"] = a.get("id") in members_online
c["members"].append(a)
a['online'] = a.get('id') in members_online
c['members'].append(a)
else:
logger.error(f"cant find author by id {member_id}")
logger.error(f'cant find author by id {member_id}')
chats.append(c)
else:
logger.error(f"cant find chat by id {cid}")
except Exception as error:
logger.error(f'cant find chat by id {cid}')
except Exception:
import traceback
traceback.print_exc()
return {"chats": chats, "error": None}
return {'chats': chats, 'error': None}
@query.field("load_messages_by")
@query.field('load_messages_by')
@login_required
async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0):
"""load :limit messages of :chat_id with :offset"""
logger.info("load_messages_by")
author_id = info.context["author_id"]
author_chats = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
logger.info('load_messages_by')
author_id = info.context['author_id']
author_chats = await redis.execute('SMEMBERS', f'chats_by_author/{author_id}')
try:
if isinstance(author_chats, set):
author_chats = list(author_chats)
messages = []
by_chat = by.get("chat")
by_chat = by.get('chat')
if by_chat in author_chats:
chat = await redis.execute("GET", f"chats/{by_chat}")
chat = await redis.execute('GET', f'chats/{by_chat}')
if not chat:
return {"messages": [], "error": "chat not exist"}
return {'messages': [], 'error': 'chat not exist'}
# everyone's messages in filtered chat
messages = await load_messages(by_chat, limit, offset)
if isinstance(messages, list):
sorted_messages = [m for m in messages if m and m.get("created_at")]
sorted_messages = [m for m in messages if m and m.get('created_at')]
return {
"messages": sorted(
'messages': sorted(
sorted_messages,
key=lambda m: m.get("created_at"),
key=lambda m: m.get('created_at'),
),
"error": None,
'error': None,
}
except Exception as error:
except Exception as exc:
logger.error(exc)
import traceback
traceback.print_exc()
return {"error": "Cannot get messages of this chat"}
return {'error': 'Cannot get messages of this chat'}

View File

@ -1,4 +1,5 @@
import json
import logging
import time
from models.chat import Message
@ -7,163 +8,162 @@ from services.presence import notify_message
from services.rediscache import redis
from services.schema import mutation
import logging
logger = logging.getLogger("[resolvers.messages] ")
logger = logging.getLogger('[resolvers.messages] ')
logger.setLevel(logging.DEBUG)
@mutation.field("create_message")
@mutation.field('create_message')
@login_required
async def create_message(_, info, chat_id: str, body: str, reply_to=None):
"""Создание сообщения с телом :body для чата :chat_id с возможным ответом на :reply_to"""
author_id = info.context["author_id"]
author_id = info.context['author_id']
# Получение данных чата из Redis
chat_data = await redis.execute("GET", f"chats/{chat_id}")
logger.debug(f"chat data: {chat_data}")
chat_data = await redis.execute('GET', f'chats/{chat_id}')
logger.debug(f'chat data: {chat_data}')
# Если данных чата нет, возвращаем ошибку
if not chat_data:
return {"error": "chat is not exist"}
return {'error': 'chat is not exist'}
elif isinstance(chat_data, str):
# Преобразование данных чата из строки JSON в словарь
chat_dict = json.loads(chat_data)
chat_id = chat_dict["id"]
chat_id = chat_dict['id']
# Получение ID следующего сообщения
message_id = await redis.execute("GET", f"chats/{chat_dict['id']}/next_message_id")
message_id = await redis.execute('GET', f"chats/{chat_dict['id']}/next_message_id")
if isinstance(message_id, str) or isinstance(message_id, int):
message_id = int(message_id) if message_id else 0
# Создание нового сообщения
new_message: Message = {
"chat_id": chat_id,
"id": message_id,
"created_by": author_id,
"body": body,
"created_at": int(time.time()),
"updated_at": None,
"reply_to": None,
'chat_id': chat_id,
'id': message_id,
'created_by': author_id,
'body': body,
'created_at': int(time.time()),
'updated_at': None,
'reply_to': None,
}
# Если есть ответ, добавляем его в сообщение
if reply_to:
new_message["reply_to"] = reply_to
new_message['reply_to'] = reply_to
# Обновление времени последнего обновления чата
chat_dict["updated_at"] = new_message["created_at"]
chat_dict['updated_at'] = new_message['created_at']
# Запись обновленных данных чата обратно в Redis
await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat_dict))
logger.debug(f"creating message {new_message}")
await redis.execute('SET', f'chats/{chat_id}', json.dumps(chat_dict))
logger.debug(f'creating message {new_message}')
# Запись нового сообщения в Redis
await redis.execute(
"SET",
f"chats/{chat_id}/messages/{message_id}",
'SET',
f'chats/{chat_id}/messages/{message_id}',
json.dumps(new_message),
)
# Добавление ID нового сообщения в список ID сообщений чата
await redis.execute("LPUSH", f"chats/{chat_id}/message_ids", str(message_id))
await redis.execute('LPUSH', f'chats/{chat_id}/message_ids', str(message_id))
# Обновление ID следующего сообщения
await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(message_id + 1))
await redis.execute('SET', f'chats/{chat_id}/next_message_id', str(message_id + 1))
# Добавление нового сообщения в список непрочитанных сообщений для каждого участника чата
members = chat_dict["members"]
members = chat_dict['members']
for member_id in members:
await redis.execute("LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id))
await redis.execute('LPUSH', f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id))
# Отправка уведомления о новом сообщении
new_message["chat_id"] = chat_id
await notify_message(new_message, "create")
new_message['chat_id'] = chat_id
await notify_message(new_message, 'create')
return {"message": new_message, "error": None}
return {"error": "cannot create message"}
return {'message': new_message, 'error': None}
return {'error': 'cannot create message'}
@mutation.field("update_message")
@mutation.field('update_message')
@login_required
async def update_message(_, info, message):
author_id = info.context["author_id"]
chat_id = message.get("chat_id")
chat_str = ""
author_id = info.context['author_id']
chat_id = message.get('chat_id')
chat_str = ''
if chat_id:
chat_str = await redis.execute("GET", f"chats/{chat_id}")
chat_str = await redis.execute('GET', f'chats/{chat_id}')
if not chat_str:
return {"error": "chat not exist"}
return {'error': 'chat not exist'}
message_id = message.get("id")
body = message.get("body")
message_id = message.get('id')
body = message.get('body')
if message_id:
message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}")
message = await redis.execute('GET', f'chats/{chat_id}/messages/{message_id}')
if isinstance(message, str):
message = json.loads(message)
if message["created_by"] != author_id:
return {"error": "access denied"}
if message['created_by'] != author_id:
return {'error': 'access denied'}
if body:
message["body"] = body
message["updated_at"] = int(time.time())
message['body'] = body
message['updated_at'] = int(time.time())
await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message))
await redis.execute('SET', f'chats/{chat_id}/messages/{message_id}', json.dumps(message))
# Отправка уведомления
message["chat_id"] = chat_id
await notify_message(message, "update")
message['chat_id'] = chat_id
await notify_message(message, 'update')
return {"message": message, "error": None}
return {'message': message, 'error': None}
return {"message": message, "error": "cannot update"}
return {'message': message, 'error': 'cannot update'}
@mutation.field("delete_message")
@mutation.field('delete_message')
@login_required
async def delete_message(_, info, chat_id: str, message_id: int):
author_id = info.context["author_id"]
author_id = info.context['author_id']
chat_str = await redis.execute("GET", f"chats/{chat_id}")
chat_str = await redis.execute('GET', f'chats/{chat_id}')
if isinstance(chat_str, str):
chat = json.loads(chat_str)
message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}")
message_data = await redis.execute('GET', f'chats/{chat_id}/messages/{str(message_id)}')
if isinstance(message_data, str):
message: Message = json.loads(message_data)
if message["created_by"] != author_id:
return {"error": "access denied"}
if message['created_by'] != author_id:
return {'error': 'access denied'}
await redis.execute("LREM", f"chats/{chat_id}/message_ids", 0, str(message_id))
await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}")
await redis.execute('LREM', f'chats/{chat_id}/message_ids', 0, str(message_id))
await redis.execute('DEL', f'chats/{chat_id}/messages/{str(message_id)}')
members = chat["members"]
members = chat['members']
for member_id in members:
await redis.execute("LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id))
await redis.execute('LREM', f'chats/{chat_id}/unread/{member_id}', 0, str(message_id))
message["chat_id"] = chat_id
await notify_message(message, "delete")
message['chat_id'] = chat_id
await notify_message(message, 'delete')
return {}
@mutation.field("mark_as_read")
@mutation.field('mark_as_read')
@login_required
async def mark_as_read(_, info, chat_id: str, message_id: int):
author_id = info.context["author_id"]
author_id = info.context['author_id']
chat_str = await redis.execute("GET", f"chats/{chat_id}")
chat_str = await redis.execute('GET', f'chats/{chat_id}')
if isinstance(chat_str, str):
chat = json.loads(chat_str)
members = set(chat["members"])
members = set(chat['members'])
if author_id not in members:
return {"error": "access denied"}
return {'error': 'access denied'}
await redis.execute("LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id))
await redis.execute('LREM', f'chats/{chat_id}/unread/{author_id}', 0, str(message_id))
message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}")
message_data = await redis.execute('GET', f'chats/{chat_id}/messages/{str(message_id)}')
if isinstance(message_data, str):
message: Message = json.loads(message_data)
await notify_message(message, "seen")
await notify_message(message, 'seen')
return {"error": None}
return {'error': None}

View File

@ -1,4 +1,3 @@
import json
import time
from typing import Any, Dict, List, Union
@ -9,67 +8,68 @@ from services.rediscache import redis
from services.schema import query
@query.field("search_recipients")
@query.field('search_recipients')
@login_required
async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0):
result = set([])
result = set()
# TODO: maybe redis scan?
author_id = info.context["author_id"]
author_id = info.context['author_id']
existed_chats = await redis.execute("SMEMBERS", f"/chats_by_author/{author_id}")
existed_chats = await redis.execute('SMEMBERS', f'/chats_by_author/{author_id}')
if isinstance(existed_chats, set):
chats_list = list(existed_chats)
for chat_id in chats_list[offset : (offset + limit)]:
members_ids = await redis.execute("SMEMBERS", f"/chats/{chat_id}/members")
members_ids = await redis.execute('SMEMBERS', f'/chats/{chat_id}/members')
if isinstance(members_ids, set):
for member_id in members_ids:
author = CacheStorage.authors_by_id.get(str(member_id))
if author:
if author["name"].startswith(text):
if author['name'].startswith(text):
result.add(author)
more_amount = limit - len(result)
if more_amount > 0:
result.update(CacheStorage.authors[0:more_amount])
return {"members": list(result), "error": None}
return {'members': list(result), 'error': None}
@query.field("search_messages")
@query.field('search_messages')
@login_required
async def search_messages(
_, info, by: Dict[str, Union[str, int]], limit: int, offset: int
) -> Dict[str, Union[List[Dict[str, Any]], None]]:
messages_set = set([])
author_id = info.context["author_id"]
lookup_chats = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
messages_set = set()
author_id = info.context['author_id']
lookup_chats = await redis.execute('SMEMBERS', f'chats_by_author/{author_id}')
if isinstance(lookup_chats, set):
# pre-filter lookup chats
by_member = by.get("author")
by_member = by.get('author')
if by_member:
lookup_chats = filter(
lambda ca: by_member in ca["members"],
lambda ca: by_member in ca['members'],
list(lookup_chats),
)
# load the messages from lookup chats
for c in lookup_chats:
chat_id = c.decode()
filter_method = None
fltr = None
now = int(time.time())
if by_member:
filter_method = lambda mx: mx and mx["created_by"] == by_member
body_like = by.get("body") or ""
fltr = lambda mx: mx and mx['created_by'] == by_member # noqa E731
body_like = by.get('body') or ''
if isinstance(body_like, str):
filter_method = lambda mx: mx and body_like in mx["body"]
days_ago = int(by.get("days") or "0")
fltr = lambda mx: mx and body_like in mx['body'] # noqa E731
days_ago = int(by.get('days') or '0')
if days_ago:
filter_method = lambda mx: mx and (int(time.time()) - mx["created_by"] < days_ago * 24 * 60 * 60)
if filter_method:
ts = days_ago * 24 * 60 * 60
fltr = lambda mx: mx and now - mx['created_by'] < ts # noqa E731
if fltr:
mmm = await load_messages(chat_id, limit, offset)
if isinstance(mmm, list):
mmm = list(filter(filter_method, mmm))
mmm = list(filter(fltr, mmm))
messages_set |= set(mmm)
return {"messages": sorted(list(messages_set)), "error": None}
return {'messages': sorted(messages_set), 'error': None}

View File

@ -1,59 +1,60 @@
import logging
from functools import wraps
from aiohttp import ClientSession
from starlette.exceptions import HTTPException
from services.core import get_author_by_user
from settings import AUTH_URL
import logging
logger = logging.getLogger("[services.auth] ")
logger = logging.getLogger('[services.auth] ')
logger.setLevel(logging.DEBUG)
async def check_auth(req) -> str | None:
logger.debug("checking auth...")
user_id = ""
logger.debug('checking auth...')
user_id = ''
try:
token = req.headers.get("Authorization")
token = req.headers.get('Authorization')
if token:
# Logging the authentication token
query_name = "validate_jwt_token"
operation = "ValidateToken"
query_name = 'validate_jwt_token'
operation = 'ValidateToken'
headers = {
"Content-Type": "application/json",
'Content-Type': 'application/json',
}
variables = {
"params": {
"token_type": "access_token",
"token": token,
'params': {
'token_type': 'access_token',
'token': token,
}
}
gql = {
"query": f"query {operation}($params: ValidateJWTTokenInput!) {{ {query_name}(params: $params) {{ is_valid claims }} }}",
"variables": variables,
"operationName": operation,
'query': f'query {operation}($params: ValidateJWTTokenInput!) {{ {query_name}(params: $params) {{ is_valid claims }} }}',
'variables': variables,
'operationName': operation,
}
# Asynchronous HTTP request to the authentication server
async with ClientSession() as session:
async with session.post(AUTH_URL, json=gql, headers=headers) as response:
if response.status == 200:
data = await response.json()
errors = data.get("errors")
errors = data.get('errors')
if errors:
logger.error(f"{errors}")
logger.error(f'{errors}')
else:
user_id = data.get("data", {}).get(query_name, {}).get("claims", {}).get("sub")
logger.info(f"[services.auth] got user_id: {user_id}")
user_id = data.get('data', {}).get(query_name, {}).get('claims', {}).get('sub')
logger.info(f'[services.auth] got user_id: {user_id}')
return user_id
except Exception as e:
# Handling and logging exceptions during authentication check
logger.error(e)
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
raise HTTPException(status_code=401, detail='Unauthorized')
def login_required(f):
@ -61,16 +62,16 @@ def login_required(f):
async def decorated_function(*args, **kwargs):
info = args[1]
context = info.context
req = context.get("request")
req = context.get('request')
user_id = await check_auth(req)
if user_id:
context["user_id"] = user_id.strip()
context['user_id'] = user_id.strip()
author = get_author_by_user(user_id)
if author and "id" in author:
context["author_id"] = author["id"]
if author and 'id' in author:
context['author_id'] = author['id']
else:
logger.debug(author)
HTTPException(status_code=401, detail="Unauthorized")
HTTPException(status_code=401, detail='Unauthorized')
return await f(*args, **kwargs)
return decorated_function

View File

@ -1,71 +1,67 @@
from typing import List
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from typing import List
import requests
from datetime import datetime, timezone, timedelta
from models.member import ChatMember
from settings import API_BASE
import time
import logging
logger = logging.getLogger("[services.core] ")
logger = logging.getLogger('[services.core] ')
logger.setLevel(logging.DEBUG)
def _request_endpoint(query_name, body) -> dict:
ts1 = time.time()
logger.debug(f"requesting {query_name}...")
response = requests.post(API_BASE, headers={"Content-Type": "application/json"}, json=body)
ts2 = time.time()
logger.debug(f"{query_name} response in {ts1-ts2} secs: <{response.status_code}> {response.text[:32]}..")
logger.debug(f'requesting {query_name}...')
response = requests.post(API_BASE, headers={'Content-Type': 'application/json'}, json=body, timeout=30.0)
if response.status_code == 200:
try:
r = response.json()
result = r.get("data", {}).get(query_name, {})
result = r.get('data', {}).get(query_name, {})
if result:
logger.info(f"entries amount in result: {len(result)} ")
logger.info(f'entries amount in result: {len(result)} ')
return result
except ValueError as e:
logger.error(f"Error decoding JSON response: {e}")
logger.error(f'Error decoding JSON response: {e}')
return {}
def get_all_authors():
query_name = "get_authors_all"
query_name = 'get_authors_all'
gql = {
"query": "query { " + query_name + "{ id slug pic name user } }",
"variables": None,
'query': 'query { ' + query_name + '{ id slug pic name user } }',
'variables': None,
}
return _request_endpoint(query_name, gql)
def get_author_by_user(user: str):
operation = "GetAuthorId"
query_name = "get_author_id"
operation = 'GetAuthorId'
query_name = 'get_author_id'
gql = {
"query": f"query {operation}($user: String!) {{ {query_name}(user: $user){{ id }} }}",
"operationName": operation,
"variables": {"user": user.strip()},
'query': f'query {operation}($user: String!) {{ {query_name}(user: $user){{ id }} }}', # noqa E201, E202
'operationName': operation,
'variables': {'user': user.strip()},
}
return _request_endpoint(query_name, gql)
def get_my_followed() -> List[ChatMember]:
query_name = "get_my_followed"
query_name = 'get_my_followed'
gql = {
"query": "query { " + query_name + " { authors { id slug pic name } } }",
"variables": None,
'query': 'query { ' + query_name + ' { authors { id slug pic name } } }',
'variables': None,
}
result = _request_endpoint(query_name, gql)
return result.get("authors", [])
return result.get('authors', [])
class CacheStorage:
@ -89,12 +85,12 @@ class CacheStorage:
self = CacheStorage
async with self.lock:
result = get_all_authors()
logger.info(f"cache loaded {len(result)}")
logger.info(f'cache loaded {len(result)}')
if result:
CacheStorage.authors = result
for a in result:
user_id = a.get("user")
author_id = str(a.get("id"))
user_id = a.get('user')
author_id = str(a.get('id'))
self.authors_by_user[user_id] = a
self.authors_by_id[author_id] = a
@ -105,22 +101,23 @@ class CacheStorage:
self = CacheStorage
while True:
try:
logger.info(" - updating profiles data...")
logger.info(' - updating profiles data...')
await self.update_authors()
failed = 0
except Exception as er:
failed += 1
logger.error(f"{er} - update failed #{failed}, wait 10 seconds")
logger.error(f'{er} - update failed #{failed}, wait 10 seconds')
if failed > 3:
logger.error(" - not trying to update anymore")
logger.error(' - not trying to update anymore')
import traceback
traceback.print_exc()
break
if failed == 0:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat())
logger.info(" ⎩ next update: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0]))
logger.info(' ⎩ next update: %s' % (t.split('T')[0] + ' ' + t.split('T')[1].split('.')[0]))
await asyncio.sleep(self.period)
else:
await asyncio.sleep(10)
logger.info(" - trying to update data again")
logger.info(' - trying to update data again')

View File

@ -4,21 +4,21 @@ from models.chat import ChatUpdate, Message
from services.rediscache import redis
async def notify_message(message: Message, action="create"):
async def notify_message(message: Message, action='create'):
channel_name = f"message:{message['chat_id']}"
data = {"payload": message, "action": action}
data = {'payload': message, 'action': action}
try:
await redis.publish(channel_name, json.dumps(data))
print(f"[services.presence] ok {data}")
print(f'[services.presence] ok {data}')
except Exception as e:
print(f"Failed to publish to channel {channel_name}: {e}")
print(f'Failed to publish to channel {channel_name}: {e}')
async def notify_chat(chat: ChatUpdate, member_id: int, action="create"):
channel_name = f"chat:{member_id}"
data = {"payload": chat, "action": action}
async def notify_chat(chat: ChatUpdate, member_id: int, action='create'):
channel_name = f'chat:{member_id}'
data = {'payload': chat, 'action': action}
try:
await redis.publish(channel_name, json.dumps(data))
print(f"[services.presence] ok {data}")
print(f'[services.presence] ok {data}')
except Exception as e:
print(f"Failed to publish to channel {channel_name}: {e}")
print(f'Failed to publish to channel {channel_name}: {e}')

View File

@ -1,9 +1,11 @@
import logging
import redis.asyncio as aredis
from settings import REDIS_URL
import logging
logger = logging.getLogger("[services.redis] ")
logger = logging.getLogger('[services.redis] ')
logger.setLevel(logging.DEBUG)
@ -23,7 +25,7 @@ class RedisCache:
async def execute(self, command, *args, **kwargs):
if self._client:
try:
logger.debug(f"{command} {args} {kwargs}")
logger.debug(f'{command} {args} {kwargs}')
r = await self._client.execute_command(command, *args, **kwargs)
logger.debug(type(r))
logger.debug(r)
@ -51,6 +53,7 @@ class RedisCache:
return
await self._client.publish(channel, data)
redis = RedisCache()
__all__ = ["redis"]
__all__ = ['redis']

View File

@ -1,5 +1,6 @@
from ariadne import MutationType, QueryType
query = QueryType()
mutation = MutationType()

View File

@ -1,9 +1,10 @@
from os import environ
PORT = 80
REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1"
API_BASE = environ.get("API_BASE") or "https://core.discours.io/"
AUTH_URL = environ.get("AUTH_URL") or "https://auth.discours.io/"
MODE = environ.get("MODE") or "production"
SENTRY_DSN = environ.get("SENTRY_DSN")
DEV_SERVER_PID_FILE_NAME = "dev-server.pid"
REDIS_URL = environ.get('REDIS_URL') or 'redis://127.0.0.1'
API_BASE = environ.get('API_BASE') or 'https://core.discours.io/'
AUTH_URL = environ.get('AUTH_URL') or 'https://auth.discours.io/'
MODE = environ.get('MODE') or 'production'
SENTRY_DSN = environ.get('SENTRY_DSN')
DEV_SERVER_PID_FILE_NAME = 'dev-server.pid'