Compare commits
126 Commits
4500ac1d9e
...
dev
Author | SHA1 | Date | |
---|---|---|---|
![]() |
412036521d | ||
![]() |
5ad7ff8265 | ||
![]() |
0a2ca2d231 | ||
![]() |
e7b71aae7c | ||
![]() |
b003aa8083 | ||
![]() |
954acf00e5 | ||
f5425d3599 | |||
76186df2b0 | |||
3c219bfa69 | |||
41af1a7349 | |||
6a450a84c1 | |||
c96e8afc45 | |||
12c5d2677d | |||
63cf0b9fee | |||
38c0a4e3ee | |||
97aed41143 | |||
1a730a9eab | |||
fe069696d3 | |||
f2726633cd | |||
5c4f73d2ca | |||
595fa945cf | |||
12602ac57c | |||
![]() |
d292be591e | ||
![]() |
a774108b18 | ||
![]() |
dbf1d8880d | ||
2b3aa43faf | |||
3a30f8aa62 | |||
dcdd7e16af | |||
c8f65ca0c9 | |||
106f1bfbde | |||
601330246d | |||
ba193c2aae | |||
23e8310fb4 | |||
2248afe3cd | |||
b1ace7d82c | |||
7c53dc0c42 | |||
ab1e426a17 | |||
899cb05c1b | |||
![]() |
a4c1663cc7 | ||
70079c38ad | |||
152d5a4e99 | |||
8435c8e6b5 | |||
2df2f6accd | |||
87cd8de5ab | |||
fce949603e | |||
e0e1e88882 | |||
7c8b58d613 | |||
91025d453f | |||
751d91562e | |||
cccaf16817 | |||
a83ef022a3 | |||
6e8c084816 | |||
636b5446a2 | |||
bd745407bf | |||
cbd77122a5 | |||
8901ded662 | |||
9b03e625f4 | |||
80fed4049a | |||
![]() |
339b6c6457 | ||
c8d46cf863 | |||
74d80f1447 | |||
2d66870443 | |||
e7b9d419c4 | |||
ce02ce0130 | |||
e81eabd0d0 | |||
9635bb8a7c | |||
1bacbc93ee | |||
4dd419bdf9 | |||
01e8a07ae7 | |||
0188140224 | |||
2253bcf956 | |||
2380e88168 | |||
2d588c549e | |||
2658cd323b | |||
c90e68d3ae | |||
fa79164ca8 | |||
4c119abbea | |||
92791efa9c | |||
6c7f269206 | |||
b141c26e80 | |||
a0d111c50d | |||
95a237f349 | |||
531eb1f028 | |||
85a40eba83 | |||
afe96fc909 | |||
8c1f52f99b | |||
d75f31072c | |||
e2bcffdc4c | |||
6f478a1d16 | |||
c3333c41a7 | |||
65eaa7b6cb | |||
be7bf90f0b | |||
c3a6ecd3ae | |||
b2040099a8 | |||
9d1a4e90c9 | |||
93c1727be3 | |||
83fba058ab | |||
ef0f5168e1 | |||
876087c528 | |||
284a69085f | |||
b1b7bf4dc2 | |||
f15f14ecd9 | |||
6023cfb4c2 | |||
5c0621dc9b | |||
f19ff47e99 | |||
2db82eddfd | |||
b3c01abd37 | |||
35e68097d8 | |||
15bbe8eb9d | |||
0091acd0ec | |||
eb6ccdc481 | |||
f20f5adedf | |||
bc8ed964ec | |||
3b08d5d6c7 | |||
65a6d534c6 | |||
40ac53d32d | |||
a11ee74fc3 | |||
a402af4590 | |||
844f32f204 | |||
3cccf97198 | |||
bc8a07e619 | |||
517de93ccd | |||
bb48a8ef11 | |||
cc02711ab3 | |||
ab3c271a6b | |||
905d03b9ed |
@@ -14,9 +14,22 @@ jobs:
|
||||
id: repo_name
|
||||
run: echo "::set-output name=repo::$(echo ${GITHUB_REPOSITORY##*/})"
|
||||
|
||||
- name: Push to dokku
|
||||
- name: Get Branch Name
|
||||
id: branch_name
|
||||
run: echo "::set-output name=branch::$(echo ${GITHUB_REF##*/})"
|
||||
|
||||
- name: Push to dokku for main branch
|
||||
if: github.ref == 'refs/heads/main'
|
||||
uses: dokku/github-action@master
|
||||
with:
|
||||
branch: 'main'
|
||||
git_remote_url: 'ssh://dokku@staging.discours.io:22/${{ steps.repo_name.outputs.repo }}'
|
||||
git_remote_url: 'ssh://dokku@v2.discours.io:22/inbox'
|
||||
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}
|
||||
|
||||
- name: Push to dokku for staging branch
|
||||
if: github.ref == 'refs/heads/dev'
|
||||
uses: dokku/github-action@master
|
||||
with:
|
||||
branch: 'main'
|
||||
git_remote_url: 'ssh://dokku@staging.discours.io:22/inbox'
|
||||
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}
|
1
.gitignore
vendored
1
.gitignore
vendored
@@ -5,3 +5,4 @@ __pycache__
|
||||
.vscode
|
||||
poetry.lock
|
||||
.venv
|
||||
.ruff_cache
|
||||
|
20
.pre-commit-config.yaml
Normal file
20
.pre-commit-config.yaml
Normal file
@@ -0,0 +1,20 @@
|
||||
fail_fast: true
|
||||
|
||||
repos:
|
||||
- repo: https://github.com/pre-commit/pre-commit-hooks
|
||||
rev: v4.5.0
|
||||
hooks:
|
||||
- id: check-yaml
|
||||
- id: check-toml
|
||||
- id: end-of-file-fixer
|
||||
- id: trailing-whitespace
|
||||
- id: check-added-large-files
|
||||
- id: detect-private-key
|
||||
- id: check-ast
|
||||
- id: check-merge-conflict
|
||||
|
||||
- repo: https://github.com/astral-sh/ruff-pre-commit
|
||||
rev: v0.3.5
|
||||
hooks:
|
||||
- id: ruff
|
||||
args: [--fix]
|
@@ -1,11 +1,47 @@
|
||||
[0.3.2]
|
||||
- added custom logger
|
||||
- auth logix synced with core
|
||||
- added httpx
|
||||
- aiohttp and requests removed
|
||||
- core adapter loads data from redis now
|
||||
- authors cachestorage removed
|
||||
|
||||
[0.3.1]
|
||||
- glitchtip connect
|
||||
|
||||
[0.3.0]
|
||||
- versions updates
|
||||
- startup refactoring
|
||||
- auth update
|
||||
- pre-commit fix
|
||||
|
||||
[0.2.21]
|
||||
- replace uvicorn with granian
|
||||
- pre-commit hook installed
|
||||
- app.json with healthchecks used
|
||||
|
||||
[0.2.20]
|
||||
- added logger
|
||||
- typing revision
|
||||
|
||||
[0.2.19]
|
||||
- fix: stripping user_id
|
||||
|
||||
[0.2.18]
|
||||
- services: auth updated
|
||||
- services: core updated
|
||||
|
||||
|
||||
[0.2.17]
|
||||
- httpx -> aiohttp
|
||||
- sentry integrations
|
||||
|
||||
|
||||
[0.2.16]
|
||||
- resolvers: snake case queries and mutations
|
||||
- resolvers: fix auth context usage with string user_id
|
||||
|
||||
|
||||
[0.2.15]
|
||||
- chore: schema service removed
|
||||
|
||||
@@ -23,34 +59,41 @@
|
||||
- uses official redis[hiredis] async
|
||||
- datetime scalar removed
|
||||
|
||||
|
||||
[0.2.12]
|
||||
- sigil is back for test
|
||||
|
||||
|
||||
[0.2.11]
|
||||
- does not need sigil with schema stitching
|
||||
|
||||
|
||||
[0.2.10]
|
||||
- middlwares removed
|
||||
- orm removed
|
||||
- added core api connector
|
||||
- nginx.conf.sigil fixed
|
||||
|
||||
|
||||
[0.2.9]
|
||||
- starlette is back
|
||||
- auth middleware
|
||||
- create first chat with member by id = 1 if empty smembers chats_by_author/author_id
|
||||
|
||||
|
||||
[0.2.8]
|
||||
- sse removed to presence service
|
||||
- bugfixes
|
||||
- pydantic removed as not used
|
||||
|
||||
|
||||
[0.2.7]
|
||||
- search messages fix
|
||||
- context with author_id fix
|
||||
- redis pubsub new_message event announce
|
||||
- sse new_message events broadcast
|
||||
|
||||
|
||||
[0.2.6]
|
||||
- authors / members / users terms revision
|
||||
- auth service connection
|
||||
|
24
Dockerfile
24
Dockerfile
@@ -1,22 +1,12 @@
|
||||
# Use an official Python runtime as a parent image
|
||||
FROM python:slim
|
||||
|
||||
# Set the working directory in the container to /app
|
||||
FROM python:alpine3.18
|
||||
WORKDIR /app
|
||||
|
||||
# Add metadata to the image to describe that the container is listening on port 80
|
||||
EXPOSE 80
|
||||
|
||||
# Copy the current directory contents into the container at /app
|
||||
COPY . /app
|
||||
|
||||
# Install any needed packages specified in pyproject.toml
|
||||
RUN apt-get update && apt-get install -y gcc curl && \
|
||||
curl -sSL https://install.python-poetry.org | python - && \
|
||||
echo "export PATH=$PATH:/root/.local/bin" >> ~/.bashrc && \
|
||||
. ~/.bashrc && \
|
||||
poetry config virtualenvs.create false && \
|
||||
poetry install --no-dev
|
||||
RUN apk update && apk add --no-cache git gcc curl
|
||||
RUN curl -sSL https://install.python-poetry.org | python
|
||||
ENV PATH="${PATH}:/root/.local/bin"
|
||||
RUN poetry config virtualenvs.create false && poetry install --without dev --no-root
|
||||
|
||||
EXPOSE 8000
|
||||
|
||||
# Run server.py when the container launches
|
||||
CMD ["python", "server.py"]
|
6
__init__.py
Normal file
6
__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Получаем путь к корневой директории проекта
|
||||
root_path = os.path.abspath(os.path.dirname(__file__))
|
||||
sys.path.append(root_path)
|
13
app.json
Normal file
13
app.json
Normal file
@@ -0,0 +1,13 @@
|
||||
{
|
||||
"healthchecks": {
|
||||
"web": [
|
||||
{
|
||||
"type": "startup",
|
||||
"name": "web check",
|
||||
"description": "Checking if the app responds to the GET /",
|
||||
"path": "/",
|
||||
"attempts": 3
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
@@ -1,9 +1,3 @@
|
||||
# duffok 2023-12-01
|
||||
# add sdl to _Service
|
||||
type _Service {
|
||||
sdl: String
|
||||
}
|
||||
|
||||
enum MessageStatus {
|
||||
NEW
|
||||
UPDATED
|
||||
@@ -70,9 +64,9 @@ type Query {
|
||||
# inbox
|
||||
load_chats(limit: Int, offset: Int): ChatResult! # your chats
|
||||
load_messages_by(by: MessagesBy!, limit: Int, offset: Int): ChatResult!
|
||||
load_recipients(limit: Int, offset: Int): ChatResult!
|
||||
search_recipients(query: String!, limit: Int, offset: Int): ChatResult!
|
||||
search_messages(by: MessagesBy!, limit: Int, offset: Int): ChatResult!
|
||||
# _service: _Service!
|
||||
}
|
||||
|
||||
type Message {
|
||||
|
59
main.py
59
main.py
@@ -4,49 +4,38 @@ from os.path import exists
|
||||
|
||||
from ariadne import load_schema_from_path, make_executable_schema
|
||||
from ariadne.asgi import GraphQL
|
||||
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
|
||||
from sentry_sdk.integrations.ariadne import AriadneIntegration
|
||||
from sentry_sdk.integrations.redis import RedisIntegration
|
||||
from starlette.applications import Starlette
|
||||
from starlette.routing import Route
|
||||
|
||||
from services.schema import resolvers
|
||||
from services.logger import root_logger as logger
|
||||
from services.rediscache import redis
|
||||
from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN, MODE
|
||||
from services.schema import resolvers
|
||||
from services.sentry import start_sentry
|
||||
from settings import DEV_SERVER_PID_FILE_NAME, MODE
|
||||
|
||||
import_module("resolvers")
|
||||
schema = make_executable_schema(load_schema_from_path("inbox.graphql"), resolvers) # type: ignore
|
||||
schema = make_executable_schema(load_schema_from_path("inbox.graphql"), resolvers)
|
||||
|
||||
|
||||
async def start_up():
|
||||
if MODE == "dev":
|
||||
if exists(DEV_SERVER_PID_FILE_NAME):
|
||||
await redis.connect()
|
||||
return
|
||||
else:
|
||||
async def start():
|
||||
if MODE == "development":
|
||||
if not exists(DEV_SERVER_PID_FILE_NAME):
|
||||
# pid file management
|
||||
with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f:
|
||||
f.write(str(os.getpid()))
|
||||
else:
|
||||
await redis.connect()
|
||||
try:
|
||||
import sentry_sdk
|
||||
logger.info(f"process started in {MODE} mode")
|
||||
|
||||
sentry_sdk.init(
|
||||
SENTRY_DSN,
|
||||
enable_tracing=True,
|
||||
integrations=[
|
||||
AriadneIntegration(),
|
||||
RedisIntegration(),
|
||||
AioHttpIntegration(),
|
||||
|
||||
# main starlette app object with ariadne mounted in root
|
||||
app = Starlette(
|
||||
on_startup=[
|
||||
redis.connect,
|
||||
start_sentry,
|
||||
start,
|
||||
],
|
||||
)
|
||||
except Exception as e:
|
||||
print("[sentry] init error")
|
||||
print(e)
|
||||
|
||||
|
||||
async def shutdown():
|
||||
await redis.disconnect()
|
||||
|
||||
|
||||
app = Starlette(debug=True, on_startup=[start_up], on_shutdown=[shutdown])
|
||||
app.mount("/", GraphQL(schema, debug=True))
|
||||
on_shutdown=[redis.disconnect],
|
||||
debug=True,
|
||||
routes=[
|
||||
Route("/", GraphQL(schema, debug=True)),
|
||||
],
|
||||
)
|
||||
|
@@ -1,4 +1,4 @@
|
||||
from typing import TypedDict, Optional, List
|
||||
from typing import List, Optional, TypedDict
|
||||
|
||||
from models.member import ChatMember
|
||||
from models.message import Message
|
||||
@@ -25,7 +25,7 @@ class ChatPayload(TypedDict):
|
||||
created_by: int
|
||||
description: Optional[str]
|
||||
messages: Optional[List[Message]]
|
||||
unread: Optional[List[int]]
|
||||
unread: Optional[int] # counter
|
||||
|
||||
|
||||
class ChatUpdate(TypedDict):
|
||||
|
@@ -1,4 +1,4 @@
|
||||
from typing import TypedDict, Optional
|
||||
from typing import Optional, TypedDict
|
||||
|
||||
|
||||
class ChatMember(TypedDict):
|
||||
|
@@ -1,4 +1,4 @@
|
||||
from typing import TypedDict, Optional
|
||||
from typing import Optional, TypedDict
|
||||
|
||||
|
||||
class Message(TypedDict):
|
||||
|
@@ -9,10 +9,7 @@
|
||||
{{ $cors_headers_get := "if ($request_method = 'GET') { add_header 'Access-Control-Allow-Origin' '$allow_origin' always; add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS' always; add_header 'Access-Control-Allow-Headers' 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,Authorization' always; add_header 'Access-Control-Expose-Headers' 'Content-Length,Content-Range' always; add_header 'Access-Control-Allow-Credentials' 'true' always; }" }}
|
||||
|
||||
# Mapping for allowed origins
|
||||
map $http_origin $allow_origin {
|
||||
~^https?:\/\/((.*\.)?localhost(:\d+)?|discoursio-webapp(-(.*))?\.vercel\.app|(.*\.)?discours\.io)$ $http_origin;
|
||||
default "";
|
||||
}
|
||||
# is moved in Global nginx config file /etc/nginx/nginx.conf
|
||||
|
||||
# Server block setup
|
||||
{{ range $port_map := .PROXY_PORT_MAP | split " " }}
|
||||
@@ -54,23 +51,6 @@ server {
|
||||
{{ $cors_headers_get }}
|
||||
}
|
||||
|
||||
# Custom location block for /connect
|
||||
location /connect/ {
|
||||
proxy_pass http://presence-8080/;
|
||||
add_header 'Cache-Control' 'no-cache';
|
||||
add_header 'Content-Type' 'text/event-stream';
|
||||
add_header 'Connection' 'keep-alive';
|
||||
proxy_set_header Upgrade $http_upgrade;
|
||||
proxy_set_header Connection "upgrade";
|
||||
proxy_buffering off;
|
||||
proxy_cache off;
|
||||
proxy_read_timeout 36000s;
|
||||
{{ $proxy_settings }}
|
||||
{{ $cors_headers_options }}
|
||||
{{ $cors_headers_post }}
|
||||
{{ $cors_headers_get }}
|
||||
}
|
||||
|
||||
# Error pages
|
||||
|
||||
error_page 400 401 402 403 405 406 407 408 409 410 411 412 413 414 415 416 417 418 420 422 423 424 426 428 429 431 444 449 450 451 /400-error.html;
|
||||
@@ -97,7 +77,8 @@ server {
|
||||
internal;
|
||||
}
|
||||
|
||||
include /home/dokku/gateway/nginx.conf.d/*.conf;
|
||||
# include /home/dokku/gateway/nginx.conf.d/*.conf;
|
||||
include {{ $.DOKKU_ROOT }}/{{ $.APP }}/nginx.conf.d/*.conf;
|
||||
}
|
||||
{{ end }}
|
||||
|
||||
|
@@ -1,71 +1,25 @@
|
||||
[build-system]
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry]
|
||||
name = "discoursio-inbox"
|
||||
version = "0.2.15"
|
||||
version = "0.3.2"
|
||||
description = "Inbox server for discours.io"
|
||||
authors = ["Tony Rewin <anton.rewin@gmail.com>"]
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.12"
|
||||
sentry-sdk = "^1.32.0"
|
||||
redis = { extras = ["hiredis"], version = "^5.0.1" }
|
||||
ariadne = "^0.21"
|
||||
starlette = "^0.32"
|
||||
uvicorn = "^0.24"
|
||||
sentry-sdk = "^1.44.1"
|
||||
ariadne = "^0.23.0"
|
||||
starlette = "^0.37.2"
|
||||
itsdangerous = "^2.1.2"
|
||||
aiohttp = "^3.9.1"
|
||||
requests = "^2.31.0"
|
||||
granian = "^1.2.1"
|
||||
colorlog = "^6.8.2"
|
||||
httpx = "^0.27.0"
|
||||
redis = {version = "^5.0.3", extras = ["async"]}
|
||||
|
||||
[tool.poetry.dev-dependencies]
|
||||
pytest = "^7.4.2"
|
||||
black = { version = "^23.9.1", python = ">=3.12" }
|
||||
ruff = { version = "^0.1.0", python = ">=3.12" }
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
pre-commit = "^3.6.0"
|
||||
ruff = "^0.3.5"
|
||||
|
||||
[tool.black]
|
||||
line-length = 120
|
||||
target-version = ['py312']
|
||||
include = '\.pyi?$'
|
||||
exclude = '''
|
||||
|
||||
(
|
||||
/(
|
||||
\.eggs # exclude a few common directories in the
|
||||
| \.git # root of the project
|
||||
| \.hg
|
||||
| \.mypy_cache
|
||||
| \.tox
|
||||
| \.venv
|
||||
| _build
|
||||
| buck-out
|
||||
| build
|
||||
| dist
|
||||
)/
|
||||
| foo.py # also separately exclude a file named foo.py in
|
||||
# the root of the project
|
||||
)
|
||||
'''
|
||||
|
||||
[tool.isort]
|
||||
multi_line_output = 3
|
||||
include_trailing_comma = true
|
||||
force_grid_wrap = 0
|
||||
use_parentheses = true
|
||||
ensure_newline_before_comments = true
|
||||
line_length = 120
|
||||
|
||||
|
||||
[tool.ruff]
|
||||
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
|
||||
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
|
||||
# McCabe complexity (`C901`) by default.
|
||||
select = ["E4", "E7", "E9", "F"]
|
||||
ignore = []
|
||||
line-length = 120
|
||||
target-version = "py312"
|
||||
|
||||
|
||||
[tool.pyright]
|
||||
venvPath = "."
|
||||
venv = ".venv"
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
@@ -1,12 +1,12 @@
|
||||
from resolvers.chats import create_chat, delete_chat, update_chat
|
||||
from resolvers.load import load_chats, load_messages_by, load_recipients
|
||||
from resolvers.load import load_chats, load_messages_by
|
||||
from resolvers.messages import (
|
||||
create_message,
|
||||
delete_message,
|
||||
update_message,
|
||||
mark_as_read,
|
||||
update_message,
|
||||
)
|
||||
from resolvers.search import search_recipients, search_messages
|
||||
from resolvers.search import search_messages, search_recipients
|
||||
|
||||
__all__ = [
|
||||
# inbox
|
||||
@@ -19,7 +19,6 @@ __all__ = [
|
||||
"delete_message",
|
||||
"update_message",
|
||||
"mark_as_read",
|
||||
"load_recipients",
|
||||
"search_recipients",
|
||||
"search_messages",
|
||||
]
|
||||
|
@@ -1,12 +1,16 @@
|
||||
import json
|
||||
import uuid
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from models.chat import Chat, ChatUpdate
|
||||
from services.auth import login_required
|
||||
from services.presence import notify_chat
|
||||
from services.rediscache import redis
|
||||
from services.schema import mutation
|
||||
from models.chat import Chat, ChatUpdate
|
||||
from services.presence import notify_chat
|
||||
|
||||
logger = logging.getLogger("[resolvers.chats] ")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
@mutation.field("update_chat")
|
||||
@@ -20,12 +24,13 @@ async def update_chat(_, info, chat_new: ChatUpdate):
|
||||
:param chat_new: dict with chat data
|
||||
:return: Result { error chat }
|
||||
"""
|
||||
logger.info("update_chat")
|
||||
author_id = info.context["author_id"]
|
||||
chat_id = chat_new["id"]
|
||||
chat_str = await redis.execute("GET", f"chats/{chat_id}")
|
||||
if not chat_str:
|
||||
return {"error": "chat not exist"}
|
||||
else:
|
||||
elif isinstance(chat_str, str):
|
||||
chat: Chat = json.loads(chat_str)
|
||||
if author_id in chat["admins"]:
|
||||
chat.update(
|
||||
@@ -48,24 +53,26 @@ async def update_chat(_, info, chat_new: ChatUpdate):
|
||||
@mutation.field("create_chat")
|
||||
@login_required
|
||||
async def create_chat(_, info, title="", members=None):
|
||||
if members is None:
|
||||
members = []
|
||||
logger.info("create_chat")
|
||||
members = members or []
|
||||
author_id = info.context["author_id"]
|
||||
print("create_chat members: %r" % members)
|
||||
chat: Chat
|
||||
if author_id:
|
||||
if author_id not in members:
|
||||
members.append(int(author_id))
|
||||
|
||||
# NOTE: private chats has no title
|
||||
# reuse private chat created before if exists
|
||||
if len(members) == 2 and title == "":
|
||||
chatset1 = set((await redis.execute("SMEMBERS", f"chats_by_author/{members[0]}")) or [])
|
||||
chatset2 = set((await redis.execute("SMEMBERS", f"chats_by_author/{members[1]}")) or [])
|
||||
chatset1 = await redis.execute("SMEMBERS", f"chats_by_author/{members[0]}")
|
||||
chatset2 = await redis.execute("SMEMBERS", f"chats_by_author/{members[1]}")
|
||||
if isinstance(chatset1, set) and isinstance(chatset2, set):
|
||||
for c in chatset1.intersection(chatset2):
|
||||
chat_data = await redis.execute("GET", f"chats/{c}")
|
||||
if chat_data:
|
||||
chat = json.loads(chat_data)
|
||||
chat_result = await redis.execute("GET", f"chats/{c}")
|
||||
if chat_result:
|
||||
chat = json.loads(chat_result)
|
||||
if chat["title"] == "":
|
||||
print("[inbox] createChat found old chat")
|
||||
logger.info("[inbox] createChat found old chat")
|
||||
return {"chat": chat, "error": "existed"}
|
||||
|
||||
chat_id = str(uuid.uuid4())
|
||||
@@ -90,20 +97,20 @@ async def create_chat(_, info, title="", members=None):
|
||||
await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0))
|
||||
|
||||
return {"error": None, "chat": chat}
|
||||
return {"error": "no chat was created"}
|
||||
|
||||
|
||||
@mutation.field("delete_chat")
|
||||
@login_required
|
||||
async def delete_chat(_, info, chat_id: str):
|
||||
logger.info("delete_chat")
|
||||
author_id = info.context["author_id"]
|
||||
|
||||
chat_str = await redis.execute("GET", f"chats/{chat_id}")
|
||||
if chat_str:
|
||||
if isinstance(chat_str, str):
|
||||
chat: Chat = json.loads(chat_str)
|
||||
if author_id in chat["admins"]:
|
||||
await redis.execute("DEL", f"chats/{chat_id}")
|
||||
await redis.execute("SREM", f"chats_by_author/{author_id}", chat_id)
|
||||
for member_id in chat["members"]:
|
||||
await notify_chat(chat, member_id, "delete")
|
||||
else:
|
||||
return {"error": "chat not exist"}
|
||||
|
@@ -1,47 +1,63 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from models.chat import ChatPayload
|
||||
from resolvers.chats import create_chat
|
||||
from services.auth import login_required
|
||||
from services.core import get_my_followed, get_all_authors
|
||||
from services.core import get_author_by_id
|
||||
from services.rediscache import redis
|
||||
from services.schema import query
|
||||
from models.chat import Message, ChatPayload
|
||||
from models.member import ChatMember
|
||||
from resolvers.chats import create_chat
|
||||
|
||||
logger = logging.getLogger("[resolvers.load] ")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
async def get_unread_counter(chat_id: str, member_id: int) -> int:
|
||||
unread = await redis.execute("LLEN", f"chats/{chat_id}/unread/{member_id}")
|
||||
return unread or 0
|
||||
if isinstance(unread, int):
|
||||
return unread
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
# NOTE: not an API handler
|
||||
async def load_messages(
|
||||
chat_id: str, limit: int = 5, offset: int = 0, ids: Optional[List[int]] = None
|
||||
) -> List[Message | None]:
|
||||
):
|
||||
"""load :limit messages for :chat_id with :offset"""
|
||||
logger.info("load_messages")
|
||||
messages = []
|
||||
try:
|
||||
message_ids = [] + (ids or [])
|
||||
if limit:
|
||||
mids = (await redis.lrange(f"chats/{chat_id}/message_ids", offset, offset + limit)) or []
|
||||
message_ids += mids
|
||||
mids = await redis.execute(
|
||||
"LRANGE", f"chats/{chat_id}/message_ids", offset, offset + limit
|
||||
)
|
||||
if isinstance(mids, list):
|
||||
message_ids.extend(mids)
|
||||
if message_ids:
|
||||
message_keys = [f"chats/{chat_id}/messages/{mid}" for mid in message_ids]
|
||||
messages = (await redis.mget(*message_keys)) or []
|
||||
messages = [json.loads(m) if isinstance(m, str) else m for m in messages]
|
||||
messages = await redis.execute("MGET", *message_keys)
|
||||
if isinstance(messages, list):
|
||||
messages = [
|
||||
json.loads(m) if isinstance(m, str) else m for m in messages
|
||||
]
|
||||
replies = []
|
||||
for m in messages:
|
||||
if m:
|
||||
reply_to = m.get("replyTo")
|
||||
reply_to = m.get("reply_to")
|
||||
if reply_to:
|
||||
reply_to = int(reply_to)
|
||||
if reply_to not in message_ids:
|
||||
replies.append(reply_to)
|
||||
if replies:
|
||||
messages += await load_messages(chat_id, offset, limit, replies)
|
||||
except Exception:
|
||||
more_messages = await load_messages(chat_id, offset, limit, replies)
|
||||
if isinstance(more_messages, list):
|
||||
messages.extend(more_messages)
|
||||
except Exception as ex:
|
||||
logger.error(ex)
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
@@ -50,37 +66,75 @@ async def load_messages(
|
||||
|
||||
@query.field("load_chats")
|
||||
@login_required
|
||||
async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Union[List[Dict[str, Any]], None]]:
|
||||
async def load_chats(
|
||||
_, info, limit: int = 50, offset: int = 0
|
||||
) -> Dict[str, Union[List[Dict[str, Any]], None]]:
|
||||
"""load :limit chats of current user with :offset"""
|
||||
logger.info("load_chats")
|
||||
author_id = info.context["author_id"]
|
||||
cids = (await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or []
|
||||
members_online = (await redis.execute("SMEMBERS", "authors-online")) or []
|
||||
cids = list(cids)[offset : (offset + limit)]
|
||||
chats = []
|
||||
try:
|
||||
if author_id:
|
||||
logger.debug(f"got author {author_id}")
|
||||
cids = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
|
||||
logger.debug(f"got cids {cids}")
|
||||
members_online = (
|
||||
await redis.execute("SMEMBERS", "authors-online")
|
||||
) or [] # to show online status
|
||||
logger.debug(f"members online: {members_online}")
|
||||
if isinstance(cids, set):
|
||||
# TODO: add sort by chat.created_at with in-memory caching chats service
|
||||
cids = list(cids)[offset : (offset + limit)]
|
||||
lock = asyncio.Lock()
|
||||
if len(cids) == 0:
|
||||
print(f"[resolvers.load] no chats for user with id={author_id}")
|
||||
r = await create_chat(None, info, members=[2]) # member with id = 2 is discours
|
||||
print(f"[resolvers.load] created chat: {r['chat_id']}")
|
||||
cids.append(r["chat"]["id"])
|
||||
all_authors: List[ChatMember] = await get_all_authors()
|
||||
authors = {a["id"]: a for a in all_authors}
|
||||
logger.debug(f"no chats for user with id={author_id}")
|
||||
r = await create_chat(
|
||||
None, info, members=[2]
|
||||
) # member with id = 2 is discours
|
||||
if (
|
||||
isinstance(r, dict)
|
||||
and "chat" in r
|
||||
and isinstance(r["chat"], dict)
|
||||
):
|
||||
chat_id = r["chat"].get("id")
|
||||
if chat_id:
|
||||
logger.debug(f"created chat: {chat_id}")
|
||||
cids.append(chat_id)
|
||||
|
||||
logger.debug(f"getting data for {len(cids)} user's chats")
|
||||
for cid in cids:
|
||||
async with lock:
|
||||
chat_str = await redis.execute("GET", f"chats/{cid}")
|
||||
print(f"[resolvers.load] redis GET by {cid}: {chat_str}")
|
||||
if chat_str:
|
||||
if isinstance(chat_str, str):
|
||||
logger.debug(f"redis GET by {cid}: {chat_str}")
|
||||
c: ChatPayload = json.loads(chat_str)
|
||||
c["messages"] = await load_messages(cid, 5, 0)
|
||||
c["messages"] = (await load_messages(cid, 5, 0)) or []
|
||||
c["unread"] = await get_unread_counter(cid, author_id)
|
||||
member_ids = c["members"].copy()
|
||||
c["members"] = []
|
||||
for member_id in member_ids:
|
||||
a = authors.get(member_id)
|
||||
if isinstance(member_id, int):
|
||||
a = await get_author_by_id(int(member_id))
|
||||
if a:
|
||||
a["online"] = a.get("id") in members_online
|
||||
c["members"].append(a)
|
||||
else:
|
||||
logger.error(
|
||||
f"cant find author by id {member_id}"
|
||||
)
|
||||
elif (
|
||||
"members" in member_id
|
||||
and member_id not in c["members"]
|
||||
):
|
||||
c["members"].append(member_id)
|
||||
chats.append(c)
|
||||
|
||||
else:
|
||||
logger.error(f"cant find chat by id {cid}")
|
||||
except Exception:
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
return {"chats": chats, "error": None}
|
||||
|
||||
|
||||
@@ -88,45 +142,33 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Uni
|
||||
@login_required
|
||||
async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0):
|
||||
"""load :limit messages of :chat_id with :offset"""
|
||||
logger.info("load_messages_by")
|
||||
author_id = info.context["author_id"]
|
||||
user_chats = (await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id))) or []
|
||||
user_chats = [c for c in user_chats]
|
||||
if user_chats:
|
||||
author_chats = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
|
||||
try:
|
||||
if isinstance(author_chats, set):
|
||||
author_chats = list(author_chats)
|
||||
messages = []
|
||||
by_chat = by.get("chat")
|
||||
if by_chat in user_chats:
|
||||
if by_chat in author_chats:
|
||||
chat = await redis.execute("GET", f"chats/{by_chat}")
|
||||
if not chat:
|
||||
return {"messages": [], "error": "chat not exist"}
|
||||
# everyone's messages in filtered chat
|
||||
messages = await load_messages(by_chat, limit, offset)
|
||||
if isinstance(messages, list):
|
||||
sorted_messages = [m for m in messages if m and m.get("created_at")]
|
||||
return {
|
||||
"messages": sorted(
|
||||
[m for m in messages if m.get("created_at")],
|
||||
sorted_messages,
|
||||
key=lambda m: m.get("created_at"),
|
||||
),
|
||||
"error": None,
|
||||
}
|
||||
else:
|
||||
return {"error": "Cannot access messages of this chat"}
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(exc)
|
||||
import traceback
|
||||
|
||||
@query.field("load_recipients")
|
||||
async def load_recipients(_, _info, limit=50, offset=0):
|
||||
"""load possible chat participants"""
|
||||
onliners = (await redis.execute("SMEMBERS", "authors-online")) or []
|
||||
r = []
|
||||
all_authors: List[ChatMember] = await get_all_authors()
|
||||
my_followings: List[ChatMember] = await get_my_followed()
|
||||
if all_authors:
|
||||
if len(my_followings) < limit:
|
||||
my_followings = my_followings + all_authors[0 : limit - len(my_followings)]
|
||||
for a in my_followings:
|
||||
a["online"] = a["id"] in onliners
|
||||
r.append(a)
|
||||
|
||||
# NOTE: maybe sort members here
|
||||
|
||||
print(f"[resolvers.load] loadRecipients found {len(r)} members")
|
||||
|
||||
return {"members": r, "error": None}
|
||||
traceback.print_exc()
|
||||
return {"error": "Cannot get messages of this chat"}
|
||||
|
@@ -1,11 +1,15 @@
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
|
||||
from models.chat import Message
|
||||
from services.auth import login_required
|
||||
from services.presence import notify_message
|
||||
from services.rediscache import redis
|
||||
from services.schema import mutation
|
||||
from models.chat import Message
|
||||
|
||||
logger = logging.getLogger("[resolvers.messages] ")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
@mutation.field("create_message")
|
||||
@@ -16,20 +20,22 @@ async def create_message(_, info, chat_id: str, body: str, reply_to=None):
|
||||
|
||||
# Получение данных чата из Redis
|
||||
chat_data = await redis.execute("GET", f"chats/{chat_id}")
|
||||
print(f"[resolvers.messages] debug chat data: {chat_data}")
|
||||
logger.debug(f"chat data: {chat_data}")
|
||||
|
||||
# Если данных чата нет, возвращаем ошибку
|
||||
if not chat_data:
|
||||
return {"error": "chat is not exist"}
|
||||
else:
|
||||
elif isinstance(chat_data, str):
|
||||
# Преобразование данных чата из строки JSON в словарь
|
||||
chat_dict = json.loads(chat_data)
|
||||
print(chat_dict)
|
||||
chat_id = chat_dict["id"]
|
||||
|
||||
# Получение ID следующего сообщения
|
||||
message_id = await redis.execute("GET", f"chats/{chat_dict['id']}/next_message_id")
|
||||
message_id = await redis.execute(
|
||||
"GET", f"chats/{chat_dict['id']}/next_message_id"
|
||||
)
|
||||
if isinstance(message_id, str) or isinstance(message_id, int):
|
||||
message_id = int(message_id) if message_id else 0
|
||||
chat_id = chat_dict["id"]
|
||||
# Создание нового сообщения
|
||||
new_message: Message = {
|
||||
"chat_id": chat_id,
|
||||
@@ -38,6 +44,7 @@ async def create_message(_, info, chat_id: str, body: str, reply_to=None):
|
||||
"body": body,
|
||||
"created_at": int(time.time()),
|
||||
"updated_at": None,
|
||||
"reply_to": None,
|
||||
}
|
||||
|
||||
# Если есть ответ, добавляем его в сообщение
|
||||
@@ -49,7 +56,7 @@ async def create_message(_, info, chat_id: str, body: str, reply_to=None):
|
||||
|
||||
# Запись обновленных данных чата обратно в Redis
|
||||
await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat_dict))
|
||||
print(f"[inbox] creating message {new_message}")
|
||||
logger.debug(f"creating message {new_message}")
|
||||
|
||||
# Запись нового сообщения в Redis
|
||||
await redis.execute(
|
||||
@@ -59,21 +66,30 @@ async def create_message(_, info, chat_id: str, body: str, reply_to=None):
|
||||
)
|
||||
|
||||
# Добавление ID нового сообщения в список ID сообщений чата
|
||||
await redis.execute("LPUSH", f"chats/{chat_id}/message_ids", str(message_id))
|
||||
await redis.execute(
|
||||
"LPUSH", f"chats/{chat_id}/message_ids", str(message_id)
|
||||
)
|
||||
|
||||
# Обновление ID следующего сообщения
|
||||
await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(message_id + 1))
|
||||
await redis.execute(
|
||||
"SET", f"chats/{chat_id}/next_message_id", str(message_id + 1)
|
||||
)
|
||||
|
||||
# Добавление нового сообщения в список непрочитанных сообщений для каждого участника чата
|
||||
members = chat_dict["members"]
|
||||
for member_id in members:
|
||||
await redis.execute("LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id))
|
||||
await redis.execute(
|
||||
"LPUSH",
|
||||
f"chats/{chat_dict['id']}/unread/{member_id}",
|
||||
str(message_id),
|
||||
)
|
||||
|
||||
# Отправка уведомления о новом сообщении
|
||||
new_message["chat_id"] = chat_id
|
||||
await notify_message(new_message, "create")
|
||||
|
||||
return {"message": new_message, "error": None}
|
||||
return {"error": "cannot create message"}
|
||||
|
||||
|
||||
@mutation.field("update_message")
|
||||
@@ -92,9 +108,7 @@ async def update_message(_, info, message):
|
||||
|
||||
if message_id:
|
||||
message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}")
|
||||
if not message:
|
||||
return {"error": "message not exist"}
|
||||
|
||||
if isinstance(message, str):
|
||||
message = json.loads(message)
|
||||
if message["created_by"] != author_id:
|
||||
return {"error": "access denied"}
|
||||
@@ -103,15 +117,17 @@ async def update_message(_, info, message):
|
||||
message["body"] = body
|
||||
message["updated_at"] = int(time.time())
|
||||
|
||||
await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message))
|
||||
await redis.execute(
|
||||
"SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message)
|
||||
)
|
||||
|
||||
# Отправка уведомления
|
||||
message["chat_id"] = chat_id
|
||||
await notify_message(message, "update")
|
||||
|
||||
return {"message": message, "error": None}
|
||||
else:
|
||||
return {"message": message, "error": "cannot update, no message_id"}
|
||||
|
||||
return {"message": message, "error": "cannot update"}
|
||||
|
||||
|
||||
@mutation.field("delete_message")
|
||||
@@ -120,23 +136,26 @@ async def delete_message(_, info, chat_id: str, message_id: int):
|
||||
author_id = info.context["author_id"]
|
||||
|
||||
chat_str = await redis.execute("GET", f"chats/{chat_id}")
|
||||
if not chat_str:
|
||||
return {"error": "chat not exist"}
|
||||
if isinstance(chat_str, str):
|
||||
chat = json.loads(chat_str)
|
||||
|
||||
message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}")
|
||||
if not message_data:
|
||||
return {"error": "message not exist"}
|
||||
message_data = await redis.execute(
|
||||
"GET", f"chats/{chat_id}/messages/{str(message_id)}"
|
||||
)
|
||||
if isinstance(message_data, str):
|
||||
message: Message = json.loads(message_data)
|
||||
if message["author"] != author_id:
|
||||
if message["created_by"] != author_id:
|
||||
return {"error": "access denied"}
|
||||
|
||||
await redis.execute("LREM", f"chats/{chat_id}/message_ids", 0, str(message_id))
|
||||
await redis.execute(
|
||||
"LREM", f"chats/{chat_id}/message_ids", 0, str(message_id)
|
||||
)
|
||||
await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}")
|
||||
|
||||
members = chat["members"]
|
||||
for member_id in members:
|
||||
await redis.execute("LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id))
|
||||
await redis.execute(
|
||||
"LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id)
|
||||
)
|
||||
|
||||
message["chat_id"] = chat_id
|
||||
await notify_message(message, "delete")
|
||||
@@ -150,19 +169,20 @@ async def mark_as_read(_, info, chat_id: str, message_id: int):
|
||||
author_id = info.context["author_id"]
|
||||
|
||||
chat_str = await redis.execute("GET", f"chats/{chat_id}")
|
||||
if not chat_str:
|
||||
return {"error": "chat not exist"}
|
||||
|
||||
if isinstance(chat_str, str):
|
||||
chat = json.loads(chat_str)
|
||||
members = set(chat["members"])
|
||||
if author_id not in members:
|
||||
return {"error": "access denied"}
|
||||
|
||||
await redis.execute("LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id))
|
||||
await redis.execute(
|
||||
"LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id)
|
||||
)
|
||||
|
||||
message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}")
|
||||
if not message_data:
|
||||
return {"error": "message not exist"}
|
||||
message_data = await redis.execute(
|
||||
"GET", f"chats/{chat_id}/messages/{str(message_id)}"
|
||||
)
|
||||
if isinstance(message_data, str):
|
||||
message: Message = json.loads(message_data)
|
||||
|
||||
await notify_message(message, "seen")
|
||||
|
@@ -1,10 +1,9 @@
|
||||
import json
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from typing import Dict, Union, List, Any
|
||||
import time
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
from resolvers.load import load_messages
|
||||
from services.auth import login_required
|
||||
from services.core import get_all_authors
|
||||
from services.core import get_author_by_id
|
||||
from services.rediscache import redis
|
||||
from services.schema import query
|
||||
|
||||
@@ -12,27 +11,24 @@ from services.schema import query
|
||||
@query.field("search_recipients")
|
||||
@login_required
|
||||
async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0):
|
||||
result = []
|
||||
result = set()
|
||||
|
||||
# TODO: maybe redis scan?
|
||||
|
||||
author_id = info.context["author_id"]
|
||||
|
||||
existed_chats = await redis.execute("SMEMBERS", f"/chats_by_author/{author_id}")
|
||||
authors = await get_all_authors()
|
||||
members = {a["id"]: a for a in authors}
|
||||
if existed_chats:
|
||||
for chat_id in list(json.loads(existed_chats))[offset : (offset + limit)]:
|
||||
members_ids = await redis.execute("GET", f"/chats/{chat_id}/members")
|
||||
if isinstance(existed_chats, set):
|
||||
chats_list = list(existed_chats)
|
||||
for chat_id in chats_list[offset : (offset + limit)]:
|
||||
members_ids = await redis.execute("SMEMBERS", f"/chats/{chat_id}/members")
|
||||
if isinstance(members_ids, set):
|
||||
for member_id in members_ids:
|
||||
author = members.get(member_id)
|
||||
author = await get_author_by_id(member_id)
|
||||
if author:
|
||||
if author["name"].startswith(text):
|
||||
if author not in result:
|
||||
result.append(author)
|
||||
result.add(author)
|
||||
|
||||
more_amount = limit - len(result)
|
||||
if more_amount > 0:
|
||||
result += authors[0:more_amount]
|
||||
return {"members": list(result), "error": None}
|
||||
|
||||
|
||||
@@ -41,15 +37,12 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0
|
||||
async def search_messages(
|
||||
_, info, by: Dict[str, Union[str, int]], limit: int, offset: int
|
||||
) -> Dict[str, Union[List[Dict[str, Any]], None]]:
|
||||
messages_set = set()
|
||||
author_id = info.context["author_id"]
|
||||
lookup_chats = set((await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or [])
|
||||
messages_set = set([])
|
||||
|
||||
by_member = by.get("author")
|
||||
body_like = by.get("body")
|
||||
days_ago = by.get("days")
|
||||
|
||||
lookup_chats = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
|
||||
if isinstance(lookup_chats, set):
|
||||
# pre-filter lookup chats
|
||||
by_member = by.get("author")
|
||||
if by_member:
|
||||
lookup_chats = filter(
|
||||
lambda ca: by_member in ca["members"],
|
||||
@@ -59,21 +52,21 @@ async def search_messages(
|
||||
# load the messages from lookup chats
|
||||
for c in lookup_chats:
|
||||
chat_id = c.decode()
|
||||
mmm = await load_messages(chat_id, limit, offset)
|
||||
fltr = None
|
||||
now = int(time.time())
|
||||
if by_member:
|
||||
mmm = list(filter(lambda mx: mx["author"] == by_member, mmm))
|
||||
if body_like:
|
||||
mmm = list(filter(lambda mx: body_like in mx["body"], mmm))
|
||||
fltr = lambda mx: mx and mx["created_by"] == by_member # noqa E731
|
||||
body_like = by.get("body") or ""
|
||||
if isinstance(body_like, str):
|
||||
fltr = lambda mx: mx and body_like in mx["body"] # noqa E731
|
||||
days_ago = int(by.get("days") or "0")
|
||||
if days_ago:
|
||||
mmm = list(
|
||||
filter(
|
||||
lambda msg: int(datetime.now(tz=timezone.utc)) - int(msg["created_at"])
|
||||
< int(timedelta(days=days_ago)),
|
||||
mmm,
|
||||
)
|
||||
)
|
||||
ts = days_ago * 24 * 60 * 60
|
||||
fltr = lambda mx: mx and now - mx["created_by"] < ts # noqa E731
|
||||
if fltr:
|
||||
mmm = await load_messages(chat_id, limit, offset)
|
||||
if isinstance(mmm, list):
|
||||
mmm = list(filter(fltr, mmm))
|
||||
messages_set |= set(mmm)
|
||||
|
||||
messages_set.union(set(mmm))
|
||||
|
||||
messages_sorted = sorted(list(messages_set))
|
||||
return {"messages": messages_sorted, "error": None}
|
||||
return {"messages": sorted(messages_set), "error": None}
|
||||
|
68
server.py
68
server.py
@@ -1,59 +1,17 @@
|
||||
import sys
|
||||
|
||||
import uvicorn
|
||||
from uvicorn.main import logger
|
||||
from granian.constants import Interfaces
|
||||
from granian.server import Granian
|
||||
|
||||
from settings import PORT
|
||||
|
||||
log_settings = {
|
||||
"version": 1,
|
||||
"disable_existing_loggers": True,
|
||||
"formatters": {
|
||||
"default": {
|
||||
"()": "uvicorn.logging.DefaultFormatter",
|
||||
"fmt": "%(levelprefix)s %(message)s",
|
||||
"use_colors": None,
|
||||
},
|
||||
"access": {
|
||||
"()": "uvicorn.logging.AccessFormatter",
|
||||
"fmt": '%(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s',
|
||||
},
|
||||
},
|
||||
"handlers": {
|
||||
"default": {
|
||||
"formatter": "default",
|
||||
"class": "logging.StreamHandler",
|
||||
"stream": "ext://sys.stderr",
|
||||
},
|
||||
"access": {
|
||||
"formatter": "access",
|
||||
"class": "logging.StreamHandler",
|
||||
"stream": "ext://sys.stdout",
|
||||
},
|
||||
},
|
||||
"loggers": {
|
||||
"uvicorn": {"handlers": ["default"], "level": "INFO"},
|
||||
"uvicorn.error": {"level": "INFO", "handlers": ["default"], "propagate": True},
|
||||
"uvicorn.access": {"handlers": ["access"], "level": "INFO", "propagate": False},
|
||||
},
|
||||
}
|
||||
|
||||
local_headers = [
|
||||
("Access-Control-Allow-Methods", "GET, POST, OPTIONS, HEAD"),
|
||||
("Access-Control-Allow-Origin", "https://localhost:3000"),
|
||||
(
|
||||
"Access-Control-Allow-Headers",
|
||||
"DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,Authorization",
|
||||
),
|
||||
("Access-Control-Expose-Headers", "Content-Length,Content-Range"),
|
||||
("Access-Control-Allow-Credentials", "true"),
|
||||
]
|
||||
|
||||
|
||||
def exception_handler(_et, exc, _tb):
|
||||
logger.error(..., exc_info=(type(exc), exc, exc.__traceback__))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.excepthook = exception_handler
|
||||
uvicorn.run("main:app", host="0.0.0.0", port=PORT, proxy_headers=True, server_header=True)
|
||||
print("[server] starting...")
|
||||
|
||||
granian_instance = Granian(
|
||||
"main:app",
|
||||
address="0.0.0.0", # noqa S104
|
||||
port=PORT,
|
||||
threads=2,
|
||||
websockets=False,
|
||||
interface=Interfaces.ASGI,
|
||||
)
|
||||
granian_instance.serve()
|
||||
|
@@ -1,36 +1,57 @@
|
||||
from functools import wraps
|
||||
import aiohttp
|
||||
|
||||
from services.core import get_author
|
||||
import httpx
|
||||
from starlette.exceptions import HTTPException
|
||||
|
||||
from services.core import get_author_by_user
|
||||
from services.logger import root_logger as logger
|
||||
from settings import AUTH_URL
|
||||
|
||||
|
||||
async def request_data(gql, headers=None):
|
||||
if headers is None:
|
||||
headers = {"Content-Type": "application/json"}
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(AUTH_URL, json=gql, headers=headers)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
errors = data.get("errors")
|
||||
if errors:
|
||||
logger.error(f"HTTP Errors: {errors}")
|
||||
else:
|
||||
return data
|
||||
except Exception as e:
|
||||
# Handling and logging exceptions during authentication check
|
||||
logger.error(f"request_data error: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def check_auth(req):
|
||||
token = req.headers.get("Authorization")
|
||||
headers = {"Authorization": token, "Content-Type": "application/json"} # "Bearer " + removed
|
||||
print(f"[services.auth] checking auth token: {token}")
|
||||
|
||||
query_name = "session"
|
||||
query_type = "query"
|
||||
operation = "GetUserId"
|
||||
user_id = ""
|
||||
user_roles = []
|
||||
if token:
|
||||
# Logging the authentication token
|
||||
logger.debug(f"{token}")
|
||||
query_name = "validate_jwt_token"
|
||||
operation = "ValidateToken"
|
||||
variables = {"params": {"token_type": "access_token", "token": token}}
|
||||
|
||||
gql = {
|
||||
"query": query_type + " " + operation + " { " + query_name + " { user { id } } }",
|
||||
"query": f"query {operation}($params: ValidateJWTTokenInput!) {{"
|
||||
+ f"{query_name}(params: $params) {{ is_valid claims }} "
|
||||
+ "}",
|
||||
"variables": variables,
|
||||
"operationName": operation,
|
||||
"variables": None,
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30.0)) as session:
|
||||
async with session.post(AUTH_URL, headers=headers, json=gql) as response:
|
||||
print(f"[services.auth] {AUTH_URL} response: {response.status}")
|
||||
if response.status != 200:
|
||||
return False, None
|
||||
r = await response.json()
|
||||
if r:
|
||||
user_id = r.get("data", {}).get(query_name, {}).get("user", {}).get("id", None)
|
||||
is_authenticated = user_id is not None
|
||||
return is_authenticated, user_id
|
||||
return False, None
|
||||
data = await request_data(gql)
|
||||
if data:
|
||||
logger.debug(data)
|
||||
user_data = data.get("data", {}).get(query_name, {}).get("claims", {})
|
||||
user_id = user_data.get("sub", "")
|
||||
user_roles = user_data.get("allowed_roles", [])
|
||||
return user_id, user_roles
|
||||
|
||||
|
||||
def login_required(f):
|
||||
@@ -39,15 +60,31 @@ def login_required(f):
|
||||
info = args[1]
|
||||
context = info.context
|
||||
req = context.get("request")
|
||||
is_authenticated, user_id = await check_auth(req)
|
||||
if not is_authenticated:
|
||||
raise Exception("You are not logged in")
|
||||
else:
|
||||
# Добавляем author_id и user_id в контекст
|
||||
context["author_id"] = await get_author(user_id)
|
||||
context["user_id"] = user_id
|
||||
|
||||
# Если пользователь аутентифицирован, выполняем резолвер
|
||||
user_id, user_roles = await check_auth(req)
|
||||
if user_id and isinstance(user_id, str):
|
||||
context["user_id"] = user_id.strip()
|
||||
author = await get_author_by_user(user_id)
|
||||
if author and "id" in author:
|
||||
context["author_id"] = author["id"]
|
||||
return await f(*args, **kwargs)
|
||||
else:
|
||||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||||
|
||||
return decorated_function
|
||||
|
||||
|
||||
def auth_request(f):
|
||||
@wraps(f)
|
||||
async def decorated_function(*args, **kwargs):
|
||||
req = args[0]
|
||||
user_id, user_roles = await check_auth(req)
|
||||
if user_id and isinstance(user_id, str):
|
||||
user_id = user_id.strip()
|
||||
author = await get_author_by_user(user_id)
|
||||
if author and "id" in author:
|
||||
req["author_id"] = author["id"]
|
||||
return await f(*args, **kwargs)
|
||||
else:
|
||||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||||
|
||||
return decorated_function
|
||||
|
108
services/core.py
108
services/core.py
@@ -1,89 +1,51 @@
|
||||
import aiohttp
|
||||
from settings import API_BASE
|
||||
from typing import List, Any
|
||||
from models.member import ChatMember
|
||||
import json
|
||||
import logging
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
from services.logger import root_logger as logger
|
||||
from services.rediscache import redis
|
||||
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
async def _request_endpoint(query_name, body) -> Any:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
try:
|
||||
async with session.post(API_BASE, headers=headers, json=body) as response:
|
||||
print(f"[services.core] {query_name}: [{response.status}] {len(await response.text())} bytes")
|
||||
if response.status != 200:
|
||||
return []
|
||||
r = await response.json()
|
||||
if r:
|
||||
return r.get("data", {}).get(query_name, {})
|
||||
else:
|
||||
raise Exception("json response error")
|
||||
except Exception:
|
||||
import traceback
|
||||
async def get_all_authors():
|
||||
authors = []
|
||||
redis_key = "user:*"
|
||||
|
||||
traceback.print_exc()
|
||||
result = await redis.execute("GET", redis_key)
|
||||
if isinstance(result, str):
|
||||
authors = json.loads(result)
|
||||
|
||||
return authors
|
||||
|
||||
|
||||
async def get_all_authors() -> List[ChatMember]:
|
||||
query_name = "authorsAll"
|
||||
query_type = "query"
|
||||
operation = "AuthorsAll"
|
||||
query_fields = "id slug pic name"
|
||||
async def get_author_by_user(user: str):
|
||||
author = None
|
||||
redis_key = f"user:{user}"
|
||||
|
||||
gql = {
|
||||
"query": query_type + " " + operation + " { " + query_name + " { " + query_fields + " } " + " }",
|
||||
"operationName": operation,
|
||||
"variables": None,
|
||||
}
|
||||
result = await redis.execute("GET", redis_key)
|
||||
if isinstance(result, str):
|
||||
author = json.loads(result)
|
||||
|
||||
return _request_endpoint(query_name, gql)
|
||||
return author
|
||||
|
||||
|
||||
async def get_my_followed() -> List[ChatMember]:
|
||||
query_name = "get_my_followed"
|
||||
query_type = "query"
|
||||
operation = "GetMyFollowed"
|
||||
query_fields = "id slug pic name"
|
||||
async def get_author_by_id(author_id: int):
|
||||
author = None
|
||||
redis_key = f"author:{author_id}"
|
||||
|
||||
gql = {
|
||||
"query": query_type + " " + operation + " { " + query_name + " { authors {" + query_fields + "} } " + " }",
|
||||
"operationName": operation,
|
||||
"variables": None,
|
||||
}
|
||||
result = await redis.execute("GET", redis_key)
|
||||
if isinstance(result, str):
|
||||
author = json.loads(result)
|
||||
|
||||
async with AsyncClient() as client:
|
||||
try:
|
||||
response = await client.post(API_BASE, headers=headers, json=gql)
|
||||
print(f"[services.core] {query_name}: [{response.status_code}] {len(response.text)} bytes")
|
||||
if response.status_code != 200:
|
||||
return []
|
||||
r = response.json()
|
||||
if r:
|
||||
return r.get("data", {}).get(query_name, {}).get("authors", [])
|
||||
except Exception:
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
return []
|
||||
return author
|
||||
|
||||
|
||||
async def get_author(author_id: int = None, slug: str = "", user: str = ""):
|
||||
query_name = "get_author(author_id: $author_id, slug: $slug, user: $user)"
|
||||
query_type = "query"
|
||||
operation = "GetAuthor($author_id: Int, $slug: String, $user: String)"
|
||||
query_fields = "id slug pic name"
|
||||
vars = {}
|
||||
if author_id:
|
||||
vars["author_id"] = author_id
|
||||
elif slug:
|
||||
vars["slug"] = slug
|
||||
elif user:
|
||||
vars["user"] = user
|
||||
async def get_author_followed(author_id: int):
|
||||
authors = []
|
||||
redis_key = f"author:{author_id}:follows-authors"
|
||||
|
||||
gql = {
|
||||
"query": query_type + " " + operation + " { " + query_name + " { " + query_fields + "} " + " }",
|
||||
"operationName": operation,
|
||||
"variables": None if vars == {} else vars,
|
||||
}
|
||||
result = await redis.execute("GET", redis_key)
|
||||
if isinstance(result, str):
|
||||
authors = json.loads(result)
|
||||
|
||||
return await _request_endpoint(query_name, gql)
|
||||
return authors
|
||||
|
81
services/logger.py
Normal file
81
services/logger.py
Normal file
@@ -0,0 +1,81 @@
|
||||
import logging
|
||||
|
||||
import colorlog
|
||||
|
||||
# Define the color scheme
|
||||
color_scheme = {
|
||||
"DEBUG": "light_black",
|
||||
"INFO": "green",
|
||||
"WARNING": "yellow",
|
||||
"ERROR": "red",
|
||||
"CRITICAL": "red,bg_white",
|
||||
}
|
||||
|
||||
# Define secondary log colors
|
||||
secondary_colors = {
|
||||
"log_name": {"DEBUG": "blue"},
|
||||
"asctime": {"DEBUG": "cyan"},
|
||||
"process": {"DEBUG": "purple"},
|
||||
"module": {"DEBUG": "light_black,bg_blue"},
|
||||
"funcName": {"DEBUG": "light_white,bg_blue"}, # Add this line
|
||||
}
|
||||
|
||||
# Define the log format string
|
||||
fmt_string = "%(log_color)s%(levelname)s: %(log_color)s[%(module)s.%(funcName)s]%(reset)s %(white)s%(message)s"
|
||||
|
||||
# Define formatting configuration
|
||||
fmt_config = {
|
||||
"log_colors": color_scheme,
|
||||
"secondary_log_colors": secondary_colors,
|
||||
"style": "%",
|
||||
"reset": True,
|
||||
}
|
||||
|
||||
|
||||
class MultilineColoredFormatter(colorlog.ColoredFormatter):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.log_colors = kwargs.pop("log_colors", {})
|
||||
self.secondary_log_colors = kwargs.pop("secondary_log_colors", {})
|
||||
|
||||
def format(self, record):
|
||||
message = record.getMessage()
|
||||
if "\n" in message:
|
||||
lines = message.split("\n")
|
||||
first_line = lines[0]
|
||||
record.message = first_line
|
||||
formatted_first_line = super().format(record)
|
||||
formatted_lines = [formatted_first_line]
|
||||
for line in lines[1:]:
|
||||
formatted_lines.append(line)
|
||||
return "\n".join(formatted_lines)
|
||||
else:
|
||||
return super().format(record)
|
||||
|
||||
|
||||
# Create a MultilineColoredFormatter object for colorized logging
|
||||
formatter = MultilineColoredFormatter(fmt_string, **fmt_config)
|
||||
|
||||
# Create a stream handler for logging output
|
||||
stream = logging.StreamHandler()
|
||||
stream.setFormatter(formatter)
|
||||
|
||||
|
||||
def get_colorful_logger(name="main"):
|
||||
# Create and configure the logger
|
||||
logger = logging.getLogger(name)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.addHandler(stream)
|
||||
|
||||
return logger
|
||||
|
||||
|
||||
# Set up the root logger with the same formatting
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(logging.DEBUG)
|
||||
root_logger.addHandler(stream)
|
||||
|
||||
ignore_logs = ["_trace", "httpx", "_client", "_trace.atrace", "aiohttp", "_client"]
|
||||
for lgr in ignore_logs:
|
||||
loggr = logging.getLogger(lgr)
|
||||
loggr.setLevel(logging.INFO)
|
@@ -1,7 +1,8 @@
|
||||
import json
|
||||
|
||||
from models.chat import ChatUpdate, Message
|
||||
from services.logger import root_logger as logger
|
||||
from services.rediscache import redis
|
||||
from models.chat import Message, ChatUpdate
|
||||
|
||||
|
||||
async def notify_message(message: Message, action="create"):
|
||||
@@ -9,9 +10,9 @@ async def notify_message(message: Message, action="create"):
|
||||
data = {"payload": message, "action": action}
|
||||
try:
|
||||
await redis.publish(channel_name, json.dumps(data))
|
||||
print(f"[services.presence] ok {data}")
|
||||
logger.info(f"ok {data}")
|
||||
except Exception as e:
|
||||
print(f"Failed to publish to channel {channel_name}: {e}")
|
||||
logger.error(f"Failed to publish to channel {channel_name}: {e}")
|
||||
|
||||
|
||||
async def notify_chat(chat: ChatUpdate, member_id: int, action="create"):
|
||||
@@ -19,6 +20,6 @@ async def notify_chat(chat: ChatUpdate, member_id: int, action="create"):
|
||||
data = {"payload": chat, "action": action}
|
||||
try:
|
||||
await redis.publish(channel_name, json.dumps(data))
|
||||
print(f"[services.presence] ok {data}")
|
||||
logger.info(f"ok {data}")
|
||||
except Exception as e:
|
||||
print(f"Failed to publish to channel {channel_name}: {e}")
|
||||
logger.error(f"Failed to publish to channel {channel_name}: {e}")
|
||||
|
@@ -1,6 +1,12 @@
|
||||
import logging
|
||||
|
||||
import redis.asyncio as aredis
|
||||
|
||||
from services.logger import root_logger as logger
|
||||
from settings import REDIS_URL
|
||||
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class RedisCache:
|
||||
def __init__(self, uri=REDIS_URL):
|
||||
@@ -18,12 +24,13 @@ class RedisCache:
|
||||
async def execute(self, command, *args, **kwargs):
|
||||
if self._client:
|
||||
try:
|
||||
print("[redis] " + command + " " + " ".join(args))
|
||||
logger.debug(f"{command} {args} {kwargs}")
|
||||
r = await self._client.execute_command(command, *args, **kwargs)
|
||||
logger.debug(type(r))
|
||||
logger.debug(r)
|
||||
return r
|
||||
except Exception as e:
|
||||
print(f"[redis] error: {e}")
|
||||
return None
|
||||
logger.error(e)
|
||||
|
||||
async def subscribe(self, *channels):
|
||||
if self._client:
|
||||
@@ -45,16 +52,6 @@ class RedisCache:
|
||||
return
|
||||
await self._client.publish(channel, data)
|
||||
|
||||
async def lrange(self, key, start, stop):
|
||||
if self._client:
|
||||
print(f"[redis] LRANGE {key} {start} {stop}")
|
||||
return await self._client.lrange(key, start, stop)
|
||||
|
||||
async def mget(self, key, *keys):
|
||||
if self._client:
|
||||
print(f"[redis] MGET {key} {keys}")
|
||||
return await self._client.mget(key, *keys)
|
||||
|
||||
|
||||
redis = RedisCache()
|
||||
|
||||
|
@@ -1,16 +1,16 @@
|
||||
from ariadne import QueryType, MutationType
|
||||
from ariadne import MutationType, QueryType
|
||||
|
||||
query = QueryType()
|
||||
mutation = MutationType()
|
||||
|
||||
# duffok was here 2023-12-1
|
||||
# This is a query resolver for Apollo Federation
|
||||
@query.field("_service")
|
||||
def resolve_service(*_):
|
||||
# Load the full SDL from your SDL file
|
||||
with open("inbox.graphql", "r") as file:
|
||||
full_sdl = file.read()
|
||||
|
||||
return {"sdl": full_sdl}
|
||||
# @query.field("_service")
|
||||
# def resolve_service(*_):
|
||||
# # Load the full SDL from your SDL file
|
||||
# with open("inbox.graphql", "r") as file:
|
||||
# full_sdl = file.read()
|
||||
#
|
||||
# return {"sdl": full_sdl}
|
||||
|
||||
resolvers = [query, mutation]
|
||||
|
30
services/sentry.py
Normal file
30
services/sentry.py
Normal file
@@ -0,0 +1,30 @@
|
||||
import sentry_sdk
|
||||
from sentry_sdk.integrations.ariadne import AriadneIntegration
|
||||
from sentry_sdk.integrations.redis import RedisIntegration
|
||||
from sentry_sdk.integrations.starlette import StarletteIntegration
|
||||
|
||||
from services.logger import root_logger as logger
|
||||
from settings import GLITCHTIP_DSN
|
||||
|
||||
|
||||
def start_sentry():
|
||||
# sentry monitoring
|
||||
try:
|
||||
sentry_sdk.init(
|
||||
GLITCHTIP_DSN,
|
||||
# Set traces_sample_rate to 1.0 to capture 100%
|
||||
# of transactions for performance monitoring.
|
||||
traces_sample_rate=1.0,
|
||||
# Set profiles_sample_rate to 1.0 to profile 100%
|
||||
# of sampled transactions.
|
||||
# We recommend adjusting this value in production.
|
||||
profiles_sample_rate=1.0,
|
||||
enable_tracing=True,
|
||||
integrations=[
|
||||
StarletteIntegration(),
|
||||
AriadneIntegration(),
|
||||
RedisIntegration(),
|
||||
],
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
@@ -1,9 +1,9 @@
|
||||
from os import environ
|
||||
|
||||
PORT = 80
|
||||
PORT = 8000
|
||||
REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1"
|
||||
API_BASE = environ.get("API_BASE") or "https://v2.discours.io/"
|
||||
AUTH_URL = environ.get("AUTH_URL") or "https://v2.discours.io/"
|
||||
API_BASE = environ.get("API_BASE") or "http://127.0.0.1:8001/"
|
||||
AUTH_URL = environ.get("AUTH_URL") or "http://127.0.0.1:8080/graphql/"
|
||||
MODE = environ.get("MODE") or "production"
|
||||
SENTRY_DSN = environ.get("SENTRY_DSN")
|
||||
GLITCHTIP_DSN = environ.get("GLITCHTIP_DSN")
|
||||
DEV_SERVER_PID_FILE_NAME = "dev-server.pid"
|
||||
|
Reference in New Issue
Block a user