0.1.0-fixes
All checks were successful
deploy / deploy (push) Successful in 1m11s

This commit is contained in:
Untone 2023-12-22 12:09:03 +03:00
parent 5c6a680832
commit e22d5468ab
12 changed files with 340 additions and 171 deletions

View File

@ -2,15 +2,12 @@
### Что делает ### Что делает
- сохраняет тех, кому уведомления уже были отправлены (redis: authors-online)
- формирует дайджесты для остальных
- слушает Redis PubSub канал с обновлениями реакций - слушает Redis PubSub канал с обновлениями реакций
### Что НЕ делает ### Что НЕ делает
- не отправляет сообщения по SSE - не отправляет сообщения по SSE
- не определяет кому их отправлять
## Как разрабатывать локально ## Как разрабатывать локально

12
main.py
View File

@ -9,23 +9,23 @@ from sentry_sdk.integrations.strawberry import StrawberryIntegration
from starlette.applications import Starlette from starlette.applications import Starlette
from strawberry.asgi import GraphQL from strawberry.asgi import GraphQL
from resolvers.listener import reactions_worker from resolvers.listener import notifications_worker
from resolvers.schema import schema from resolvers.schema import schema
from services.rediscache import redis from services.rediscache import redis
from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN
async def start_up(): async def start_up():
await redis.connect()
task = asyncio.create_task(notifications_worker())
print(task)
if MODE == "dev": if MODE == "dev":
if exists(DEV_SERVER_PID_FILE_NAME): if exists(DEV_SERVER_PID_FILE_NAME):
await redis.connect()
else:
with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f: with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f:
f.write(str(os.getpid())) f.write(str(os.getpid()))
else: else:
await redis.connect()
notification_service_task = asyncio.create_task(reactions_worker())
print(f"[main.start_up] {notification_service_task}")
try: try:
import sentry_sdk import sentry_sdk

View File

@ -1,7 +1,6 @@
import time
from enum import Enum as Enumeration from enum import Enum as Enumeration
from sqlalchemy import JSON as JSONType from sqlalchemy import JSON as JSONType, func, cast
from sqlalchemy import Column, Enum, ForeignKey, Integer from sqlalchemy import Column, Enum, ForeignKey, Integer
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
@ -34,7 +33,7 @@ class NotificationSeen(Base):
class Notification(Base): class Notification(Base):
__tablename__ = "notification" __tablename__ = "notification"
created_at = Column(Integer, default=lambda: int(time.time())) created_at = Column(Integer, server_default=cast(func.current_timestamp(), Integer))
entity = Column(Enum(NotificationEntity), nullable=False) entity = Column(Enum(NotificationEntity), nullable=False)
action = Column(Enum(NotificationAction), nullable=False) action = Column(Enum(NotificationAction), nullable=False)
payload = Column(JSONType, nullable=True) payload = Column(JSONType, nullable=True)

View File

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "discoursio-notifier" name = "discoursio-notifier"
version = "0.0.3" version = "0.1.0"
description = "notifier server for discours.io" description = "notifier server for discours.io"
authors = ["discours.io devteam"] authors = ["discours.io devteam"]

View File

@ -1,13 +1,25 @@
from orm.notification import Notification from orm.notification import Notification
from resolvers.model import NotificationReaction, NotificationAuthor, NotificationShout
from services.db import local_session from services.db import local_session
from services.rediscache import redis from services.rediscache import redis
import asyncio
async def handle_reaction(notification: dict[str, str | int]): class ServiceMessage:
action: str
entity: str
payload: NotificationReaction | NotificationAuthor | NotificationShout
async def handle_notification(n: ServiceMessage, channel: str):
"""создаеёт новое хранимое уведомление""" """создаеёт новое хранимое уведомление"""
with local_session() as session: with local_session() as session:
try: try:
n = Notification(**notification) if channel.startswith("follower:"):
author_id = int(channel.split(":")[1])
if isinstance(n.payload, NotificationAuthor):
n.payload.following_id = author_id
n = Notification(action=n.action, entity=n.entity, payload=n.payload)
session.add(n) session.add(n)
session.commit() session.commit()
except Exception as e: except Exception as e:
@ -15,7 +27,15 @@ async def handle_reaction(notification: dict[str, str | int]):
print(f"[listener.handle_reaction] error: {str(e)}") print(f"[listener.handle_reaction] error: {str(e)}")
async def reactions_worker(): async def listen_task(pattern):
async for message in redis.listen("reaction"): async for message_data, channel in redis.listen(pattern):
if message: try:
await handle_reaction(message) notification_message = ServiceMessage(**message_data)
await handle_notification(notification_message, str(channel))
except Exception as e:
print(f"[listener.listen_task] Error processing notification: {str(e)}")
async def notifications_worker():
# Use asyncio.gather to run tasks concurrently
await asyncio.gather(listen_task("follower:*"), listen_task("reaction"), listen_task("shout"))

