Compare commits

...

17 Commits

Author SHA1 Message Date
Stepan Vladovskiy
32b47372bb with change config for poetry not install inbox like package
All checks were successful
deploy / deploy (push) Successful in 53s
2025-02-12 20:02:12 -03:00
Stepan Vladovskiy
2173da2cad feat: sv nginx conf sigil without map function, becasue it is moved to global nginx config
Some checks failed
deploy / deploy (push) Failing after 38s
2025-01-29 16:31:41 -03:00
76186df2b0 fmt
All checks were successful
deploy / deploy (push) Successful in 53s
2024-04-19 14:28:57 +03:00
3c219bfa69 author-cache-removed 2024-04-19 14:28:21 +03:00
41af1a7349 fmt
All checks were successful
deploy / deploy (push) Successful in 53s
2024-04-19 11:50:50 +03:00
6a450a84c1 logger-fix-getchat-fix 2024-04-19 11:50:25 +03:00
c96e8afc45 logfix
All checks were successful
deploy / deploy (push) Successful in 1m10s
2024-04-19 11:41:02 +03:00
12c5d2677d async-fix
All checks were successful
deploy / deploy (push) Successful in 49s
2024-04-19 11:22:57 +03:00
63cf0b9fee less-code
All checks were successful
deploy / deploy (push) Successful in 48s
2024-04-19 11:01:43 +03:00
38c0a4e3ee auth-fix
All checks were successful
deploy / deploy (push) Successful in 47s
2024-04-19 10:57:54 +03:00
97aed41143 typo-fix
All checks were successful
deploy / deploy (push) Successful in 47s
2024-04-19 10:48:51 +03:00
1a730a9eab core-adapter-upgrade
Some checks failed
deploy / deploy (push) Failing after 57s
2024-04-19 10:47:16 +03:00
fe069696d3 auth-update
Some checks failed
deploy / deploy (push) Failing after 1m5s
2024-04-18 14:28:10 +03:00
f2726633cd depfix
Some checks failed
deploy / deploy (push) Failing after 58s
2024-04-18 13:50:32 +03:00
5c4f73d2ca logger+fmt+isort-2
Some checks failed
deploy / deploy (push) Failing after 1m6s
2024-04-18 13:48:29 +03:00
595fa945cf Merge branch 'main' of https://dev.discours.io/discours.io/inbox 2024-04-18 13:47:31 +03:00
12602ac57c logger+fmt+isort 2024-04-18 13:47:01 +03:00
19 changed files with 247 additions and 223 deletions

View File

@@ -1,3 +1,10 @@
[0.3.2]
- added custom logger
- auth logix synced with core
- added httpx
- aiohttp and requests removed
- core adapter loads data from redis now
[0.3.1]
- glitchtip connect

View File

@@ -5,7 +5,7 @@ COPY . /app
RUN apk update && apk add --no-cache git gcc curl
RUN curl -sSL https://install.python-poetry.org | python
ENV PATH="${PATH}:/root/.local/bin"
RUN poetry config virtualenvs.create false && poetry install --no-dev
RUN poetry config virtualenvs.create false && poetry install --without dev --no-root
EXPOSE 8000

View File

