This commit is contained in:
2024-02-18 12:50:18 +03:00
commit 393910be4c
24 changed files with 1406 additions and 0 deletions

113
services/auth.py Normal file
View File

@@ -0,0 +1,113 @@
import logging
from functools import wraps
from aiohttp import ClientSession
from starlette.exceptions import HTTPException
from settings import ADMIN_SECRET, AUTH_URL
logger = logging.getLogger('\t[services.auth]\t')
logger.setLevel(logging.DEBUG)
async def request_data(gql, headers=None):
if headers is None:
headers = {'Content-Type': 'application/json'}
try:
# Asynchronous HTTP request to the authentication server
async with ClientSession() as session:
async with session.post(AUTH_URL, json=gql, headers=headers) as response:
if response.status == 200:
data = await response.json()
errors = data.get('errors')
if errors:
logger.error(f'HTTP Errors: {errors}')
else:
return data
except Exception as e:
# Handling and logging exceptions during authentication check
logger.error(f'[services.auth] request_data error: {e}')
return None
async def check_auth(req):
token = req.headers.get('Authorization')
user_id = ''
if token:
# Logging the authentication token
logger.debug(f'{token}')
query_name = 'validate_jwt_token'
operation = 'ValidateToken'
variables = {
'params': {
'token_type': 'access_token',
'token': token,
}
}
gql = {
'query': f'query {operation}($params: ValidateJWTTokenInput!) {{'
+ f'{query_name}(params: $params) {{ is_valid claims }} '
+ '}',
'variables': variables,
'operationName': operation,
}
data = await request_data(gql)
if data:
user_data = data.get('data', {}).get(query_name, {}).get('claims', {})
user_id = user_data.get('sub')
user_roles = user_data.get('allowed_roles')
return [user_id, user_roles]
if not user_id:
raise HTTPException(status_code=401, detail='Unauthorized')
async def add_user_role(user_id):
logger.info(f'[services.auth] add author role for user_id: {user_id}')
query_name = '_update_user'
operation = 'UpdateUserRoles'
headers = {
'Content-Type': 'application/json',
'x-authorizer-admin-secret': ADMIN_SECRET,
}
variables = {'params': {'roles': 'author, reader', 'id': user_id}}
gql = {
'query': f'mutation {operation}($params: UpdateUserInput!) {{ {query_name}(params: $params) {{ id roles }} }}',
'variables': variables,
'operationName': operation,
}
data = await request_data(gql, headers)
if data:
user_id = data.get('data', {}).get(query_name, {}).get('id')
return user_id
def login_required(f):
@wraps(f)
async def decorated_function(*args, **kwargs):
info = args[1]
context = info.context
req = context.get('request')
[user_id, user_roles] = (await check_auth(req)) or []
if user_id and user_roles:
logger.info(f' got {user_id} roles: {user_roles}')
context['user_id'] = user_id.strip()
context['roles'] = user_roles
return await f(*args, **kwargs)
return decorated_function
def auth_request(f):
@wraps(f)
async def decorated_function(*args, **kwargs):
req = args[0]
[user_id, user_roles] = (await check_auth(req)) or []
if user_id:
req['user_id'] = user_id.strip()
req['roles'] = user_roles
return await f(*args, **kwargs)
return decorated_function

153
services/core.py Normal file
View File

