diff --git a/.gitea/workflows/main.yml b/.gitea/workflows/main.yml index 18730b95..f65ae48a 100644 --- a/.gitea/workflows/main.yml +++ b/.gitea/workflows/main.yml @@ -29,7 +29,16 @@ jobs: if: github.ref == 'refs/heads/dev' uses: dokku/github-action@master with: - branch: 'dev' + branch: 'main' force: true git_remote_url: 'ssh://dokku@v2.discours.io:22/core' ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }} + + - name: Push to dokku for staging branch + if: github.ref == 'refs/heads/staging' + uses: dokku/github-action@master + with: + branch: 'dev' + git_remote_url: 'ssh://dokku@staging.discours.io:22/core' + ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }} + git_push_flags: '--force' \ No newline at end of file diff --git a/.gitignore b/.gitignore index fe42d48f..8257ff78 100644 --- a/.gitignore +++ b/.gitignore @@ -128,6 +128,9 @@ dmypy.json .idea temp.* +# Debug +DEBUG.log + discours.key discours.crt discours.pem @@ -162,5 +165,6 @@ views.json *.crt *cache.json .cursor +.devcontainer/ -node_modules/ \ No newline at end of file +node_modules/ diff --git a/Dockerfile b/Dockerfile index b481d544..dd41fae3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,6 +3,7 @@ FROM python:slim RUN apt-get update && apt-get install -y \ postgresql-client \ curl \ + build-essential \ && rm -rf /var/lib/apt/lists/* WORKDIR /app diff --git a/main.py b/main.py index ed09f7ef..05f33c16 100644 --- a/main.py +++ b/main.py @@ -21,7 +21,7 @@ from cache.revalidator import revalidation_manager from services.exception import ExceptionHandlerMiddleware from services.redis import redis from services.schema import create_all_tables, resolvers -from services.search import search_service +from services.search import search_service, initialize_search_index from utils.logger import root_logger as logger from auth.internal import InternalAuthentication @@ -46,6 +46,15 @@ DIST_DIR = join(os.path.dirname(__file__), "dist") # Директория дл INDEX_HTML = join(os.path.dirname(__file__), "index.html") +async def check_search_service(): + """Check if search service is available and log result""" + info = await search_service.info() + if info.get("status") in ["error", "unavailable"]: + print(f"[WARNING] Search service unavailable: {info.get('message', 'unknown reason')}") + else: + print(f"[INFO] Search service is available: {info}") + + async def index_handler(request: Request): """ Раздача основного HTML файла diff --git a/orm/shout.py b/orm/shout.py index 5d445184..5934d6cb 100644 --- a/orm/shout.py +++ b/orm/shout.py @@ -71,6 +71,34 @@ class ShoutAuthor(Base): class Shout(Base): """ Публикация в системе. + + Attributes: + body (str) + slug (str) + cover (str) : "Cover image url" + cover_caption (str) : "Cover image alt caption" + lead (str) + title (str) + subtitle (str) + layout (str) + media (dict) + authors (list[Author]) + topics (list[Topic]) + reactions (list[Reaction]) + lang (str) + version_of (int) + oid (str) + seo (str) : JSON + draft (int) + created_at (int) + updated_at (int) + published_at (int) + featured_at (int) + deleted_at (int) + created_by (int) + updated_by (int) + deleted_by (int) + community (int) """ __tablename__ = "shout" diff --git a/requirements.txt b/requirements.txt index f8578f04..89423048 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,10 @@ starlette gql ariadne granian + +# NLP and search +httpx + orjson pydantic trafilatura \ No newline at end of file diff --git a/resolvers/__init__.py b/resolvers/__init__.py index c4113feb..55a6ce7b 100644 --- a/resolvers/__init__.py +++ b/resolvers/__init__.py @@ -8,6 +8,7 @@ from resolvers.author import ( # search_authors, get_author_id, get_authors_all, load_authors_by, + load_authors_search, update_author, ) from resolvers.community import get_communities_all, get_community @@ -97,6 +98,7 @@ __all__ = [ "get_author_follows_authors", "get_authors_all", "load_authors_by", + "load_authors_search", "update_author", # "search_authors", diff --git a/resolvers/author.py b/resolvers/author.py index d3ec1c26..ceb05d0f 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -20,6 +20,7 @@ from services.auth import login_required from services.db import local_session from services.redis import redis from services.schema import mutation, query +from services.search import search_service from utils.logger import root_logger as logger DEFAULT_COMMUNITIES = [1] @@ -358,6 +359,46 @@ async def load_authors_by(_, info, by, limit, offset): return await get_authors_with_stats(limit, offset, by, current_user_id, is_admin) +@query.field("load_authors_search") +async def load_authors_search(_, info, text: str, limit: int = 10, offset: int = 0): + """ + Resolver for searching authors by text. Works with txt-ai search endpony. + Args: + text: Search text + limit: Maximum number of authors to return + offset: Offset for pagination + Returns: + list: List of authors matching the search criteria + """ + + # Get author IDs from search engine (already sorted by relevance) + search_results = await search_service.search_authors(text, limit, offset) + + if not search_results: + return [] + + author_ids = [result.get("id") for result in search_results if result.get("id")] + if not author_ids: + return [] + + # Fetch full author objects from DB + with local_session() as session: + # Simple query to get authors by IDs - no need for stats here + authors_query = select(Author).filter(Author.id.in_(author_ids)) + db_authors = session.execute(authors_query).scalars().all() + + if not db_authors: + return [] + + # Create a dictionary for quick lookup + authors_dict = {str(author.id): author for author in db_authors} + + # Keep the order from search results (maintains the relevance sorting) + ordered_authors = [authors_dict[author_id] for author_id in author_ids if author_id in authors_dict] + + return ordered_authors + + def get_author_id_from(slug="", user=None, author_id=None): try: author_id = None diff --git a/schema/query.graphql b/schema/query.graphql index 2a4d10cf..31c29f77 100644 --- a/schema/query.graphql +++ b/schema/query.graphql @@ -4,7 +4,7 @@ type Query { get_author_id(user: String!): Author get_authors_all: [Author] load_authors_by(by: AuthorsBy!, limit: Int, offset: Int): [Author] - # search_authors(what: String!): [Author] + load_authors_search(text: String!, limit: Int, offset: Int): [Author!] # Search for authors by name or bio # Auth queries logout: AuthResult! @@ -41,6 +41,7 @@ type Query { get_shout(slug: String, shout_id: Int): Shout load_shouts_by(options: LoadShoutsOptions): [Shout] load_shouts_search(text: String!, options: LoadShoutsOptions): [SearchResult] + get_search_results_count(text: String!): CountResult! load_shouts_bookmarked(options: LoadShoutsOptions): [Shout] # rating diff --git a/schema/type.graphql b/schema/type.graphql index ebf513fe..71f2aebe 100644 --- a/schema/type.graphql +++ b/schema/type.graphql @@ -214,6 +214,7 @@ type CommonResult { } type SearchResult { + id: Int! slug: String! title: String! cover: String @@ -317,3 +318,7 @@ type RolesInfo { permissions: [Permission!]! } +type CountResult { + count: Int! +} + diff --git a/services/db.py b/services/db.py index 17981644..844ee891 100644 --- a/services/db.py +++ b/services/db.py @@ -19,7 +19,7 @@ from sqlalchemy import ( inspect, text, ) -from sqlalchemy.orm import Session, configure_mappers, declarative_base +from sqlalchemy.orm import Session, configure_mappers, declarative_base, joinedload from sqlalchemy.sql.schema import Table from settings import DB_URL @@ -298,3 +298,32 @@ def get_json_builder(): # Используем их в коде json_builder, json_array_builder, json_cast = get_json_builder() + +# Fetch all shouts, with authors preloaded +# This function is used for search indexing + +async def fetch_all_shouts(session=None): + """Fetch all published shouts for search indexing with authors preloaded""" + from orm.shout import Shout + + close_session = False + if session is None: + session = local_session() + close_session = True + + try: + # Fetch only published and non-deleted shouts with authors preloaded + query = session.query(Shout).options( + joinedload(Shout.authors) + ).filter( + Shout.published_at.is_not(None), + Shout.deleted_at.is_(None) + ) + shouts = query.all() + return shouts + except Exception as e: + logger.error(f"Error fetching shouts for search indexing: {e}") + return [] + finally: + if close_session: + session.close() \ No newline at end of file diff --git a/services/search.py b/services/search.py index adf62789..f83e4050 100644 --- a/services/search.py +++ b/services/search.py @@ -2,251 +2,949 @@ import asyncio import json import logging import os -from typing import List +import httpx +import time +import random +from collections import defaultdict +from datetime import datetime, timedelta -import orjson -from opensearchpy import OpenSearch - -from orm.shout import Shout -from services.redis import redis -from utils.encoders import CustomJSONEncoder - -# Set redis logging level to suppress DEBUG messages +# Set up proper logging logger = logging.getLogger("search") -logger.setLevel(logging.WARNING) +logger.setLevel(logging.INFO) # Change to INFO to see more details +# Disable noise HTTP client logging +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("httpcore").setLevel(logging.WARNING) -ELASTIC_HOST = os.environ.get("ELASTIC_HOST", "").replace("https://", "") -ELASTIC_USER = os.environ.get("ELASTIC_USER", "") -ELASTIC_PASSWORD = os.environ.get("ELASTIC_PASSWORD", "") -ELASTIC_PORT = os.environ.get("ELASTIC_PORT", 9200) -ELASTIC_URL = os.environ.get( - "ELASTIC_URL", - f"https://{ELASTIC_USER}:{ELASTIC_PASSWORD}@{ELASTIC_HOST}:{ELASTIC_PORT}", +# Configuration for search service +SEARCH_ENABLED = bool( + os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"] ) -REDIS_TTL = 86400 # 1 день в секундах +TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL", "none") +MAX_BATCH_SIZE = int(os.environ.get("SEARCH_MAX_BATCH_SIZE", "25")) -index_settings = { - "settings": { - "index": {"number_of_shards": 1, "auto_expand_replicas": "0-all"}, - "analysis": { - "analyzer": { - "ru": { - "tokenizer": "standard", - "filter": ["lowercase", "ru_stop", "ru_stemmer"], - } - }, - "filter": { - "ru_stemmer": {"type": "stemmer", "language": "russian"}, - "ru_stop": {"type": "stop", "stopwords": "_russian_"}, - }, - }, - }, - "mappings": { - "properties": { - "body": {"type": "text", "analyzer": "ru"}, - "title": {"type": "text", "analyzer": "ru"}, - "subtitle": {"type": "text", "analyzer": "ru"}, - "lead": {"type": "text", "analyzer": "ru"}, - "media": {"type": "text", "analyzer": "ru"}, - } - }, -} +# Search cache configuration +SEARCH_CACHE_ENABLED = bool( + os.environ.get("SEARCH_CACHE_ENABLED", "true").lower() in ["true", "1", "yes"] +) +SEARCH_CACHE_TTL_SECONDS = int( + os.environ.get("SEARCH_CACHE_TTL_SECONDS", "300") +) # Default: 15 minutes +SEARCH_PREFETCH_SIZE = int(os.environ.get("SEARCH_PREFETCH_SIZE", "200")) +SEARCH_USE_REDIS = bool( + os.environ.get("SEARCH_USE_REDIS", "true").lower() in ["true", "1", "yes"] +) -expected_mapping = index_settings["mappings"] +search_offset = 0 -# Создание цикла событий -search_loop = asyncio.get_event_loop() +# Import Redis client if Redis caching is enabled +if SEARCH_USE_REDIS: + try: + from services.redis import redis -# В начале файла добавим флаг -SEARCH_ENABLED = bool(os.environ.get("ELASTIC_HOST", "")) + logger.info("Redis client imported for search caching") + except ImportError: + logger.warning("Redis client import failed, falling back to memory cache") + SEARCH_USE_REDIS = False -def get_indices_stats(): - indices_stats = search_service.client.cat.indices(format="json") - for index_info in indices_stats: - index_name = index_info["index"] - if not index_name.startswith("."): - index_health = index_info["health"] - index_status = index_info["status"] - pri_shards = index_info["pri"] - rep_shards = index_info["rep"] - docs_count = index_info["docs.count"] - docs_deleted = index_info["docs.deleted"] - store_size = index_info["store.size"] - pri_store_size = index_info["pri.store.size"] +class SearchCache: + """Cache for search results to enable efficient pagination""" - logger.info(f"Index: {index_name}") - logger.info(f"Health: {index_health}") - logger.info(f"Status: {index_status}") - logger.info(f"Primary Shards: {pri_shards}") - logger.info(f"Replica Shards: {rep_shards}") - logger.info(f"Documents Count: {docs_count}") - logger.info(f"Deleted Documents: {docs_deleted}") - logger.info(f"Store Size: {store_size}") - logger.info(f"Primary Store Size: {pri_store_size}") + def __init__(self, ttl_seconds=SEARCH_CACHE_TTL_SECONDS, max_items=100): + self.cache = {} # Maps search query to list of results + self.last_accessed = {} # Maps search query to last access timestamp + self.ttl = ttl_seconds + self.max_items = max_items + self._redis_prefix = "search_cache:" + + async def store(self, query, results): + """Store search results for a query""" + normalized_query = self._normalize_query(query) + + if SEARCH_USE_REDIS: + try: + serialized_results = json.dumps(results) + await redis.set( + f"{self._redis_prefix}{normalized_query}", + serialized_results, + ex=self.ttl, + ) + logger.info( + f"Stored {len(results)} search results for query '{query}' in Redis" + ) + return True + except Exception as e: + logger.error(f"Error storing search results in Redis: {e}") + # Fall back to memory cache if Redis fails + + # First cleanup if needed for memory cache + if len(self.cache) >= self.max_items: + self._cleanup() + + # Store results and update timestamp + self.cache[normalized_query] = results + self.last_accessed[normalized_query] = time.time() + logger.info( + f"Cached {len(results)} search results for query '{query}' in memory" + ) + return True + + async def get(self, query, limit=10, offset=0): + """Get paginated results for a query""" + normalized_query = self._normalize_query(query) + all_results = None + + # Try to get from Redis first + if SEARCH_USE_REDIS: + try: + cached_data = await redis.get(f"{self._redis_prefix}{normalized_query}") + if cached_data: + all_results = json.loads(cached_data) + logger.info(f"Retrieved search results for '{query}' from Redis") + except Exception as e: + logger.error(f"Error retrieving search results from Redis: {e}") + + # Fall back to memory cache if not in Redis + if all_results is None and normalized_query in self.cache: + all_results = self.cache[normalized_query] + self.last_accessed[normalized_query] = time.time() + logger.info(f"Retrieved search results for '{query}' from memory cache") + + # If not found in any cache + if all_results is None: + logger.info(f"Cache miss for query '{query}'") + return None + + # Return paginated subset + end_idx = min(offset + limit, len(all_results)) + if offset >= len(all_results): + logger.warning( + f"Requested offset {offset} exceeds result count {len(all_results)}" + ) + return [] + + logger.info( + f"Cache hit for '{query}': serving {offset}:{end_idx} of {len(all_results)} results" + ) + return all_results[offset:end_idx] + + async def has_query(self, query): + """Check if query exists in cache""" + normalized_query = self._normalize_query(query) + + # Check Redis first + if SEARCH_USE_REDIS: + try: + exists = await redis.get(f"{self._redis_prefix}{normalized_query}") + if exists: + return True + except Exception as e: + logger.error(f"Error checking Redis for query existence: {e}") + + # Fall back to memory cache + return normalized_query in self.cache + + async def get_total_count(self, query): + """Get total count of results for a query""" + normalized_query = self._normalize_query(query) + + # Check Redis first + if SEARCH_USE_REDIS: + try: + cached_data = await redis.get(f"{self._redis_prefix}{normalized_query}") + if cached_data: + all_results = json.loads(cached_data) + return len(all_results) + except Exception as e: + logger.error(f"Error getting result count from Redis: {e}") + + # Fall back to memory cache + if normalized_query in self.cache: + return len(self.cache[normalized_query]) + + return 0 + + def _normalize_query(self, query): + """Normalize query string for cache key""" + if not query: + return "" + # Simple normalization - lowercase and strip whitespace + return query.lower().strip() + + def _cleanup(self): + """Remove oldest entries if memory cache is full""" + now = time.time() + # First remove expired entries + expired_keys = [ + key + for key, last_access in self.last_accessed.items() + if now - last_access > self.ttl + ] + + for key in expired_keys: + if key in self.cache: + del self.cache[key] + if key in self.last_accessed: + del self.last_accessed[key] + + logger.info(f"Cleaned up {len(expired_keys)} expired search cache entries") + + # If still above max size, remove oldest entries + if len(self.cache) >= self.max_items: + # Sort by last access time + sorted_items = sorted(self.last_accessed.items(), key=lambda x: x[1]) + # Remove oldest 20% + remove_count = max(1, int(len(sorted_items) * 0.2)) + for key, _ in sorted_items[:remove_count]: + if key in self.cache: + del self.cache[key] + if key in self.last_accessed: + del self.last_accessed[key] + logger.info(f"Removed {remove_count} oldest search cache entries") class SearchService: - def __init__(self, index_name="search_index"): - logger.info("Инициализируем поиск...") - self.index_name = index_name - self.client = None - self.lock = asyncio.Lock() + def __init__(self): + logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}") + self.available = SEARCH_ENABLED + # Use different timeout settings for indexing and search requests + self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL) + self.index_client = httpx.AsyncClient(timeout=120.0, base_url=TXTAI_SERVICE_URL) + # Initialize search cache + self.cache = SearchCache() if SEARCH_CACHE_ENABLED else None - # Инициализация клиента OpenSearch только если поиск включен - if SEARCH_ENABLED: - try: - self.client = OpenSearch( - hosts=[{"host": ELASTIC_HOST, "port": ELASTIC_PORT}], - http_compress=True, - http_auth=(ELASTIC_USER, ELASTIC_PASSWORD), - use_ssl=True, - verify_certs=False, - ssl_assert_hostname=False, - ssl_show_warn=False, - ) - logger.info("Клиент OpenSearch.org подключен") - search_loop.create_task(self.check_index()) - except Exception as exc: - logger.warning(f"Поиск отключен из-за ошибки подключения: {exc}") - self.client = None - else: - logger.info("Поиск отключен (ELASTIC_HOST не установлен)") + if not self.available: + logger.info("Search disabled (SEARCH_ENABLED = False)") + + if SEARCH_CACHE_ENABLED: + cache_location = "Redis" if SEARCH_USE_REDIS else "Memory" + logger.info( + f"Search caching enabled using {cache_location} cache with TTL={SEARCH_CACHE_TTL_SECONDS}s" + ) async def info(self): - if not SEARCH_ENABLED: + """Return information about search service""" + if not self.available: return {"status": "disabled"} - try: - return get_indices_stats() + response = await self.client.get("/info") + response.raise_for_status() + result = response.json() + logger.info(f"Search service info: {result}") + return result except Exception as e: logger.error(f"Failed to get search info: {e}") return {"status": "error", "message": str(e)} - def delete_index(self): - if self.client: - logger.warning(f"[!!!] Удаляем индекс {self.index_name}") - self.client.indices.delete(index=self.index_name, ignore_unavailable=True) + def is_ready(self): + """Check if service is available""" + return self.available - def create_index(self): - if self.client: - logger.info(f"Создается индекс: {self.index_name}") - self.client.indices.create(index=self.index_name, body=index_settings) - logger.info(f"Индекс {self.index_name} создан") + async def verify_docs(self, doc_ids): + """Verify which documents exist in the search index across all content types""" + if not self.available: + return {"status": "disabled"} - async def check_index(self): - if self.client: - logger.info(f"Проверяем индекс {self.index_name}...") - if not self.client.indices.exists(index=self.index_name): - self.create_index() - self.client.indices.put_mapping(index=self.index_name, body=expected_mapping) - else: - logger.info(f"Найден существующий индекс {self.index_name}") - # Проверка и обновление структуры индекса, если необходимо - result = self.client.indices.get_mapping(index=self.index_name) - if isinstance(result, str): - result = orjson.loads(result) - if isinstance(result, dict): - mapping = result.get(self.index_name, {}).get("mappings") - logger.info(f"Найдена структура индексации: {mapping['properties'].keys()}") - expected_keys = expected_mapping["properties"].keys() - if mapping and mapping["properties"].keys() != expected_keys: - logger.info(f"Ожидаемая структура индексации: {expected_mapping}") - logger.warning("[!!!] Требуется переиндексация всех данных") - self.delete_index() - self.client = None - else: - logger.error("клиент не инициализован, невозможно проверить индекс") + try: + logger.info(f"Verifying {len(doc_ids)} documents in search index") + response = await self.client.post( + "/verify-docs", + json={"doc_ids": doc_ids}, + timeout=60.0, # Longer timeout for potentially large ID lists + ) + response.raise_for_status() + result = response.json() - def index_shouts(self, shouts: List[Shout]): - if not SEARCH_ENABLED: - return + # Process the more detailed response format + bodies_missing = set(result.get("bodies", {}).get("missing", [])) + titles_missing = set(result.get("titles", {}).get("missing", [])) - if self.client: - for shout in shouts: - self.index(shout) - - def index(self, shout: Shout): - return self.index_shout(shout) + # Combine missing IDs from both bodies and titles + # A document is considered missing if it's missing from either index + all_missing = list(bodies_missing.union(titles_missing)) - def index_shout(self, shout: Shout): - if not SEARCH_ENABLED: - return + # Log summary of verification results + bodies_missing_count = len(bodies_missing) + titles_missing_count = len(titles_missing) + total_missing_count = len(all_missing) - if self.client: - logger.info(f"Индексируем пост {shout.id}") - index_body = { - "body": shout.body, - "title": shout.title, - "subtitle": shout.subtitle, - "lead": shout.lead, - "media": shout.media, + logger.info( + f"Document verification complete: {bodies_missing_count} bodies missing, {titles_missing_count} titles missing" + ) + logger.info( + f"Total unique missing documents: {total_missing_count} out of {len(doc_ids)} total" + ) + + # Return in a backwards-compatible format plus the detailed breakdown + return { + "missing": all_missing, + "details": { + "bodies_missing": list(bodies_missing), + "titles_missing": list(titles_missing), + "bodies_missing_count": bodies_missing_count, + "titles_missing_count": titles_missing_count, + }, } - asyncio.create_task(self.perform_index(shout, index_body)) + except Exception as e: + logger.error(f"Document verification error: {e}") + return {"status": "error", "message": str(e)} - def close(self): - if self.client: - self.client.close() + def index(self, shout): + """Index a single document""" + if not self.available: + return + logger.info(f"Indexing post {shout.id}") + # Start in background to not block + asyncio.create_task(self.perform_index(shout)) - async def perform_index(self, shout, index_body): - if self.client: - try: - await asyncio.wait_for( - self.client.index(index=self.index_name, id=str(shout.id), body=index_body), - timeout=40.0, + async def perform_index(self, shout): + """Index a single document across multiple endpoints""" + if not self.available: + return + + try: + logger.info(f"Indexing document {shout.id} to individual endpoints") + indexing_tasks = [] + + # 1. Index title if available + if hasattr(shout, "title") and shout.title and isinstance(shout.title, str): + title_doc = {"id": str(shout.id), "title": shout.title.strip()} + indexing_tasks.append( + self.index_client.post("/index-title", json=title_doc) ) - except asyncio.TimeoutError: - logger.error(f"Indexing timeout for shout {shout.id}") + + # 2. Index body content (subtitle, lead, body) + body_text_parts = [] + for field_name in ["subtitle", "lead", "body"]: + field_value = getattr(shout, field_name, None) + if field_value and isinstance(field_value, str) and field_value.strip(): + body_text_parts.append(field_value.strip()) + + # Process media content if available + media = getattr(shout, "media", None) + if media: + if isinstance(media, str): + try: + media_json = json.loads(media) + if isinstance(media_json, dict): + if "title" in media_json: + body_text_parts.append(media_json["title"]) + if "body" in media_json: + body_text_parts.append(media_json["body"]) + except json.JSONDecodeError: + body_text_parts.append(media) + elif isinstance(media, dict): + if "title" in media: + body_text_parts.append(media["title"]) + if "body" in media: + body_text_parts.append(media["body"]) + + if body_text_parts: + body_text = " ".join(body_text_parts) + # Truncate if too long + MAX_TEXT_LENGTH = 4000 + if len(body_text) > MAX_TEXT_LENGTH: + body_text = body_text[:MAX_TEXT_LENGTH] + + body_doc = {"id": str(shout.id), "body": body_text} + indexing_tasks.append( + self.index_client.post("/index-body", json=body_doc) + ) + + # 3. Index authors + authors = getattr(shout, "authors", []) + for author in authors: + author_id = str(getattr(author, "id", 0)) + if not author_id or author_id == "0": + continue + + name = getattr(author, "name", "") + + # Combine bio and about fields + bio_parts = [] + bio = getattr(author, "bio", "") + if bio and isinstance(bio, str): + bio_parts.append(bio.strip()) + + about = getattr(author, "about", "") + if about and isinstance(about, str): + bio_parts.append(about.strip()) + + combined_bio = " ".join(bio_parts) + + if name: + author_doc = {"id": author_id, "name": name, "bio": combined_bio} + indexing_tasks.append( + self.index_client.post("/index-author", json=author_doc) + ) + + # Run all indexing tasks in parallel + if indexing_tasks: + responses = await asyncio.gather( + *indexing_tasks, return_exceptions=True + ) + + # Check for errors in responses + for i, response in enumerate(responses): + if isinstance(response, Exception): + logger.error(f"Error in indexing task {i}: {response}") + elif ( + hasattr(response, "status_code") and response.status_code >= 400 + ): + logger.error( + f"Error response in indexing task {i}: {response.status_code}, {await response.text()}" + ) + + logger.info( + f"Document {shout.id} indexed across {len(indexing_tasks)} endpoints" + ) + else: + logger.warning(f"No content to index for shout {shout.id}") + + except Exception as e: + logger.error(f"Indexing error for shout {shout.id}: {e}") + + async def bulk_index(self, shouts): + """Index multiple documents across three separate endpoints""" + if not self.available or not shouts: + logger.warning( + f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}" + ) + return + + start_time = time.time() + logger.info(f"Starting multi-endpoint bulk indexing of {len(shouts)} documents") + + # Prepare documents for different endpoints + title_docs = [] + body_docs = [] + author_docs = {} # Use dict to prevent duplicate authors + + total_skipped = 0 + + for shout in shouts: + try: + # 1. Process title documents + if ( + hasattr(shout, "title") + and shout.title + and isinstance(shout.title, str) + ): + title_docs.append( + {"id": str(shout.id), "title": shout.title.strip()} + ) + + # 2. Process body documents (subtitle, lead, body) + body_text_parts = [] + for field_name in ["subtitle", "lead", "body"]: + field_value = getattr(shout, field_name, None) + if ( + field_value + and isinstance(field_value, str) + and field_value.strip() + ): + body_text_parts.append(field_value.strip()) + + # Process media content if available + media = getattr(shout, "media", None) + if media: + if isinstance(media, str): + try: + media_json = json.loads(media) + if isinstance(media_json, dict): + if "title" in media_json: + body_text_parts.append(media_json["title"]) + if "body" in media_json: + body_text_parts.append(media_json["body"]) + except json.JSONDecodeError: + body_text_parts.append(media) + elif isinstance(media, dict): + if "title" in media: + body_text_parts.append(media["title"]) + if "body" in media: + body_text_parts.append(media["body"]) + + # Only add body document if we have body text + if body_text_parts: + body_text = " ".join(body_text_parts) + # Truncate if too long + MAX_TEXT_LENGTH = 4000 + if len(body_text) > MAX_TEXT_LENGTH: + body_text = body_text[:MAX_TEXT_LENGTH] + + body_docs.append({"id": str(shout.id), "body": body_text}) + + # 3. Process authors if available + authors = getattr(shout, "authors", []) + for author in authors: + author_id = str(getattr(author, "id", 0)) + if not author_id or author_id == "0": + continue + + # Skip if we've already processed this author + if author_id in author_docs: + continue + + name = getattr(author, "name", "") + + # Combine bio and about fields + bio_parts = [] + bio = getattr(author, "bio", "") + if bio and isinstance(bio, str): + bio_parts.append(bio.strip()) + + about = getattr(author, "about", "") + if about and isinstance(about, str): + bio_parts.append(about.strip()) + + combined_bio = " ".join(bio_parts) + + # Only add if we have author data + if name: + author_docs[author_id] = { + "id": author_id, + "name": name, + "bio": combined_bio, + } + except Exception as e: - logger.error(f"Indexing error for shout {shout.id}: {e}") + logger.error( + f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing: {e}" + ) + total_skipped += 1 - async def search(self, text, limit, offset): - if not SEARCH_ENABLED: - return [] + # Convert author dict to list + author_docs_list = list(author_docs.values()) - logger.info(f"Ищем: {text} {offset}+{limit}") - search_body = { - "query": { - "multi_match": {"query": text, "fields": ["title", "lead", "subtitle", "body", "media"]} - } + # Log indexing started message + logger.info("indexing started...") + + # Process each endpoint in parallel + indexing_tasks = [ + self._index_endpoint(title_docs, "/bulk-index-titles", "title"), + self._index_endpoint(body_docs, "/bulk-index-bodies", "body"), + self._index_endpoint(author_docs_list, "/bulk-index-authors", "author"), + ] + + await asyncio.gather(*indexing_tasks) + + elapsed = time.time() - start_time + logger.info( + f"Multi-endpoint indexing completed in {elapsed:.2f}s: " + f"{len(title_docs)} titles, {len(body_docs)} bodies, {len(author_docs_list)} authors, " + f"{total_skipped} shouts skipped" + ) + + async def _index_endpoint(self, documents, endpoint, doc_type): + """Process and index documents to a specific endpoint""" + if not documents: + logger.info(f"No {doc_type} documents to index") + return + + logger.info(f"Indexing {len(documents)} {doc_type} documents") + + # Categorize documents by size + small_docs, medium_docs, large_docs = self._categorize_by_size( + documents, doc_type + ) + + # Process each category with appropriate batch sizes + batch_sizes = { + "small": min(MAX_BATCH_SIZE, 15), + "medium": min(MAX_BATCH_SIZE, 10), + "large": min(MAX_BATCH_SIZE, 3), } - if self.client: - search_response = self.client.search( - index=self.index_name, - body=search_body, - size=limit, - from_=offset, - _source=False, - _source_excludes=["title", "body", "subtitle", "media", "lead", "_index"], - ) - hits = search_response["hits"]["hits"] - results = [{"id": hit["_id"], "score": hit["_score"]} for hit in hits] - - # если результаты не пустые - if results: - # Кэширование в Redis с TTL - redis_key = f"search:{text}:{offset}+{limit}" - await redis.execute( - "SETEX", - redis_key, - REDIS_TTL, - json.dumps(results, cls=CustomJSONEncoder), + for category, docs in [ + ("small", small_docs), + ("medium", medium_docs), + ("large", large_docs), + ]: + if docs: + batch_size = batch_sizes[category] + await self._process_batches( + docs, batch_size, endpoint, f"{doc_type}-{category}" ) - return results - return [] + + def _categorize_by_size(self, documents, doc_type): + """Categorize documents by size for optimized batch processing""" + small_docs = [] + medium_docs = [] + large_docs = [] + + for doc in documents: + # Extract relevant text based on document type + if doc_type == "title": + text = doc.get("title", "") + elif doc_type == "body": + text = doc.get("body", "") + else: # author + # For authors, consider both name and bio length + text = doc.get("name", "") + " " + doc.get("bio", "") + + text_len = len(text) + + if text_len > 5000: + large_docs.append(doc) + elif text_len > 2000: + medium_docs.append(doc) + else: + small_docs.append(doc) + + logger.info( + f"{doc_type.capitalize()} documents categorized: {len(small_docs)} small, {len(medium_docs)} medium, {len(large_docs)} large" + ) + return small_docs, medium_docs, large_docs + + async def _process_batches(self, documents, batch_size, endpoint, batch_prefix): + """Process document batches with retry logic""" + for i in range(0, len(documents), batch_size): + batch = documents[i : i + batch_size] + batch_id = f"{batch_prefix}-{i//batch_size + 1}" + + retry_count = 0 + max_retries = 3 + success = False + + while not success and retry_count < max_retries: + try: + response = await self.index_client.post( + endpoint, json=batch, timeout=90.0 + ) + + if response.status_code == 422: + error_detail = response.json() + logger.error( + f"Validation error from search service for batch {batch_id}: {self._truncate_error_detail(error_detail)}" + ) + break + + response.raise_for_status() + success = True + + except Exception as e: + retry_count += 1 + if retry_count >= max_retries: + if len(batch) > 1: + mid = len(batch) // 2 + await self._process_batches( + batch[:mid], + batch_size // 2, + endpoint, + f"{batch_prefix}-{i//batch_size}-A", + ) + await self._process_batches( + batch[mid:], + batch_size // 2, + endpoint, + f"{batch_prefix}-{i//batch_size}-B", + ) + else: + logger.error( + f"Failed to index single document in batch {batch_id} after {max_retries} attempts: {str(e)}" + ) + break + + wait_time = (2**retry_count) + (random.random() * 0.5) + await asyncio.sleep(wait_time) + + def _truncate_error_detail(self, error_detail): + """Truncate error details for logging""" + truncated_detail = ( + error_detail.copy() if isinstance(error_detail, dict) else error_detail + ) + + if ( + isinstance(truncated_detail, dict) + and "detail" in truncated_detail + and isinstance(truncated_detail["detail"], list) + ): + for i, item in enumerate(truncated_detail["detail"]): + if isinstance(item, dict) and "input" in item: + if isinstance(item["input"], dict) and any( + k in item["input"] for k in ["documents", "text"] + ): + if "documents" in item["input"] and isinstance( + item["input"]["documents"], list + ): + for j, doc in enumerate(item["input"]["documents"]): + if ( + "text" in doc + and isinstance(doc["text"], str) + and len(doc["text"]) > 100 + ): + item["input"]["documents"][j][ + "text" + ] = f"{doc['text'][:100]}... [truncated, total {len(doc['text'])} chars]" + + if ( + "text" in item["input"] + and isinstance(item["input"]["text"], str) + and len(item["input"]["text"]) > 100 + ): + item["input"][ + "text" + ] = f"{item['input']['text'][:100]}... [truncated, total {len(item['input']['text'])} chars]" + + return truncated_detail + + async def search(self, text, limit, offset): + """Search documents""" + if not self.available: + return [] + + if not isinstance(text, str) or not text.strip(): + return [] + + # Check if we can serve from cache + if SEARCH_CACHE_ENABLED: + has_cache = await self.cache.has_query(text) + if has_cache: + cached_results = await self.cache.get(text, limit, offset) + if cached_results is not None: + return cached_results + + # Not in cache or cache disabled, perform new search + try: + search_limit = limit + + if SEARCH_CACHE_ENABLED: + search_limit = SEARCH_PREFETCH_SIZE + else: + search_limit = limit + + logger.info(f"Searching for: '{text}' (limit={limit}, offset={offset}, search_limit={search_limit})") + + response = await self.client.post( + "/search-combined", + json={"text": text, "limit": search_limit}, + ) + response.raise_for_status() + result = response.json() + formatted_results = result.get("results", []) + + # filter out non‑numeric IDs + valid_results = [r for r in formatted_results if r.get("id", "").isdigit()] + if len(valid_results) != len(formatted_results): + formatted_results = valid_results + + if len(valid_results) != len(formatted_results): + formatted_results = valid_results + + if SEARCH_CACHE_ENABLED: + # Store the full prefetch batch, then page it + await self.cache.store(text, formatted_results) + return await self.cache.get(text, limit, offset) + + return formatted_results + except Exception as e: + logger.error(f"Search error for '{text}': {e}", exc_info=True) + return [] + + async def search_authors(self, text, limit=10, offset=0): + """Search only for authors using the specialized endpoint""" + if not self.available or not text.strip(): + return [] + + cache_key = f"author:{text}" + + # Check if we can serve from cache + if SEARCH_CACHE_ENABLED: + has_cache = await self.cache.has_query(cache_key) + if has_cache: + cached_results = await self.cache.get(cache_key, limit, offset) + if cached_results is not None: + return cached_results + + # Not in cache or cache disabled, perform new search + try: + search_limit = limit + + if SEARCH_CACHE_ENABLED: + search_limit = SEARCH_PREFETCH_SIZE + else: + search_limit = limit + + logger.info( + f"Searching authors for: '{text}' (limit={limit}, offset={offset}, search_limit={search_limit})" + ) + response = await self.client.post( + "/search-author", json={"text": text, "limit": search_limit} + ) + response.raise_for_status() + + result = response.json() + author_results = result.get("results", []) + + # Filter out any invalid results if necessary + valid_results = [r for r in author_results if r.get("id", "").isdigit()] + if len(valid_results) != len(author_results): + author_results = valid_results + + if SEARCH_CACHE_ENABLED: + # Store the full prefetch batch, then page it + await self.cache.store(cache_key, author_results) + return await self.cache.get(cache_key, limit, offset) + + return author_results[offset : offset + limit] + + except Exception as e: + logger.error(f"Error searching authors for '{text}': {e}") + return [] + + async def check_index_status(self): + """Get detailed statistics about the search index health""" + if not self.available: + return {"status": "disabled"} + + try: + response = await self.client.get("/index-status") + response.raise_for_status() + result = response.json() + + if result.get("consistency", {}).get("status") != "ok": + null_count = result.get("consistency", {}).get( + "null_embeddings_count", 0 + ) + if null_count > 0: + logger.warning(f"Found {null_count} documents with NULL embeddings") + + return result + except Exception as e: + logger.error(f"Failed to check index status: {e}") + return {"status": "error", "message": str(e)} +# Create the search service singleton search_service = SearchService() +# API-compatible function to perform a search -async def search_text(text: str, limit: int = 50, offset: int = 0): + +async def search_text(text: str, limit: int = 200, offset: int = 0): payload = [] - if search_service.client: - # Использование метода search_post из OpenSearchService + if search_service.available: payload = await search_service.search(text, limit, offset) return payload -# Проверить что URL корректный -OPENSEARCH_URL = os.getenv("OPENSEARCH_URL", "rc1a-3n5pi3bhuj9gieel.mdb.yandexcloud.net") +async def search_author_text(text: str, limit: int = 10, offset: int = 0): + """Search authors API helper function""" + if search_service.available: + return await search_service.search_authors(text, limit, offset) + return [] + + +async def get_search_count(text: str): + """Get count of title search results""" + if not search_service.available: + return 0 + + if SEARCH_CACHE_ENABLED and await search_service.cache.has_query(text): + return await search_service.cache.get_total_count(text) + + # If not found in cache, fetch from endpoint + return len(await search_text(text, SEARCH_PREFETCH_SIZE, 0)) + + +async def get_author_search_count(text: str): + """Get count of author search results""" + if not search_service.available: + return 0 + + if SEARCH_CACHE_ENABLED: + cache_key = f"author:{text}" + if await search_service.cache.has_query(cache_key): + return await search_service.cache.get_total_count(cache_key) + + # If not found in cache, fetch from endpoint + return len(await search_author_text(text, SEARCH_PREFETCH_SIZE, 0)) + + +async def initialize_search_index(shouts_data): + """Initialize search index with existing data during application startup""" + if not SEARCH_ENABLED: + return + + if not shouts_data: + return + + info = await search_service.info() + if info.get("status") in ["error", "unavailable", "disabled"]: + return + + index_stats = info.get("index_stats", {}) + indexed_doc_count = index_stats.get("total_count", 0) + + index_status = await search_service.check_index_status() + if index_status.get("status") == "inconsistent": + problem_ids = index_status.get("consistency", {}).get( + "null_embeddings_sample", [] + ) + + if problem_ids: + problem_docs = [ + shout for shout in shouts_data if str(shout.id) in problem_ids + ] + if problem_docs: + await search_service.bulk_index(problem_docs) + + # Only consider shouts with body content for body verification + def has_body_content(shout): + for field in ["subtitle", "lead", "body"]: + if ( + getattr(shout, field, None) + and isinstance(getattr(shout, field, None), str) + and getattr(shout, field).strip() + ): + return True + media = getattr(shout, "media", None) + if media: + if isinstance(media, str): + try: + media_json = json.loads(media) + if isinstance(media_json, dict) and ( + media_json.get("title") or media_json.get("body") + ): + return True + except Exception: + return True + elif isinstance(media, dict): + if media.get("title") or media.get("body"): + return True + return False + + shouts_with_body = [shout for shout in shouts_data if has_body_content(shout)] + body_ids = [str(shout.id) for shout in shouts_with_body] + + if abs(indexed_doc_count - len(shouts_data)) > 10: + doc_ids = [str(shout.id) for shout in shouts_data] + verification = await search_service.verify_docs(doc_ids) + if verification.get("status") == "error": + return + # Only reindex missing docs that actually have body content + missing_ids = [ + mid for mid in verification.get("missing", []) if mid in body_ids + ] + if missing_ids: + missing_docs = [ + shout for shout in shouts_with_body if str(shout.id) in missing_ids + ] + await search_service.bulk_index(missing_docs) + else: + pass + + try: + test_query = "test" + # Use body search since that's most likely to return results + test_results = await search_text(test_query, 5) + + if test_results: + categories = set() + for result in test_results: + result_id = result.get("id") + matching_shouts = [s for s in shouts_data if str(s.id) == result_id] + if matching_shouts and hasattr(matching_shouts[0], "category"): + categories.add(getattr(matching_shouts[0], "category", "unknown")) + except Exception as e: + pass