Compare commits

..

No commits in common. "main" and "dev" have entirely different histories.
main ... dev

12 changed files with 315 additions and 262 deletions

View File

@ -1,4 +1,4 @@
name: 'Deploy to v2' name: "Deploy to notifier"
on: [push] on: [push]
jobs: jobs:
@ -21,6 +21,6 @@ jobs:
- name: Push to dokku - name: Push to dokku
uses: dokku/github-action@master uses: dokku/github-action@master
with: with:
branch: 'main' branch: "main"
git_remote_url: 'ssh://dokku@v2.discours.io:22/notifier' git_remote_url: "ssh://dokku@v2.discours.io:22/notifier"
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }} ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}

1
.gitignore vendored
View File

@ -6,4 +6,3 @@ __pycache__
poetry.lock poetry.lock
.venv .venv
.ruff_cache .ruff_cache
.pytest_cache

View File

@ -9,6 +9,7 @@ repos:
- id: trailing-whitespace - id: trailing-whitespace
- id: check-added-large-files - id: check-added-large-files
- id: detect-private-key - id: detect-private-key
- id: double-quote-string-fixer
- id: check-ast - id: check-ast
- id: check-merge-conflict - id: check-merge-conflict
@ -17,4 +18,3 @@ repos:
hooks: hooks:
- id: ruff - id: ruff
args: [--fix] args: [--fix]
- id: ruff-format

View File

@ -1,15 +1,22 @@
# Use an official Python runtime as a parent image
FROM python:slim FROM python:slim
# Set the working directory in the container to /app
WORKDIR /app WORKDIR /app
# Add metadata to the image to describe that the container is listening on port 80
EXPOSE 8000
# Copy the current directory contents into the container at /app
COPY . /app COPY . /app
RUN apt-get update && apt-get install -y git gcc curl postgresql && \ # Install any needed packages specified in pyproject.toml
RUN apt-get update && apt-get install -y gcc curl && \
curl -sSL https://install.python-poetry.org | python - && \ curl -sSL https://install.python-poetry.org | python - && \
echo "export PATH=$PATH:/root/.local/bin" >> ~/.bashrc && \ echo "export PATH=$PATH:/root/.local/bin" >> ~/.bashrc && \
. ~/.bashrc && \ . ~/.bashrc && \
poetry config virtualenvs.create false && \ poetry config virtualenvs.create false && \
poetry install --no-dev poetry install --no-dev
EXPOSE 8000 # Run server.py when the container launches
# Run server when the container launches
CMD python server.py CMD python server.py

View File

@ -1,6 +0,0 @@
import os
import sys
# Получаем путь к корневой директории проекта
root_path = os.path.abspath(os.path.dirname(__file__))
sys.path.append(root_path)

14
main.py
View File

@ -16,23 +16,23 @@ from services.rediscache import redis
from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN
logger = logging.getLogger("\t[main]\t") logger = logging.getLogger('\t[main]\t')
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
async def start_up(): async def start_up():
logger.info("[main] starting...") logger.info('[main] starting...')
await redis.connect() await redis.connect()
task = asyncio.create_task(notifications_worker()) task = asyncio.create_task(notifications_worker())
logger.info(task) logger.info(task)
if MODE == "dev": if MODE == 'dev':
if exists(DEV_SERVER_PID_FILE_NAME): if exists(DEV_SERVER_PID_FILE_NAME):
with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f: with open(DEV_SERVER_PID_FILE_NAME, 'w', encoding='utf-8') as f:
f.write(str(os.getpid())) f.write(str(os.getpid()))
else: else:
logger.info("[main] production mode") logger.info('[main] production mode')
try: try:
import sentry_sdk import sentry_sdk
@ -47,7 +47,7 @@ async def start_up():
], ],
) )
except Exception as e: except Exception as e:
logger.error("sentry init error", e) logger.error('sentry init error', e)
async def shutdown(): async def shutdown():
@ -55,4 +55,4 @@ async def shutdown():
app = Starlette(debug=True, on_startup=[start_up], on_shutdown=[shutdown]) app = Starlette(debug=True, on_startup=[start_up], on_shutdown=[shutdown])
app.mount("/", GraphQL(schema, debug=True)) app.mount('/', GraphQL(schema, graphiql=True, debug=True))

