This commit is contained in:
parent
b9d602eedf
commit
2f2fa346ed
4
main.py
4
main.py
|
@ -70,6 +70,4 @@ routes = [
|
||||||
Route('/', GraphQL(schema, debug=True)),
|
Route('/', GraphQL(schema, debug=True)),
|
||||||
Route('/new-author', WebhookEndpoint),
|
Route('/new-author', WebhookEndpoint),
|
||||||
]
|
]
|
||||||
app = Starlette(
|
app = Starlette(routes=routes, debug=True, on_startup=[start_up], on_shutdown=[shutdown])
|
||||||
routes=routes, debug=True, on_startup=[start_up], on_shutdown=[shutdown]
|
|
||||||
)
|
|
||||||
|
|
|
@ -73,11 +73,7 @@ async def create_shout(_, info, inp):
|
||||||
sa = ShoutAuthor(shout=shout.id, author=author.id)
|
sa = ShoutAuthor(shout=shout.id, author=author.id)
|
||||||
session.add(sa)
|
session.add(sa)
|
||||||
|
|
||||||
topics = (
|
topics = session.query(Topic).filter(Topic.slug.in_(inp.get('topics', []))).all()
|
||||||
session.query(Topic)
|
|
||||||
.filter(Topic.slug.in_(inp.get('topics', [])))
|
|
||||||
.all()
|
|
||||||
)
|
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
t = ShoutTopic(topic=topic.id, shout=shout.id)
|
t = ShoutTopic(topic=topic.id, shout=shout.id)
|
||||||
session.add(t)
|
session.add(t)
|
||||||
|
@ -118,9 +114,7 @@ async def update_shout( # noqa: C901
|
||||||
topics_input = shout_input['topics']
|
topics_input = shout_input['topics']
|
||||||
del shout_input['topics']
|
del shout_input['topics']
|
||||||
new_topics_to_link = []
|
new_topics_to_link = []
|
||||||
new_topics = [
|
new_topics = [topic_input for topic_input in topics_input if topic_input['id'] < 0]
|
||||||
topic_input for topic_input in topics_input if topic_input['id'] < 0
|
|
||||||
]
|
|
||||||
for new_topic in new_topics:
|
for new_topic in new_topics:
|
||||||
del new_topic['id']
|
del new_topic['id']
|
||||||
created_new_topic = Topic(**new_topic)
|
created_new_topic = Topic(**new_topic)
|
||||||
|
@ -129,31 +123,21 @@ async def update_shout( # noqa: C901
|
||||||
if len(new_topics) > 0:
|
if len(new_topics) > 0:
|
||||||
session.commit()
|
session.commit()
|
||||||
for new_topic_to_link in new_topics_to_link:
|
for new_topic_to_link in new_topics_to_link:
|
||||||
created_unlinked_topic = ShoutTopic(
|
created_unlinked_topic = ShoutTopic(shout=shout.id, topic=new_topic_to_link.id)
|
||||||
shout=shout.id, topic=new_topic_to_link.id
|
|
||||||
)
|
|
||||||
session.add(created_unlinked_topic)
|
session.add(created_unlinked_topic)
|
||||||
existing_topics_input = [
|
existing_topics_input = [topic_input for topic_input in topics_input if topic_input.get('id', 0) > 0]
|
||||||
topic_input
|
|
||||||
for topic_input in topics_input
|
|
||||||
if topic_input.get('id', 0) > 0
|
|
||||||
]
|
|
||||||
existing_topic_to_link_ids = [
|
existing_topic_to_link_ids = [
|
||||||
existing_topic_input['id']
|
existing_topic_input['id']
|
||||||
for existing_topic_input in existing_topics_input
|
for existing_topic_input in existing_topics_input
|
||||||
if existing_topic_input['id']
|
if existing_topic_input['id'] not in [topic.id for topic in shout.topics]
|
||||||
not in [topic.id for topic in shout.topics]
|
|
||||||
]
|
]
|
||||||
for existing_topic_to_link_id in existing_topic_to_link_ids:
|
for existing_topic_to_link_id in existing_topic_to_link_ids:
|
||||||
created_unlinked_topic = ShoutTopic(
|
created_unlinked_topic = ShoutTopic(shout=shout.id, topic=existing_topic_to_link_id)
|
||||||
shout=shout.id, topic=existing_topic_to_link_id
|
|
||||||
)
|
|
||||||
session.add(created_unlinked_topic)
|
session.add(created_unlinked_topic)
|
||||||
topic_to_unlink_ids = [
|
topic_to_unlink_ids = [
|
||||||
topic.id
|
topic.id
|
||||||
for topic in shout.topics
|
for topic in shout.topics
|
||||||
if topic.id
|
if topic.id not in [topic_input['id'] for topic_input in existing_topics_input]
|
||||||
not in [topic_input['id'] for topic_input in existing_topics_input]
|
|
||||||
]
|
]
|
||||||
shout_topics_to_remove = session.query(ShoutTopic).filter(
|
shout_topics_to_remove = session.query(ShoutTopic).filter(
|
||||||
and_(
|
and_(
|
||||||
|
@ -165,9 +149,7 @@ async def update_shout( # noqa: C901
|
||||||
session.delete(shout_topic_to_remove)
|
session.delete(shout_topic_to_remove)
|
||||||
|
|
||||||
# Replace datetime with Unix timestamp
|
# Replace datetime with Unix timestamp
|
||||||
shout_input[
|
shout_input['updated_at'] = current_time # Set updated_at as Unix timestamp
|
||||||
'updated_at'
|
|
||||||
] = current_time # Set updated_at as Unix timestamp
|
|
||||||
Shout.update(shout, shout_input)
|
Shout.update(shout, shout_input)
|
||||||
session.add(shout)
|
session.add(shout)
|
||||||
|
|
||||||
|
@ -175,16 +157,10 @@ async def update_shout( # noqa: C901
|
||||||
if 'main_topic' in shout_input:
|
if 'main_topic' in shout_input:
|
||||||
old_main_topic = (
|
old_main_topic = (
|
||||||
session.query(ShoutTopic)
|
session.query(ShoutTopic)
|
||||||
.filter(
|
.filter(and_(ShoutTopic.shout == shout.id, ShoutTopic.main == True))
|
||||||
and_(ShoutTopic.shout == shout.id, ShoutTopic.main == True)
|
|
||||||
)
|
|
||||||
.first()
|
|
||||||
)
|
|
||||||
main_topic = (
|
|
||||||
session.query(Topic)
|
|
||||||
.filter(Topic.slug == shout_input['main_topic'])
|
|
||||||
.first()
|
.first()
|
||||||
)
|
)
|
||||||
|
main_topic = session.query(Topic).filter(Topic.slug == shout_input['main_topic']).first()
|
||||||
if isinstance(main_topic, Topic):
|
if isinstance(main_topic, Topic):
|
||||||
new_main_topic = (
|
new_main_topic = (
|
||||||
session.query(ShoutTopic)
|
session.query(ShoutTopic)
|
||||||
|
@ -211,10 +187,7 @@ async def update_shout( # noqa: C901
|
||||||
|
|
||||||
if not publish:
|
if not publish:
|
||||||
await notify_shout(shout_dict, 'update')
|
await notify_shout(shout_dict, 'update')
|
||||||
if (
|
if shout.visibility is ShoutVisibility.COMMUNITY.value or shout.visibility is ShoutVisibility.PUBLIC.value:
|
||||||
shout.visibility is ShoutVisibility.COMMUNITY.value
|
|
||||||
or shout.visibility is ShoutVisibility.PUBLIC.value
|
|
||||||
):
|
|
||||||
# search service indexing
|
# search service indexing
|
||||||
search_service.index(shout)
|
search_service.index(shout)
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from sqlalchemy import bindparam, distinct, literal_column, or_
|
from sqlalchemy import bindparam, distinct, or_
|
||||||
from sqlalchemy.orm import aliased, joinedload, selectinload
|
from sqlalchemy.orm import aliased, joinedload, selectinload
|
||||||
from sqlalchemy.sql.expression import and_, asc, case, desc, func, nulls_last, select
|
from sqlalchemy.sql.expression import and_, asc, case, desc, func, nulls_last, select
|
||||||
from starlette.exceptions import HTTPException
|
from starlette.exceptions import HTTPException
|
||||||
|
@ -312,49 +312,7 @@ async def load_shouts_feed(_, info, options):
|
||||||
@query.field('load_shouts_search')
|
@query.field('load_shouts_search')
|
||||||
async def load_shouts_search(_, _info, text, limit=50, offset=0):
|
async def load_shouts_search(_, _info, text, limit=50, offset=0):
|
||||||
if text and len(text) > 2:
|
if text and len(text) > 2:
|
||||||
results = await search_text(text, limit, offset)
|
return await search_text(text, limit, offset)
|
||||||
results_dict = {r['slug']: r for r in results}
|
|
||||||
found_keys = list(results_dict.keys())
|
|
||||||
|
|
||||||
with local_session() as session:
|
|
||||||
# Create a subquery with the synthetic 'score' column
|
|
||||||
subquery = (
|
|
||||||
select(
|
|
||||||
[
|
|
||||||
Shout,
|
|
||||||
literal_column(f"({results_dict.get(Shout.slug, {}).get('score', 0)})").label('score'),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
.select_from(Shout)
|
|
||||||
.join(ShoutTopic, Shout.id == ShoutTopic.shout)
|
|
||||||
.options(
|
|
||||||
joinedload(Shout.authors),
|
|
||||||
joinedload(Shout.topics),
|
|
||||||
)
|
|
||||||
.filter(
|
|
||||||
and_(
|
|
||||||
Shout.deleted_at.is_(None),
|
|
||||||
Shout.slug.in_(found_keys),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
).alias('scored_shouts')
|
|
||||||
|
|
||||||
# Use the subquery in the main query
|
|
||||||
q = (
|
|
||||||
select([subquery.c.Shout, subquery.c.score])
|
|
||||||
.order_by(desc(subquery.c.score))
|
|
||||||
.limit(limit)
|
|
||||||
.offset(offset)
|
|
||||||
)
|
|
||||||
|
|
||||||
results = session.execute(q).all()
|
|
||||||
logger.debug(f'search found {len(results)} results')
|
|
||||||
|
|
||||||
# Directly build the shouts_data list within the loop
|
|
||||||
shouts_data = [shout.Shout.dict() for score, shout in results]
|
|
||||||
|
|
||||||
return shouts_data
|
|
||||||
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -11,16 +11,12 @@ from services.rediscache import redis
|
||||||
logger = logging.getLogger('\t[services.search]\t')
|
logger = logging.getLogger('\t[services.search]\t')
|
||||||
logger.setLevel(logging.DEBUG)
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
ELASTIC_HOST = (
|
ELASTIC_HOST = os.environ.get('ELASTIC_HOST', '').replace('https://', '').replace('http://', '')
|
||||||
os.environ.get('ELASTIC_HOST', '').replace('https://', '').replace('http://', '')
|
|
||||||
)
|
|
||||||
ELASTIC_USER = os.environ.get('ELASTIC_USER', '')
|
ELASTIC_USER = os.environ.get('ELASTIC_USER', '')
|
||||||
ELASTIC_PASSWORD = os.environ.get('ELASTIC_PASSWORD', '')
|
ELASTIC_PASSWORD = os.environ.get('ELASTIC_PASSWORD', '')
|
||||||
ELASTIC_PORT = os.environ.get('ELASTIC_PORT', 9200)
|
ELASTIC_PORT = os.environ.get('ELASTIC_PORT', 9200)
|
||||||
ELASTIC_AUTH = f'{ELASTIC_USER}:{ELASTIC_PASSWORD}' if ELASTIC_USER else ''
|
ELASTIC_AUTH = f'{ELASTIC_USER}:{ELASTIC_PASSWORD}' if ELASTIC_USER else ''
|
||||||
ELASTIC_URL = os.environ.get(
|
ELASTIC_URL = os.environ.get('ELASTIC_URL', f'https://{ELASTIC_AUTH}@{ELASTIC_HOST}:{ELASTIC_PORT}')
|
||||||
'ELASTIC_URL', f'https://{ELASTIC_AUTH}@{ELASTIC_HOST}:{ELASTIC_PORT}'
|
|
||||||
)
|
|
||||||
REDIS_TTL = 86400 # 1 day in seconds
|
REDIS_TTL = 86400 # 1 day in seconds
|
||||||
|
|
||||||
|
|
||||||
|
@ -102,9 +98,7 @@ class SearchService:
|
||||||
if self.lock.acquire(blocking=False):
|
if self.lock.acquire(blocking=False):
|
||||||
try:
|
try:
|
||||||
logger.debug(f' Создаём новый индекс: {self.index_name} ')
|
logger.debug(f' Создаём новый индекс: {self.index_name} ')
|
||||||
self.client.indices.create(
|
self.client.indices.create(index=self.index_name, body=index_settings)
|
||||||
index=self.index_name, body=index_settings
|
|
||||||
)
|
|
||||||
self.client.indices.close(index=self.index_name)
|
self.client.indices.close(index=self.index_name)
|
||||||
self.client.indices.open(index=self.index_name)
|
self.client.indices.open(index=self.index_name)
|
||||||
finally:
|
finally:
|
||||||
|
@ -146,9 +140,7 @@ class SearchService:
|
||||||
def recreate_index(self):
|
def recreate_index(self):
|
||||||
if self.lock.acquire(blocking=False):
|
if self.lock.acquire(blocking=False):
|
||||||
try:
|
try:
|
||||||
logger.debug(
|
logger.debug(f' Удаляем индекс {self.index_name} из-за неправильной структуры данных')
|
||||||
f' Удаляем индекс {self.index_name} из-за неправильной структуры данных'
|
|
||||||
)
|
|
||||||
self.delete_index()
|
self.delete_index()
|
||||||
self.check_index()
|
self.check_index()
|
||||||
finally:
|
finally:
|
||||||
|
@ -168,9 +160,7 @@ class SearchService:
|
||||||
'query': {'match': {'_all': query}},
|
'query': {'match': {'_all': query}},
|
||||||
}
|
}
|
||||||
if self.client:
|
if self.client:
|
||||||
search_response = self.client.search(
|
search_response = self.client.search(index=self.index_name, body=search_body, size=limit, from_=offset)
|
||||||
index=self.index_name, body=search_body, size=limit, from_=offset
|
|
||||||
)
|
|
||||||
hits = search_response['hits']['hits']
|
hits = search_response['hits']['hits']
|
||||||
|
|
||||||
return [
|
return [
|
||||||
|
|
Loading…
Reference in New Issue
Block a user