@@ -0,0 +1,153 @@
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from typing import List
import requests
from models.member import ChatMember
from settings import API_BASE
logger = logging.getLogger('[services.core] ')
logger.setLevel(logging.DEBUG)
# TODO: rewrite to orm usage
async def _request_endpoint(query_name, body) -> Any:
async with aiohttp.ClientSession() as session:
async with session.post(API_BASE, headers=headers, json=body) as response:
print(f'[services.core] {query_name} HTTP Response {response.status} {await response.text()}')
if response.status == 200:
r = await response.json()
if r:
return r.get('data', {}).get(query_name, {})
return []
async def get_followed_shouts(author_id: int):
query_name = 'load_shouts_followed'
operation = 'GetFollowedShouts'
query = f"""query {operation}($author_id: Int!, limit: Int, offset: Int) {{
{query_name}(author_id: $author_id, limit: $limit, offset: $offset) {{ id slug title }}
}}"""
gql = {
'query': query,
'operationName': operation,
'variables': {'author_id': author_id, 'limit': 1000, 'offset': 0}, # FIXME: too big limit
}
return await _request_endpoint(query_name, gql)
async def get_shout(shout_id):
query_name = 'get_shout'
operation = 'GetShout'
query = f"""query {operation}($slug: String, $shout_id: Int) {{
{query_name}(slug: $slug, shout_id: $shout_id) {{ id slug title authors {{ id slug name pic }} }}
}}"""
gql = {'query': query, 'operationName': operation, 'variables': {'slug': None, 'shout_id': shout_id}}
return await _request_endpoint(query_name, gql)
def get_all_authors():
query_name = 'get_authors_all'
gql = {
'query': 'query { ' + query_name + '{ id slug pic name user } }',
'variables': None,
}
return _request_endpoint(query_name, gql)
def get_author_by_user(user: str):
operation = 'GetAuthorId'
query_name = 'get_author_id'
gql = {
'query': f'query {operation}($user: String!) {{ {query_name}(user: $user){{ id }} }}', # noqa E201, E202
'operationName': operation,
'variables': {'user': user.strip()},
}
return _request_endpoint(query_name, gql)
def get_my_followed() -> List[ChatMember]:
query_name = 'get_my_followed'
gql = {
'query': 'query { ' + query_name + ' { authors { id slug pic name } } }',
'variables': None,
}
result = _request_endpoint(query_name, gql)
return result.get('authors', [])
class CacheStorage:
lock = asyncio.Lock()
period = 5 * 60 # every 5 mins
client = None
authors = []
authors_by_user = {}
authors_by_id = {}
@staticmethod
async def init():
"""graphql client connection using permanent token"""
self = CacheStorage
async with self.lock:
task = asyncio.create_task(self.worker())
logger.info(task)
@staticmethod
async def update_authors():
self = CacheStorage
async with self.lock:
result = get_all_authors()
logger.info(f'cache loaded {len(result)}')
if result:
CacheStorage.authors = result
for a in result:
user_id = a.get('user')
author_id = str(a.get('id'))
self.authors_by_user[user_id] = a
self.authors_by_id[author_id] = a
@staticmethod
async def worker():
"""async task worker"""
failed = 0
self = CacheStorage
while True:
try:
logger.info(' - updating profiles data...')
await self.update_authors()
failed = 0
except Exception as er:
failed += 1
logger.error(f'{er} - update failed #{failed}, wait 10 seconds')
if failed > 3:
logger.error(' - not trying to update anymore')
import traceback
traceback.print_exc()
break
if failed == 0:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat())
logger.info(' ⎩ next update: %s' % (t.split('T')[0] + ' ' + t.split('T')[1].split('.')[0]))
await asyncio.sleep(self.period)
else:
await asyncio.sleep(10)
logger.info(' - trying to update data again')

72
services/db.py Normal file
View File