153
resolvers/load.py Normal file
View File

@ -0,0 +1,153 @@
from services.db import local_session
from resolvers.model import (
NotificationReaction,
Notification as NotificationMessage,
NotificationGroup,
NotificationShout,
NotificationAuthor,
NotificationsResult,
)
from orm.notification import NotificationSeen
from typing import Dict
import time, json
import strawberry
from sqlalchemy.orm import aliased
from sqlalchemy import select, and_
async def get_notifications_grouped(
author_id: int, after: int = 0, limit: int = 10, offset: int = 0, mark_as_read=False
) -> Dict[str, NotificationGroup]:
"""
Retrieves notifications for a given author.
Args:
author_id (int): The ID of the author for whom notifications are retrieved.
session: Database connection session
after (int, optional): If provided, only notifications created after this timestamp will be considered.
limit (int, optional): The maximum number of notifications to retrieve.
Returns:
Dict[str, NotificationGroup]: A dictionary where keys are thread IDs and values are NotificationGroup objects.
This function queries the database to retrieve notifications for the specified author, considering optional filters.
The result is a dictionary where each key is a thread ID, and the corresponding value is a NotificationGroup
containing information about the notifications within that thread.
NotificationGroup structure:
{
entity: str, # Type of entity (e.g., 'reaction', 'shout', 'follower').
updated_at: int, # Timestamp of the latest update in the thread.
reactions: List[Reaction], # List of reactions within the thread.
authors: List[Author], # List of authors involved in the thread.
}
"""
NotificationSeenAlias = aliased(NotificationSeen)
query = select(NotificationMessage, NotificationSeenAlias.viewer.label("seen")).outerjoin(
NotificationSeen,
and_(NotificationSeen.viewer == author_id, NotificationSeen.notification == NotificationMessage.id),
)
if after:
query = query.filter(NotificationMessage.created_at > after)
query = query.group_by(NotificationSeen.notification)
notifications: Dict[str, NotificationGroup] = {}
counter = 0
with local_session() as session:
for n, seen in session.execute(query):
thread_id = ""
payload = json.loads(n.payload)
print(f"[resolvers.schema] {n.action} {n.entity}: {payload}")
if n.entity == "shout":
shout: NotificationShout = payload
thread_id += f"{shout.id}"
if n.action == "delete":
del notifications[thread_id]
elif n.action == "create":
print(f"[resolvers.schema] create shout: {shout}")
notification_group = NotificationGroup(
entity=n.entity,
shout=shout,
authors=shout.authors,
updated_at=shout.created_at,
reactions=[],
action="create",
)
# store group in result
notifications[thread_id] = notification_group
counter += 1
elif n.entity == "reaction":
reaction: NotificationReaction = payload
shout: NotificationShout = reaction.shout
thread_id += f"{reaction.shout}"
if reaction.kind == "LIKE" or reaction.kind == "DISLIKE":
# TODO: making published reaction vote announce
pass
elif reaction.kind == "COMMENT":
if reaction.reply_to:
thread_id += f"{'::' + str(reaction.reply_to)}"
notification_group: NotificationGroup | None = notifications.get(thread_id)
if notification_group:
notification_group.shout = shout
notification_group.authors.append(reaction.created_by)
if not notification_group.reactions:
notification_group.reactions = []
notification_group.reactions.append(reaction.id)
# store group in result
notifications[thread_id] = notification_group
counter += 1
else:
counter += 1
if counter > limit:
break
else:
# init notification group
reactions = []
reactions.append(reaction.id)
notification_group = NotificationGroup(
action=n.action,
entity=n.entity,
updated_at=reaction.created_at,
reactions=reactions,
shout=shout,
authors=[
reaction.created_by,
],
)
# store group in result
notifications[thread_id] = notification_group
elif n.entity == "follower":
thread_id = "followers"
follower: NotificationAuthor = payload
notification_group = notifications.get(thread_id)
if not notification_group:
notification_group = NotificationGroup(
authors=[follower],
updated_at=int(time.time()),
shout=None,
reactions=[],
entity="follower",
action="follow",
)
# store group in result
notifications[thread_id] = notification_group
counter += 1
if counter > limit:
break
return notifications
@strawberry.type
class Query:
@strawberry.field
async def load_notifications(self, info, after: int, limit: int = 50, offset: int = 0) -> NotificationsResult:
author_id = info.context.get("author_id")
notification_groups: Dict[str, NotificationGroup] = {}
if author_id:
# TODO: add total counter calculation
# TODO: add unread counter calculation
notification_groups = await get_notifications_grouped(author_id, after, limit, offset)
notifications = sorted(notification_groups.values(), key=lambda group: group.updated_at, reverse=True)
return NotificationsResult(notifications=notifications, total=0, unread=0, error=None)