View File

@ -1,25 +1,113 @@
[tool.poetry] [tool.poetry]
name = "discoursio-notifier" name = "discoursio-notifier"
version = "0.3.0" version = "0.3.0"
description = "notifier service for discours.io" description = "notifier server for discours.io"
authors = ["Untone <anton.rewin@gmail.com>"] authors = ["discours.io devteam"]
license = "MIT"
readme = "README.md"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.12" python = "^3.12"
aiohttp = "^3.9.3" SQLAlchemy = "^2.0.22"
redis = {extras = ["hiredis"], version = "^5.0.1"}
strawberry-graphql = {extras = ["asgi", "debug-server"], version = "^0.219.2"}
granian = "^1.1.0"
sentry-sdk = {extras = ["strawberry"], version = "^1.40.4"}
strawberry-sqlalchemy-mapper = "^0.4.2"
psycopg2-binary = "^2.9.9" psycopg2-binary = "^2.9.9"
SQLAlchemy = "^2.0.27" redis = {extras = ["hiredis"], version = "^5.0.1"}
strawberry-graphql = {extras = ["asgi", "debug-server"], version = "^0.219.0" }
strawberry-sqlalchemy-mapper = "^0.4.2"
sentry-sdk = "^1.39.2"
aiohttp = "^3.9.1"
pre-commit = "^3.6.0"
granian = "^1.0.1"
discoursio-core = { git = "https://dev.discours.io/discours.io/core.git", branch = "feature/core" }
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
ruff = "^0.2.1" setuptools = "^69.0.2"
pyright = "^1.1.341"
pytest = "^7.4.2"
black = { version = "^23.12.0", python = ">=3.12" }
ruff = { version = "^0.1.15", python = ">=3.12" }
isort = "^5.13.2"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"
[tool.setuptools.dynamic]
version = {attr = "notifier.__version__"}
readme = {file = "README.md"}
[tool.ruff]
line-length = 120
extend-select = [
# E and F are enabled by default
'B', # flake8-bugbear
'C4', # flake8-comprehensions
'C90', # mccabe
'I', # isort
'N', # pep8-naming
'Q', # flake8-quotes
'S', # flake8-bandit
'W', # pycodestyle
]
extend-ignore = [
'B008', # function calls in args defaults are fine
'B009', # getattr with constants is fine
'B034', # re.split won't confuse us
'B904', # rising without from is fine
'E501', # leave line length to black
'N818', # leave to us exceptions naming
'S101', # assert is fine
'E712', # allow == True
]
flake8-quotes = { inline-quotes = 'single', multiline-quotes = 'double' }
mccabe = { max-complexity = 13 }
target-version = "py312"
[tool.ruff.format]
quote-style = 'single'
[tool.black]
skip-string-normalization = true
[tool.ruff.isort]
combine-as-imports = true
lines-after-imports = 2
known-first-party = ['resolvers', 'services', 'orm', 'tests']
[tool.ruff.per-file-ignores]
'tests/**' = ['B018', 'S110', 'S501']
[tool.mypy]
python_version = "3.12"
warn_return_any = true
warn_unused_configs = true
ignore_missing_imports = true
exclude = ["nb"]
[tool.pytest.ini_options]
asyncio_mode = 'auto'
[tool.pyright]
venvPath = "."
venv = ".venv"
include = ["."]
useLibraryCodeForTypes = true
disableLanguageServices = false
disableOrganizeImports = false
reportMissingImports = false
reportMissingModuleSource = "warning"
reportImportCycles = "warning"
maxMemoryForLargeFile = 4096
pythonVersion = "3.12"
autoImportCompletions = true
useVirtualEnv = true
typeCheckingMode = "basic"
disableJediCompletion = false
disableCompletion = false
disableSnippetCompletion = false
disableGoToDefinition = false
disableRenaming = false
disableSignatureHelp = false
diagnostics = true
logLevel = "Information"
pluginSearchPaths = []
typings = {}
mergeTypeStubPackages = false

