Compare commits

..

No commits in common. "main" and "dev" have entirely different histories.
main ... dev

12 changed files with 315 additions and 262 deletions

View File

@ -1,4 +1,4 @@
name: 'Deploy to v2'
name: "Deploy to notifier"
on: [push]
jobs:
@ -21,6 +21,6 @@ jobs:
- name: Push to dokku
uses: dokku/github-action@master
with:
branch: 'main'
git_remote_url: 'ssh://dokku@v2.discours.io:22/notifier'
branch: "main"
git_remote_url: "ssh://dokku@v2.discours.io:22/notifier"
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}

1
.gitignore vendored
View File

@ -6,4 +6,3 @@ __pycache__
poetry.lock
.venv
.ruff_cache
.pytest_cache

View File

@ -9,6 +9,7 @@ repos:
- id: trailing-whitespace
- id: check-added-large-files
- id: detect-private-key
- id: double-quote-string-fixer
- id: check-ast
- id: check-merge-conflict
@ -17,4 +18,3 @@ repos:
hooks:
- id: ruff
args: [--fix]
- id: ruff-format

View File

@ -1,15 +1,22 @@
# Use an official Python runtime as a parent image
FROM python:slim
# Set the working directory in the container to /app
WORKDIR /app
# Add metadata to the image to describe that the container is listening on port 80
EXPOSE 8000
# Copy the current directory contents into the container at /app
COPY . /app
RUN apt-get update && apt-get install -y git gcc curl postgresql && \
# Install any needed packages specified in pyproject.toml
RUN apt-get update && apt-get install -y gcc curl && \
curl -sSL https://install.python-poetry.org | python - && \
echo "export PATH=$PATH:/root/.local/bin" >> ~/.bashrc && \
. ~/.bashrc && \
poetry config virtualenvs.create false && \
poetry install --no-dev
EXPOSE 8000
# Run server when the container launches
# Run server.py when the container launches
CMD python server.py

View File

@ -1,6 +0,0 @@
import os
import sys
# Получаем путь к корневой директории проекта
root_path = os.path.abspath(os.path.dirname(__file__))
sys.path.append(root_path)

14
main.py
View File

@ -16,23 +16,23 @@ from services.rediscache import redis
from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN
logger = logging.getLogger("\t[main]\t")
logger = logging.getLogger('\t[main]\t')
logger.setLevel(logging.DEBUG)
async def start_up():
logger.info("[main] starting...")
logger.info('[main] starting...')
await redis.connect()
task = asyncio.create_task(notifications_worker())
logger.info(task)
if MODE == "dev":
if MODE == 'dev':
if exists(DEV_SERVER_PID_FILE_NAME):
with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f:
with open(DEV_SERVER_PID_FILE_NAME, 'w', encoding='utf-8') as f:
f.write(str(os.getpid()))
else:
logger.info("[main] production mode")
logger.info('[main] production mode')
try:
import sentry_sdk
@ -47,7 +47,7 @@ async def start_up():
],
)
except Exception as e:
logger.error("sentry init error", e)
logger.error('sentry init error', e)
async def shutdown():
@ -55,4 +55,4 @@ async def shutdown():
app = Starlette(debug=True, on_startup=[start_up], on_shutdown=[shutdown])
app.mount("/", GraphQL(schema, debug=True))
app.mount('/', GraphQL(schema, graphiql=True, debug=True))

View File