72
resolvers/model.py Normal file
View File

@ -0,0 +1,72 @@
import strawberry
from typing import List, Optional
from strawberry_sqlalchemy_mapper import StrawberrySQLAlchemyMapper
from orm.notification import Notification as NotificationMessage
strawberry_sqlalchemy_mapper = StrawberrySQLAlchemyMapper()
@strawberry_sqlalchemy_mapper.type(NotificationMessage)
class Notification:
id: int
action: str # create update delete join follow etc.
entity: str # REACTION SHOUT FOLLOWER
created_at: int
payload: str # JSON data
seen: List[int] # NOTE: adds author_id when seen
# TODO: add recipient defining field
@strawberry.type
class NotificationSeenResult:
error: str | None
@strawberry.type
class NotificationAuthor:
id: int
slug: str
name: str
pic: str
following_id: Optional[int]
@strawberry.type
class NotificationShout:
id: int
slug: str
title: str
created_at: int
authors: List[NotificationAuthor]
@strawberry.type
class NotificationReaction:
id: int
kind: str
shout: NotificationShout
reply_to: int
created_by: NotificationAuthor
created_at: int
@strawberry.type
class NotificationGroup:
authors: List[NotificationAuthor]
updated_at: int
entity: str
action: Optional[str]
shout: Optional[NotificationShout]
reactions: Optional[List[int]]
# latest reaction.created_at for reactions-updates
# no timestamp for followers-updates
# latest shout.created_at for shouts-updates
# you are invited in authors list
@strawberry.type
class NotificationsResult:
notifications: List[NotificationGroup]
unread: int
total: int
error: Optional[str]

View File