View File

@ -1,7 +1,7 @@
import json import json
import logging import logging
import time import time
from typing import Dict, List, Tuple, Union from typing import Dict, List
import strawberry import strawberry
from sqlalchemy import and_, select from sqlalchemy import and_, select
@ -24,134 +24,13 @@ from resolvers.model import (
from services.db import local_session from services.db import local_session
logger = logging.getLogger("[resolvers.schema]") logger = logging.getLogger('[resolvers.schema] ')
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
def query_notifications( async def get_notifications_grouped( # noqa: C901
author_id: int, after: int = 0 author_id: int, after: int = 0, limit: int = 10, offset: int = 0
) -> Tuple[int, int, List[Tuple[Notification, bool]]]: ):
notification_seen_alias = aliased(NotificationSeen)
query = select(
Notification, notification_seen_alias.viewer.label("seen")
).outerjoin(
NotificationSeen,
and_(
NotificationSeen.viewer == author_id,
NotificationSeen.notification == Notification.id,
),
)
if after:
query = query.filter(Notification.created_at > after)
query = query.group_by(NotificationSeen.notification, Notification.created_at)
with local_session() as session:
total = (
session.query(Notification)
.filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
)
)
.count()
)
unread = (
session.query(Notification)
.filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
not_(Notification.seen),
)
)
.count()
)
notifications_result = session.execute(query)
notifications = []
for n, seen in notifications_result:
notifications.append((n, seen))
return total, unread, notifications
def process_shout_notification(
notification: Notification, seen: bool
) -> Union[Tuple[str, NotificationGroup], None] | None:
if not isinstance(notification.payload, str) or not isinstance(
notification.entity, str
):
return
payload = json.loads(notification.payload)
shout: NotificationShout = payload
thread_id = str(shout.id)
group = NotificationGroup(
id=thread_id,
entity=notification.entity,
shout=shout,
authors=shout.authors,
updated_at=shout.created_at,
reactions=[],
action="create",
seen=seen,
)
return thread_id, group
def process_reaction_notification(
notification: Notification, seen: bool
) -> Union[Tuple[str, NotificationGroup], None] | None:
if (
not isinstance(notification, Notification)
or not isinstance(notification.payload, str)
or not isinstance(notification.entity, str)
):
return
payload = json.loads(notification.payload)
reaction: NotificationReaction = payload
shout: NotificationShout = reaction.shout
thread_id = str(reaction.shout)
if reaction.kind == "COMMENT" and reaction.reply_to:
thread_id += f"::{reaction.reply_to}"
group = NotificationGroup(
id=thread_id,
action=str(notification.action),
entity=notification.entity,
updated_at=reaction.created_at,
reactions=[reaction.id],
shout=shout,
authors=[reaction.created_by],
seen=seen,
)
return thread_id, group
def process_follower_notification(
notification: Notification, seen: bool
) -> Union[Tuple[str, NotificationGroup], None] | None:
if not isinstance(notification.payload, str):
return
payload = json.loads(notification.payload)
follower: NotificationAuthor = payload
thread_id = "followers"
group = NotificationGroup(
id=thread_id,
authors=[follower],
updated_at=int(time.time()),
shout=None,
reactions=[],
entity="follower",
action="follow",
seen=seen,
)
return thread_id, group
async def get_notifications_grouped(
author_id: int, after: int = 0, limit: int = 10
) -> Tuple[Dict[str, NotificationGroup], int, int]:
""" """
Retrieves notifications for a given author. Retrieves notifications for a given author.
@ -177,66 +56,163 @@ async def get_notifications_grouped(
authors: List[NotificationAuthor], # List of authors involved in the thread. authors: List[NotificationAuthor], # List of authors involved in the thread.
} }
""" """
total, unread, notifications = query_notifications(author_id, after) seen_alias = aliased(NotificationSeen)
groups_by_thread: Dict[str, NotificationGroup] = {} query = select(Notification, seen_alias.viewer.label('seen')).outerjoin(
groups_amount = 0 NotificationSeen,
and_(
NotificationSeen.viewer == author_id,
NotificationSeen.notification == Notification.id,
),
)
if after:
query = query.filter(Notification.created_at > after)
query = query.group_by(NotificationSeen.notification, Notification.created_at)
for notification, seen in notifications: groups_amount = 0
if groups_amount >= limit: unread = 0
total = 0
notifications_by_thread: Dict[str, List[Notification]] = {}
groups_by_thread: Dict[str, NotificationGroup] = {}
with local_session() as session:
total = (
session.query(Notification)
.filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
)
)
.count()
)
unread = (
session.query(Notification)
.filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
not_(Notification.seen),
)
)
.count()
)
notifications_result = session.execute(query)
for n, _seen in notifications_result:
thread_id = ''
payload = json.loads(n.payload)
logger.debug(f'[resolvers.schema] {n.action} {n.entity}: {payload}')
if n.entity == 'shout' and n.action == 'create':
shout: NotificationShout = payload
thread_id += f'{shout.id}'
logger.debug(f'create shout: {shout}')
group = groups_by_thread.get(thread_id) or NotificationGroup(
id=thread_id,
entity=n.entity,
shout=shout,
authors=shout.authors,
updated_at=shout.created_at,
reactions=[],
action='create',
seen=author_id in n.seen,
)
# store group in result
groups_by_thread[thread_id] = group
notifications = notifications_by_thread.get(thread_id, [])
if n not in notifications:
notifications.append(n)
notifications_by_thread[thread_id] = notifications
groups_amount += 1
elif n.entity == NotificationEntity.REACTION.value and n.action == NotificationAction.CREATE.value:
reaction: NotificationReaction = payload
shout: NotificationShout = reaction.shout
thread_id += f'{reaction.shout}'
if not bool(reaction.reply_to) and (reaction.kind == 'LIKE' or reaction.kind == 'DISLIKE'):
# TODO: making published reaction vote announce
pass
elif reaction.kind == 'COMMENT':
if reaction.reply_to:
thread_id += f"{'::' + str(reaction.reply_to)}"
group: NotificationGroup | None = groups_by_thread.get(thread_id)
notifications: List[Notification] = notifications_by_thread.get(thread_id, [])
if group and notifications:
group.seen = False # any not seen notification make it false
group.shout = shout
group.authors.append(reaction.created_by)
if not group.reactions:
group.reactions = []
group.reactions.append(reaction.id)
# store group in result
groups_by_thread[thread_id] = group
notifications = notifications_by_thread.get(thread_id, [])
if n not in notifications:
notifications.append(n)
notifications_by_thread[thread_id] = notifications
groups_amount += 1
else:
groups_amount += 1
if groups_amount > limit:
break
else:
# init notification group
reactions = [
reaction.id,
]
group = NotificationGroup(
id=thread_id,
action=n.action,
entity=n.entity,
updated_at=reaction.created_at,
reactions=reactions,
shout=shout,
authors=[
reaction.created_by,
],
seen=author_id in n.seen,
)
# store group in result
groups_by_thread[thread_id] = group
notifications = notifications_by_thread.get(thread_id, [])
if n not in notifications:
notifications.append(n)
notifications_by_thread[thread_id] = notifications
elif n.entity == 'follower':
thread_id = 'followers'
follower: NotificationAuthor = payload
group = groups_by_thread.get(thread_id) or NotificationGroup(
id=thread_id,
authors=[follower],
updated_at=int(time.time()),
shout=None,
reactions=[],
entity='follower',
action='follow',
seen=author_id in n.seen,
)
group.authors = [
follower,
]
group.updated_at = int(time.time())
# store group in result
groups_by_thread[thread_id] = group
notifications = notifications_by_thread.get(thread_id, [])
if n not in notifications:
notifications.append(n)
notifications_by_thread[thread_id] = notifications
groups_amount += 1
if groups_amount > limit:
break break
if str(notification.entity) == "shout" and str(notification.action) == "create": return groups_by_thread, notifications_by_thread, unread, total
result = process_shout_notification(notification, seen)
if result:
thread_id, group = result
groups_by_thread[thread_id] = group
groups_amount += 1
elif (
str(notification.entity) == NotificationEntity.REACTION.value
and str(notification.action) == NotificationAction.CREATE.value
):
result = process_reaction_notification(notification, seen)
if result:
thread_id, group = result
existing_group = groups_by_thread.get(thread_id)
if existing_group:
existing_group.seen = False
existing_group.shout = group.shout
existing_group.authors.append(group.authors[0])
if not existing_group.reactions:
existing_group.reactions = []
existing_group.reactions.extend(group.reactions or [])
groups_by_thread[thread_id] = existing_group
else:
groups_by_thread[thread_id] = group
groups_amount += 1
elif str(notification.entity) == "follower":
result = process_follower_notification(notification, seen)
if result:
thread_id, group = result
groups_by_thread[thread_id] = group
groups_amount += 1
return groups_by_thread, unread, total
@strawberry.type @strawberry.type
class Query: class Query:
@strawberry.field @strawberry.field
async def load_notifications( async def load_notifications(self, info, after: int, limit: int = 50, offset: int = 0) -> NotificationsResult:
self, info, after: int, limit: int = 50, offset: int = 0 author_id = info.context.get('author_id')
) -> NotificationsResult: groups: Dict[str, NotificationGroup] = {}
author_id = info.context.get("author_id")
if author_id: if author_id:
groups, unread, total = await get_notifications_grouped( groups, notifications, total, unread = await get_notifications_grouped(author_id, after, limit, offset)
author_id, after, limit notifications = sorted(groups.values(), key=lambda group: group.updated_at, reverse=True)
) return NotificationsResult(notifications=notifications, total=0, unread=0, error=None)
notifications = sorted(
groups.values(), key=lambda group: group.updated_at, reverse=True
)
return NotificationsResult(
notifications=notifications, total=total, unread=unread, error=None
)
return NotificationsResult(notifications=[], total=0, unread=0, error=None)