@ -1,25 +1,113 @@
[tool.poetry]
name = "discoursio-notifier"
version = "0.3.0"
description = "notifier service for discours.io"
authors = ["Untone <anton.rewin@gmail.com>"]
license = "MIT"
readme = "README.md"
description = "notifier server for discours.io"
authors = ["discours.io devteam"]
[tool.poetry.dependencies]
python = "^3.12"
aiohttp = "^3.9.3"
redis = {extras = ["hiredis"], version = "^5.0.1"}
strawberry-graphql = {extras = ["asgi", "debug-server"], version = "^0.219.2"}
granian = "^1.1.0"
sentry-sdk = {extras = ["strawberry"], version = "^1.40.4"}
strawberry-sqlalchemy-mapper = "^0.4.2"
SQLAlchemy = "^2.0.22"
psycopg2-binary = "^2.9.9"
SQLAlchemy = "^2.0.27"
redis = {extras = ["hiredis"], version = "^5.0.1"}
strawberry-graphql = {extras = ["asgi", "debug-server"], version = "^0.219.0" }
strawberry-sqlalchemy-mapper = "^0.4.2"
sentry-sdk = "^1.39.2"
aiohttp = "^3.9.1"
pre-commit = "^3.6.0"
granian = "^1.0.1"
discoursio-core = { git = "https://dev.discours.io/discours.io/core.git", branch = "feature/core" }
[tool.poetry.group.dev.dependencies]
ruff = "^0.2.1"
setuptools = "^69.0.2"
pyright = "^1.1.341"
pytest = "^7.4.2"
black = { version = "^23.12.0", python = ">=3.12" }
ruff = { version = "^0.1.15", python = ">=3.12" }
isort = "^5.13.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.setuptools.dynamic]
version = {attr = "notifier.__version__"}
readme = {file = "README.md"}
[tool.ruff]
line-length = 120
extend-select = [
# E and F are enabled by default
'B', # flake8-bugbear
'C4', # flake8-comprehensions
'C90', # mccabe
'I', # isort
'N', # pep8-naming
'Q', # flake8-quotes
'S', # flake8-bandit
'W', # pycodestyle
]
extend-ignore = [
'B008', # function calls in args defaults are fine
'B009', # getattr with constants is fine
'B034', # re.split won't confuse us
'B904', # rising without from is fine
'E501', # leave line length to black
'N818', # leave to us exceptions naming
'S101', # assert is fine
'E712', # allow == True
]
flake8-quotes = { inline-quotes = 'single', multiline-quotes = 'double' }
mccabe = { max-complexity = 13 }
target-version = "py312"
[tool.ruff.format]
quote-style = 'single'
[tool.black]
skip-string-normalization = true
[tool.ruff.isort]
combine-as-imports = true
lines-after-imports = 2
known-first-party = ['resolvers', 'services', 'orm', 'tests']
[tool.ruff.per-file-ignores]
'tests/**' = ['B018', 'S110', 'S501']
[tool.mypy]
python_version = "3.12"
warn_return_any = true
warn_unused_configs = true
ignore_missing_imports = true
exclude = ["nb"]
[tool.pytest.ini_options]
asyncio_mode = 'auto'
[tool.pyright]
venvPath = "."
venv = ".venv"
include = ["."]
useLibraryCodeForTypes = true
disableLanguageServices = false
disableOrganizeImports = false
reportMissingImports = false
reportMissingModuleSource = "warning"
reportImportCycles = "warning"
maxMemoryForLargeFile = 4096
pythonVersion = "3.12"
autoImportCompletions = true
useVirtualEnv = true
typeCheckingMode = "basic"
disableJediCompletion = false
disableCompletion = false
disableSnippetCompletion = false
disableGoToDefinition = false
disableRenaming = false
disableSignatureHelp = false
diagnostics = true
logLevel = "Information"
pluginSearchPaths = []
typings = {}
mergeTypeStubPackages = false

View File

