From 2c2932caeb6f6c39c1bf2b3a5a913a3811265023 Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 29 Jan 2024 03:27:30 +0300 Subject: [PATCH] inner-search --- main.py | 9 ++- pyproject.toml | 1 + resolvers/editor.py | 8 +++ resolvers/reader.py | 50 ++++----------- services/search.py | 153 +++++++++++++++++++++++++++++++++++++++----- services/viewed.py | 2 +- 6 files changed, 166 insertions(+), 57 deletions(-) diff --git a/main.py b/main.py index 3771d5fe..8300b1e7 100644 --- a/main.py +++ b/main.py @@ -16,6 +16,7 @@ from starlette.routing import Route from resolvers.webhook import WebhookEndpoint from services.rediscache import redis from services.schema import resolvers +from services.search import SearchService from services.viewed import ViewedStorage from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN @@ -33,6 +34,9 @@ async def start_up(): # start viewed service await ViewedStorage.init() + # start search service + await SearchService.init() + if MODE == 'development': # pid file management if not exists(DEV_SERVER_PID_FILE_NAME): @@ -62,5 +66,8 @@ async def shutdown(): await redis.disconnect() -routes = [Route('/', GraphQL(schema, debug=True)), Route('/new-author', WebhookEndpoint)] +routes = [ + Route('/', GraphQL(schema, debug=True)), + Route('/new-author', WebhookEndpoint), +] app = Starlette(routes=routes, debug=True, on_startup=[start_up], on_shutdown=[shutdown]) diff --git a/pyproject.toml b/pyproject.toml index 90a3d3bf..5119f4f1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ aiohttp = "^3.9.1" pre-commit = "^3.6.0" granian = "^1.0.1" google-analytics-data = "^0.18.3" +elasticsearch = "^8.12.0" [tool.poetry.group.dev.dependencies] setuptools = "^69.0.2" diff --git a/resolvers/editor.py b/resolvers/editor.py index 3804e2a7..a7276acb 100644 --- a/resolvers/editor.py +++ b/resolvers/editor.py @@ -11,6 +11,7 @@ from services.auth import login_required from services.db import local_session from services.notify import notify_shout from services.schema import mutation, query +from services.search import SearchService @query.field('get_shouts_drafts') @@ -79,7 +80,11 @@ async def create_shout(_, info, inp): reactions_follow(author.id, shout.id, True) + # notifier await notify_shout(shout_dict, 'create') + + # search service indexing + SearchService.elastic.index_post(shout) return {'shout': shout_dict} @@ -186,6 +191,9 @@ async def update_shout( # noqa: C901 if not publish: await notify_shout(shout_dict, 'update') + # search service indexing + SearchService.elastic.index_post(shout) + return {'shout': shout_dict} diff --git a/resolvers/reader.py b/resolvers/reader.py index 4cd64bbf..b3d10402 100644 --- a/resolvers/reader.py +++ b/resolvers/reader.py @@ -82,9 +82,7 @@ async def get_shout(_, _info, slug=None, shout_id=None): 'rating': int(likes_stat or 0) - int(dislikes_stat or 0), } - for author_caption in ( - session.query(ShoutAuthor).join(Shout).where(Shout.slug == slug) - ): + for author_caption in session.query(ShoutAuthor).join(Shout).where(Shout.slug == slug): for author in shout.authors: if author.id == author_caption.author: author.caption = author_caption.caption @@ -105,9 +103,7 @@ async def get_shout(_, _info, slug=None, shout_id=None): shout.main_topic = main_topic[0] return shout except Exception: - raise HTTPException( - status_code=404, detail=f'shout {slug or shout_id} not found' - ) + raise HTTPException(status_code=404, detail=f'shout {slug or shout_id} not found') @query.field('load_shouts_by') @@ -153,9 +149,7 @@ async def load_shouts_by(_, _info, options): # order order_by = options.get('order_by', Shout.published_at) - query_order_by = ( - desc(order_by) if options.get('order_by_desc', True) else asc(order_by) - ) + query_order_by = desc(order_by) if options.get('order_by_desc', True) else asc(order_by) q = q.order_by(nulls_last(query_order_by)) # limit offset @@ -248,20 +242,15 @@ async def load_shouts_feed(_, info, options): with local_session() as session: reader = session.query(Author).filter(Author.user == user_id).first() if reader: - reader_followed_authors = select(AuthorFollower.author).where( - AuthorFollower.follower == reader.id - ) - reader_followed_topics = select(TopicFollower.topic).where( - TopicFollower.follower == reader.id - ) + reader_followed_authors = select(AuthorFollower.author).where(AuthorFollower.follower == reader.id) + reader_followed_topics = select(TopicFollower.topic).where(TopicFollower.follower == reader.id) subquery = ( select(Shout.id) .where(Shout.id == ShoutAuthor.shout) .where(Shout.id == ShoutTopic.shout) .where( - (ShoutAuthor.author.in_(reader_followed_authors)) - | (ShoutTopic.topic.in_(reader_followed_topics)) + (ShoutAuthor.author.in_(reader_followed_authors)) | (ShoutTopic.topic.in_(reader_followed_topics)) ) ) @@ -286,24 +275,15 @@ async def load_shouts_feed(_, info, options): order_by = options.get('order_by', Shout.published_at) - query_order_by = ( - desc(order_by) if options.get('order_by_desc', True) else asc(order_by) - ) + query_order_by = desc(order_by) if options.get('order_by_desc', True) else asc(order_by) offset = options.get('offset', 0) limit = options.get('limit', 10) - q = ( - q.group_by(Shout.id) - .order_by(nulls_last(query_order_by)) - .limit(limit) - .offset(offset) - ) + q = q.group_by(Shout.id).order_by(nulls_last(query_order_by)).limit(limit).offset(offset) # print(q.compile(compile_kwargs={"literal_binds": True})) - for [shout, reacted_stat, commented_stat, _last_comment] in session.execute( - q - ).unique(): + for [shout, reacted_stat, commented_stat, _last_comment] in session.execute(q).unique(): main_topic = ( session.query(Topic.slug) .join( @@ -342,9 +322,7 @@ async def load_shouts_search(_, _info, text, limit=50, offset=0): select( [ Shout, - literal_column( - f"({results_dict.get(Shout.slug, {}).get('score', 0)})" - ).label('score'), + literal_column(f"({results_dict.get(Shout.slug, {}).get('score', 0)})").label('score'), ] ) .select_from(Shout) @@ -394,9 +372,7 @@ async def load_shouts_unrated(_, info, limit: int = 50, offset: int = 0): and_( Reaction.shout == Shout.id, Reaction.replyTo.is_(None), - Reaction.kind.in_( - [ReactionKind.LIKE.value, ReactionKind.DISLIKE.value] - ), + Reaction.kind.in_([ReactionKind.LIKE.value, ReactionKind.DISLIKE.value]), ), ) .outerjoin(Author, Author.user == bindparam('user_id')) @@ -465,9 +441,7 @@ async def load_shouts_random_top(_, _info, options): aliased_reaction = aliased(Reaction) - subquery = ( - select(Shout.id).outerjoin(aliased_reaction).where(Shout.deleted_at.is_(None)) - ) + subquery = select(Shout.id).outerjoin(aliased_reaction).where(Shout.deleted_at.is_(None)) subquery = apply_filters(subquery, options.get('filters', {})) subquery = subquery.group_by(Shout.id).order_by( diff --git a/services/search.py b/services/search.py index 0db6ede6..ce77798a 100644 --- a/services/search.py +++ b/services/search.py @@ -1,42 +1,161 @@ import asyncio import json import logging +import os from typing import List -import aiohttp +from elasticsearch import Elasticsearch from orm.shout import Shout # Adjust the import as needed from services.rediscache import redis # Adjust the import as needed +logger = logging.getLogger('[services.search] ') +logger.setLevel(logging.DEBUG) + +ELASTIC_HOST = os.environ.get('ELASTIC_HOST', 'localhost').replace('https://', '').replace('http://', '') +ELASTIC_USER = os.environ.get('ELASTIC_USER', '') +ELASTIC_PASSWORD = os.environ.get('ELASTIC_PASSWORD', '') +ELASTIC_PORT = os.environ.get('ELASTIC_PORT', 9200) +ELASTIC_AUTH = f'{ELASTIC_USER}:{ELASTIC_PASSWORD}' if ELASTIC_USER else '' +ELASTIC_URL = f'https://{ELASTIC_AUTH}@{ELASTIC_HOST}:{ELASTIC_PORT}' + + +class OpenSearchService: + def __init__(self, index_name, delete_index_on_startup): + self.index_name = index_name + self.delete_index_on_startup = delete_index_on_startup + self.elasticsearch_client = Elasticsearch(f'{ELASTIC_URL}') + + if self.delete_index_on_startup: + self.delete_index() + + self.check_index() + + def delete_index(self): + self.elasticsearch_client.indices.delete(index=self.index_name, ignore_unavailable=True) + + def create_index(self): + index_settings = { + 'settings': { + 'index': { + 'number_of_shards': 1, + 'auto_expand_replicas': '0-all', + }, + 'analysis': { + 'analyzer': { + 'ru': { + 'tokenizer': 'standard', + 'filter': ['lowercase', 'ru_stop', 'ru_stemmer'], + } + }, + 'filter': { + 'ru_stemmer': { + 'type': 'stemmer', + 'language': 'russian', + }, + 'ru_stop': { + 'type': 'stop', + 'stopwords': '_russian_', + }, + }, + }, + }, + 'mappings': { + 'properties': { + 'body': { + 'type': 'text', + 'analyzer': 'ru', + }, + 'text': {'type': 'text'}, + 'author': {'type': 'text'}, + } + }, + } + + self.elasticsearch_client.indices.create(index=self.index_name, body=index_settings) + self.elasticsearch_client.indices.close(index=self.index_name) + self.elasticsearch_client.indices.open(index=self.index_name) + + def put_mapping(self): + mapping = { + 'properties': { + 'body': { + 'type': 'text', + 'analyzer': 'ru', + }, + 'text': {'type': 'text'}, + 'author': {'type': 'text'}, + } + } + + self.elasticsearch_client.indices.put_mapping(index=self.index_name, body=mapping) + + def check_index(self): + if not self.elasticsearch_client.indices.exists(index=self.index_name): + logger.debug(f'Creating {self.index_name} index') + self.create_index() + self.put_mapping() + + def index_post(self, shout): + id_ = str(shout.id) + logger.debug(f'Indexing post id {id_}') + + self.elasticsearch_client.index(index=self.index_name, id=id_, body=shout) + + def search_post(self, query, limit, offset): + logger.debug(f'Search query = {query}, limit = {limit}') + search_body = { + 'query': { + 'match': { + '_all': query, + } + } + } + + search_response = self.elasticsearch_client.search( + index=self.index_name, body=search_body, size=limit, from_=offset + ) + hits = search_response['hits']['hits'] + + return [ + { + **hit['_source'], + 'score': hit['_score'], + } + for hit in hits + ] + + class SearchService: lock = asyncio.Lock() + elastic = None @staticmethod - async def init(session): - async with SearchService.lock: - logging.info('[services.search] Initializing SearchService') + async def init(): + self = SearchService + async with self.lock: + logging.info('Initializing SearchService') + try: + self.elastic = OpenSearchService('shouts_index', False) + except Exception as exc: + logger.error(exc) @staticmethod async def search(text: str, limit: int = 50, offset: int = 0) -> List[Shout]: payload = [] + self = SearchService try: # TODO: add ttl for redis cached search results cached = await redis.execute('GET', text) if not cached: - async with SearchService.lock: - # Use aiohttp to send a request to ElasticSearch - async with aiohttp.ClientSession() as session: - search_url = f'https://search.discours.io/search?q={text}' - async with session.get(search_url) as response: - if response.status == 200: - payload = await response.json() - await redis.execute('SET', text, json.dumps(payload)) # use redis as cache - else: - logging.error(f'[services.search] response: {response.status} {await response.text()}') + async with self.lock: + # Use OpenSearchService.search_post method + payload = await self.elastic.search_post(text, limit, offset) + # Use Redis as cache + await redis.execute('SET', text, json.dumps(payload)) elif isinstance(cached, str): payload = json.loads(cached) except Exception as e: - logging.error(f'[services.search] Error during search: {e}') - - return payload[offset : offset + limit] + logging.error(f'Error during search: {e}') + return payload diff --git a/services/viewed.py b/services/viewed.py index 75436b2c..dd5e5145 100644 --- a/services/viewed.py +++ b/services/viewed.py @@ -49,7 +49,7 @@ class ViewedStorage: self = ViewedStorage async with self.lock: os.environ.setdefault('GOOGLE_APPLICATION_CREDENTIALS', GOOGLE_KEYFILE_PATH) - if GOOGLE_KEYFILE_PATH: + if GOOGLE_KEYFILE_PATH and os.path.isfile(GOOGLE_KEYFILE_PATH): # Using a default constructor instructs the client to use the credentials # specified in GOOGLE_APPLICATION_CREDENTIALS environment variable. self.analytics_client = BetaAnalyticsDataClient()