Compare commits
43 Commits
fix/sv-aut
...
ec9465ad40
Author | SHA1 | Date | |
---|---|---|---|
![]() |
ec9465ad40 | ||
![]() |
4d965fb27b | ||
![]() |
e382cc1ea5 | ||
83d61ca76d | |||
![]() |
106222b0e0 | ||
![]() |
c533241d1e | ||
![]() |
78326047bf | ||
![]() |
bc4ec79240 | ||
![]() |
a0db5707c4 | ||
![]() |
ecc443c3ad | ||
![]() |
9a02ca74ad | ||
![]() |
9ebb81cbd3 | ||
![]() |
0bc55977ac | ||
![]() |
ff3a4debce | ||
![]() |
ae85b32f69 | ||
![]() |
34a354e9e3 | ||
![]() |
e405fb527b | ||
![]() |
7f36f93d92 | ||
![]() |
f089a32394 | ||
![]() |
1fd623a660 | ||
![]() |
88012f1b8c | ||
![]() |
6e284640c0 | ||
![]() |
077cb46482 | ||
![]() |
60a13a9097 | ||
![]() |
316375bf18 | ||
![]() |
fb820f67fd | ||
![]() |
f1d9f4e036 | ||
![]() |
ebb67eb311 | ||
![]() |
50a8c24ead | ||
![]() |
eb4b9363ab | ||
![]() |
19c5028a0c | ||
![]() |
57e1e8e6bd | ||
![]() |
385057ffcd | ||
![]() |
90699768ff | ||
![]() |
ad0ca75aa9 | ||
![]() |
39242d5e6c | ||
![]() |
24cca7f2cb | ||
![]() |
a9c7ac49d6 | ||
![]() |
f249752db5 | ||
![]() |
c0b2116da2 | ||
![]() |
59e71c8144 | ||
![]() |
e6a416383d | ||
![]() |
d55448398d |
@@ -29,7 +29,16 @@ jobs:
|
||||
if: github.ref == 'refs/heads/dev'
|
||||
uses: dokku/github-action@master
|
||||
with:
|
||||
branch: 'dev'
|
||||
branch: 'main'
|
||||
force: true
|
||||
git_remote_url: 'ssh://dokku@v2.discours.io:22/core'
|
||||
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}
|
||||
|
||||
- name: Push to dokku for staging branch
|
||||
if: github.ref == 'refs/heads/staging'
|
||||
uses: dokku/github-action@master
|
||||
with:
|
||||
branch: 'dev'
|
||||
git_remote_url: 'ssh://dokku@staging.discours.io:22/core'
|
||||
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}
|
||||
git_push_flags: '--force'
|
6
.gitignore
vendored
6
.gitignore
vendored
@@ -128,6 +128,9 @@ dmypy.json
|
||||
.idea
|
||||
temp.*
|
||||
|
||||
# Debug
|
||||
DEBUG.log
|
||||
|
||||
discours.key
|
||||
discours.crt
|
||||
discours.pem
|
||||
@@ -161,4 +164,5 @@ views.json
|
||||
*.key
|
||||
*.crt
|
||||
*cache.json
|
||||
.cursor
|
||||
.cursor
|
||||
.devcontainer/
|
||||
|
48
main.py
48
main.py
@@ -17,7 +17,8 @@ from cache.revalidator import revalidation_manager
|
||||
from services.exception import ExceptionHandlerMiddleware
|
||||
from services.redis import redis
|
||||
from services.schema import create_all_tables, resolvers
|
||||
from services.search import search_service
|
||||
#from services.search import search_service
|
||||
from services.search import search_service, initialize_search_index
|
||||
from services.viewed import ViewedStorage
|
||||
from services.webhook import WebhookEndpoint, create_webhook_endpoint
|
||||
from settings import DEV_SERVER_PID_FILE_NAME, MODE
|
||||
@@ -34,24 +35,67 @@ async def start():
|
||||
f.write(str(os.getpid()))
|
||||
print(f"[main] process started in {MODE} mode")
|
||||
|
||||
async def check_search_service():
|
||||
"""Check if search service is available and log result"""
|
||||
info = await search_service.info()
|
||||
if info.get("status") in ["error", "unavailable"]:
|
||||
print(f"[WARNING] Search service unavailable: {info.get('message', 'unknown reason')}")
|
||||
else:
|
||||
print(f"[INFO] Search service is available: {info}")
|
||||
|
||||
|
||||
# indexing DB data
|
||||
# async def indexing():
|
||||
# from services.db import fetch_all_shouts
|
||||
# all_shouts = await fetch_all_shouts()
|
||||
# await initialize_search_index(all_shouts)
|
||||
async def lifespan(_app):
|
||||
try:
|
||||
print("[lifespan] Starting application initialization")
|
||||
create_all_tables()
|
||||
await asyncio.gather(
|
||||
redis.connect(),
|
||||
precache_data(),
|
||||
ViewedStorage.init(),
|
||||
create_webhook_endpoint(),
|
||||
search_service.info(),
|
||||
check_search_service(),
|
||||
start(),
|
||||
revalidation_manager.start(),
|
||||
)
|
||||
print("[lifespan] Basic initialization complete")
|
||||
|
||||
# Add a delay before starting the intensive search indexing
|
||||
print("[lifespan] Waiting for system stabilization before search indexing...")
|
||||
await asyncio.sleep(10) # 10-second delay to let the system stabilize
|
||||
|
||||
# Start search indexing as a background task with lower priority
|
||||
asyncio.create_task(initialize_search_index_background())
|
||||
|
||||
yield
|
||||
finally:
|
||||
print("[lifespan] Shutting down application services")
|
||||
tasks = [redis.disconnect(), ViewedStorage.stop(), revalidation_manager.stop()]
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
print("[lifespan] Shutdown complete")
|
||||
|
||||
# Initialize search index in the background
|
||||
async def initialize_search_index_background():
|
||||
"""Run search indexing as a background task with low priority"""
|
||||
try:
|
||||
print("[search] Starting background search indexing process")
|
||||
from services.db import fetch_all_shouts
|
||||
|
||||
# Get total count first (optional)
|
||||
all_shouts = await fetch_all_shouts()
|
||||
total_count = len(all_shouts) if all_shouts else 0
|
||||
print(f"[search] Fetched {total_count} shouts for background indexing")
|
||||
|
||||
# Start the indexing process with the fetched shouts
|
||||
print("[search] Beginning background search index initialization...")
|
||||
await initialize_search_index(all_shouts)
|
||||
print("[search] Background search index initialization complete")
|
||||
except Exception as e:
|
||||
print(f"[search] Error in background search indexing: {str(e)}")
|
||||
|
||||
# Создаем экземпляр GraphQL
|
||||
graphql_app = GraphQL(schema, debug=True)
|
||||
|
28
orm/shout.py
28
orm/shout.py
@@ -71,6 +71,34 @@ class ShoutAuthor(Base):
|
||||
class Shout(Base):
|
||||
"""
|
||||
Публикация в системе.
|
||||
|
||||
Attributes:
|
||||
body (str)
|
||||
slug (str)
|
||||
cover (str) : "Cover image url"
|
||||
cover_caption (str) : "Cover image alt caption"
|
||||
lead (str)
|
||||
title (str)
|
||||
subtitle (str)
|
||||
layout (str)
|
||||
media (dict)
|
||||
authors (list[Author])
|
||||
topics (list[Topic])
|
||||
reactions (list[Reaction])
|
||||
lang (str)
|
||||
version_of (int)
|
||||
oid (str)
|
||||
seo (str) : JSON
|
||||
draft (int)
|
||||
created_at (int)
|
||||
updated_at (int)
|
||||
published_at (int)
|
||||
featured_at (int)
|
||||
deleted_at (int)
|
||||
created_by (int)
|
||||
updated_by (int)
|
||||
deleted_by (int)
|
||||
community (int)
|
||||
"""
|
||||
|
||||
__tablename__ = "shout"
|
||||
|
@@ -13,6 +13,10 @@ starlette
|
||||
gql
|
||||
ariadne
|
||||
granian
|
||||
|
||||
# NLP and search
|
||||
httpx
|
||||
|
||||
orjson
|
||||
pydantic
|
||||
trafilatura
|
@@ -10,7 +10,7 @@ from orm.shout import Shout, ShoutAuthor, ShoutTopic
|
||||
from orm.topic import Topic
|
||||
from services.db import json_array_builder, json_builder, local_session
|
||||
from services.schema import query
|
||||
from services.search import search_text
|
||||
from services.search import search_text, get_search_count
|
||||
from services.viewed import ViewedStorage
|
||||
from utils.logger import root_logger as logger
|
||||
|
||||
@@ -187,12 +187,10 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
||||
"""
|
||||
shouts = []
|
||||
try:
|
||||
# logger.info(f"Starting get_shouts_with_links with limit={limit}, offset={offset}")
|
||||
q = q.limit(limit).offset(offset)
|
||||
|
||||
with local_session() as session:
|
||||
shouts_result = session.execute(q).all()
|
||||
# logger.info(f"Got {len(shouts_result) if shouts_result else 0} shouts from query")
|
||||
|
||||
if not shouts_result:
|
||||
logger.warning("No shouts found in query result")
|
||||
@@ -203,7 +201,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
||||
shout = None
|
||||
if hasattr(row, "Shout"):
|
||||
shout = row.Shout
|
||||
# logger.debug(f"Processing shout#{shout.id} at index {idx}")
|
||||
if shout:
|
||||
shout_id = int(f"{shout.id}")
|
||||
shout_dict = shout.dict()
|
||||
@@ -231,20 +228,16 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
||||
topics = None
|
||||
if has_field(info, "topics") and hasattr(row, "topics"):
|
||||
topics = orjson.loads(row.topics) if isinstance(row.topics, str) else row.topics
|
||||
# logger.debug(f"Shout#{shout_id} topics: {topics}")
|
||||
shout_dict["topics"] = topics
|
||||
|
||||
if has_field(info, "main_topic"):
|
||||
main_topic = None
|
||||
if hasattr(row, "main_topic"):
|
||||
# logger.debug(f"Raw main_topic for shout#{shout_id}: {row.main_topic}")
|
||||
main_topic = (
|
||||
orjson.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic
|
||||
)
|
||||
# logger.debug(f"Parsed main_topic for shout#{shout_id}: {main_topic}")
|
||||
|
||||
if not main_topic and topics and len(topics) > 0:
|
||||
# logger.info(f"No main_topic found for shout#{shout_id}, using first topic from list")
|
||||
main_topic = {
|
||||
"id": topics[0]["id"],
|
||||
"title": topics[0]["title"],
|
||||
@@ -252,10 +245,8 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
||||
"is_main": True,
|
||||
}
|
||||
elif not main_topic:
|
||||
logger.warning(f"No main_topic and no topics found for shout#{shout_id}")
|
||||
main_topic = {"id": 0, "title": "no topic", "slug": "notopic", "is_main": True}
|
||||
shout_dict["main_topic"] = main_topic
|
||||
# logger.debug(f"Final main_topic for shout#{shout_id}: {main_topic}")
|
||||
|
||||
if has_field(info, "authors") and hasattr(row, "authors"):
|
||||
shout_dict["authors"] = (
|
||||
@@ -282,7 +273,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
||||
logger.error(f"Fatal error in get_shouts_with_links: {e}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
logger.info(f"Returning {len(shouts)} shouts from get_shouts_with_links")
|
||||
return shouts
|
||||
|
||||
|
||||
@@ -401,8 +391,17 @@ async def load_shouts_search(_, info, text, options):
|
||||
"""
|
||||
limit = options.get("limit", 10)
|
||||
offset = options.get("offset", 0)
|
||||
|
||||
if isinstance(text, str) and len(text) > 2:
|
||||
# Get search results with pagination
|
||||
results = await search_text(text, limit, offset)
|
||||
|
||||
# If no results, return empty list
|
||||
if not results:
|
||||
logger.info(f"No search results found for '{text}'")
|
||||
return []
|
||||
|
||||
# Extract IDs and scores
|
||||
scores = {}
|
||||
hits_ids = []
|
||||
for sr in results:
|
||||
@@ -412,22 +411,42 @@ async def load_shouts_search(_, info, text, options):
|
||||
scores[shout_id] = sr.get("score")
|
||||
hits_ids.append(shout_id)
|
||||
|
||||
q = (
|
||||
query_with_stat(info)
|
||||
if has_field(info, "stat")
|
||||
else select(Shout).filter(and_(Shout.published_at.is_not(None), Shout.deleted_at.is_(None)))
|
||||
)
|
||||
# Query DB for only the IDs in the current page
|
||||
q = query_with_stat(info)
|
||||
q = q.filter(Shout.id.in_(hits_ids))
|
||||
q = apply_filters(q, options)
|
||||
q = apply_sorting(q, options)
|
||||
shouts = get_shouts_with_links(info, q, limit, offset)
|
||||
q = apply_filters(q, options.get("filters", {}))
|
||||
|
||||
#
|
||||
shouts = get_shouts_with_links(info, q, len(hits_ids), 0)
|
||||
|
||||
# Add scores from search results
|
||||
for shout in shouts:
|
||||
shout.score = scores[f"{shout.id}"]
|
||||
shouts.sort(key=lambda x: x.score, reverse=True)
|
||||
shout_id = str(shout['id'])
|
||||
shout["score"] = scores.get(shout_id, 0)
|
||||
|
||||
# Re-sort by search score to maintain ranking
|
||||
shouts.sort(key=lambda x: scores.get(str(x['id']), 0), reverse=True)
|
||||
|
||||
return shouts
|
||||
return []
|
||||
|
||||
|
||||
@query.field("get_search_results_count")
|
||||
async def get_search_results_count(_, info, text):
|
||||
"""
|
||||
Returns the total count of search results for a search query.
|
||||
|
||||
:param _: Root query object (unused)
|
||||
:param info: GraphQL context information
|
||||
:param text: Search query text
|
||||
:return: Total count of results
|
||||
"""
|
||||
if isinstance(text, str) and len(text) > 2:
|
||||
count = await get_search_count(text)
|
||||
return {"count": count}
|
||||
return {"count": 0}
|
||||
|
||||
|
||||
@query.field("load_shouts_unrated")
|
||||
async def load_shouts_unrated(_, info, options):
|
||||
"""
|
||||
|
@@ -33,6 +33,7 @@ type Query {
|
||||
get_shout(slug: String, shout_id: Int): Shout
|
||||
load_shouts_by(options: LoadShoutsOptions): [Shout]
|
||||
load_shouts_search(text: String!, options: LoadShoutsOptions): [SearchResult]
|
||||
get_search_results_count(text: String!): CountResult!
|
||||
load_shouts_bookmarked(options: LoadShoutsOptions): [Shout]
|
||||
|
||||
# rating
|
||||
|
@@ -207,6 +207,7 @@ type CommonResult {
|
||||
}
|
||||
|
||||
type SearchResult {
|
||||
id: Int!
|
||||
slug: String!
|
||||
title: String!
|
||||
cover: String
|
||||
@@ -274,3 +275,7 @@ type MyRateComment {
|
||||
my_rate: ReactionKind
|
||||
}
|
||||
|
||||
type CountResult {
|
||||
count: Int!
|
||||
}
|
||||
|
||||
|
34
server.py
Normal file
34
server.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from granian.constants import Interfaces
|
||||
from granian.log import LogLevels
|
||||
from granian.server import Server
|
||||
|
||||
from settings import PORT
|
||||
from utils.logger import root_logger as logger
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.info("started")
|
||||
try:
|
||||
|
||||
granian_instance = Server(
|
||||
"main:app",
|
||||
address="0.0.0.0",
|
||||
port=PORT,
|
||||
interface=Interfaces.ASGI,
|
||||
workers=1,
|
||||
websockets=False,
|
||||
log_level=LogLevels.debug,
|
||||
backlog=2048,
|
||||
)
|
||||
|
||||
if "dev" in sys.argv:
|
||||
logger.info("dev mode, building ssl context")
|
||||
granian_instance.build_ssl_context(cert=Path("localhost.pem"), key=Path("localhost-key.pem"), password=None)
|
||||
granian_instance.serve()
|
||||
except Exception as error:
|
||||
logger.error(error, exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
logger.info("stopped")
|
@@ -19,7 +19,7 @@ from sqlalchemy import (
|
||||
inspect,
|
||||
text,
|
||||
)
|
||||
from sqlalchemy.orm import Session, configure_mappers, declarative_base
|
||||
from sqlalchemy.orm import Session, configure_mappers, declarative_base, joinedload
|
||||
from sqlalchemy.sql.schema import Table
|
||||
|
||||
from settings import DB_URL
|
||||
@@ -259,3 +259,32 @@ def get_json_builder():
|
||||
|
||||
# Используем их в коде
|
||||
json_builder, json_array_builder, json_cast = get_json_builder()
|
||||
|
||||
# Fetch all shouts, with authors preloaded
|
||||
# This function is used for search indexing
|
||||
|
||||
async def fetch_all_shouts(session=None):
|
||||
"""Fetch all published shouts for search indexing with authors preloaded"""
|
||||
from orm.shout import Shout
|
||||
|
||||
close_session = False
|
||||
if session is None:
|
||||
session = local_session()
|
||||
close_session = True
|
||||
|
||||
try:
|
||||
# Fetch only published and non-deleted shouts with authors preloaded
|
||||
query = session.query(Shout).options(
|
||||
joinedload(Shout.authors)
|
||||
).filter(
|
||||
Shout.published_at.is_not(None),
|
||||
Shout.deleted_at.is_(None)
|
||||
)
|
||||
shouts = query.all()
|
||||
return shouts
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching shouts for search indexing: {e}")
|
||||
return []
|
||||
finally:
|
||||
if close_session:
|
||||
session.close()
|
@@ -2,231 +2,779 @@ import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import httpx
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import orjson
|
||||
from opensearchpy import OpenSearch
|
||||
|
||||
from services.redis import redis
|
||||
from utils.encoders import CustomJSONEncoder
|
||||
|
||||
# Set redis logging level to suppress DEBUG messages
|
||||
# Set up proper logging
|
||||
logger = logging.getLogger("search")
|
||||
logger.setLevel(logging.WARNING)
|
||||
logger.setLevel(logging.INFO) # Change to INFO to see more details
|
||||
|
||||
ELASTIC_HOST = os.environ.get("ELASTIC_HOST", "").replace("https://", "")
|
||||
ELASTIC_USER = os.environ.get("ELASTIC_USER", "")
|
||||
ELASTIC_PASSWORD = os.environ.get("ELASTIC_PASSWORD", "")
|
||||
ELASTIC_PORT = os.environ.get("ELASTIC_PORT", 9200)
|
||||
ELASTIC_URL = os.environ.get(
|
||||
"ELASTIC_URL",
|
||||
f"https://{ELASTIC_USER}:{ELASTIC_PASSWORD}@{ELASTIC_HOST}:{ELASTIC_PORT}",
|
||||
)
|
||||
REDIS_TTL = 86400 # 1 день в секундах
|
||||
# Configuration for search service
|
||||
SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"])
|
||||
TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL", "none")
|
||||
MAX_BATCH_SIZE = int(os.environ.get("SEARCH_MAX_BATCH_SIZE", "25"))
|
||||
|
||||
index_settings = {
|
||||
"settings": {
|
||||
"index": {"number_of_shards": 1, "auto_expand_replicas": "0-all"},
|
||||
"analysis": {
|
||||
"analyzer": {
|
||||
"ru": {
|
||||
"tokenizer": "standard",
|
||||
"filter": ["lowercase", "ru_stop", "ru_stemmer"],
|
||||
}
|
||||
},
|
||||
"filter": {
|
||||
"ru_stemmer": {"type": "stemmer", "language": "russian"},
|
||||
"ru_stop": {"type": "stop", "stopwords": "_russian_"},
|
||||
},
|
||||
},
|
||||
},
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"body": {"type": "text", "analyzer": "ru"},
|
||||
"title": {"type": "text", "analyzer": "ru"},
|
||||
"subtitle": {"type": "text", "analyzer": "ru"},
|
||||
"lead": {"type": "text", "analyzer": "ru"},
|
||||
"media": {"type": "text", "analyzer": "ru"},
|
||||
}
|
||||
},
|
||||
}
|
||||
# Search cache configuration
|
||||
SEARCH_CACHE_ENABLED = bool(os.environ.get("SEARCH_CACHE_ENABLED", "true").lower() in ["true", "1", "yes"])
|
||||
SEARCH_CACHE_TTL_SECONDS = int(os.environ.get("SEARCH_CACHE_TTL_SECONDS", "900")) # Default: 15 minutes
|
||||
SEARCH_MIN_SCORE = float(os.environ.get("SEARCH_MIN_SCORE", "0.1"))
|
||||
SEARCH_PREFETCH_SIZE = int(os.environ.get("SEARCH_PREFETCH_SIZE", "200"))
|
||||
SEARCH_USE_REDIS = bool(os.environ.get("SEARCH_USE_REDIS", "true").lower() in ["true", "1", "yes"])
|
||||
|
||||
expected_mapping = index_settings["mappings"]
|
||||
search_offset = 0
|
||||
|
||||
# Создание цикла событий
|
||||
search_loop = asyncio.get_event_loop()
|
||||
# Import Redis client if Redis caching is enabled
|
||||
if SEARCH_USE_REDIS:
|
||||
try:
|
||||
from services.redis import redis
|
||||
logger.info("Redis client imported for search caching")
|
||||
except ImportError:
|
||||
logger.warning("Redis client import failed, falling back to memory cache")
|
||||
SEARCH_USE_REDIS = False
|
||||
|
||||
# В начале файла добавим флаг
|
||||
SEARCH_ENABLED = bool(os.environ.get("ELASTIC_HOST", ""))
|
||||
|
||||
|
||||
def get_indices_stats():
|
||||
indices_stats = search_service.client.cat.indices(format="json")
|
||||
for index_info in indices_stats:
|
||||
index_name = index_info["index"]
|
||||
if not index_name.startswith("."):
|
||||
index_health = index_info["health"]
|
||||
index_status = index_info["status"]
|
||||
pri_shards = index_info["pri"]
|
||||
rep_shards = index_info["rep"]
|
||||
docs_count = index_info["docs.count"]
|
||||
docs_deleted = index_info["docs.deleted"]
|
||||
store_size = index_info["store.size"]
|
||||
pri_store_size = index_info["pri.store.size"]
|
||||
|
||||
logger.info(f"Index: {index_name}")
|
||||
logger.info(f"Health: {index_health}")
|
||||
logger.info(f"Status: {index_status}")
|
||||
logger.info(f"Primary Shards: {pri_shards}")
|
||||
logger.info(f"Replica Shards: {rep_shards}")
|
||||
logger.info(f"Documents Count: {docs_count}")
|
||||
logger.info(f"Deleted Documents: {docs_deleted}")
|
||||
logger.info(f"Store Size: {store_size}")
|
||||
logger.info(f"Primary Store Size: {pri_store_size}")
|
||||
class SearchCache:
|
||||
"""Cache for search results to enable efficient pagination"""
|
||||
|
||||
def __init__(self, ttl_seconds=SEARCH_CACHE_TTL_SECONDS, max_items=100):
|
||||
self.cache = {} # Maps search query to list of results
|
||||
self.last_accessed = {} # Maps search query to last access timestamp
|
||||
self.ttl = ttl_seconds
|
||||
self.max_items = max_items
|
||||
self._redis_prefix = "search_cache:"
|
||||
|
||||
async def store(self, query, results):
|
||||
"""Store search results for a query"""
|
||||
normalized_query = self._normalize_query(query)
|
||||
|
||||
if SEARCH_USE_REDIS:
|
||||
try:
|
||||
serialized_results = json.dumps(results)
|
||||
await redis.set(
|
||||
f"{self._redis_prefix}{normalized_query}",
|
||||
serialized_results,
|
||||
ex=self.ttl
|
||||
)
|
||||
logger.info(f"Stored {len(results)} search results for query '{query}' in Redis")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error storing search results in Redis: {e}")
|
||||
# Fall back to memory cache if Redis fails
|
||||
|
||||
# First cleanup if needed for memory cache
|
||||
if len(self.cache) >= self.max_items:
|
||||
self._cleanup()
|
||||
|
||||
# Store results and update timestamp
|
||||
self.cache[normalized_query] = results
|
||||
self.last_accessed[normalized_query] = time.time()
|
||||
logger.info(f"Cached {len(results)} search results for query '{query}' in memory")
|
||||
return True
|
||||
|
||||
async def get(self, query, limit=10, offset=0):
|
||||
"""Get paginated results for a query"""
|
||||
normalized_query = self._normalize_query(query)
|
||||
all_results = None
|
||||
|
||||
# Try to get from Redis first
|
||||
if SEARCH_USE_REDIS:
|
||||
try:
|
||||
cached_data = await redis.get(f"{self._redis_prefix}{normalized_query}")
|
||||
if cached_data:
|
||||
all_results = json.loads(cached_data)
|
||||
logger.info(f"Retrieved search results for '{query}' from Redis")
|
||||
except Exception as e:
|
||||
logger.error(f"Error retrieving search results from Redis: {e}")
|
||||
|
||||
# Fall back to memory cache if not in Redis
|
||||
if all_results is None and normalized_query in self.cache:
|
||||
all_results = self.cache[normalized_query]
|
||||
self.last_accessed[normalized_query] = time.time()
|
||||
logger.info(f"Retrieved search results for '{query}' from memory cache")
|
||||
|
||||
# If not found in any cache
|
||||
if all_results is None:
|
||||
logger.info(f"Cache miss for query '{query}'")
|
||||
return None
|
||||
|
||||
# Return paginated subset
|
||||
end_idx = min(offset + limit, len(all_results))
|
||||
if offset >= len(all_results):
|
||||
logger.warning(f"Requested offset {offset} exceeds result count {len(all_results)}")
|
||||
return []
|
||||
|
||||
logger.info(f"Cache hit for '{query}': serving {offset}:{end_idx} of {len(all_results)} results")
|
||||
return all_results[offset:end_idx]
|
||||
|
||||
async def has_query(self, query):
|
||||
"""Check if query exists in cache"""
|
||||
normalized_query = self._normalize_query(query)
|
||||
|
||||
# Check Redis first
|
||||
if SEARCH_USE_REDIS:
|
||||
try:
|
||||
exists = await redis.get(f"{self._redis_prefix}{normalized_query}")
|
||||
if exists:
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking Redis for query existence: {e}")
|
||||
|
||||
# Fall back to memory cache
|
||||
return normalized_query in self.cache
|
||||
|
||||
async def get_total_count(self, query):
|
||||
"""Get total count of results for a query"""
|
||||
normalized_query = self._normalize_query(query)
|
||||
|
||||
# Check Redis first
|
||||
if SEARCH_USE_REDIS:
|
||||
try:
|
||||
cached_data = await redis.get(f"{self._redis_prefix}{normalized_query}")
|
||||
if cached_data:
|
||||
all_results = json.loads(cached_data)
|
||||
return len(all_results)
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting result count from Redis: {e}")
|
||||
|
||||
# Fall back to memory cache
|
||||
if normalized_query in self.cache:
|
||||
return len(self.cache[normalized_query])
|
||||
|
||||
return 0
|
||||
|
||||
def _normalize_query(self, query):
|
||||
"""Normalize query string for cache key"""
|
||||
if not query:
|
||||
return ""
|
||||
# Simple normalization - lowercase and strip whitespace
|
||||
return query.lower().strip()
|
||||
|
||||
def _cleanup(self):
|
||||
"""Remove oldest entries if memory cache is full"""
|
||||
now = time.time()
|
||||
# First remove expired entries
|
||||
expired_keys = [
|
||||
key for key, last_access in self.last_accessed.items()
|
||||
if now - last_access > self.ttl
|
||||
]
|
||||
|
||||
for key in expired_keys:
|
||||
if key in self.cache:
|
||||
del self.cache[key]
|
||||
if key in self.last_accessed:
|
||||
del self.last_accessed[key]
|
||||
|
||||
logger.info(f"Cleaned up {len(expired_keys)} expired search cache entries")
|
||||
|
||||
# If still above max size, remove oldest entries
|
||||
if len(self.cache) >= self.max_items:
|
||||
# Sort by last access time
|
||||
sorted_items = sorted(self.last_accessed.items(), key=lambda x: x[1])
|
||||
# Remove oldest 20%
|
||||
remove_count = max(1, int(len(sorted_items) * 0.2))
|
||||
for key, _ in sorted_items[:remove_count]:
|
||||
if key in self.cache:
|
||||
del self.cache[key]
|
||||
if key in self.last_accessed:
|
||||
del self.last_accessed[key]
|
||||
logger.info(f"Removed {remove_count} oldest search cache entries")
|
||||
|
||||
class SearchService:
|
||||
def __init__(self, index_name="search_index"):
|
||||
logger.info("Инициализируем поиск...")
|
||||
self.index_name = index_name
|
||||
self.client = None
|
||||
self.lock = asyncio.Lock()
|
||||
|
||||
# Инициализация клиента OpenSearch только если поиск включен
|
||||
if SEARCH_ENABLED:
|
||||
try:
|
||||
self.client = OpenSearch(
|
||||
hosts=[{"host": ELASTIC_HOST, "port": ELASTIC_PORT}],
|
||||
http_compress=True,
|
||||
http_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
|
||||
use_ssl=True,
|
||||
verify_certs=False,
|
||||
ssl_assert_hostname=False,
|
||||
ssl_show_warn=False,
|
||||
)
|
||||
logger.info("Клиент OpenSearch.org подключен")
|
||||
search_loop.create_task(self.check_index())
|
||||
except Exception as exc:
|
||||
logger.warning(f"Поиск отключен из-за ошибки подключения: {exc}")
|
||||
self.client = None
|
||||
else:
|
||||
logger.info("Поиск отключен (ELASTIC_HOST не установлен)")
|
||||
|
||||
def __init__(self):
|
||||
logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}")
|
||||
self.available = SEARCH_ENABLED
|
||||
# Use different timeout settings for indexing and search requests
|
||||
self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL)
|
||||
self.index_client = httpx.AsyncClient(timeout=120.0, base_url=TXTAI_SERVICE_URL)
|
||||
# Initialize search cache
|
||||
self.cache = SearchCache() if SEARCH_CACHE_ENABLED else None
|
||||
|
||||
if not self.available:
|
||||
logger.info("Search disabled (SEARCH_ENABLED = False)")
|
||||
|
||||
if SEARCH_CACHE_ENABLED:
|
||||
cache_location = "Redis" if SEARCH_USE_REDIS else "Memory"
|
||||
logger.info(f"Search caching enabled using {cache_location} cache with TTL={SEARCH_CACHE_TTL_SECONDS}s")
|
||||
logger.info(f"Minimum score filter: {SEARCH_MIN_SCORE}, prefetch size: {SEARCH_PREFETCH_SIZE}")
|
||||
|
||||
async def info(self):
|
||||
if not SEARCH_ENABLED:
|
||||
"""Return information about search service"""
|
||||
if not self.available:
|
||||
return {"status": "disabled"}
|
||||
|
||||
try:
|
||||
return get_indices_stats()
|
||||
response = await self.client.get("/info")
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
logger.info(f"Search service info: {result}")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get search info: {e}")
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def delete_index(self):
|
||||
if self.client:
|
||||
logger.warning(f"[!!!] Удаляем индекс {self.index_name}")
|
||||
self.client.indices.delete(index=self.index_name, ignore_unavailable=True)
|
||||
|
||||
def create_index(self):
|
||||
if self.client:
|
||||
logger.info(f"Создается индекс: {self.index_name}")
|
||||
self.client.indices.create(index=self.index_name, body=index_settings)
|
||||
logger.info(f"Индекс {self.index_name} создан")
|
||||
|
||||
async def check_index(self):
|
||||
if self.client:
|
||||
logger.info(f"Проверяем индекс {self.index_name}...")
|
||||
if not self.client.indices.exists(index=self.index_name):
|
||||
self.create_index()
|
||||
self.client.indices.put_mapping(index=self.index_name, body=expected_mapping)
|
||||
else:
|
||||
logger.info(f"Найден существующий индекс {self.index_name}")
|
||||
# Проверка и обновление структуры индекса, если необходимо
|
||||
result = self.client.indices.get_mapping(index=self.index_name)
|
||||
if isinstance(result, str):
|
||||
result = orjson.loads(result)
|
||||
if isinstance(result, dict):
|
||||
mapping = result.get(self.index_name, {}).get("mappings")
|
||||
logger.info(f"Найдена структура индексации: {mapping['properties'].keys()}")
|
||||
expected_keys = expected_mapping["properties"].keys()
|
||||
if mapping and mapping["properties"].keys() != expected_keys:
|
||||
logger.info(f"Ожидаемая структура индексации: {expected_mapping}")
|
||||
logger.warning("[!!!] Требуется переиндексация всех данных")
|
||||
self.delete_index()
|
||||
self.client = None
|
||||
else:
|
||||
logger.error("клиент не инициализован, невозможно проверить индекс")
|
||||
|
||||
def index(self, shout):
|
||||
if not SEARCH_ENABLED:
|
||||
return
|
||||
|
||||
if self.client:
|
||||
logger.info(f"Индексируем пост {shout.id}")
|
||||
index_body = {
|
||||
"body": shout.body,
|
||||
"title": shout.title,
|
||||
"subtitle": shout.subtitle,
|
||||
"lead": shout.lead,
|
||||
"media": shout.media,
|
||||
|
||||
def is_ready(self):
|
||||
"""Check if service is available"""
|
||||
return self.available
|
||||
|
||||
|
||||
async def verify_docs(self, doc_ids):
|
||||
"""Verify which documents exist in the search index across all content types"""
|
||||
if not self.available:
|
||||
return {"status": "disabled"}
|
||||
|
||||
try:
|
||||
logger.info(f"Verifying {len(doc_ids)} documents in search index")
|
||||
response = await self.client.post(
|
||||
"/verify-docs",
|
||||
json={"doc_ids": doc_ids},
|
||||
timeout=60.0 # Longer timeout for potentially large ID lists
|
||||
)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
|
||||
# Process the more detailed response format
|
||||
bodies_missing = set(result.get("bodies", {}).get("missing", []))
|
||||
titles_missing = set(result.get("titles", {}).get("missing", []))
|
||||
|
||||
# Combine missing IDs from both bodies and titles
|
||||
# A document is considered missing if it's missing from either index
|
||||
all_missing = list(bodies_missing.union(titles_missing))
|
||||
|
||||
# Log summary of verification results
|
||||
bodies_missing_count = len(bodies_missing)
|
||||
titles_missing_count = len(titles_missing)
|
||||
total_missing_count = len(all_missing)
|
||||
|
||||
logger.info(f"Document verification complete: {bodies_missing_count} bodies missing, {titles_missing_count} titles missing")
|
||||
logger.info(f"Total unique missing documents: {total_missing_count} out of {len(doc_ids)} total")
|
||||
|
||||
# Return in a backwards-compatible format plus the detailed breakdown
|
||||
return {
|
||||
"missing": all_missing,
|
||||
"details": {
|
||||
"bodies_missing": list(bodies_missing),
|
||||
"titles_missing": list(titles_missing),
|
||||
"bodies_missing_count": bodies_missing_count,
|
||||
"titles_missing_count": titles_missing_count
|
||||
}
|
||||
}
|
||||
asyncio.create_task(self.perform_index(shout, index_body))
|
||||
except Exception as e:
|
||||
logger.error(f"Document verification error: {e}")
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
async def perform_index(self, shout, index_body):
|
||||
if self.client:
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self.client.index(index=self.index_name, id=str(shout.id), body=index_body), timeout=40.0
|
||||
|
||||
def index(self, shout):
|
||||
"""Index a single document"""
|
||||
if not self.available:
|
||||
return
|
||||
logger.info(f"Indexing post {shout.id}")
|
||||
# Start in background to not block
|
||||
asyncio.create_task(self.perform_index(shout))
|
||||
|
||||
async def perform_index(self, shout):
|
||||
"""Index a single document across multiple endpoints"""
|
||||
if not self.available:
|
||||
return
|
||||
|
||||
try:
|
||||
logger.info(f"Indexing document {shout.id} to individual endpoints")
|
||||
indexing_tasks = []
|
||||
|
||||
# 1. Index title if available
|
||||
if hasattr(shout, 'title') and shout.title and isinstance(shout.title, str):
|
||||
title_doc = {
|
||||
"id": str(shout.id),
|
||||
"title": shout.title.strip()
|
||||
}
|
||||
indexing_tasks.append(
|
||||
self.index_client.post("/index-title", json=title_doc)
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"Indexing timeout for shout {shout.id}")
|
||||
|
||||
# 2. Index body content (subtitle, lead, body)
|
||||
body_text_parts = []
|
||||
for field_name in ['subtitle', 'lead', 'body']:
|
||||
field_value = getattr(shout, field_name, None)
|
||||
if field_value and isinstance(field_value, str) and field_value.strip():
|
||||
body_text_parts.append(field_value.strip())
|
||||
|
||||
# Process media content if available
|
||||
media = getattr(shout, 'media', None)
|
||||
if media:
|
||||
if isinstance(media, str):
|
||||
try:
|
||||
media_json = json.loads(media)
|
||||
if isinstance(media_json, dict):
|
||||
if 'title' in media_json:
|
||||
body_text_parts.append(media_json['title'])
|
||||
if 'body' in media_json:
|
||||
body_text_parts.append(media_json['body'])
|
||||
except json.JSONDecodeError:
|
||||
body_text_parts.append(media)
|
||||
elif isinstance(media, dict):
|
||||
if 'title' in media:
|
||||
body_text_parts.append(media['title'])
|
||||
if 'body' in media:
|
||||
body_text_parts.append(media['body'])
|
||||
|
||||
if body_text_parts:
|
||||
body_text = " ".join(body_text_parts)
|
||||
# Truncate if too long
|
||||
MAX_TEXT_LENGTH = 4000
|
||||
if len(body_text) > MAX_TEXT_LENGTH:
|
||||
body_text = body_text[:MAX_TEXT_LENGTH]
|
||||
|
||||
body_doc = {
|
||||
"id": str(shout.id),
|
||||
"body": body_text
|
||||
}
|
||||
indexing_tasks.append(
|
||||
self.index_client.post("/index-body", json=body_doc)
|
||||
)
|
||||
|
||||
# 3. Index authors
|
||||
authors = getattr(shout, 'authors', [])
|
||||
for author in authors:
|
||||
author_id = str(getattr(author, 'id', 0))
|
||||
if not author_id or author_id == '0':
|
||||
continue
|
||||
|
||||
name = getattr(author, 'name', '')
|
||||
|
||||
# Combine bio and about fields
|
||||
bio_parts = []
|
||||
bio = getattr(author, 'bio', '')
|
||||
if bio and isinstance(bio, str):
|
||||
bio_parts.append(bio.strip())
|
||||
|
||||
about = getattr(author, 'about', '')
|
||||
if about and isinstance(about, str):
|
||||
bio_parts.append(about.strip())
|
||||
|
||||
combined_bio = " ".join(bio_parts)
|
||||
|
||||
if name:
|
||||
author_doc = {
|
||||
"id": author_id,
|
||||
"name": name,
|
||||
"bio": combined_bio
|
||||
}
|
||||
indexing_tasks.append(
|
||||
self.index_client.post("/index-author", json=author_doc)
|
||||
)
|
||||
|
||||
# Run all indexing tasks in parallel
|
||||
if indexing_tasks:
|
||||
responses = await asyncio.gather(*indexing_tasks, return_exceptions=True)
|
||||
|
||||
# Check for errors in responses
|
||||
for i, response in enumerate(responses):
|
||||
if isinstance(response, Exception):
|
||||
logger.error(f"Error in indexing task {i}: {response}")
|
||||
elif hasattr(response, 'status_code') and response.status_code >= 400:
|
||||
logger.error(f"Error response in indexing task {i}: {response.status_code}, {await response.text()}")
|
||||
|
||||
logger.info(f"Document {shout.id} indexed across {len(indexing_tasks)} endpoints")
|
||||
else:
|
||||
logger.warning(f"No content to index for shout {shout.id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Indexing error for shout {shout.id}: {e}")
|
||||
|
||||
async def bulk_index(self, shouts):
|
||||
"""Index multiple documents across three separate endpoints"""
|
||||
if not self.available or not shouts:
|
||||
logger.warning(f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}")
|
||||
return
|
||||
|
||||
start_time = time.time()
|
||||
logger.info(f"Starting multi-endpoint bulk indexing of {len(shouts)} documents")
|
||||
|
||||
# Prepare documents for different endpoints
|
||||
title_docs = []
|
||||
body_docs = []
|
||||
author_docs = {} # Use dict to prevent duplicate authors
|
||||
|
||||
total_skipped = 0
|
||||
|
||||
for shout in shouts:
|
||||
try:
|
||||
# 1. Process title documents
|
||||
if hasattr(shout, 'title') and shout.title and isinstance(shout.title, str):
|
||||
title_docs.append({
|
||||
"id": str(shout.id),
|
||||
"title": shout.title.strip()
|
||||
})
|
||||
|
||||
# 2. Process body documents (subtitle, lead, body)
|
||||
body_text_parts = []
|
||||
for field_name in ['subtitle', 'lead', 'body']:
|
||||
field_value = getattr(shout, field_name, None)
|
||||
if field_value and isinstance(field_value, str) and field_value.strip():
|
||||
body_text_parts.append(field_value.strip())
|
||||
|
||||
# Process media content if available
|
||||
media = getattr(shout, 'media', None)
|
||||
if media:
|
||||
if isinstance(media, str):
|
||||
try:
|
||||
media_json = json.loads(media)
|
||||
if isinstance(media_json, dict):
|
||||
if 'title' in media_json:
|
||||
body_text_parts.append(media_json['title'])
|
||||
if 'body' in media_json:
|
||||
body_text_parts.append(media_json['body'])
|
||||
except json.JSONDecodeError:
|
||||
body_text_parts.append(media)
|
||||
elif isinstance(media, dict):
|
||||
if 'title' in media:
|
||||
body_text_parts.append(media['title'])
|
||||
if 'body' in media:
|
||||
body_text_parts.append(media['body'])
|
||||
|
||||
# Only add body document if we have body text
|
||||
if body_text_parts:
|
||||
body_text = " ".join(body_text_parts)
|
||||
# Truncate if too long
|
||||
MAX_TEXT_LENGTH = 4000
|
||||
if len(body_text) > MAX_TEXT_LENGTH:
|
||||
body_text = body_text[:MAX_TEXT_LENGTH]
|
||||
|
||||
body_docs.append({
|
||||
"id": str(shout.id),
|
||||
"body": body_text
|
||||
})
|
||||
|
||||
# 3. Process authors if available
|
||||
authors = getattr(shout, 'authors', [])
|
||||
for author in authors:
|
||||
author_id = str(getattr(author, 'id', 0))
|
||||
if not author_id or author_id == '0':
|
||||
continue
|
||||
|
||||
# Skip if we've already processed this author
|
||||
if author_id in author_docs:
|
||||
continue
|
||||
|
||||
name = getattr(author, 'name', '')
|
||||
|
||||
# Combine bio and about fields
|
||||
bio_parts = []
|
||||
bio = getattr(author, 'bio', '')
|
||||
if bio and isinstance(bio, str):
|
||||
bio_parts.append(bio.strip())
|
||||
|
||||
about = getattr(author, 'about', '')
|
||||
if about and isinstance(about, str):
|
||||
bio_parts.append(about.strip())
|
||||
|
||||
combined_bio = " ".join(bio_parts)
|
||||
|
||||
# Only add if we have author data
|
||||
if name:
|
||||
author_docs[author_id] = {
|
||||
"id": author_id,
|
||||
"name": name,
|
||||
"bio": combined_bio
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Indexing error for shout {shout.id}: {e}")
|
||||
logger.error(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing: {e}")
|
||||
total_skipped += 1
|
||||
|
||||
# Convert author dict to list
|
||||
author_docs_list = list(author_docs.values())
|
||||
|
||||
# Process each endpoint in parallel
|
||||
indexing_tasks = [
|
||||
self._index_endpoint(title_docs, "/bulk-index-titles", "title"),
|
||||
self._index_endpoint(body_docs, "/bulk-index-bodies", "body"),
|
||||
self._index_endpoint(author_docs_list, "/bulk-index-authors", "author")
|
||||
]
|
||||
|
||||
await asyncio.gather(*indexing_tasks)
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
logger.info(
|
||||
f"Multi-endpoint indexing completed in {elapsed:.2f}s: "
|
||||
f"{len(title_docs)} titles, {len(body_docs)} bodies, {len(author_docs_list)} authors, "
|
||||
f"{total_skipped} shouts skipped"
|
||||
)
|
||||
|
||||
async def _index_endpoint(self, documents, endpoint, doc_type):
|
||||
"""Process and index documents to a specific endpoint"""
|
||||
if not documents:
|
||||
logger.info(f"No {doc_type} documents to index")
|
||||
return
|
||||
|
||||
logger.info(f"Indexing {len(documents)} {doc_type} documents")
|
||||
|
||||
# Categorize documents by size
|
||||
small_docs, medium_docs, large_docs = self._categorize_by_size(documents, doc_type)
|
||||
|
||||
# Process each category with appropriate batch sizes
|
||||
batch_sizes = {
|
||||
"small": min(MAX_BATCH_SIZE, 15),
|
||||
"medium": min(MAX_BATCH_SIZE, 10),
|
||||
"large": min(MAX_BATCH_SIZE, 3)
|
||||
}
|
||||
|
||||
for category, docs in [("small", small_docs), ("medium", medium_docs), ("large", large_docs)]:
|
||||
if docs:
|
||||
batch_size = batch_sizes[category]
|
||||
await self._process_batches(docs, batch_size, endpoint, f"{doc_type}-{category}")
|
||||
|
||||
def _categorize_by_size(self, documents, doc_type):
|
||||
"""Categorize documents by size for optimized batch processing"""
|
||||
small_docs = []
|
||||
medium_docs = []
|
||||
large_docs = []
|
||||
|
||||
for doc in documents:
|
||||
# Extract relevant text based on document type
|
||||
if doc_type == "title":
|
||||
text = doc.get("title", "")
|
||||
elif doc_type == "body":
|
||||
text = doc.get("body", "")
|
||||
else: # author
|
||||
# For authors, consider both name and bio length
|
||||
text = doc.get("name", "") + " " + doc.get("bio", "")
|
||||
|
||||
text_len = len(text)
|
||||
|
||||
if text_len > 5000:
|
||||
large_docs.append(doc)
|
||||
elif text_len > 2000:
|
||||
medium_docs.append(doc)
|
||||
else:
|
||||
small_docs.append(doc)
|
||||
|
||||
logger.info(f"{doc_type.capitalize()} documents categorized: {len(small_docs)} small, {len(medium_docs)} medium, {len(large_docs)} large")
|
||||
return small_docs, medium_docs, large_docs
|
||||
|
||||
async def _process_batches(self, documents, batch_size, endpoint, batch_prefix):
|
||||
"""Process document batches with retry logic"""
|
||||
for i in range(0, len(documents), batch_size):
|
||||
batch = documents[i:i+batch_size]
|
||||
batch_id = f"{batch_prefix}-{i//batch_size + 1}"
|
||||
|
||||
retry_count = 0
|
||||
max_retries = 3
|
||||
success = False
|
||||
|
||||
while not success and retry_count < max_retries:
|
||||
try:
|
||||
logger.info(f"Sending batch {batch_id} ({len(batch)} docs) to {endpoint}")
|
||||
response = await self.index_client.post(
|
||||
endpoint,
|
||||
json=batch,
|
||||
timeout=90.0
|
||||
)
|
||||
|
||||
if response.status_code == 422:
|
||||
error_detail = response.json()
|
||||
logger.error(f"Validation error from search service for batch {batch_id}: {self._truncate_error_detail(error_detail)}")
|
||||
break
|
||||
|
||||
response.raise_for_status()
|
||||
success = True
|
||||
logger.info(f"Successfully indexed batch {batch_id}")
|
||||
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
if len(batch) > 1:
|
||||
mid = len(batch) // 2
|
||||
logger.warning(f"Splitting batch {batch_id} into smaller batches for retry")
|
||||
await self._process_batches(batch[:mid], batch_size // 2, endpoint, f"{batch_prefix}-{i//batch_size}-A")
|
||||
await self._process_batches(batch[mid:], batch_size // 2, endpoint, f"{batch_prefix}-{i//batch_size}-B")
|
||||
else:
|
||||
logger.error(f"Failed to index single document in batch {batch_id} after {max_retries} attempts: {str(e)}")
|
||||
break
|
||||
|
||||
wait_time = (2 ** retry_count) + (random.random() * 0.5)
|
||||
logger.warning(f"Retrying batch {batch_id} in {wait_time:.1f}s... (attempt {retry_count+1}/{max_retries})")
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
def _truncate_error_detail(self, error_detail):
|
||||
"""Truncate error details for logging"""
|
||||
truncated_detail = error_detail.copy() if isinstance(error_detail, dict) else error_detail
|
||||
|
||||
if isinstance(truncated_detail, dict) and 'detail' in truncated_detail and isinstance(truncated_detail['detail'], list):
|
||||
for i, item in enumerate(truncated_detail['detail']):
|
||||
if isinstance(item, dict) and 'input' in item:
|
||||
if isinstance(item['input'], dict) and any(k in item['input'] for k in ['documents', 'text']):
|
||||
if 'documents' in item['input'] and isinstance(item['input']['documents'], list):
|
||||
for j, doc in enumerate(item['input']['documents']):
|
||||
if 'text' in doc and isinstance(doc['text'], str) and len(doc['text']) > 100:
|
||||
item['input']['documents'][j]['text'] = f"{doc['text'][:100]}... [truncated, total {len(doc['text'])} chars]"
|
||||
|
||||
if 'text' in item['input'] and isinstance(item['input']['text'], str) and len(item['input']['text']) > 100:
|
||||
item['input']['text'] = f"{item['input']['text'][:100]}... [truncated, total {len(item['input']['text'])} chars]"
|
||||
|
||||
return truncated_detail
|
||||
|
||||
async def search(self, text, limit, offset):
|
||||
if not SEARCH_ENABLED:
|
||||
"""Search documents"""
|
||||
if not self.available:
|
||||
return []
|
||||
|
||||
if not isinstance(text, str) or not text.strip():
|
||||
return []
|
||||
|
||||
logger.info(f"Searching for: '{text}' (limit={limit}, offset={offset})")
|
||||
|
||||
# Check if we can serve from cache
|
||||
if SEARCH_CACHE_ENABLED:
|
||||
has_cache = await self.cache.has_query(text)
|
||||
if has_cache:
|
||||
cached_results = await self.cache.get(text, limit, offset)
|
||||
if cached_results is not None:
|
||||
return cached_results
|
||||
|
||||
# Not in cache or cache disabled, perform new search
|
||||
try:
|
||||
search_limit = limit
|
||||
search_offset = offset
|
||||
|
||||
logger.info(f"Ищем: {text} {offset}+{limit}")
|
||||
search_body = {
|
||||
"query": {"multi_match": {"query": text, "fields": ["title", "lead", "subtitle", "body", "media"]}}
|
||||
}
|
||||
|
||||
if self.client:
|
||||
search_response = self.client.search(
|
||||
index=self.index_name,
|
||||
body=search_body,
|
||||
size=limit,
|
||||
from_=offset,
|
||||
_source=False,
|
||||
_source_excludes=["title", "body", "subtitle", "media", "lead", "_index"],
|
||||
if SEARCH_CACHE_ENABLED:
|
||||
search_limit = SEARCH_PREFETCH_SIZE
|
||||
search_offset = 0
|
||||
else:
|
||||
search_limit = limit
|
||||
search_offset = offset
|
||||
|
||||
response = await self.client.post(
|
||||
"/search",
|
||||
json={"text": text, "limit": search_limit, "offset": search_offset}
|
||||
)
|
||||
hits = search_response["hits"]["hits"]
|
||||
results = [{"id": hit["_id"], "score": hit["_score"]} for hit in hits]
|
||||
|
||||
# если результаты не пустые
|
||||
if results:
|
||||
# Кэширование в Redis с TTL
|
||||
redis_key = f"search:{text}:{offset}+{limit}"
|
||||
await redis.execute(
|
||||
"SETEX",
|
||||
redis_key,
|
||||
REDIS_TTL,
|
||||
json.dumps(results, cls=CustomJSONEncoder),
|
||||
)
|
||||
return results
|
||||
return []
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
|
||||
formatted_results = result.get("results", [])
|
||||
|
||||
valid_results = []
|
||||
for item in formatted_results:
|
||||
doc_id = item.get("id")
|
||||
if doc_id and doc_id.isdigit():
|
||||
valid_results.append(item)
|
||||
|
||||
if len(valid_results) != len(formatted_results):
|
||||
formatted_results = valid_results
|
||||
|
||||
if SEARCH_MIN_SCORE > 0:
|
||||
initial_count = len(formatted_results)
|
||||
formatted_results = [r for r in formatted_results if r.get("score", 0) >= SEARCH_MIN_SCORE]
|
||||
|
||||
if SEARCH_CACHE_ENABLED:
|
||||
await self.cache.store(text, formatted_results)
|
||||
end_idx = offset + limit
|
||||
page_results = formatted_results[offset:end_idx]
|
||||
return page_results
|
||||
|
||||
return formatted_results
|
||||
except Exception as e:
|
||||
logger.error(f"Search error for '{text}': {e}", exc_info=True)
|
||||
return []
|
||||
|
||||
async def check_index_status(self):
|
||||
"""Get detailed statistics about the search index health"""
|
||||
if not self.available:
|
||||
return {"status": "disabled"}
|
||||
|
||||
try:
|
||||
response = await self.client.get("/index-status")
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
|
||||
if result.get("consistency", {}).get("status") != "ok":
|
||||
null_count = result.get("consistency", {}).get("null_embeddings_count", 0)
|
||||
if null_count > 0:
|
||||
logger.warning(f"Found {null_count} documents with NULL embeddings")
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to check index status: {e}")
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
# Create the search service singleton
|
||||
search_service = SearchService()
|
||||
|
||||
|
||||
# API-compatible function to perform a search
|
||||
async def search_text(text: str, limit: int = 50, offset: int = 0):
|
||||
payload = []
|
||||
if search_service.client:
|
||||
# Использование метода search_post из OpenSearchService
|
||||
if search_service.available:
|
||||
payload = await search_service.search(text, limit, offset)
|
||||
return payload
|
||||
|
||||
async def get_search_count(text: str):
|
||||
"""Get total count of results for a query without fetching all results"""
|
||||
if search_service.available and SEARCH_CACHE_ENABLED:
|
||||
if await search_service.cache.has_query(text):
|
||||
return await search_service.cache.get_total_count(text)
|
||||
results = await search_text(text, SEARCH_PREFETCH_SIZE, 0)
|
||||
return len(results)
|
||||
|
||||
# Проверить что URL корректный
|
||||
OPENSEARCH_URL = os.getenv("OPENSEARCH_URL", "rc1a-3n5pi3bhuj9gieel.mdb.yandexcloud.net")
|
||||
async def initialize_search_index(shouts_data):
|
||||
"""Initialize search index with existing data during application startup"""
|
||||
if not SEARCH_ENABLED:
|
||||
return
|
||||
|
||||
if not shouts_data:
|
||||
return
|
||||
|
||||
info = await search_service.info()
|
||||
if info.get("status") in ["error", "unavailable", "disabled"]:
|
||||
return
|
||||
|
||||
index_stats = info.get("index_stats", {})
|
||||
indexed_doc_count = index_stats.get("total_count", 0)
|
||||
|
||||
index_status = await search_service.check_index_status()
|
||||
if index_status.get("status") == "inconsistent":
|
||||
problem_ids = index_status.get("consistency", {}).get("null_embeddings_sample", [])
|
||||
|
||||
if problem_ids:
|
||||
problem_docs = [shout for shout in shouts_data if str(shout.id) in problem_ids]
|
||||
if problem_docs:
|
||||
await search_service.bulk_index(problem_docs)
|
||||
|
||||
db_ids = [str(shout.id) for shout in shouts_data]
|
||||
|
||||
try:
|
||||
numeric_ids = [int(sid) for sid in db_ids if sid.isdigit()]
|
||||
if numeric_ids:
|
||||
min_id = min(numeric_ids)
|
||||
max_id = max(numeric_ids)
|
||||
id_range = max_id - min_id + 1
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
if abs(indexed_doc_count - len(shouts_data)) > 10:
|
||||
doc_ids = [str(shout.id) for shout in shouts_data]
|
||||
|
||||
verification = await search_service.verify_docs(doc_ids)
|
||||
|
||||
if verification.get("status") == "error":
|
||||
return
|
||||
|
||||
missing_ids = verification.get("missing", [])
|
||||
if missing_ids:
|
||||
missing_docs = [shout for shout in shouts_data if str(shout.id) in missing_ids]
|
||||
await search_service.bulk_index(missing_docs)
|
||||
else:
|
||||
pass
|
||||
|
||||
try:
|
||||
test_query = "test"
|
||||
test_results = await search_text(test_query, 5)
|
||||
|
||||
if test_results:
|
||||
categories = set()
|
||||
for result in test_results:
|
||||
result_id = result.get("id")
|
||||
matching_shouts = [s for s in shouts_data if str(s.id) == result_id]
|
||||
if matching_shouts and hasattr(matching_shouts[0], 'category'):
|
||||
categories.add(getattr(matching_shouts[0], 'category', 'unknown'))
|
||||
except Exception as e:
|
||||
pass
|
||||
|
Reference in New Issue
Block a user