@ -1,7 +1,7 @@
import json
import logging
import time
from typing import Dict, List, Tuple, Union
from typing import Dict, List
import strawberry
from sqlalchemy import and_, select
@ -24,134 +24,13 @@ from resolvers.model import (
from services.db import local_session
logger = logging.getLogger("[resolvers.schema]")
logger = logging.getLogger('[resolvers.schema] ')
logger.setLevel(logging.DEBUG)
def query_notifications(
author_id: int, after: int = 0
) -> Tuple[int, int, List[Tuple[Notification, bool]]]:
notification_seen_alias = aliased(NotificationSeen)
query = select(
Notification, notification_seen_alias.viewer.label("seen")
).outerjoin(
NotificationSeen,
and_(
NotificationSeen.viewer == author_id,
NotificationSeen.notification == Notification.id,
),
)
if after:
query = query.filter(Notification.created_at > after)
query = query.group_by(NotificationSeen.notification, Notification.created_at)
with local_session() as session:
total = (
session.query(Notification)
.filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
)
)
.count()
)
unread = (
session.query(Notification)
.filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
not_(Notification.seen),
)
)
.count()
)
notifications_result = session.execute(query)
notifications = []
for n, seen in notifications_result:
notifications.append((n, seen))
return total, unread, notifications
def process_shout_notification(
notification: Notification, seen: bool
) -> Union[Tuple[str, NotificationGroup], None] | None:
if not isinstance(notification.payload, str) or not isinstance(
notification.entity, str
):
return
payload = json.loads(notification.payload)
shout: NotificationShout = payload
thread_id = str(shout.id)
group = NotificationGroup(
id=thread_id,
entity=notification.entity,
shout=shout,
authors=shout.authors,
updated_at=shout.created_at,
reactions=[],
action="create",
seen=seen,
)
return thread_id, group
def process_reaction_notification(
notification: Notification, seen: bool
) -> Union[Tuple[str, NotificationGroup], None] | None:
if (
not isinstance(notification, Notification)
or not isinstance(notification.payload, str)
or not isinstance(notification.entity, str)
):
return
payload = json.loads(notification.payload)
reaction: NotificationReaction = payload
shout: NotificationShout = reaction.shout
thread_id = str(reaction.shout)
if reaction.kind == "COMMENT" and reaction.reply_to:
thread_id += f"::{reaction.reply_to}"
group = NotificationGroup(
id=thread_id,
action=str(notification.action),
entity=notification.entity,
updated_at=reaction.created_at,
reactions=[reaction.id],
shout=shout,
authors=[reaction.created_by],
seen=seen,
)
return thread_id, group
def process_follower_notification(
notification: Notification, seen: bool
) -> Union[Tuple[str, NotificationGroup], None] | None:
if not isinstance(notification.payload, str):
return
payload = json.loads(notification.payload)
follower: NotificationAuthor = payload
thread_id = "followers"
group = NotificationGroup(
id=thread_id,
authors=[follower],
updated_at=int(time.time()),
shout=None,
reactions=[],
entity="follower",
action="follow",
seen=seen,
)
return thread_id, group
async def get_notifications_grouped(
author_id: int, after: int = 0, limit: int = 10
) -> Tuple[Dict[str, NotificationGroup], int, int]:
async def get_notifications_grouped( # noqa: C901
author_id: int, after: int = 0, limit: int = 10, offset: int = 0
):
"""
Retrieves notifications for a given author.
@ -177,66 +56,163 @@ async def get_notifications_grouped(
authors: List[NotificationAuthor], # List of authors involved in the thread.
}
"""
total, unread, notifications = query_notifications(author_id, after)
groups_by_thread: Dict[str, NotificationGroup] = {}
seen_alias = aliased(NotificationSeen)
query = select(Notification, seen_alias.viewer.label('seen')).outerjoin(
NotificationSeen,
and_(
NotificationSeen.viewer == author_id,
NotificationSeen.notification == Notification.id,
),
)
if after:
query = query.filter(Notification.created_at > after)
query = query.group_by(NotificationSeen.notification, Notification.created_at)
groups_amount = 0
for notification, seen in notifications:
if groups_amount >= limit:
break
if str(notification.entity) == "shout" and str(notification.action) == "create":
result = process_shout_notification(notification, seen)
if result:
thread_id, group = result
unread = 0
total = 0
notifications_by_thread: Dict[str, List[Notification]] = {}
groups_by_thread: Dict[str, NotificationGroup] = {}
with local_session() as session:
total = (
session.query(Notification)
.filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
)
)
.count()
)
unread = (
session.query(Notification)
.filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
not_(Notification.seen),
)
)
.count()
)
notifications_result = session.execute(query)
for n, _seen in notifications_result:
thread_id = ''
payload = json.loads(n.payload)
logger.debug(f'[resolvers.schema] {n.action} {n.entity}: {payload}')
if n.entity == 'shout' and n.action == 'create':
shout: NotificationShout = payload
thread_id += f'{shout.id}'
logger.debug(f'create shout: {shout}')
group = groups_by_thread.get(thread_id) or NotificationGroup(
id=thread_id,
entity=n.entity,
shout=shout,
authors=shout.authors,
updated_at=shout.created_at,
reactions=[],
action='create',
seen=author_id in n.seen,
)
# store group in result
groups_by_thread[thread_id] = group
notifications = notifications_by_thread.get(thread_id, [])
if n not in notifications:
notifications.append(n)
notifications_by_thread[thread_id] = notifications
groups_amount += 1
elif n.entity == NotificationEntity.REACTION.value and n.action == NotificationAction.CREATE.value:
reaction: NotificationReaction = payload
shout: NotificationShout = reaction.shout
thread_id += f'{reaction.shout}'
if not bool(reaction.reply_to) and (reaction.kind == 'LIKE' or reaction.kind == 'DISLIKE'):
# TODO: making published reaction vote announce
pass
elif reaction.kind == 'COMMENT':
if reaction.reply_to:
thread_id += f"{'::' + str(reaction.reply_to)}"
group: NotificationGroup | None = groups_by_thread.get(thread_id)
notifications: List[Notification] = notifications_by_thread.get(thread_id, [])
if group and notifications:
group.seen = False # any not seen notification make it false
group.shout = shout
group.authors.append(reaction.created_by)
if not group.reactions:
group.reactions = []
group.reactions.append(reaction.id)
# store group in result
groups_by_thread[thread_id] = group
notifications = notifications_by_thread.get(thread_id, [])
if n not in notifications:
notifications.append(n)
notifications_by_thread[thread_id] = notifications
groups_amount += 1
else:
groups_amount += 1
if groups_amount > limit:
break
else:
# init notification group
reactions = [
reaction.id,
]
group = NotificationGroup(
id=thread_id,
action=n.action,
entity=n.entity,
updated_at=reaction.created_at,
reactions=reactions,
shout=shout,
authors=[
reaction.created_by,
],
seen=author_id in n.seen,
)
# store group in result
groups_by_thread[thread_id] = group
notifications = notifications_by_thread.get(thread_id, [])
if n not in notifications:
notifications.append(n)
notifications_by_thread[thread_id] = notifications
elif n.entity == 'follower':
thread_id = 'followers'
follower: NotificationAuthor = payload
group = groups_by_thread.get(thread_id) or NotificationGroup(
id=thread_id,
authors=[follower],
updated_at=int(time.time()),
shout=None,
reactions=[],
entity='follower',
action='follow',
seen=author_id in n.seen,
)
group.authors = [
follower,
]
group.updated_at = int(time.time())
# store group in result
groups_by_thread[thread_id] = group
notifications = notifications_by_thread.get(thread_id, [])
if n not in notifications:
notifications.append(n)
notifications_by_thread[thread_id] = notifications
groups_amount += 1
elif (
str(notification.entity) == NotificationEntity.REACTION.value
and str(notification.action) == NotificationAction.CREATE.value
):
result = process_reaction_notification(notification, seen)
if result:
thread_id, group = result
existing_group = groups_by_thread.get(thread_id)
if existing_group:
existing_group.seen = False
existing_group.shout = group.shout
existing_group.authors.append(group.authors[0])
if not existing_group.reactions:
existing_group.reactions = []
existing_group.reactions.extend(group.reactions or [])
groups_by_thread[thread_id] = existing_group
else:
groups_by_thread[thread_id] = group
groups_amount += 1
if groups_amount > limit:
break
elif str(notification.entity) == "follower":
result = process_follower_notification(notification, seen)
if result:
thread_id, group = result
groups_by_thread[thread_id] = group
groups_amount += 1
return groups_by_thread, unread, total
return groups_by_thread, notifications_by_thread, unread, total
@strawberry.type
class Query:
@strawberry.field
async def load_notifications(
self, info, after: int, limit: int = 50, offset: int = 0
) -> NotificationsResult:
author_id = info.context.get("author_id")
async def load_notifications(self, info, after: int, limit: int = 50, offset: int = 0) -> NotificationsResult:
author_id = info.context.get('author_id')
groups: Dict[str, NotificationGroup] = {}
if author_id:
groups, unread, total = await get_notifications_grouped(
author_id, after, limit
)
notifications = sorted(
groups.values(), key=lambda group: group.updated_at, reverse=True
)
return NotificationsResult(
notifications=notifications, total=total, unread=unread, error=None
)
return NotificationsResult(notifications=[], total=0, unread=0, error=None)
groups, notifications, total, unread = await get_notifications_grouped(author_id, after, limit, offset)
notifications = sorted(groups.values(), key=lambda group: group.updated_at, reverse=True)
return NotificationsResult(notifications=notifications, total=0, unread=0, error=None)