@@ -0,0 +1,72 @@
import logging
import math
import time
from typing import Any, Callable, Dict, TypeVar
from sqlalchemy import Column, Integer, create_engine, event
from sqlalchemy.engine import Engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from sqlalchemy.sql.schema import Table
from settings import DB_URL
# Настройка журнала
logging.basicConfig(level=logging.DEBUG)
# Создание обработчика журнала для записи сообщений в файл
logger = logging.getLogger('sqlalchemy.profiler')
@event.listens_for(Engine, 'before_cursor_execute')
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault('query_start_time', []).append(time.time())
@event.listens_for(Engine, 'after_cursor_execute')
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - conn.info['query_start_time'].pop(-1)
total = math.floor(total * 10000) / 10000
if total > 25:
logger.debug(f'Long running query: {statement}, Execution Time: {total} s')
engine = create_engine(DB_URL, echo=False, pool_size=10, max_overflow=20)
T = TypeVar('T')
REGISTRY: Dict[str, type] = {}
Base = declarative_base()
def local_session(src=''):
return Session(bind=engine, expire_on_commit=False)
class Base(declarative_base()):
__table__: Table
__tablename__: str
__new__: Callable
__init__: Callable
__allow_unmapped__ = True
__abstract__ = True
__table_args__ = {'extend_existing': True}
id = Column(Integer, primary_key=True)
def __init_subclass__(cls, **kwargs):
REGISTRY[cls.__name__] = cls
def dict(self) -> Dict[str, Any]:
column_names = self.__table__.columns.keys()
if '_sa_instance_state' in column_names:
column_names.remove('_sa_instance_state')
try:
return {c: getattr(self, c) for c in column_names}
except Exception as e:
logger.error(f'Error occurred while converting object to dictionary: {e}')
return {}
def update(self, values: Dict[str, Any]) -> None:
for key, value in values.items():
if hasattr(self, key):
setattr(self, key, value)

47
services/diff.py Normal file
View File

@@ -0,0 +1,47 @@
import re
from difflib import ndiff
def get_diff(original, modified):
"""
Get the difference between two strings using difflib.
Parameters:
- original: The original string.
- modified: The modified string.
Returns:
A list of differences.
"""
diff = list(ndiff(original.split(), modified.split()))
return diff
def apply_diff(original, diff):
"""
Apply the difference to the original string.
Parameters:
- original: The original string.
- diff: The difference obtained from get_diff function.
Returns:
The modified string.
"""
result = []
pattern = re.compile(r'^(\+|-) ')
for line in diff:
match = pattern.match(line)
if match:
op = match.group(1)
content = line[2:]
if op == '+':
result.append(content)
elif op == '-':
# Ignore deleted lines
pass
else:
result.append(line)
return ' '.join(result)

45
services/notify.py Normal file
View File

@@ -0,0 +1,45 @@
import json
from services.rediscache import redis
async def notify_reaction(reaction, action: str = 'create'):
channel_name = 'reaction'
data = {'payload': reaction, 'action': action}
try:
await redis.publish(channel_name, json.dumps(data))
except Exception as e:
print(f'[services.notify] Failed to publish to channel {channel_name}: {e}')
async def notify_shout(shout, action: str = 'update'):
channel_name = 'shout'
data = {'payload': shout, 'action': action}
try:
await redis.publish(channel_name, json.dumps(data))
except Exception as e:
print(f'[services.notify] Failed to publish to channel {channel_name}: {e}')
async def notify_follower(follower: dict, author_id: int, action: str = 'follow'):
channel_name = f'follower:{author_id}'
try:
# Simplify dictionary before publishing
simplified_follower = {k: follower[k] for k in ['id', 'name', 'slug', 'pic']}
data = {'payload': simplified_follower, 'action': action}
# Convert data to JSON string
json_data = json.dumps(data)
# Ensure the data is not empty before publishing
if not json_data:
raise ValueError('Empty data to publish.')
# Use the 'await' keyword when publishing
await redis.publish(channel_name, json_data)
except Exception as e:
# Log the error and re-raise it
print(f'[services.notify] Failed to publish to channel {channel_name}: {e}')
raise

24
services/presence.py Normal file
View File

@@ -0,0 +1,24 @@
import json
from models.chat import ChatUpdate, Message
from services.rediscache import redis
async def notify_message(message: Message, action='create'):
channel_name = f"message:{message['chat_id']}"
data = {'payload': message, 'action': action}
try:
await redis.publish(channel_name, json.dumps(data))
print(f'[services.presence] ok {data}')
except Exception as e:
print(f'Failed to publish to channel {channel_name}: {e}')
async def notify_chat(chat: ChatUpdate, member_id: int, action='create'):
channel_name = f'chat:{member_id}'
data = {'payload': chat, 'action': action}
try:
await redis.publish(channel_name, json.dumps(data))
print(f'[services.presence] ok {data}')
except Exception as e:
print(f'Failed to publish to channel {channel_name}: {e}')