@ -1,145 +1,9 @@
import logging
from typing import List
from sqlalchemy.schema import Column
import strawberry import strawberry
from sqlalchemy import and_, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import aliased
from strawberry.schema.config import StrawberryConfig from strawberry.schema.config import StrawberryConfig
from strawberry_sqlalchemy_mapper import StrawberrySQLAlchemyMapper
from orm.author import Author
from orm.notification import Notification as NotificationMessage
from orm.notification import NotificationSeen
from services.auth import LoginRequiredMiddleware from services.auth import LoginRequiredMiddleware
from services.db import local_session from resolvers.load import Query
from resolvers.seen import Mutation
strawberry_sqlalchemy_mapper = StrawberrySQLAlchemyMapper()
# Инициализация логгера
logger = logging.getLogger(__name__)
@strawberry_sqlalchemy_mapper.type(NotificationMessage)
class Notification:
id: int
action: str # create update delete join follow etc.
entity: str # REACTION SHOUT
created_at: int
payload: str # JSON data
seen: List[int]
@strawberry.type
class NotificationSeenResult:
error: str | None
@strawberry.type
class NotificationsResult:
notifications: List[Notification]
unread: int
total: int
def get_notifications(author_id: int, session, after: int | Column[int], limit: int = 9999, offset: int = 0) -> List[Notification]:
NotificationSeenAlias = aliased(NotificationSeen)
query = (
select(NotificationMessage, NotificationSeenAlias.viewer.label("seen"))
.outerjoin(
NotificationSeen,
and_(NotificationSeen.viewer == author_id, NotificationSeen.notification == NotificationMessage.id),
)
.filter(NotificationMessage.created_at > after)
.group_by(NotificationSeen.notification)
)
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
notifications = []
for n, seen in session.execute(query):
ntf = Notification(
id=n.id,
payload=n.payload,
entity=n.entity,
action=n.action,
created_at=n.created_at,
seen=seen,
)
if ntf:
notifications.append(ntf)
return notifications
@strawberry.type
class Query:
@strawberry.field
async def load_notifications(self, info, after: int, limit: int = 50, offset: int = 0) -> NotificationsResult:
author_id = info.context.get("author_id")
with local_session() as session:
try:
if author_id:
notifications = get_notifications(author_id, session, after, limit, offset)
if notifications and len(notifications) > 0:
nr = NotificationsResult(
notifications=notifications,
unread=sum(1 for n in notifications if author_id in n.seen),
total=session.query(NotificationMessage).count(),
)
return nr
except Exception as ex:
import traceback
traceback.print_exc()
logger.error(f"[load_notifications] Ошибка при выполнении запроса к базе данных: {ex}")
return NotificationsResult(notifications=[], total=0, unread=0)
@strawberry.type
class Mutation:
@strawberry.mutation
async def mark_notification_as_read(self, info, notification_id: int) -> NotificationSeenResult:
author_id = info.context.get("author_id")
if author_id:
with local_session() as session:
try:
ns = NotificationSeen(notification=notification_id, viewer=author_id)
session.add(ns)
session.commit()
except SQLAlchemyError as e:
session.rollback()
logger.error(
f"[mark_notification_as_read] Ошибка при обновлении статуса прочтения уведомления: {str(e)}"
)
return NotificationSeenResult(error="cant mark as read")
return NotificationSeenResult(error=None)
@strawberry.mutation
async def mark_all_notifications_as_read(self, info) -> NotificationSeenResult:
author_id = info.context.get("author_id")
if author_id:
with local_session() as session:
try:
author = session.query(Author).filter(Author.id == author_id).first()
if author:
after = author.last_seen
nslist = get_notifications(author_id, session, after)
for n in nslist:
if author_id not in n.seen:
ns = NotificationSeen(viewer=author_id, notification=n.id)
session.add(ns)
session.commit()
except SQLAlchemyError as e:
session.rollback()
logger.error(
f"[mark_all_notifications_as_read] Ошибка обновления статуса прочтения всех уведомлений: {e}"
)
return NotificationSeenResult(error="cant mark as read")
return NotificationSeenResult(error=None)
schema = strawberry.Schema( schema = strawberry.Schema(
query=Query, mutation=Mutation, config=StrawberryConfig(auto_camel_case=False), extensions=[LoginRequiredMiddleware] query=Query, mutation=Mutation, config=StrawberryConfig(auto_camel_case=False), extensions=[LoginRequiredMiddleware]

45
resolvers/seen.py Normal file
View File

@ -0,0 +1,45 @@
from orm.notification import NotificationSeen
from services.db import local_session
from resolvers.model import NotificationSeenResult
from resolvers.load import get_notifications_grouped
import strawberry
import logging
from sqlalchemy.exc import SQLAlchemyError
logger = logging.getLogger(__name__)
@strawberry.type
class Mutation:
@strawberry.mutation
async def mark_notification_as_read(self, info, notification_id: int) -> NotificationSeenResult:
author_id = info.context.get("author_id")
if author_id:
with local_session() as session:
try:
ns = NotificationSeen(notification=notification_id, viewer=author_id)
session.add(ns)
session.commit()
except SQLAlchemyError as e:
session.rollback()
logger.error(
f"[mark_notification_as_read] Ошибка при обновлении статуса прочтения уведомления: {str(e)}"
)
return NotificationSeenResult(error="cant mark as read")
return NotificationSeenResult(error=None)
@strawberry.mutation
async def mark_all_notifications_as_read(self, info, limit: int = 10, offset: int = 0) -> NotificationSeenResult:
# TODO: use latest loaded notification_id as input offset parameter
ngroups = {}
error = None
try:
author_id = info.context.get("author_id")
if author_id:
ngroups = get_notifications_grouped(author_id, limit, offset, mark_as_read=True)
except Exception as e:
print(e)
error = "cant mark as read"
return NotificationSeenResult(error=error)

View File

@ -1,4 +1,3 @@
from functools import wraps
from aiohttp import ClientSession from aiohttp import ClientSession
from starlette.exceptions import HTTPException from starlette.exceptions import HTTPException
from strawberry.extensions import Extension from strawberry.extensions import Extension
@ -7,12 +6,13 @@ from settings import AUTH_URL
from services.db import local_session from services.db import local_session
from orm.author import Author from orm.author import Author
async def check_auth(req) -> str | None: async def check_auth(req) -> str | None:
token = req.headers.get("Authorization") token = req.headers.get("Authorization")
user_id = "" user_id = ""
if token: if token:
# Logging the authentication token # Logging the authentication token
print(f"[services.auth] checking auth token: {token}") # print(f"[services.auth] checking auth token: {token}")
query_name = "validate_jwt_token" query_name = "validate_jwt_token"
operation = "ValidateToken" operation = "ValidateToken"
headers = { headers = {
@ -42,11 +42,16 @@ async def check_auth(req) -> str | None:
print(f"[services.auth] errors: {errors}") print(f"[services.auth] errors: {errors}")
else: else:
user_id = data.get("data", {}).get(query_name, {}).get("claims", {}).get("sub") user_id = data.get("data", {}).get(query_name, {}).get("claims", {}).get("sub")
return user_id if user_id:
print(f"[services.auth] got user_id: {user_id}")
return user_id
except Exception as e: except Exception as e:
# Handling and logging exceptions during authentication check # Handling and logging exceptions during authentication check
print(f"[services.auth] {e}") print(f"[services.auth] {e}")
if not user_id:
raise HTTPException(status_code=401, detail="Unathorized")
class LoginRequiredMiddleware(Extension): class LoginRequiredMiddleware(Extension):
async def on_request_start(self): async def on_request_start(self):
@ -60,3 +65,5 @@ class LoginRequiredMiddleware(Extension):
if author: if author:
context["author_id"] = author.id context["author_id"] = author.id
context["user_id"] = user_id or None context["user_id"] = user_id or None
self.execution_context.context = context

View File

@ -1,4 +1,4 @@
from typing import Any, List from typing import Any
import aiohttp import aiohttp
@ -11,7 +11,7 @@ api_base = API_BASE or "https://core.discours.io"
async def _request_endpoint(query_name, body) -> Any: async def _request_endpoint(query_name, body) -> Any:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.post(API_BASE, headers=headers, json=body) as response: async with session.post(API_BASE, headers=headers, json=body) as response:
print(f"[services.core] {query_name} response: <{response.status}> {await response.text()}") print(f"[services.core] {query_name} HTTP Response {response.status} {await response.text()}")
if response.status == 200: if response.status == 200:
r = await response.json() r = await response.json()
if r: if r:
@ -27,10 +27,23 @@ async def get_followed_shouts(author_id: int):
{query_name}(author_id: $author_id, limit: $limit, offset: $offset) {{ id slug title }} {query_name}(author_id: $author_id, limit: $limit, offset: $offset) {{ id slug title }}
}}""" }}"""
body = { gql = {
"query": query, "query": query,
"operationName": operation, "operationName": operation,
"variables": {"author_id": author_id, "limit": 1000, "offset": 0}, # FIXME: too big limit "variables": {"author_id": author_id, "limit": 1000, "offset": 0}, # FIXME: too big limit
} }
return await _request_endpoint(query_name, body) return await _request_endpoint(query_name, gql)
async def get_shout(shout_id):
query_name = "get_shout"
operation = "GetShout"
query = f"""query {operation}($slug: String, $shout_id: Int) {{
{query_name}(slug: $slug, shout_id: $shout_id) {{ id slug title authors {{ id slug name pic }} }}
}}"""
gql = {"query": query, "operationName": operation, "variables": {"slug": None, "shout_id": shout_id}}
return await _request_endpoint(query_name, gql)

View File

@ -1,4 +1,3 @@
import asyncio
import json import json
import redis.asyncio as aredis import redis.asyncio as aredis
@ -49,19 +48,19 @@ class RedisCache:
return return
await self._client.publish(channel, data) await self._client.publish(channel, data)
async def listen(self, channel): async def listen(self, pattern):
if self._client: if self._client:
pubsub = self._client.pubsub() pubsub = self._client.pubsub()
await pubsub.subscribe(channel) await pubsub.psubscribe(pattern)
while True: while True:
message = await pubsub.get_message() message = await pubsub.get_message()
if message and isinstance(message["data"], (str, bytes, bytearray)): if message and isinstance(message["data"], (str, bytes, bytearray)):
print(f"[services.rediscache] msg: {message}")
try: try:
yield json.loads(message["data"]) yield json.loads(message["data"]), message.get("channel")
except json.JSONDecodeError as e: except Exception as e:
print(f"Error decoding JSON: {e}") print(f"[servoces.rediscache] Error: {e}")
await asyncio.sleep(0.1)
redis = RedisCache() redis = RedisCache()