@@ -3,11 +3,11 @@ from importlib import import_module
from os.path import exists
from ariadne import load_schema_from_path, make_executable_schema
from starlette.applications import Starlette
from ariadne.asgi import GraphQL
from starlette.applications import Starlette
from starlette.routing import Route
from services.logger import root_logger as logger
from services.rediscache import redis
from services.schema import resolvers
from services.sentry import start_sentry
@@ -23,7 +23,8 @@ async def start():
# pid file management
with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f:
f.write(str(os.getpid()))
print(f"[main] process started in {MODE} mode")
logger.info(f"process started in {MODE} mode")
# main starlette app object with ariadne mounted in root
app = Starlette(

View File

@@ -9,10 +9,7 @@
{{ $cors_headers_get := "if ($request_method = 'GET') { add_header 'Access-Control-Allow-Origin' '$allow_origin' always; add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS' always; add_header 'Access-Control-Allow-Headers' 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,Authorization' always; add_header 'Access-Control-Expose-Headers' 'Content-Length,Content-Range' always; add_header 'Access-Control-Allow-Credentials' 'true' always; }" }}
# Mapping for allowed origins
map $http_origin $allow_origin {
~^https?:\/\/((.*\.)?localhost(:\d+)?|discoursio-webapp(-(.*))?\.vercel\.app|(.*\.)?discours\.io)$ $http_origin;
default "";
}
# is moved to globL NGINX
# Server block setup
{{ range $port_map := .PROXY_PORT_MAP | split " " }}

View File

@@ -1,19 +1,20 @@
[tool.poetry]
name = "discoursio-inbox"
version = "0.3.1"
version = "0.3.2"
description = "Inbox server for discours.io"
authors = ["Tony Rewin <anton.rewin@gmail.com>"]
[tool.poetry.dependencies]
python = "^3.12"
sentry-sdk = "^1.44.1"
redis = "^5.0.3"
ariadne = "^0.23.0"
starlette = "^0.37.2"
itsdangerous = "^2.1.2"
aiohttp = "^3.9.3"
requests = "^2.31.0"
granian = "^1.2.1"
colorlog = "^6.8.2"
httpx = "^0.27.0"
redis = {version = "^5.0.3", extras = ["async"]}
[tool.poetry.group.dev.dependencies]
pre-commit = "^3.6.0"

View File

@@ -8,7 +8,6 @@ from resolvers.messages import (
)
from resolvers.search import search_messages, search_recipients
__all__ = [
# inbox
"load_chats",

View File

@@ -9,7 +9,6 @@ from services.presence import notify_chat
from services.rediscache import redis
from services.schema import mutation
logger = logging.getLogger("[resolvers.chats] ")
logger.setLevel(logging.DEBUG)
@@ -67,8 +66,11 @@ async def create_chat(_, info, title="", members=None):
if len(members) == 2 and title == "":
chatset1 = await redis.execute("SMEMBERS", f"chats_by_author/{members[0]}")
chatset2 = await redis.execute("SMEMBERS", f"chats_by_author/{members[1]}")
if isinstance(chatset1, set) and isinstance(chatset2, set):
for c in chatset1.intersection(chatset2):
chat = await redis.execute("GET", f"chats/{c}")
chat_result = await redis.execute("GET", f"chats/{c}")
if chat_result:
chat = json.loads(chat_result)
if chat["title"] == "":
logger.info("[inbox] createChat found old chat")
return {"chat": chat, "error": "existed"}

View File

@@ -6,11 +6,10 @@ from typing import Any, Dict, List, Optional, Union
from models.chat import ChatPayload
from resolvers.chats import create_chat
from services.auth import login_required
from services.core import CacheStorage
from services.core import get_author_by_id
from services.rediscache import redis
from services.schema import query
logger = logging.getLogger("[resolvers.load] ")
logger.setLevel(logging.DEBUG)
@@ -76,13 +75,13 @@ async def load_chats(
chats = []
try:
if author_id:
logger.debug("got author", author_id)
logger.debug(f"got author {author_id}")
cids = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
logger.debug("got cids", cids)
logger.debug(f"got cids {cids}")
members_online = (
await redis.execute("SMEMBERS", "authors-online")
) or [] # to show online status
logger.debug("members online", members_online)
logger.debug(f"members online: {members_online}")
if isinstance(cids, set):
# TODO: add sort by chat.created_at with in-memory caching chats service
cids = list(cids)[offset : (offset + limit)]
@@ -92,8 +91,15 @@ async def load_chats(
r = await create_chat(
None, info, members=[2]
) # member with id = 2 is discours
logger.debug(f"created chat: {r['chat_id']}")
cids.append(r["chat"]["id"])
if (
isinstance(r, dict)
and "chat" in r
and isinstance(r["chat"], dict)
):
chat_id = r["chat"].get("id")
if chat_id:
logger.debug(f"created chat: {chat_id}")
cids.append(chat_id)
logger.debug(f"getting data for {len(cids)} user's chats")
for cid in cids:
@@ -107,12 +113,20 @@ async def load_chats(
member_ids = c["members"].copy()
c["members"] = []
for member_id in member_ids:
a = CacheStorage.authors_by_id.get(str(member_id))
if isinstance(member_id, int):
a = await get_author_by_id(int(member_id))
if a:
a["online"] = a.get("id") in members_online
c["members"].append(a)
else:
logger.error(f"cant find author by id {member_id}")
logger.error(
f"cant find author by id {member_id}"
)
elif (
"members" in member_id
and member_id not in c["members"]
):
c["members"].append(member_id)
chats.append(c)
else:

View File

@@ -8,7 +8,6 @@ from services.presence import notify_message
from services.rediscache import redis
from services.schema import mutation
logger = logging.getLogger("[resolvers.messages] ")
logger.setLevel(logging.DEBUG)

View File

@@ -3,7 +3,7 @@ from typing import Any, Dict, List, Union
from resolvers.load import load_messages
from services.auth import login_required
from services.core import CacheStorage
from services.core import get_author_by_id
from services.rediscache import redis
from services.schema import query
@@ -24,14 +24,11 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0
members_ids = await redis.execute("SMEMBERS", f"/chats/{chat_id}/members")
if isinstance(members_ids, set):
for member_id in members_ids:
author = CacheStorage.authors_by_id.get(str(member_id))
author = await get_author_by_id(member_id)
if author:
if author["name"].startswith(text):
result.add(author)
more_amount = limit - len(result)
if more_amount > 0:
result.update(CacheStorage.authors[0:more_amount])
return {"members": list(result), "error": None}

View File

@@ -3,7 +3,6 @@ from granian.server import Granian
from settings import PORT
if __name__ == "__main__":
print("[server] starting...")

View File

@@ -1,67 +1,57 @@
import logging
from functools import wraps
from aiohttp import ClientSession
import httpx
from starlette.exceptions import HTTPException
from services.core import get_author_by_user
from services.logger import root_logger as logger
from settings import AUTH_URL
logger = logging.getLogger("[services.auth] ")
logger.setLevel(logging.DEBUG)
async def request_data(gql, headers=None):
if headers is None:
headers = {"Content-Type": "application/json"}
try:
async with httpx.AsyncClient() as client:
response = await client.post(AUTH_URL, json=gql, headers=headers)
if response.status_code == 200:
data = response.json()
errors = data.get("errors")
if errors:
logger.error(f"HTTP Errors: {errors}")
else:
return data
except Exception as e:
# Handling and logging exceptions during authentication check
logger.error(f"request_data error: {e}")
return None
async def check_auth(req):
logger.debug("checking auth...")
user_id = ""
try:
token = req.headers.get("Authorization")
user_id = ""
user_roles = []
if token:
# Logging the authentication token
logger.debug(f"{token}")
query_name = "validate_jwt_token"
operation = "ValidateToken"
headers = {
"Content-Type": "application/json",
}
variables = {
"params": {
"token_type": "access_token",
"token": token,
}
}
variables = {"params": {"token_type": "access_token", "token": token}}
gql = {
"query": f"query {operation}($params: ValidateJWTTokenInput!) {{ {query_name}(params: $params) {{ is_valid claims }} }}",
"query": f"query {operation}($params: ValidateJWTTokenInput!) {{"
+ f"{query_name}(params: $params) {{ is_valid claims }} "
+ "}",
"variables": variables,
"operationName": operation,
}
# Asynchronous HTTP request to the authentication server
async with ClientSession() as session:
async with session.post(
AUTH_URL, json=gql, headers=headers
) as response:
if response.status == 200:
data = await response.json()
errors = data.get("errors")
if errors:
logger.error(f"{errors}")
else:
user_id = (
data.get("data", {})
.get(query_name, {})
.get("claims", {})
.get("sub")
)
logger.info(f"got user_id: {user_id}")
return user_id
except Exception as e:
# Handling and logging exceptions during authentication check
logger.error(e)
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
data = await request_data(gql)
if data:
logger.debug(data)
user_data = data.get("data", {}).get(query_name, {}).get("claims", {})
user_id = user_data.get("sub", "")
user_roles = user_data.get("allowed_roles", [])
return user_id, user_roles
def login_required(f):
@@ -70,15 +60,31 @@ def login_required(f):
info = args[1]
context = info.context
req = context.get("request")
user_id = await check_auth(req)
if user_id:
user_id, user_roles = await check_auth(req)
if user_id and isinstance(user_id, str):
context["user_id"] = user_id.strip()
author = get_author_by_user(user_id)
author = await get_author_by_user(user_id)
if author and "id" in author:
context["author_id"] = author["id"]
else:
logger.debug(author)
HTTPException(status_code=401, detail="Unauthorized")
return await f(*args, **kwargs)
else:
raise HTTPException(status_code=401, detail="Unauthorized")
return decorated_function
def auth_request(f):
@wraps(f)
async def decorated_function(*args, **kwargs):
req = args[0]
user_id, user_roles = await check_auth(req)
if user_id and isinstance(user_id, str):
user_id = user_id.strip()
author = await get_author_by_user(user_id)
if author and "id" in author:
req["author_id"] = author["id"]
return await f(*args, **kwargs)
else:
raise HTTPException(status_code=401, detail="Unauthorized")
return decorated_function

View File

@@ -1,128 +1,51 @@
import asyncio
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import List
import requests
from services.logger import root_logger as logger
from services.rediscache import redis
from models.member import ChatMember
from settings import API_BASE
logger = logging.getLogger("[services.core] ")
logger.setLevel(logging.DEBUG)
def _request_endpoint(query_name, body) -> dict:
logger.debug(f"requesting {query_name}...")
response = requests.post(
API_BASE, headers={"Content-Type": "application/json"}, json=body, timeout=30.0
)
if response.status_code == 200:
try:
r = response.json()
result = r.get("data", {}).get(query_name, {})
if result:
logger.info(f"entries amount in result: {len(result)} ")
return result
except ValueError as e:
logger.error(f"Error decoding JSON response: {e}")
return {}
def get_all_authors():
query_name = "get_authors_all"
gql = {
"query": "query { " + query_name + "{ id slug pic name user } }",
"variables": None,
}
return _request_endpoint(query_name, gql)
def get_author_by_user(user: str):
operation = "GetAuthorId"
query_name = "get_author_id"
gql = {
"query": f"query {operation}($user: String!) {{ {query_name}(user: $user){{ id }} }}", # noqa E201, E202
"operationName": operation,
"variables": {"user": user.strip()},
}
return _request_endpoint(query_name, gql)
def get_my_followed() -> List[ChatMember]:
query_name = "get_my_followed"
gql = {
"query": "query { " + query_name + " { authors { id slug pic name } } }",
"variables": None,
}
result = _request_endpoint(query_name, gql)
return result.get("authors", [])
class CacheStorage:
lock = asyncio.Lock()
period = 5 * 60 # every 5 mins
client = None
async def get_all_authors():
authors = []
authors_by_user = {}
authors_by_id = {}
redis_key = "user:*"
@staticmethod
async def init():
"""graphql client connection using permanent token"""
self = CacheStorage
async with self.lock:
task = asyncio.create_task(self.worker())
logger.info(task)
result = await redis.execute("GET", redis_key)
if isinstance(result, str):
authors = json.loads(result)
@staticmethod
async def update_authors():
self = CacheStorage
async with self.lock:
result = get_all_authors()
logger.info(f"cache loaded {len(result)}")
if result:
CacheStorage.authors = result
for a in result:
user_id = a.get("user")
author_id = str(a.get("id"))
self.authors_by_user[user_id] = a
self.authors_by_id[author_id] = a
return authors
@staticmethod
async def worker():
"""async task worker"""
failed = 0
self = CacheStorage
while True:
try:
logger.info(" - updating profiles data...")
await self.update_authors()
failed = 0
except Exception as er:
failed += 1
logger.error(f"{er} - update failed #{failed}, wait 10 seconds")
if failed > 3:
logger.error(" - not trying to update anymore")
import traceback
traceback.print_exc()
break
if failed == 0:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat())
logger.info(
" ⎩ next update: %s"
% (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])
)
await asyncio.sleep(self.period)
else:
await asyncio.sleep(10)
logger.info(" - trying to update data again")
async def get_author_by_user(user: str):
author = None
redis_key = f"user:{user}"
result = await redis.execute("GET", redis_key)
if isinstance(result, str):
author = json.loads(result)
return author
async def get_author_by_id(author_id: int):
author = None
redis_key = f"author:{author_id}"
result = await redis.execute("GET", redis_key)
if isinstance(result, str):
author = json.loads(result)
return author
async def get_author_followed(author_id: int):
authors = []
redis_key = f"author:{author_id}:follows-authors"
result = await redis.execute("GET", redis_key)
if isinstance(result, str):
authors = json.loads(result)
return authors

81
services/logger.py Normal file
View File

@@ -0,0 +1,81 @@
import logging
import colorlog
# Define the color scheme
color_scheme = {
"DEBUG": "light_black",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red,bg_white",
}
# Define secondary log colors
secondary_colors = {
"log_name": {"DEBUG": "blue"},
"asctime": {"DEBUG": "cyan"},
"process": {"DEBUG": "purple"},
"module": {"DEBUG": "light_black,bg_blue"},
"funcName": {"DEBUG": "light_white,bg_blue"}, # Add this line
}
# Define the log format string
fmt_string = "%(log_color)s%(levelname)s: %(log_color)s[%(module)s.%(funcName)s]%(reset)s %(white)s%(message)s"
# Define formatting configuration
fmt_config = {
"log_colors": color_scheme,
"secondary_log_colors": secondary_colors,
"style": "%",
"reset": True,
}
class MultilineColoredFormatter(colorlog.ColoredFormatter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.log_colors = kwargs.pop("log_colors", {})
self.secondary_log_colors = kwargs.pop("secondary_log_colors", {})
def format(self, record):
message = record.getMessage()
if "\n" in message:
lines = message.split("\n")
first_line = lines[0]
record.message = first_line
formatted_first_line = super().format(record)
formatted_lines = [formatted_first_line]
for line in lines[1:]:
formatted_lines.append(line)
return "\n".join(formatted_lines)
else:
return super().format(record)
# Create a MultilineColoredFormatter object for colorized logging
formatter = MultilineColoredFormatter(fmt_string, **fmt_config)
# Create a stream handler for logging output
stream = logging.StreamHandler()
stream.setFormatter(formatter)
def get_colorful_logger(name="main"):
# Create and configure the logger
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.addHandler(stream)
return logger
# Set up the root logger with the same formatting
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(stream)
ignore_logs = ["_trace", "httpx", "_client", "_trace.atrace", "aiohttp", "_client"]
for lgr in ignore_logs:
loggr = logging.getLogger(lgr)
loggr.setLevel(logging.INFO)

View File

@@ -1,6 +1,7 @@
import json
from models.chat import ChatUpdate, Message
from services.logger import root_logger as logger
from services.rediscache import redis
@@ -9,9 +10,9 @@ async def notify_message(message: Message, action="create"):
data = {"payload": message, "action": action}
try:
await redis.publish(channel_name, json.dumps(data))
print(f"[services.presence] ok {data}")
logger.info(f"ok {data}")
except Exception as e:
print(f"Failed to publish to channel {channel_name}: {e}")
logger.error(f"Failed to publish to channel {channel_name}: {e}")
async def notify_chat(chat: ChatUpdate, member_id: int, action="create"):
@@ -19,6 +20,6 @@ async def notify_chat(chat: ChatUpdate, member_id: int, action="create"):
data = {"payload": chat, "action": action}
try:
await redis.publish(channel_name, json.dumps(data))
print(f"[services.presence] ok {data}")
logger.info(f"ok {data}")
except Exception as e:
print(f"Failed to publish to channel {channel_name}: {e}")
logger.error(f"Failed to publish to channel {channel_name}: {e}")

View File

@@ -2,10 +2,9 @@ import logging
import redis.asyncio as aredis
from services.logger import root_logger as logger
from settings import REDIS_URL
logger = logging.getLogger("[services.redis] ")
logger.setLevel(logging.DEBUG)

View File

@@ -1,6 +1,5 @@
from ariadne import MutationType, QueryType
query = QueryType()
mutation = MutationType()

View File

@@ -3,6 +3,7 @@ from sentry_sdk.integrations.ariadne import AriadneIntegration
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.integrations.starlette import StarletteIntegration
from services.logger import root_logger as logger
from settings import GLITCHTIP_DSN
@@ -26,5 +27,4 @@ def start_sentry():
],
)
except Exception as e:
print("[services.sentry] init error")
print(e)
logger.error(e)

View File

@@ -1,6 +1,5 @@
from os import environ
PORT = 8000
REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1"
API_BASE = environ.get("API_BASE") or "http://127.0.0.1:8001/"