59
services/rediscache.py Normal file
View File

@@ -0,0 +1,59 @@
import logging
import redis.asyncio as aredis
from settings import REDIS_URL
logger = logging.getLogger('[services.redis] ')
logger.setLevel(logging.DEBUG)
class RedisCache:
def __init__(self, uri=REDIS_URL):
self._uri: str = uri
self.pubsub_channels = []
self._client = None
async def connect(self):
self._client = aredis.Redis.from_url(self._uri, decode_responses=True)
async def disconnect(self):
if self._client:
await self._client.close()
async def execute(self, command, *args, **kwargs):
if self._client:
try:
logger.debug(f'{command} {args} {kwargs}')
r = await self._client.execute_command(command, *args, **kwargs)
logger.debug(type(r))
logger.debug(r)
return r
except Exception as e:
logger.error(e)
async def subscribe(self, *channels):
if self._client:
async with self._client.pubsub() as pubsub:
for channel in channels:
await pubsub.subscribe(channel)
self.pubsub_channels.append(channel)
async def unsubscribe(self, *channels):
if not self._client:
return
async with self._client.pubsub() as pubsub:
for channel in channels:
await pubsub.unsubscribe(channel)
self.pubsub_channels.remove(channel)
async def publish(self, channel, data):
if not self._client:
return
await self._client.publish(channel, data)
redis = RedisCache()
__all__ = ['redis']

184
services/search.py Normal file
View File

@@ -0,0 +1,184 @@
import json
import logging
import os
from multiprocessing import Manager
from opensearchpy import OpenSearch
from services.rediscache import redis
os_logger = logging.getLogger(name='opensearch')
os_logger.setLevel(logging.INFO)
logger = logging.getLogger('\t[services.search]\t')
logger.setLevel(logging.DEBUG)
ELASTIC_HOST = os.environ.get('ELASTIC_HOST', '').replace('https://', '')
ELASTIC_USER = os.environ.get('ELASTIC_USER', '')
ELASTIC_PASSWORD = os.environ.get('ELASTIC_PASSWORD', '')
ELASTIC_PORT = os.environ.get('ELASTIC_PORT', 9200)
ELASTIC_AUTH = f'{ELASTIC_USER}:{ELASTIC_PASSWORD}' if ELASTIC_USER else ''
ELASTIC_URL = os.environ.get('ELASTIC_URL', f'https://{ELASTIC_AUTH}@{ELASTIC_HOST}:{ELASTIC_PORT}')
REDIS_TTL = 86400 # 1 day in seconds
index_settings = {
'settings': {
'index': {
'number_of_shards': 1,
'auto_expand_replicas': '0-all',
},
'analysis': {
'analyzer': {
'ru': {
'tokenizer': 'standard',
'filter': ['lowercase', 'ru_stop', 'ru_stemmer'],
}
},
'filter': {
'ru_stemmer': {
'type': 'stemmer',
'language': 'russian',
},
'ru_stop': {
'type': 'stop',
'stopwords': '_russian_',
},
},
},
},
'mappings': {
'properties': {
'body': {'type': 'text', 'analyzer': 'ru'},
'title': {'type': 'text', 'analyzer': 'ru'},
# 'author': {'type': 'text'},
}
},
}
expected_mapping = index_settings['mappings']
class SearchService:
def __init__(self, index_name='search_index'):
self.index_name = index_name
self.manager = Manager()
self.client = None
# Используем менеджер для создания Lock и Value
self.lock = self.manager.Lock()
self.initialized_flag = self.manager.Value('i', 0)
# Only initialize the instance if it's not already initialized
if not self.initialized_flag.value and ELASTIC_HOST:
try:
self.client = OpenSearch(
hosts=[{'host': ELASTIC_HOST, 'port': ELASTIC_PORT}],
http_compress=True,
http_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
# ca_certs = ca_certs_path
)
logger.info(' Клиент OpenSearch.org подключен')
if self.lock.acquire(blocking=False):
try:
self.check_index()
finally:
self.lock.release()
else:
logger.debug(' проверка пропущена')
except Exception as exc:
logger.error(f' {exc}')
self.client = None
def info(self):
if isinstance(self.client, OpenSearch):
logger.info(f' Поиск подключен: {self.client.info()}')
else:
logger.info(' * Задайте переменные среды для подключения к серверу поиска')
def delete_index(self):
if self.client:
logger.debug(f' Удаляем индекс {self.index_name}')
self.client.indices.delete(index=self.index_name, ignore_unavailable=True)
def create_index(self):
if self.client:
if self.lock.acquire(blocking=False):
try:
logger.debug(f' Создаём новый индекс: {self.index_name} ')
self.client.indices.create(index=self.index_name, body=index_settings)
self.client.indices.close(index=self.index_name)
self.client.indices.open(index=self.index_name)
finally:
self.lock.release()
else:
logger.debug(' ..')
def put_mapping(self):
if self.client:
logger.debug(f' Разметка индекации {self.index_name}')
self.client.indices.put_mapping(index=self.index_name, body=expected_mapping)
def check_index(self):
if self.client:
if not self.client.indices.exists(index=self.index_name):
self.create_index()
self.put_mapping()
else:
# Check if the mapping is correct, and recreate the index if needed
mapping = self.client.indices.get_mapping(index=self.index_name)
if mapping != expected_mapping:
self.recreate_index()
def recreate_index(self):
if self.lock.acquire(blocking=False):
try:
self.delete_index()
self.check_index()
finally:
self.lock.release()
else:
logger.debug(' ..')
def index(self, shout):
if self.client:
id_ = str(shout.id)
logger.debug(f' Индексируем пост {id_}')
self.client.index(index=self.index_name, id=id_, body=shout.dict())
async def search(self, text, limit, offset):
logger.debug(f' Ищем: {text}')
search_body = {
'query': {'match': {'_all': text}},
}
if self.client:
search_response = self.client.search(index=self.index_name, body=search_body, size=limit, from_=offset)
hits = search_response['hits']['hits']
results = [
{
**hit['_source'],
'score': hit['_score'],
}
for hit in hits
]
# Use Redis as cache with TTL
redis_key = f'search:{text}'
await redis.execute('SETEX', redis_key, REDIS_TTL, json.dumps(results))
return []
search_service = SearchService()
async def search_text(text: str, limit: int = 50, offset: int = 0):
payload = []
if search_service.client:
# Use OpenSearchService.search_post method
payload = await search_service.search(text, limit, offset)
return payload