View File

@ -1,15 +1,14 @@
from granian.constants import Interfaces from granian.constants import Interfaces
from granian.server import Granian from granian.server import Granian
from settings import PORT
if __name__ == "__main__": if __name__ == '__main__':
print("[server] started") print('[server] started')
granian_instance = Granian( granian_instance = Granian(
"main:app", 'main:app',
address="0.0.0.0", # noqa S104 address='0.0.0.0', # noqa S104
port=PORT, port=8000,
workers=2, workers=2,
threads=2, threads=2,
websockets=False, websockets=False,

View File

@ -8,62 +8,53 @@ from services.db import local_session
from settings import AUTH_URL from settings import AUTH_URL
logger = logging.getLogger("\t[services.auth]\t") logger = logging.getLogger('\t[services.auth]\t')
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
async def check_auth(req) -> str | None: async def check_auth(req) -> str | None:
token = req.headers.get("Authorization") token = req.headers.get('Authorization')
user_id = "" user_id = ''
if token: if token:
query_name = "validate_jwt_token" query_name = 'validate_jwt_token'
operation = "ValidateToken" operation = 'ValidateToken'
headers = { headers = {
"Content-Type": "application/json", 'Content-Type': 'application/json',
} }
variables = { variables = {
"params": { 'params': {
"token_type": "access_token", 'token_type': 'access_token',
"token": token, 'token': token,
} }
} }
gql = { gql = {
"query": f"query {operation}($params: ValidateJWTTokenInput!) {{ {query_name}(params: $params) {{ is_valid claims }} }}", 'query': f'query {operation}($params: ValidateJWTTokenInput!) {{ {query_name}(params: $params) {{ is_valid claims }} }}',
"variables": variables, 'variables': variables,
"operationName": operation, 'operationName': operation,
} }
try: try:
# Asynchronous HTTP request to the authentication server # Asynchronous HTTP request to the authentication server
async with ClientSession() as session: async with ClientSession() as session:
async with session.post( async with session.post(AUTH_URL, json=gql, headers=headers) as response:
AUTH_URL, json=gql, headers=headers print(f'[services.auth] HTTP Response {response.status} {await response.text()}')
) as response:
logger.debug(
f"HTTP Response {response.status} {await response.text()}"
)
if response.status == 200: if response.status == 200:
data = await response.json() data = await response.json()
errors = data.get("errors") errors = data.get('errors')
if errors: if errors:
logger.error(f"errors: {errors}") print(f'errors: {errors}')
else: else:
user_id = ( user_id = data.get('data', {}).get(query_name, {}).get('claims', {}).get('sub')
data.get("data", {})
.get(query_name, {})
.get("claims", {})
.get("sub")
)
if user_id: if user_id:
logger.info(f"got user_id: {user_id}") print(f'[services.auth] got user_id: {user_id}')
return user_id return user_id
except Exception as e: except Exception as e:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
# Handling and logging exceptions during authentication check # Handling and logging exceptions during authentication check
logger.error(f"Error {e}") print(f'[services.auth] Error {e}')
return None return None
@ -71,14 +62,14 @@ async def check_auth(req) -> str | None:
class LoginRequiredMiddleware(Extension): class LoginRequiredMiddleware(Extension):
async def on_request_start(self): async def on_request_start(self):
context = self.execution_context.context context = self.execution_context.context
req = context.get("request") req = context.get('request')
user_id = await check_auth(req) user_id = await check_auth(req)
if user_id: if user_id:
context["user_id"] = user_id.strip() context['user_id'] = user_id.strip()
with local_session() as session: with local_session() as session:
author = session.query(Author).filter(Author.user == user_id).first() author = session.query(Author).filter(Author.user == user_id).first()
if author: if author:
context["author_id"] = author.id context['author_id'] = author.id
context["user_id"] = user_id or None context['user_id'] = user_id or None
self.execution_context.context = context self.execution_context.context = context

View File

@ -3,7 +3,8 @@ from typing import Any, Callable, Dict, TypeVar
# from psycopg2.errors import UniqueViolation # from psycopg2.errors import UniqueViolation
from sqlalchemy import Column, Integer, create_engine from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import Session, declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from sqlalchemy.sql.schema import Table from sqlalchemy.sql.schema import Table
from settings import DB_URL from settings import DB_URL

View File

@ -1,16 +1,14 @@
from os import environ from os import environ
PORT = 8000 PORT = 80
DB_URL = ( DB_URL = (
environ.get("DATABASE_URL", environ.get("DB_URL", "")).replace( environ.get('DATABASE_URL', environ.get('DB_URL', '')).replace('postgres://', 'postgresql://')
"postgres://", "postgresql://" or 'postgresql://postgres@localhost:5432/discoursio'
)
or "postgresql://postgres@localhost:5432/discoursio"
) )
REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1" REDIS_URL = environ.get('REDIS_URL') or 'redis://127.0.0.1'
API_BASE = environ.get("API_BASE") or "http://127.0.0.1:8001" API_BASE = environ.get('API_BASE') or 'https://core.discours.io'
AUTH_URL = environ.get("AUTH_URL") or "http://127.0.0.1:8080/graphql" AUTH_URL = environ.get('AUTH_URL') or 'https://auth.discours.io'
MODE = environ.get("MODE") or "production" MODE = environ.get('MODE') or 'production'
SENTRY_DSN = environ.get("SENTRY_DSN") SENTRY_DSN = environ.get('SENTRY_DSN')
DEV_SERVER_PID_FILE_NAME = "dev-server.pid" DEV_SERVER_PID_FILE_NAME = 'dev-server.pid'