View File

@ -1,15 +1,14 @@
from granian.constants import Interfaces
from granian.server import Granian
from settings import PORT
if __name__ == "__main__":
print("[server] started")
if __name__ == '__main__':
print('[server] started')
granian_instance = Granian(
"main:app",
address="0.0.0.0", # noqa S104
port=PORT,
'main:app',
address='0.0.0.0', # noqa S104
port=8000,
workers=2,
threads=2,
websockets=False,

View File

@ -8,62 +8,53 @@ from services.db import local_session
from settings import AUTH_URL
logger = logging.getLogger("\t[services.auth]\t")
logger = logging.getLogger('\t[services.auth]\t')
logger.setLevel(logging.DEBUG)
async def check_auth(req) -> str | None:
token = req.headers.get("Authorization")
user_id = ""
token = req.headers.get('Authorization')
user_id = ''
if token:
query_name = "validate_jwt_token"
operation = "ValidateToken"
query_name = 'validate_jwt_token'
operation = 'ValidateToken'
headers = {
"Content-Type": "application/json",
'Content-Type': 'application/json',
}
variables = {
"params": {
"token_type": "access_token",
"token": token,
'params': {
'token_type': 'access_token',
'token': token,
}
}
gql = {
"query": f"query {operation}($params: ValidateJWTTokenInput!) {{ {query_name}(params: $params) {{ is_valid claims }} }}",
"variables": variables,
"operationName": operation,
'query': f'query {operation}($params: ValidateJWTTokenInput!) {{ {query_name}(params: $params) {{ is_valid claims }} }}',
'variables': variables,
'operationName': operation,
}
try:
# Asynchronous HTTP request to the authentication server
async with ClientSession() as session:
async with session.post(
AUTH_URL, json=gql, headers=headers
) as response:
logger.debug(
f"HTTP Response {response.status} {await response.text()}"
)
async with session.post(AUTH_URL, json=gql, headers=headers) as response:
print(f'[services.auth] HTTP Response {response.status} {await response.text()}')
if response.status == 200:
data = await response.json()
errors = data.get("errors")
errors = data.get('errors')
if errors:
logger.error(f"errors: {errors}")
print(f'errors: {errors}')
else:
user_id = (
data.get("data", {})
.get(query_name, {})
.get("claims", {})
.get("sub")
)
user_id = data.get('data', {}).get(query_name, {}).get('claims', {}).get('sub')
if user_id:
logger.info(f"got user_id: {user_id}")
print(f'[services.auth] got user_id: {user_id}')
return user_id
except Exception as e:
import traceback
traceback.print_exc()
# Handling and logging exceptions during authentication check
logger.error(f"Error {e}")
print(f'[services.auth] Error {e}')
return None
@ -71,14 +62,14 @@ async def check_auth(req) -> str | None:
class LoginRequiredMiddleware(Extension):
async def on_request_start(self):
context = self.execution_context.context
req = context.get("request")
req = context.get('request')
user_id = await check_auth(req)
if user_id:
context["user_id"] = user_id.strip()
context['user_id'] = user_id.strip()
with local_session() as session:
author = session.query(Author).filter(Author.user == user_id).first()
if author:
context["author_id"] = author.id
context["user_id"] = user_id or None
context['author_id'] = author.id
context['user_id'] = user_id or None
self.execution_context.context = context

View File

@ -3,7 +3,8 @@ from typing import Any, Callable, Dict, TypeVar
# from psycopg2.errors import UniqueViolation
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import Session, declarative_base
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from sqlalchemy.sql.schema import Table
from settings import DB_URL

View File

@ -1,16 +1,14 @@
from os import environ
PORT = 8000
PORT = 80
DB_URL = (
environ.get("DATABASE_URL", environ.get("DB_URL", "")).replace(
"postgres://", "postgresql://"
)
or "postgresql://postgres@localhost:5432/discoursio"
environ.get('DATABASE_URL', environ.get('DB_URL', '')).replace('postgres://', 'postgresql://')
or 'postgresql://postgres@localhost:5432/discoursio'
)
REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1"
API_BASE = environ.get("API_BASE") or "http://127.0.0.1:8001"
AUTH_URL = environ.get("AUTH_URL") or "http://127.0.0.1:8080/graphql"
MODE = environ.get("MODE") or "production"
SENTRY_DSN = environ.get("SENTRY_DSN")
DEV_SERVER_PID_FILE_NAME = "dev-server.pid"
REDIS_URL = environ.get('REDIS_URL') or 'redis://127.0.0.1'
API_BASE = environ.get('API_BASE') or 'https://core.discours.io'
AUTH_URL = environ.get('AUTH_URL') or 'https://auth.discours.io'
MODE = environ.get('MODE') or 'production'
SENTRY_DSN = environ.get('SENTRY_DSN')
DEV_SERVER_PID_FILE_NAME = 'dev-server.pid'