33
services/sentry.py Normal file
View File

@@ -0,0 +1,33 @@
import sentry_sdk
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.ariadne import AriadneIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.starlette import StarletteIntegration
from settings import SENTRY_DSN
def start_sentry():
# sentry monitoring
try:
sentry_sdk.init(
SENTRY_DSN,
# Set traces_sample_rate to 1.0 to capture 100%
# of transactions for performance monitoring.
traces_sample_rate=1.0,
# Set profiles_sample_rate to 1.0 to profile 100%
# of sampled transactions.
# We recommend adjusting this value in production.
profiles_sample_rate=1.0,
enable_tracing=True,
integrations=[
StarletteIntegration(),
AriadneIntegration(),
SqlalchemyIntegration(),
# RedisIntegration(),
AioHttpIntegration()
]
)
except Exception as e:
print('[lib.services.sentry] init error')
print(e)

24
services/unread.py Normal file
View File

@@ -0,0 +1,24 @@
import json
from services.rediscache import redis
async def get_unread_counter(chat_id: str, author_id: int) -> int:
r = await redis.execute('LLEN', f'chats/{chat_id}/unread/{author_id}')
if isinstance(r, str):
return int(r)
elif isinstance(r, int):
return r
else:
return 0
async def get_total_unread_counter(author_id: int) -> int:
chats_set = await redis.execute('SMEMBERS', f'chats_by_author/{author_id}')
s = 0
if isinstance(chats_set, str):
chats_set = json.loads(chats_set)
if isinstance(chats_set, list):
for chat_id in chats_set:
s += await get_unread_counter(chat_id, author_id)
return s

