From 8ff19491704804aab56b253dad15ab3a85b9b6c2 Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 29 Jan 2024 04:09:54 +0300 Subject: [PATCH] inner-search-2 --- README.md | 2 +- services/search.py | 62 ++++++++++++++++++++-------------------------- services/viewed.py | 6 +++++ 3 files changed, 34 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 0cf526b5..7afedac1 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ poetry run main.py ### search.py -Позволяет получать результаты пользовательских поисковых запросов в кешируемом виде от [нашего сервера](https://search.discours.io) ElasticSearch с оценкой `score`, объединенные с запросами к базе данных, запрашиваем через GraphQL API `load_shouts_search`. +Позволяет получать результаты пользовательских поисковых запросов в кешируемом виде от ElasticSearch с оценкой `score`, объединенные с запросами к базе данных, запрашиваем через GraphQL API `load_shouts_search`. Требует установка `ELASTIC_URL` (можно отдельными компонентами) и, опционально, обновляет индекс на старте если переменная `ELASTIC_REINDEX` задана. ### notify.py diff --git a/services/search.py b/services/search.py index ce77798a..16bcfea9 100644 --- a/services/search.py +++ b/services/search.py @@ -6,8 +6,8 @@ from typing import List from elasticsearch import Elasticsearch -from orm.shout import Shout # Adjust the import as needed -from services.rediscache import redis # Adjust the import as needed +from orm.shout import Shout +from services.rediscache import redis logger = logging.getLogger('[services.search] ') @@ -18,10 +18,15 @@ ELASTIC_USER = os.environ.get('ELASTIC_USER', '') ELASTIC_PASSWORD = os.environ.get('ELASTIC_PASSWORD', '') ELASTIC_PORT = os.environ.get('ELASTIC_PORT', 9200) ELASTIC_AUTH = f'{ELASTIC_USER}:{ELASTIC_PASSWORD}' if ELASTIC_USER else '' -ELASTIC_URL = f'https://{ELASTIC_AUTH}@{ELASTIC_HOST}:{ELASTIC_PORT}' +ELASTIC_URL = os.environ.get('ELASTIC_URL', f'https://{ELASTIC_AUTH}@{ELASTIC_HOST}:{ELASTIC_PORT}') +ELASTIC_REINDEX = os.environ.get('ELASTIC_REINDEX', '') +REDIS_TTL = 86400 # 1 day in seconds -class OpenSearchService: +class SearchService: + lock = asyncio.Lock() + elastic = None + def __init__(self, index_name, delete_index_on_startup): self.index_name = index_name self.delete_index_on_startup = delete_index_on_startup @@ -32,6 +37,9 @@ class OpenSearchService: self.check_index() + if ELASTIC_REINDEX: + self.recreate_index() + def delete_index(self): self.elasticsearch_client.indices.delete(index=self.index_name, ignore_unavailable=True) @@ -63,10 +71,7 @@ class OpenSearchService: }, 'mappings': { 'properties': { - 'body': { - 'type': 'text', - 'analyzer': 'ru', - }, + 'body': {'type': 'text', 'analyzer': 'ru'}, 'text': {'type': 'text'}, 'author': {'type': 'text'}, } @@ -80,10 +85,7 @@ class OpenSearchService: def put_mapping(self): mapping = { 'properties': { - 'body': { - 'type': 'text', - 'analyzer': 'ru', - }, + 'body': {'type': 'text', 'analyzer': 'ru'}, 'text': {'type': 'text'}, 'author': {'type': 'text'}, } @@ -97,20 +99,19 @@ class OpenSearchService: self.create_index() self.put_mapping() + def recreate_index(self): + self.delete_index() + self.check_index() + def index_post(self, shout): id_ = str(shout.id) logger.debug(f'Indexing post id {id_}') - self.elasticsearch_client.index(index=self.index_name, id=id_, body=shout) def search_post(self, query, limit, offset): - logger.debug(f'Search query = {query}, limit = {limit}') + logger.debug(f'query: {query}') search_body = { - 'query': { - 'match': { - '_all': query, - } - } + 'query': {'match': {'_all': query}}, } search_response = self.elasticsearch_client.search( @@ -126,18 +127,13 @@ class OpenSearchService: for hit in hits ] - -class SearchService: - lock = asyncio.Lock() - elastic = None - @staticmethod async def init(): self = SearchService async with self.lock: logging.info('Initializing SearchService') try: - self.elastic = OpenSearchService('shouts_index', False) + self.elastic = SearchService('shouts_index', False) except Exception as exc: logger.error(exc) @@ -146,16 +142,12 @@ class SearchService: payload = [] self = SearchService try: - # TODO: add ttl for redis cached search results - cached = await redis.execute('GET', text) - if not cached: - async with self.lock: - # Use OpenSearchService.search_post method - payload = await self.elastic.search_post(text, limit, offset) - # Use Redis as cache - await redis.execute('SET', text, json.dumps(payload)) - elif isinstance(cached, str): - payload = json.loads(cached) + # Use a key with a prefix to differentiate search results from other Redis data + redis_key = f'search:{text}' + # Use OpenSearchService.search_post method + payload = await self.elastic.search_post(text, limit, offset) + # Use Redis as cache with TTL + await redis.execute('SETEX', redis_key, REDIS_TTL, json.dumps(payload)) except Exception as e: logging.error(f'Error during search: {e}') return payload diff --git a/services/viewed.py b/services/viewed.py index dd5e5145..cabd596d 100644 --- a/services/viewed.py +++ b/services/viewed.py @@ -61,6 +61,12 @@ class ViewedStorage: if os.path.exists(VIEWS_FILEPATH): file_timestamp = os.path.getctime(VIEWS_FILEPATH) self.start_date = datetime.fromtimestamp(file_timestamp).strftime('%Y-%m-%d') + now_date = datetime.now().strftime('%Y-%m-%d') + + if now_date == self.start_date: + logger.info(' * Данные актуализованы!') + else: + logger.info(f' * Миграция проводилась: {self.start_date}') # Запуск фоновой задачи asyncio.create_task(self.worker())