From 393910be4cb8dd9c631d0c7b07487a692f450d13 Mon Sep 17 00:00:00 2001 From: Untone Date: Sun, 18 Feb 2024 12:50:18 +0300 Subject: [PATCH] init --- .DS_Store | Bin 0 -> 6148 bytes orm/author.py | 45 +++++++++ orm/collection.py | 25 +++++ orm/community.py | 41 ++++++++ orm/invite.py | 27 ++++++ orm/notification.py | 41 ++++++++ orm/reaction.py | 43 +++++++++ orm/shout.py | 83 ++++++++++++++++ orm/topic.py | 26 +++++ orm/user.py | 30 ++++++ pyproject.toml | 24 +++++ services/auth.py | 113 ++++++++++++++++++++++ services/core.py | 153 +++++++++++++++++++++++++++++ services/db.py | 72 ++++++++++++++ services/diff.py | 47 +++++++++ services/notify.py | 45 +++++++++ services/presence.py | 24 +++++ services/rediscache.py | 59 ++++++++++++ services/search.py | 184 +++++++++++++++++++++++++++++++++++ services/sentry.py | 33 +++++++ services/unread.py | 24 +++++ services/viewed.py | 213 +++++++++++++++++++++++++++++++++++++++++ services/webhook.py | 36 +++++++ settings.py | 18 ++++ 24 files changed, 1406 insertions(+) create mode 100644 .DS_Store create mode 100644 orm/author.py create mode 100644 orm/collection.py create mode 100644 orm/community.py create mode 100644 orm/invite.py create mode 100644 orm/notification.py create mode 100644 orm/reaction.py create mode 100644 orm/shout.py create mode 100644 orm/topic.py create mode 100644 orm/user.py create mode 100644 pyproject.toml create mode 100644 services/auth.py create mode 100644 services/core.py create mode 100644 services/db.py create mode 100644 services/diff.py create mode 100644 services/notify.py create mode 100644 services/presence.py create mode 100644 services/rediscache.py create mode 100644 services/search.py create mode 100644 services/sentry.py create mode 100644 services/unread.py create mode 100644 services/viewed.py create mode 100644 services/webhook.py create mode 100644 settings.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..cae94efba517a6084a287682e8b7d3702c88ce8e GIT binary patch literal 6148 zcmeHKJ5Iwu5S>X<7$Kn)6qI`e(okZTSX?x$ z6g8dLnh&;}**X+XyJP*3!il4zkIsNI&}5*imwl=K=im4L%_P5a2AqLE#Q^v6aX!K= zS#53IoYdL?y@ZNLT&1{4!6cPp#7ZeXfhK`H$OIS-D@9l!{v(iR@WC1QQ3k#M<-$xA literal 0 HcmV?d00001 diff --git a/orm/author.py b/orm/author.py new file mode 100644 index 0000000..49bf6d1 --- /dev/null +++ b/orm/author.py @@ -0,0 +1,45 @@ +import time + +from sqlalchemy import JSON, Boolean, Column, ForeignKey, Integer, String +from sqlalchemy.orm import relationship + +from services.db import Base + + +class AuthorRating(Base): + __tablename__ = 'author_rating' + + id = None # type: ignore + rater = Column(ForeignKey('author.id'), primary_key=True, index=True) + author = Column(ForeignKey('author.id'), primary_key=True, index=True) + plus = Column(Boolean) + + +class AuthorFollower(Base): + __tablename__ = 'author_follower' + + id = None # type: ignore + follower = Column(ForeignKey('author.id'), primary_key=True, index=True) + author = Column(ForeignKey('author.id'), primary_key=True, index=True) + created_at = Column(Integer, nullable=False, default=lambda: int(time.time())) + auto = Column(Boolean, nullable=False, default=False) + + +class Author(Base): + __tablename__ = 'author' + + user = Column(String, unique=True) # unbounded link with authorizer's User type + + name = Column(String, nullable=True, comment='Display name') + slug = Column(String, unique=True, comment="Author's slug") + bio = Column(String, nullable=True, comment='Bio') # status description + about = Column(String, nullable=True, comment='About') # long and formatted + pic = Column(String, nullable=True, comment='Picture') + links = Column(JSON, nullable=True, comment='Links') + + ratings = relationship(AuthorRating, foreign_keys=AuthorRating.author) + + created_at = Column(Integer, nullable=False, default=lambda: int(time.time())) + last_seen = Column(Integer, nullable=False, default=lambda: int(time.time())) + updated_at = Column(Integer, nullable=False, default=lambda: int(time.time())) + deleted_at = Column(Integer, nullable=True, comment='Deleted at') diff --git a/orm/collection.py b/orm/collection.py new file mode 100644 index 0000000..87592bc --- /dev/null +++ b/orm/collection.py @@ -0,0 +1,25 @@ +import time + +from sqlalchemy import Column, ForeignKey, Integer, String + +from services.db import Base + + +class ShoutCollection(Base): + __tablename__ = 'shout_collection' + + id = None # type: ignore + shout = Column(ForeignKey('shout.id'), primary_key=True) + collection = Column(ForeignKey('collection.id'), primary_key=True) + + +class Collection(Base): + __tablename__ = 'collection' + + slug = Column(String, unique=True) + title = Column(String, nullable=False, comment='Title') + body = Column(String, nullable=True, comment='Body') + pic = Column(String, nullable=True, comment='Picture') + created_at = Column(Integer, default=lambda: int(time.time())) + created_by = Column(ForeignKey('author.id'), comment='Created By') + published_at = Column(Integer, default=lambda: int(time.time())) diff --git a/orm/community.py b/orm/community.py new file mode 100644 index 0000000..ff156bd --- /dev/null +++ b/orm/community.py @@ -0,0 +1,41 @@ +import time + +from sqlalchemy import Column, ForeignKey, Integer, String +from sqlalchemy.orm import relationship + +from orm.author import Author +from services.db import Base, local_session + + +class CommunityAuthor(Base): + __tablename__ = 'community_author' + + id = None # type: ignore + author = Column(ForeignKey('author.id'), primary_key=True) + community = Column(ForeignKey('community.id'), primary_key=True) + joined_at = Column(Integer, nullable=False, default=lambda: int(time.time())) + role = Column(String, nullable=False) + + +class Community(Base): + __tablename__ = 'community' + + name = Column(String, nullable=False) + slug = Column(String, nullable=False, unique=True) + desc = Column(String, nullable=False, default='') + pic = Column(String, nullable=False, default='') + created_at = Column(Integer, nullable=False, default=lambda: int(time.time())) + + authors = relationship(lambda: Author, secondary=CommunityAuthor.__tablename__) + + @staticmethod + def init_table(): + with local_session('orm.community') as session: + d = session.query(Community).filter(Community.slug == 'discours').first() + if not d: + d = Community(name='Дискурс', slug='discours') + session.add(d) + session.commit() + print('[orm.community] created community %s' % d.slug) + Community.default_community = d + print('[orm.community] default community is %s' % d.slug) diff --git a/orm/invite.py b/orm/invite.py new file mode 100644 index 0000000..972df37 --- /dev/null +++ b/orm/invite.py @@ -0,0 +1,27 @@ +from enum import Enum as Enumeration + +from sqlalchemy import Column, ForeignKey, String +from sqlalchemy.orm import relationship + +from orm.author import Author +from orm.shout import Shout +from services.db import Base + + +class InviteStatus(Enumeration): + PENDING = 'PENDING' + ACCEPTED = 'ACCEPTED' + REJECTED = 'REJECTED' + + +class Invite(Base): + __tablename__ = 'invite' + + inviter_id = Column(ForeignKey('author.id'), nullable=False, index=True) + author_id = Column(ForeignKey('author.id'), nullable=False, index=True) + shout_id = Column(ForeignKey('shout.id'), nullable=False, index=True) + status = Column(String, default=InviteStatus.PENDING.value) + + inviter = relationship(Author, foreign_keys=[inviter_id]) + author = relationship(Author, foreign_keys=[author_id]) + shout = relationship(Shout) diff --git a/orm/notification.py b/orm/notification.py new file mode 100644 index 0000000..2b09ea1 --- /dev/null +++ b/orm/notification.py @@ -0,0 +1,41 @@ +import time +from enum import Enum as Enumeration + +from sqlalchemy import JSON, Column, ForeignKey, Integer, String +from sqlalchemy.orm import relationship + +from orm.author import Author +from services.db import Base + + +class NotificationEntity(Enumeration): + REACTION = 'reaction' + SHOUT = 'shout' + FOLLOWER = 'follower' + + +class NotificationAction(Enumeration): + CREATE = 'create' + UPDATE = 'update' + DELETE = 'delete' + SEEN = 'seen' + FOLLOW = 'follow' + UNFOLLOW = 'unfollow' + + +class NotificationSeen(Base): + __tablename__ = 'notification_seen' + + viewer = Column(ForeignKey('author.id')) + notification = Column(ForeignKey('notification.id')) + + +class Notification(Base): + __tablename__ = 'notification' + + created_at = Column(Integer, server_default=str(int(time.time()))) + entity = Column(String, nullable=False) + action = Column(String, nullable=False) + payload = Column(JSON, nullable=True) + + seen = relationship(lambda: Author, secondary='notification_seen') diff --git a/orm/reaction.py b/orm/reaction.py new file mode 100644 index 0000000..5ceb27a --- /dev/null +++ b/orm/reaction.py @@ -0,0 +1,43 @@ +import time +from enum import Enum as Enumeration + +from sqlalchemy import Column, ForeignKey, Integer, String + +from services.db import Base + + +class ReactionKind(Enumeration): + # TYPE = # rating diff + + # editor mode + AGREE = 'AGREE' # +1 + DISAGREE = 'DISAGREE' # -1 + ASK = 'ASK' # +0 + PROPOSE = 'PROPOSE' # +0 + PROOF = 'PROOF' # +1 + DISPROOF = 'DISPROOF' # -1 + ACCEPT = 'ACCEPT' # +1 + REJECT = 'REJECT' # -1 + + # public feed + QUOTE = 'QUOTE' # +0 TODO: use to bookmark in collection + COMMENT = 'COMMENT' # +0 + LIKE = 'LIKE' # +1 + DISLIKE = 'DISLIKE' # -1 + + +class Reaction(Base): + __tablename__ = 'reaction' + + body = Column(String, default='', comment='Reaction Body') + created_at = Column(Integer, nullable=False, default=lambda: int(time.time())) + updated_at = Column(Integer, nullable=True, comment='Updated at') + deleted_at = Column(Integer, nullable=True, comment='Deleted at') + deleted_by = Column(ForeignKey('author.id'), nullable=True, index=True) + reply_to = Column(ForeignKey('reaction.id'), nullable=True) + quote = Column(String, nullable=True, comment='Original quoted text') + shout = Column(ForeignKey('shout.id'), nullable=False, index=True) + created_by = Column(ForeignKey('author.id'), nullable=False, index=True) + kind = Column(String, nullable=False, index=True) + + oid = Column(String) diff --git a/orm/shout.py b/orm/shout.py new file mode 100644 index 0000000..2757bcb --- /dev/null +++ b/orm/shout.py @@ -0,0 +1,83 @@ +import time + +from sqlalchemy import JSON, Boolean, Column, ForeignKey, Integer, String +from sqlalchemy.orm import relationship + +from orm.author import Author +from orm.community import Community +from orm.reaction import Reaction +from orm.topic import Topic +from services.db import Base + + +class ShoutTopic(Base): + __tablename__ = 'shout_topic' + + id = None # type: ignore + shout = Column(ForeignKey('shout.id'), primary_key=True, index=True) + topic = Column(ForeignKey('topic.id'), primary_key=True, index=True) + main = Column(Boolean, nullable=True) + + +class ShoutReactionsFollower(Base): + __tablename__ = 'shout_reactions_followers' + + id = None # type: ignore + follower = Column(ForeignKey('author.id'), primary_key=True, index=True) + shout = Column(ForeignKey('shout.id'), primary_key=True, index=True) + auto = Column(Boolean, nullable=False, default=False) + created_at = Column(Integer, nullable=False, default=lambda: int(time.time())) + deleted_at = Column(Integer, nullable=True) + + +class ShoutAuthor(Base): + __tablename__ = 'shout_author' + + id = None # type: ignore + shout = Column(ForeignKey('shout.id'), primary_key=True, index=True) + author = Column(ForeignKey('author.id'), primary_key=True, index=True) + caption = Column(String, nullable=True, default='') + + +class ShoutCommunity(Base): + __tablename__ = 'shout_community' + + id = None # type: ignore + shout = Column(ForeignKey('shout.id'), primary_key=True, index=True) + community = Column(ForeignKey('community.id'), primary_key=True, index=True) + + +class Shout(Base): + __tablename__ = 'shout' + + created_at = Column(Integer, nullable=False, default=lambda: int(time.time())) + updated_at = Column(Integer, nullable=True) + published_at = Column(Integer, nullable=True) + featured_at = Column(Integer, nullable=True) + deleted_at = Column(Integer, nullable=True) + + created_by = Column(ForeignKey('author.id'), nullable=False) + updated_by = Column(ForeignKey('author.id'), nullable=True) + deleted_by = Column(ForeignKey('author.id'), nullable=True) + + body = Column(String, nullable=False, comment='Body') + slug = Column(String, unique=True) + cover = Column(String, nullable=True, comment='Cover image url') + cover_caption = Column(String, nullable=True, comment='Cover image alt caption') + lead = Column(String, nullable=True) + description = Column(String, nullable=True) + title = Column(String, nullable=False) + subtitle = Column(String, nullable=True) + layout = Column(String, nullable=False, default='article') + media = Column(JSON, nullable=True) + + authors = relationship(lambda: Author, secondary='shout_author') + topics = relationship(lambda: Topic, secondary='shout_topic') + communities = relationship(lambda: Community, secondary='shout_community') + reactions = relationship(lambda: Reaction) + + lang = Column(String, nullable=False, default='ru', comment='Language') + version_of = Column(ForeignKey('shout.id'), nullable=True) + oid = Column(String, nullable=True) + + seo = Column(String, nullable=True) # JSON diff --git a/orm/topic.py b/orm/topic.py new file mode 100644 index 0000000..928b012 --- /dev/null +++ b/orm/topic.py @@ -0,0 +1,26 @@ +import time + +from sqlalchemy import Boolean, Column, ForeignKey, Integer, String + +from services.db import Base + + +class TopicFollower(Base): + __tablename__ = 'topic_followers' + + id = None # type: ignore + follower = Column(ForeignKey('author.id'), primary_key=True, index=True) + topic = Column(ForeignKey('topic.id'), primary_key=True, index=True) + created_at = Column(Integer, nullable=False, default=lambda: int(time.time())) + auto = Column(Boolean, nullable=False, default=False) + + +class Topic(Base): + __tablename__ = 'topic' + + slug = Column(String, unique=True) + title = Column(String, nullable=False, comment='Title') + body = Column(String, nullable=True, comment='Body') + pic = Column(String, nullable=True, comment='Picture') + community = Column(ForeignKey('community.id'), default=1) + oid = Column(String, nullable=True, comment='Old ID') diff --git a/orm/user.py b/orm/user.py new file mode 100644 index 0000000..6001b2e --- /dev/null +++ b/orm/user.py @@ -0,0 +1,30 @@ +import time + +from sqlalchemy import Boolean, Column, Integer, String + +from services.db import Base + + +class User(Base): + __tablename__ = 'authorizer_users' + + id = Column(String, primary_key=True, unique=True, nullable=False, default=None) + key = Column(String) + email = Column(String, unique=True) + email_verified_at = Column(Integer) + family_name = Column(String) + gender = Column(String) + given_name = Column(String) + is_multi_factor_auth_enabled = Column(Boolean) + middle_name = Column(String) + nickname = Column(String) + password = Column(String) + phone_number = Column(String, unique=True) + phone_number_verified_at = Column(Integer) + # preferred_username = Column(String, nullable=False) + picture = Column(String) + revoked_timestamp = Column(Integer) + roles = Column(String, default='author, reader') + signup_methods = Column(String, default='magic_link_login') + created_at = Column(Integer, default=lambda: int(time.time())) + updated_at = Column(Integer, default=lambda: int(time.time())) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..db44d85 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,24 @@ +[tool.poetry] +name = "discoursio" +version = "0.3.0" +description = "shared code for discours.io" +authors = ["discours.io devteam"] +license = "MIT" + +[tool.poetry.dependencies] +python = "^3.12" +SQLAlchemy = "^2.0.22" +psycopg2-binary = "^2.9.9" +redis = {extras = ["hiredis"], version = "^5.0.1"} +sentry-sdk = { version = "^1.4.1", extras = ["starlette", "aiohttp", "ariadne", "sqlalchemy"] } +starlette = "^0.36.1" +aiohttp = "^3.9.1" +google-analytics-data = "^0.18.3" +opensearch-py = "^2.4.2" + +[tool.poetry.group.dev.dependencies] +ruff = "^0.2.1" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/services/auth.py b/services/auth.py new file mode 100644 index 0000000..0956762 --- /dev/null +++ b/services/auth.py @@ -0,0 +1,113 @@ +import logging +from functools import wraps + +from aiohttp import ClientSession +from starlette.exceptions import HTTPException + +from settings import ADMIN_SECRET, AUTH_URL + + +logger = logging.getLogger('\t[services.auth]\t') +logger.setLevel(logging.DEBUG) + + +async def request_data(gql, headers=None): + if headers is None: + headers = {'Content-Type': 'application/json'} + try: + # Asynchronous HTTP request to the authentication server + async with ClientSession() as session: + async with session.post(AUTH_URL, json=gql, headers=headers) as response: + if response.status == 200: + data = await response.json() + errors = data.get('errors') + if errors: + logger.error(f'HTTP Errors: {errors}') + else: + return data + except Exception as e: + # Handling and logging exceptions during authentication check + logger.error(f'[services.auth] request_data error: {e}') + return None + + +async def check_auth(req): + token = req.headers.get('Authorization') + user_id = '' + if token: + # Logging the authentication token + logger.debug(f'{token}') + query_name = 'validate_jwt_token' + operation = 'ValidateToken' + variables = { + 'params': { + 'token_type': 'access_token', + 'token': token, + } + } + + gql = { + 'query': f'query {operation}($params: ValidateJWTTokenInput!) {{' + + f'{query_name}(params: $params) {{ is_valid claims }} ' + + '}', + 'variables': variables, + 'operationName': operation, + } + data = await request_data(gql) + if data: + user_data = data.get('data', {}).get(query_name, {}).get('claims', {}) + user_id = user_data.get('sub') + user_roles = user_data.get('allowed_roles') + return [user_id, user_roles] + + if not user_id: + raise HTTPException(status_code=401, detail='Unauthorized') + + +async def add_user_role(user_id): + logger.info(f'[services.auth] add author role for user_id: {user_id}') + query_name = '_update_user' + operation = 'UpdateUserRoles' + headers = { + 'Content-Type': 'application/json', + 'x-authorizer-admin-secret': ADMIN_SECRET, + } + variables = {'params': {'roles': 'author, reader', 'id': user_id}} + gql = { + 'query': f'mutation {operation}($params: UpdateUserInput!) {{ {query_name}(params: $params) {{ id roles }} }}', + 'variables': variables, + 'operationName': operation, + } + data = await request_data(gql, headers) + if data: + user_id = data.get('data', {}).get(query_name, {}).get('id') + return user_id + + +def login_required(f): + @wraps(f) + async def decorated_function(*args, **kwargs): + info = args[1] + context = info.context + req = context.get('request') + [user_id, user_roles] = (await check_auth(req)) or [] + if user_id and user_roles: + logger.info(f' got {user_id} roles: {user_roles}') + context['user_id'] = user_id.strip() + context['roles'] = user_roles + return await f(*args, **kwargs) + + return decorated_function + + +def auth_request(f): + @wraps(f) + async def decorated_function(*args, **kwargs): + req = args[0] + [user_id, user_roles] = (await check_auth(req)) or [] + if user_id: + req['user_id'] = user_id.strip() + req['roles'] = user_roles + return await f(*args, **kwargs) + + return decorated_function diff --git a/services/core.py b/services/core.py new file mode 100644 index 0000000..9ef0798 --- /dev/null +++ b/services/core.py @@ -0,0 +1,153 @@ +import asyncio +import logging +from datetime import datetime, timedelta, timezone +from typing import List + +import requests + +from models.member import ChatMember +from settings import API_BASE + + +logger = logging.getLogger('[services.core] ') +logger.setLevel(logging.DEBUG) + + + +# TODO: rewrite to orm usage + + +async def _request_endpoint(query_name, body) -> Any: + async with aiohttp.ClientSession() as session: + async with session.post(API_BASE, headers=headers, json=body) as response: + print(f'[services.core] {query_name} HTTP Response {response.status} {await response.text()}') + if response.status == 200: + r = await response.json() + if r: + return r.get('data', {}).get(query_name, {}) + return [] + + +async def get_followed_shouts(author_id: int): + query_name = 'load_shouts_followed' + operation = 'GetFollowedShouts' + + query = f"""query {operation}($author_id: Int!, limit: Int, offset: Int) {{ + {query_name}(author_id: $author_id, limit: $limit, offset: $offset) {{ id slug title }} + }}""" + + gql = { + 'query': query, + 'operationName': operation, + 'variables': {'author_id': author_id, 'limit': 1000, 'offset': 0}, # FIXME: too big limit + } + + return await _request_endpoint(query_name, gql) + + +async def get_shout(shout_id): + query_name = 'get_shout' + operation = 'GetShout' + + query = f"""query {operation}($slug: String, $shout_id: Int) {{ + {query_name}(slug: $slug, shout_id: $shout_id) {{ id slug title authors {{ id slug name pic }} }} + }}""" + + gql = {'query': query, 'operationName': operation, 'variables': {'slug': None, 'shout_id': shout_id}} + + return await _request_endpoint(query_name, gql) + + + +def get_all_authors(): + query_name = 'get_authors_all' + + gql = { + 'query': 'query { ' + query_name + '{ id slug pic name user } }', + 'variables': None, + } + + return _request_endpoint(query_name, gql) + + +def get_author_by_user(user: str): + operation = 'GetAuthorId' + query_name = 'get_author_id' + gql = { + 'query': f'query {operation}($user: String!) {{ {query_name}(user: $user){{ id }} }}', # noqa E201, E202 + 'operationName': operation, + 'variables': {'user': user.strip()}, + } + + return _request_endpoint(query_name, gql) + + +def get_my_followed() -> List[ChatMember]: + query_name = 'get_my_followed' + + gql = { + 'query': 'query { ' + query_name + ' { authors { id slug pic name } } }', + 'variables': None, + } + + result = _request_endpoint(query_name, gql) + return result.get('authors', []) + + +class CacheStorage: + lock = asyncio.Lock() + period = 5 * 60 # every 5 mins + client = None + authors = [] + authors_by_user = {} + authors_by_id = {} + + @staticmethod + async def init(): + """graphql client connection using permanent token""" + self = CacheStorage + async with self.lock: + task = asyncio.create_task(self.worker()) + logger.info(task) + + @staticmethod + async def update_authors(): + self = CacheStorage + async with self.lock: + result = get_all_authors() + logger.info(f'cache loaded {len(result)}') + if result: + CacheStorage.authors = result + for a in result: + user_id = a.get('user') + author_id = str(a.get('id')) + self.authors_by_user[user_id] = a + self.authors_by_id[author_id] = a + + @staticmethod + async def worker(): + """async task worker""" + failed = 0 + self = CacheStorage + while True: + try: + logger.info(' - updating profiles data...') + await self.update_authors() + failed = 0 + except Exception as er: + failed += 1 + logger.error(f'{er} - update failed #{failed}, wait 10 seconds') + if failed > 3: + logger.error(' - not trying to update anymore') + import traceback + + traceback.print_exc() + break + if failed == 0: + when = datetime.now(timezone.utc) + timedelta(seconds=self.period) + t = format(when.astimezone().isoformat()) + logger.info(' ⎩ next update: %s' % (t.split('T')[0] + ' ' + t.split('T')[1].split('.')[0])) + await asyncio.sleep(self.period) + else: + await asyncio.sleep(10) + logger.info(' - trying to update data again') diff --git a/services/db.py b/services/db.py new file mode 100644 index 0000000..5e8df1e --- /dev/null +++ b/services/db.py @@ -0,0 +1,72 @@ +import logging +import math +import time +from typing import Any, Callable, Dict, TypeVar + +from sqlalchemy import Column, Integer, create_engine, event +from sqlalchemy.engine import Engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import Session +from sqlalchemy.sql.schema import Table + +from settings import DB_URL + + +# Настройка журнала +logging.basicConfig(level=logging.DEBUG) + +# Создание обработчика журнала для записи сообщений в файл +logger = logging.getLogger('sqlalchemy.profiler') + + +@event.listens_for(Engine, 'before_cursor_execute') +def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): + conn.info.setdefault('query_start_time', []).append(time.time()) + + +@event.listens_for(Engine, 'after_cursor_execute') +def after_cursor_execute(conn, cursor, statement, parameters, context, executemany): + total = time.time() - conn.info['query_start_time'].pop(-1) + total = math.floor(total * 10000) / 10000 + if total > 25: + logger.debug(f'Long running query: {statement}, Execution Time: {total} s') + + +engine = create_engine(DB_URL, echo=False, pool_size=10, max_overflow=20) +T = TypeVar('T') +REGISTRY: Dict[str, type] = {} +Base = declarative_base() + + +def local_session(src=''): + return Session(bind=engine, expire_on_commit=False) + + +class Base(declarative_base()): + __table__: Table + __tablename__: str + __new__: Callable + __init__: Callable + __allow_unmapped__ = True + __abstract__ = True + __table_args__ = {'extend_existing': True} + + id = Column(Integer, primary_key=True) + + def __init_subclass__(cls, **kwargs): + REGISTRY[cls.__name__] = cls + + def dict(self) -> Dict[str, Any]: + column_names = self.__table__.columns.keys() + if '_sa_instance_state' in column_names: + column_names.remove('_sa_instance_state') + try: + return {c: getattr(self, c) for c in column_names} + except Exception as e: + logger.error(f'Error occurred while converting object to dictionary: {e}') + return {} + + def update(self, values: Dict[str, Any]) -> None: + for key, value in values.items(): + if hasattr(self, key): + setattr(self, key, value) diff --git a/services/diff.py b/services/diff.py new file mode 100644 index 0000000..75a99fa --- /dev/null +++ b/services/diff.py @@ -0,0 +1,47 @@ +import re +from difflib import ndiff + + +def get_diff(original, modified): + """ + Get the difference between two strings using difflib. + + Parameters: + - original: The original string. + - modified: The modified string. + + Returns: + A list of differences. + """ + diff = list(ndiff(original.split(), modified.split())) + return diff + + +def apply_diff(original, diff): + """ + Apply the difference to the original string. + + Parameters: + - original: The original string. + - diff: The difference obtained from get_diff function. + + Returns: + The modified string. + """ + result = [] + pattern = re.compile(r'^(\+|-) ') + + for line in diff: + match = pattern.match(line) + if match: + op = match.group(1) + content = line[2:] + if op == '+': + result.append(content) + elif op == '-': + # Ignore deleted lines + pass + else: + result.append(line) + + return ' '.join(result) diff --git a/services/notify.py b/services/notify.py new file mode 100644 index 0000000..588c1ee --- /dev/null +++ b/services/notify.py @@ -0,0 +1,45 @@ +import json + +from services.rediscache import redis + + +async def notify_reaction(reaction, action: str = 'create'): + channel_name = 'reaction' + data = {'payload': reaction, 'action': action} + try: + await redis.publish(channel_name, json.dumps(data)) + except Exception as e: + print(f'[services.notify] Failed to publish to channel {channel_name}: {e}') + + +async def notify_shout(shout, action: str = 'update'): + channel_name = 'shout' + data = {'payload': shout, 'action': action} + try: + await redis.publish(channel_name, json.dumps(data)) + except Exception as e: + print(f'[services.notify] Failed to publish to channel {channel_name}: {e}') + + +async def notify_follower(follower: dict, author_id: int, action: str = 'follow'): + channel_name = f'follower:{author_id}' + try: + # Simplify dictionary before publishing + simplified_follower = {k: follower[k] for k in ['id', 'name', 'slug', 'pic']} + + data = {'payload': simplified_follower, 'action': action} + + # Convert data to JSON string + json_data = json.dumps(data) + + # Ensure the data is not empty before publishing + if not json_data: + raise ValueError('Empty data to publish.') + + # Use the 'await' keyword when publishing + await redis.publish(channel_name, json_data) + + except Exception as e: + # Log the error and re-raise it + print(f'[services.notify] Failed to publish to channel {channel_name}: {e}') + raise diff --git a/services/presence.py b/services/presence.py new file mode 100644 index 0000000..175f9a6 --- /dev/null +++ b/services/presence.py @@ -0,0 +1,24 @@ +import json + +from models.chat import ChatUpdate, Message +from services.rediscache import redis + + +async def notify_message(message: Message, action='create'): + channel_name = f"message:{message['chat_id']}" + data = {'payload': message, 'action': action} + try: + await redis.publish(channel_name, json.dumps(data)) + print(f'[services.presence] ok {data}') + except Exception as e: + print(f'Failed to publish to channel {channel_name}: {e}') + + +async def notify_chat(chat: ChatUpdate, member_id: int, action='create'): + channel_name = f'chat:{member_id}' + data = {'payload': chat, 'action': action} + try: + await redis.publish(channel_name, json.dumps(data)) + print(f'[services.presence] ok {data}') + except Exception as e: + print(f'Failed to publish to channel {channel_name}: {e}') diff --git a/services/rediscache.py b/services/rediscache.py new file mode 100644 index 0000000..129213a --- /dev/null +++ b/services/rediscache.py @@ -0,0 +1,59 @@ +import logging + +import redis.asyncio as aredis + +from settings import REDIS_URL + + +logger = logging.getLogger('[services.redis] ') +logger.setLevel(logging.DEBUG) + + +class RedisCache: + def __init__(self, uri=REDIS_URL): + self._uri: str = uri + self.pubsub_channels = [] + self._client = None + + async def connect(self): + self._client = aredis.Redis.from_url(self._uri, decode_responses=True) + + async def disconnect(self): + if self._client: + await self._client.close() + + async def execute(self, command, *args, **kwargs): + if self._client: + try: + logger.debug(f'{command} {args} {kwargs}') + r = await self._client.execute_command(command, *args, **kwargs) + logger.debug(type(r)) + logger.debug(r) + return r + except Exception as e: + logger.error(e) + + async def subscribe(self, *channels): + if self._client: + async with self._client.pubsub() as pubsub: + for channel in channels: + await pubsub.subscribe(channel) + self.pubsub_channels.append(channel) + + async def unsubscribe(self, *channels): + if not self._client: + return + async with self._client.pubsub() as pubsub: + for channel in channels: + await pubsub.unsubscribe(channel) + self.pubsub_channels.remove(channel) + + async def publish(self, channel, data): + if not self._client: + return + await self._client.publish(channel, data) + + +redis = RedisCache() + +__all__ = ['redis'] diff --git a/services/search.py b/services/search.py new file mode 100644 index 0000000..87dd3c4 --- /dev/null +++ b/services/search.py @@ -0,0 +1,184 @@ +import json +import logging +import os +from multiprocessing import Manager + +from opensearchpy import OpenSearch + +from services.rediscache import redis + + +os_logger = logging.getLogger(name='opensearch') +os_logger.setLevel(logging.INFO) +logger = logging.getLogger('\t[services.search]\t') +logger.setLevel(logging.DEBUG) + +ELASTIC_HOST = os.environ.get('ELASTIC_HOST', '').replace('https://', '') +ELASTIC_USER = os.environ.get('ELASTIC_USER', '') +ELASTIC_PASSWORD = os.environ.get('ELASTIC_PASSWORD', '') +ELASTIC_PORT = os.environ.get('ELASTIC_PORT', 9200) +ELASTIC_AUTH = f'{ELASTIC_USER}:{ELASTIC_PASSWORD}' if ELASTIC_USER else '' +ELASTIC_URL = os.environ.get('ELASTIC_URL', f'https://{ELASTIC_AUTH}@{ELASTIC_HOST}:{ELASTIC_PORT}') +REDIS_TTL = 86400 # 1 day in seconds + + +index_settings = { + 'settings': { + 'index': { + 'number_of_shards': 1, + 'auto_expand_replicas': '0-all', + }, + 'analysis': { + 'analyzer': { + 'ru': { + 'tokenizer': 'standard', + 'filter': ['lowercase', 'ru_stop', 'ru_stemmer'], + } + }, + 'filter': { + 'ru_stemmer': { + 'type': 'stemmer', + 'language': 'russian', + }, + 'ru_stop': { + 'type': 'stop', + 'stopwords': '_russian_', + }, + }, + }, + }, + 'mappings': { + 'properties': { + 'body': {'type': 'text', 'analyzer': 'ru'}, + 'title': {'type': 'text', 'analyzer': 'ru'}, + # 'author': {'type': 'text'}, + } + }, +} + +expected_mapping = index_settings['mappings'] + + +class SearchService: + def __init__(self, index_name='search_index'): + self.index_name = index_name + self.manager = Manager() + self.client = None + + # Используем менеджер для создания Lock и Value + self.lock = self.manager.Lock() + self.initialized_flag = self.manager.Value('i', 0) + + # Only initialize the instance if it's not already initialized + if not self.initialized_flag.value and ELASTIC_HOST: + try: + self.client = OpenSearch( + hosts=[{'host': ELASTIC_HOST, 'port': ELASTIC_PORT}], + http_compress=True, + http_auth=(ELASTIC_USER, ELASTIC_PASSWORD), + use_ssl=True, + verify_certs=False, + ssl_assert_hostname=False, + ssl_show_warn=False, + # ca_certs = ca_certs_path + ) + logger.info(' Клиент OpenSearch.org подключен') + if self.lock.acquire(blocking=False): + try: + self.check_index() + finally: + self.lock.release() + else: + logger.debug(' проверка пропущена') + except Exception as exc: + logger.error(f' {exc}') + self.client = None + + def info(self): + if isinstance(self.client, OpenSearch): + logger.info(f' Поиск подключен: {self.client.info()}') + else: + logger.info(' * Задайте переменные среды для подключения к серверу поиска') + + def delete_index(self): + if self.client: + logger.debug(f' Удаляем индекс {self.index_name}') + self.client.indices.delete(index=self.index_name, ignore_unavailable=True) + + def create_index(self): + if self.client: + if self.lock.acquire(blocking=False): + try: + logger.debug(f' Создаём новый индекс: {self.index_name} ') + self.client.indices.create(index=self.index_name, body=index_settings) + self.client.indices.close(index=self.index_name) + self.client.indices.open(index=self.index_name) + finally: + self.lock.release() + else: + logger.debug(' ..') + + def put_mapping(self): + if self.client: + logger.debug(f' Разметка индекации {self.index_name}') + self.client.indices.put_mapping(index=self.index_name, body=expected_mapping) + + def check_index(self): + if self.client: + if not self.client.indices.exists(index=self.index_name): + self.create_index() + self.put_mapping() + else: + # Check if the mapping is correct, and recreate the index if needed + mapping = self.client.indices.get_mapping(index=self.index_name) + if mapping != expected_mapping: + self.recreate_index() + + def recreate_index(self): + if self.lock.acquire(blocking=False): + try: + self.delete_index() + self.check_index() + finally: + self.lock.release() + else: + logger.debug(' ..') + + def index(self, shout): + if self.client: + id_ = str(shout.id) + logger.debug(f' Индексируем пост {id_}') + self.client.index(index=self.index_name, id=id_, body=shout.dict()) + + async def search(self, text, limit, offset): + logger.debug(f' Ищем: {text}') + search_body = { + 'query': {'match': {'_all': text}}, + } + if self.client: + search_response = self.client.search(index=self.index_name, body=search_body, size=limit, from_=offset) + hits = search_response['hits']['hits'] + + results = [ + { + **hit['_source'], + 'score': hit['_score'], + } + for hit in hits + ] + + # Use Redis as cache with TTL + redis_key = f'search:{text}' + await redis.execute('SETEX', redis_key, REDIS_TTL, json.dumps(results)) + return [] + + +search_service = SearchService() + + +async def search_text(text: str, limit: int = 50, offset: int = 0): + payload = [] + if search_service.client: + # Use OpenSearchService.search_post method + payload = await search_service.search(text, limit, offset) + return payload diff --git a/services/sentry.py b/services/sentry.py new file mode 100644 index 0000000..c210b1b --- /dev/null +++ b/services/sentry.py @@ -0,0 +1,33 @@ +import sentry_sdk +from sentry_sdk.integrations.aiohttp import AioHttpIntegration +from sentry_sdk.integrations.ariadne import AriadneIntegration +from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration +from sentry_sdk.integrations.starlette import StarletteIntegration + +from settings import SENTRY_DSN + + +def start_sentry(): + # sentry monitoring + try: + sentry_sdk.init( + SENTRY_DSN, + # Set traces_sample_rate to 1.0 to capture 100% + # of transactions for performance monitoring. + traces_sample_rate=1.0, + # Set profiles_sample_rate to 1.0 to profile 100% + # of sampled transactions. + # We recommend adjusting this value in production. + profiles_sample_rate=1.0, + enable_tracing=True, + integrations=[ + StarletteIntegration(), + AriadneIntegration(), + SqlalchemyIntegration(), + # RedisIntegration(), + AioHttpIntegration() + ] + ) + except Exception as e: + print('[lib.services.sentry] init error') + print(e) diff --git a/services/unread.py b/services/unread.py new file mode 100644 index 0000000..e8b2a89 --- /dev/null +++ b/services/unread.py @@ -0,0 +1,24 @@ +import json + +from services.rediscache import redis + + +async def get_unread_counter(chat_id: str, author_id: int) -> int: + r = await redis.execute('LLEN', f'chats/{chat_id}/unread/{author_id}') + if isinstance(r, str): + return int(r) + elif isinstance(r, int): + return r + else: + return 0 + + +async def get_total_unread_counter(author_id: int) -> int: + chats_set = await redis.execute('SMEMBERS', f'chats_by_author/{author_id}') + s = 0 + if isinstance(chats_set, str): + chats_set = json.loads(chats_set) + if isinstance(chats_set, list): + for chat_id in chats_set: + s += await get_unread_counter(chat_id, author_id) + return s diff --git a/services/viewed.py b/services/viewed.py new file mode 100644 index 0000000..fb824a9 --- /dev/null +++ b/services/viewed.py @@ -0,0 +1,213 @@ +import asyncio +import json +import logging +import os +import time +from datetime import datetime, timedelta, timezone +from typing import Dict + +# ga +from google.analytics.data_v1beta import BetaAnalyticsDataClient +from google.analytics.data_v1beta.types import ( + DateRange, + Dimension, + Metric, + RunReportRequest, +) + +from orm.author import Author +from orm.shout import Shout, ShoutAuthor, ShoutTopic +from orm.topic import Topic +from services.db import local_session + + +# Настройка журналирования +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger('\t[services.viewed]\t') +logger.setLevel(logging.DEBUG) + +GOOGLE_KEYFILE_PATH = os.environ.get('GOOGLE_KEYFILE_PATH', '/dump/google-service.json') +GOOGLE_PROPERTY_ID = os.environ.get('GOOGLE_PROPERTY_ID', '') +VIEWS_FILEPATH = '/dump/views.json' + + +class ViewedStorage: + lock = asyncio.Lock() + views_by_shout = {} + shouts_by_topic = {} + shouts_by_author = {} + views = None + period = 60 * 60 # каждый час + analytics_client: BetaAnalyticsDataClient | None = None + auth_result = None + disabled = False + start_date = int(time.time()) + + @staticmethod + async def init(): + """Подключение к клиенту Google Analytics с использованием аутентификации""" + self = ViewedStorage + async with self.lock: + os.environ.setdefault('GOOGLE_APPLICATION_CREDENTIALS', GOOGLE_KEYFILE_PATH) + if GOOGLE_KEYFILE_PATH and os.path.isfile(GOOGLE_KEYFILE_PATH): + # Using a default constructor instructs the client to use the credentials + # specified in GOOGLE_APPLICATION_CREDENTIALS environment variable. + self.analytics_client = BetaAnalyticsDataClient() + logger.info(' * Клиент Google Analytics успешно авторизован') + + # Загрузка предварительно подсчитанных просмотров из файла JSON + self.load_precounted_views() + + if os.path.exists(VIEWS_FILEPATH): + file_timestamp = os.path.getctime(VIEWS_FILEPATH) + self.start_date = datetime.fromtimestamp(file_timestamp).strftime('%Y-%m-%d') + now_date = datetime.now().strftime('%Y-%m-%d') + + if now_date == self.start_date: + logger.info(' * Данные актуализованы!') + else: + logger.info(f' * Миграция проводилась: {self.start_date}') + + # Запуск фоновой задачи + asyncio.create_task(self.worker()) + else: + logger.info(' * Пожалуйста, добавьте ключевой файл Google Analytics') + self.disabled = True + + @staticmethod + def load_precounted_views(): + """Загрузка предварительно подсчитанных просмотров из файла JSON""" + self = ViewedStorage + try: + with open(VIEWS_FILEPATH, 'r') as file: + precounted_views = json.load(file) + self.views_by_shout.update(precounted_views) + logger.info(f' * {len(precounted_views)} публикаций с просмотрами успешно загружены.') + except Exception as e: + logger.error(f'Ошибка загрузки предварительно подсчитанных просмотров: {e}') + + @staticmethod + async def update_pages(): + """Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров""" + self = ViewedStorage + logger.info(' ⎧ Обновление данных просмотров от Google Analytics ---') + if not self.disabled: + try: + start = time.time() + async with self.lock: + if self.analytics_client: + request = RunReportRequest( + property=f'properties/{GOOGLE_PROPERTY_ID}', + dimensions=[Dimension(name='pagePath')], + metrics=[Metric(name='screenPageViews')], + date_ranges=[DateRange(start_date=self.start_date, end_date='today')], + ) + response = self.analytics_client.run_report(request) + if response and isinstance(response.rows, list): + slugs = set() + for row in response.rows: + print( + row.dimension_values[0].value, + row.metric_values[0].value, + ) + # Извлечение путей страниц из ответа Google Analytics + if isinstance(row.dimension_values, list): + page_path = row.dimension_values[0].value + slug = page_path.split('discours.io/')[-1] + views_count = int(row.metric_values[0].value) + + # Обновление данных в хранилище + self.views_by_shout[slug] = self.views_by_shout.get(slug, 0) + self.views_by_shout[slug] += views_count + self.update_topics(slug) + + # Запись путей страниц для логирования + slugs.add(slug) + + logger.info(f' ⎪ Собрано страниц: {len(slugs)} ') + + end = time.time() + logger.info(' ⎪ Обновление страниц заняло %fs ' % (end - start)) + except Exception as error: + logger.error(error) + + @staticmethod + async def get_shout(shout_slug) -> int: + """Получение метрики просмотров shout по slug""" + self = ViewedStorage + async with self.lock: + return self.views_by_shout.get(shout_slug, 0) + + @staticmethod + async def get_shout_media(shout_slug) -> Dict[str, int]: + """Получение метрики воспроизведения shout по slug""" + self = ViewedStorage + async with self.lock: + return self.views_by_shout.get(shout_slug, 0) + + @staticmethod + async def get_topic(topic_slug) -> int: + """Получение суммарного значения просмотров темы""" + self = ViewedStorage + topic_views = 0 + async with self.lock: + for shout_slug in self.shouts_by_topic.get(topic_slug, []): + topic_views += self.views_by_shout.get(shout_slug, 0) + return topic_views + + @staticmethod + async def get_author(author_slug) -> int: + """Получение суммарного значения просмотров автора""" + self = ViewedStorage + author_views = 0 + async with self.lock: + for shout_slug in self.shouts_by_author.get(author_slug, []): + author_views += self.views_by_shout.get(shout_slug, 0) + return author_views + + @staticmethod + def update_topics(shout_slug): + """Обновление счетчиков темы по slug shout""" + self = ViewedStorage + with local_session() as session: + # Определение вспомогательной функции для избежания повторения кода + def update_groups(dictionary, key, value): + dictionary[key] = list(set(dictionary.get(key, []) + [value])) + + # Обновление тем и авторов с использованием вспомогательной функции + for [_shout_topic, topic] in ( + session.query(ShoutTopic, Topic).join(Topic).join(Shout).where(Shout.slug == shout_slug).all() + ): + update_groups(self.shouts_by_topic, topic.slug, shout_slug) + + for [_shout_topic, author] in ( + session.query(ShoutAuthor, Author).join(Author).join(Shout).where(Shout.slug == shout_slug).all() + ): + update_groups(self.shouts_by_author, author.slug, shout_slug) + + @staticmethod + async def worker(): + """Асинхронная задача обновления""" + failed = 0 + self = ViewedStorage + if self.disabled: + return + + while True: + try: + await self.update_pages() + failed = 0 + except Exception as _exc: + failed += 1 + logger.info(' - Обновление не удалось #%d, ожидание 10 секунд' % failed) + if failed > 3: + logger.info(' - Больше не пытаемся обновить') + break + if failed == 0: + when = datetime.now(timezone.utc) + timedelta(seconds=self.period) + t = format(when.astimezone().isoformat()) + logger.info(' ⎩ Следующее обновление: %s' % (t.split('T')[0] + ' ' + t.split('T')[1].split('.')[0])) + await asyncio.sleep(self.period) + else: + await asyncio.sleep(10) + logger.info(' - Попытка снова обновить данные') diff --git a/services/webhook.py b/services/webhook.py new file mode 100644 index 0000000..23c67ee --- /dev/null +++ b/services/webhook.py @@ -0,0 +1,36 @@ +import os +import re + +from starlette.endpoints import HTTPEndpoint +from starlette.requests import Request +from starlette.responses import JSONResponse + +from orm.author import Author +from resolvers.author import create_author +from services.db import local_session + + +class WebhookEndpoint(HTTPEndpoint): + async def post(self, request: Request) -> JSONResponse: + try: + data = await request.json() + if data: + auth = request.headers.get('Authorization') + if auth: + if auth == os.environ.get('WEBHOOK_SECRET'): + user_id: str = data['user']['id'] + name: str = data['user']['given_name'] + slug: str = data['user']['email'].split('@')[0] + slug: str = re.sub('[^0-9a-z]+', '-', slug.lower()) + with local_session() as session: + author = session.query(Author).filter(Author.slug == slug).first() + if author: + slug = slug + '-' + user_id.split('-').pop() + await create_author(user_id, slug, name) + + return JSONResponse({'status': 'success'}) + except Exception as e: + import traceback + + traceback.print_exc() + return JSONResponse({'status': 'error', 'message': str(e)}, status_code=500) diff --git a/settings.py b/settings.py new file mode 100644 index 0000000..37ae76a --- /dev/null +++ b/settings.py @@ -0,0 +1,18 @@ +import sys +from os import environ + + +PORT = 8080 +DB_URL = ( + environ.get('DATABASE_URL', '').replace('postgres://', 'postgresql://') + or environ.get('DB_URL', '').replace('postgres://', 'postgresql://') + or 'postgresql://postgres@localhost:5432/discoursio' +) +REDIS_URL = environ.get('REDIS_URL') or 'redis://127.0.0.1' +API_BASE = environ.get('API_BASE') or 'http://127.0.0.1:8001' +AUTH_URL = environ.get('AUTH_URL') or 'http://127.0.0.1:8080/graphql' +SENTRY_DSN = environ.get('SENTRY_DSN') +DEV_SERVER_PID_FILE_NAME = 'dev-server.pid' +MODE = 'development' if 'dev' in sys.argv else 'production' + +ADMIN_SECRET = environ.get('ADMIN_SECRET') or 'nothing'