213
services/viewed.py Normal file
View File

@@ -0,0 +1,213 @@
import asyncio
import json
import logging
import os
import time
from datetime import datetime, timedelta, timezone
from typing import Dict
# ga
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import (
DateRange,
Dimension,
Metric,
RunReportRequest,
)
from orm.author import Author
from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic
from services.db import local_session
# Настройка журналирования
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('\t[services.viewed]\t')
logger.setLevel(logging.DEBUG)
GOOGLE_KEYFILE_PATH = os.environ.get('GOOGLE_KEYFILE_PATH', '/dump/google-service.json')
GOOGLE_PROPERTY_ID = os.environ.get('GOOGLE_PROPERTY_ID', '')
VIEWS_FILEPATH = '/dump/views.json'
class ViewedStorage:
lock = asyncio.Lock()
views_by_shout = {}
shouts_by_topic = {}
shouts_by_author = {}
views = None
period = 60 * 60 # каждый час
analytics_client: BetaAnalyticsDataClient | None = None
auth_result = None
disabled = False
start_date = int(time.time())
@staticmethod
async def init():
"""Подключение к клиенту Google Analytics с использованием аутентификации"""
self = ViewedStorage
async with self.lock:
os.environ.setdefault('GOOGLE_APPLICATION_CREDENTIALS', GOOGLE_KEYFILE_PATH)
if GOOGLE_KEYFILE_PATH and os.path.isfile(GOOGLE_KEYFILE_PATH):
# Using a default constructor instructs the client to use the credentials
# specified in GOOGLE_APPLICATION_CREDENTIALS environment variable.
self.analytics_client = BetaAnalyticsDataClient()
logger.info(' * Клиент Google Analytics успешно авторизован')
# Загрузка предварительно подсчитанных просмотров из файла JSON
self.load_precounted_views()
if os.path.exists(VIEWS_FILEPATH):
file_timestamp = os.path.getctime(VIEWS_FILEPATH)
self.start_date = datetime.fromtimestamp(file_timestamp).strftime('%Y-%m-%d')
now_date = datetime.now().strftime('%Y-%m-%d')
if now_date == self.start_date:
logger.info(' * Данные актуализованы!')
else:
logger.info(f' * Миграция проводилась: {self.start_date}')
# Запуск фоновой задачи
asyncio.create_task(self.worker())
else:
logger.info(' * Пожалуйста, добавьте ключевой файл Google Analytics')
self.disabled = True
@staticmethod
def load_precounted_views():
"""Загрузка предварительно подсчитанных просмотров из файла JSON"""
self = ViewedStorage
try:
with open(VIEWS_FILEPATH, 'r') as file:
precounted_views = json.load(file)
self.views_by_shout.update(precounted_views)
logger.info(f' * {len(precounted_views)} публикаций с просмотрами успешно загружены.')
except Exception as e:
logger.error(f'Ошибка загрузки предварительно подсчитанных просмотров: {e}')
@staticmethod
async def update_pages():
"""Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров"""
self = ViewedStorage
logger.info(' ⎧ Обновление данных просмотров от Google Analytics ---')
if not self.disabled:
try:
start = time.time()
async with self.lock:
if self.analytics_client:
request = RunReportRequest(
property=f'properties/{GOOGLE_PROPERTY_ID}',
dimensions=[Dimension(name='pagePath')],
metrics=[Metric(name='screenPageViews')],
date_ranges=[DateRange(start_date=self.start_date, end_date='today')],
)
response = self.analytics_client.run_report(request)
if response and isinstance(response.rows, list):
slugs = set()
for row in response.rows:
print(
row.dimension_values[0].value,
row.metric_values[0].value,
)
# Извлечение путей страниц из ответа Google Analytics
if isinstance(row.dimension_values, list):
page_path = row.dimension_values[0].value
slug = page_path.split('discours.io/')[-1]
views_count = int(row.metric_values[0].value)
# Обновление данных в хранилище
self.views_by_shout[slug] = self.views_by_shout.get(slug, 0)
self.views_by_shout[slug] += views_count
self.update_topics(slug)
# Запись путей страниц для логирования
slugs.add(slug)
logger.info(f' ⎪ Собрано страниц: {len(slugs)} ')
end = time.time()
logger.info(' ⎪ Обновление страниц заняло %fs ' % (end - start))
except Exception as error:
logger.error(error)
@staticmethod
async def get_shout(shout_slug) -> int:
"""Получение метрики просмотров shout по slug"""
self = ViewedStorage
async with self.lock:
return self.views_by_shout.get(shout_slug, 0)
@staticmethod
async def get_shout_media(shout_slug) -> Dict[str, int]:
"""Получение метрики воспроизведения shout по slug"""
self = ViewedStorage
async with self.lock:
return self.views_by_shout.get(shout_slug, 0)
@staticmethod
async def get_topic(topic_slug) -> int:
"""Получение суммарного значения просмотров темы"""
self = ViewedStorage
topic_views = 0
async with self.lock:
for shout_slug in self.shouts_by_topic.get(topic_slug, []):
topic_views += self.views_by_shout.get(shout_slug, 0)
return topic_views
@staticmethod
async def get_author(author_slug) -> int:
"""Получение суммарного значения просмотров автора"""
self = ViewedStorage
author_views = 0
async with self.lock:
for shout_slug in self.shouts_by_author.get(author_slug, []):
author_views += self.views_by_shout.get(shout_slug, 0)
return author_views
@staticmethod
def update_topics(shout_slug):
"""Обновление счетчиков темы по slug shout"""
self = ViewedStorage
with local_session() as session:
# Определение вспомогательной функции для избежания повторения кода
def update_groups(dictionary, key, value):
dictionary[key] = list(set(dictionary.get(key, []) + [value]))
# Обновление тем и авторов с использованием вспомогательной функции
for [_shout_topic, topic] in (
session.query(ShoutTopic, Topic).join(Topic).join(Shout).where(Shout.slug == shout_slug).all()
):
update_groups(self.shouts_by_topic, topic.slug, shout_slug)
for [_shout_topic, author] in (
session.query(ShoutAuthor, Author).join(Author).join(Shout).where(Shout.slug == shout_slug).all()
):
update_groups(self.shouts_by_author, author.slug, shout_slug)
@staticmethod
async def worker():
"""Асинхронная задача обновления"""
failed = 0
self = ViewedStorage
if self.disabled:
return
while True:
try:
await self.update_pages()
failed = 0
except Exception as _exc:
failed += 1
logger.info(' - Обновление не удалось #%d, ожидание 10 секунд' % failed)
if failed > 3:
logger.info(' - Больше не пытаемся обновить')
break
if failed == 0:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat())
logger.info(' ⎩ Следующее обновление: %s' % (t.split('T')[0] + ' ' + t.split('T')[1].split('.')[0]))
await asyncio.sleep(self.period)
else:
await asyncio.sleep(10)
logger.info(' - Попытка снова обновить данные')

36
services/webhook.py Normal file
View File

@@ -0,0 +1,36 @@
import os
import re
from starlette.endpoints import HTTPEndpoint
from starlette.requests import Request
from starlette.responses import JSONResponse
from orm.author import Author
from resolvers.author import create_author
from services.db import local_session
class WebhookEndpoint(HTTPEndpoint):
async def post(self, request: Request) -> JSONResponse:
try:
data = await request.json()
if data:
auth = request.headers.get('Authorization')
if auth:
if auth == os.environ.get('WEBHOOK_SECRET'):
user_id: str = data['user']['id']
name: str = data['user']['given_name']
slug: str = data['user']['email'].split('@')[0]
slug: str = re.sub('[^0-9a-z]+', '-', slug.lower())
with local_session() as session:
author = session.query(Author).filter(Author.slug == slug).first()
if author:
slug = slug + '-' + user_id.split('-').pop()
await create_author(user_id, slug, name)
return JSONResponse({'status': 'success'})
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({'status': 'error', 'message': str(e)}, status_code=500)