Compare commits
7 Commits
e382cc1ea5
...
11654dba68
Author | SHA1 | Date | |
---|---|---|---|
![]() |
11654dba68 | ||
![]() |
ec9465ad40 | ||
![]() |
4d965fb27b | ||
aaa6022a53 | |||
d6ada44c7f | |||
243f836f0a | |||
536c094e72 |
1
app/resolvers/draft.py
Normal file
1
app/resolvers/draft.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
|
28
orm/shout.py
28
orm/shout.py
@@ -71,6 +71,34 @@ class ShoutAuthor(Base):
|
|||||||
class Shout(Base):
|
class Shout(Base):
|
||||||
"""
|
"""
|
||||||
Публикация в системе.
|
Публикация в системе.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
body (str)
|
||||||
|
slug (str)
|
||||||
|
cover (str) : "Cover image url"
|
||||||
|
cover_caption (str) : "Cover image alt caption"
|
||||||
|
lead (str)
|
||||||
|
title (str)
|
||||||
|
subtitle (str)
|
||||||
|
layout (str)
|
||||||
|
media (dict)
|
||||||
|
authors (list[Author])
|
||||||
|
topics (list[Topic])
|
||||||
|
reactions (list[Reaction])
|
||||||
|
lang (str)
|
||||||
|
version_of (int)
|
||||||
|
oid (str)
|
||||||
|
seo (str) : JSON
|
||||||
|
draft (int)
|
||||||
|
created_at (int)
|
||||||
|
updated_at (int)
|
||||||
|
published_at (int)
|
||||||
|
featured_at (int)
|
||||||
|
deleted_at (int)
|
||||||
|
created_by (int)
|
||||||
|
updated_by (int)
|
||||||
|
deleted_by (int)
|
||||||
|
community (int)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
__tablename__ = "shout"
|
__tablename__ = "shout"
|
||||||
|
@@ -115,11 +115,7 @@ async def create_draft(_, info, draft_input):
|
|||||||
# Remove id from input if present since it's auto-generated
|
# Remove id from input if present since it's auto-generated
|
||||||
if "id" in draft_input:
|
if "id" in draft_input:
|
||||||
del draft_input["id"]
|
del draft_input["id"]
|
||||||
|
|
||||||
if "seo" not in draft_input and not draft_input["seo"]:
|
|
||||||
body_teaser = draft_input.get("body", "")[:300].split("\n")[:-1].join("\n")
|
|
||||||
draft_input["seo"] = draft_input.get("lead", body_teaser)
|
|
||||||
|
|
||||||
# Добавляем текущее время создания
|
# Добавляем текущее время создания
|
||||||
draft_input["created_at"] = int(time.time())
|
draft_input["created_at"] = int(time.time())
|
||||||
|
|
||||||
@@ -131,6 +127,11 @@ async def create_draft(_, info, draft_input):
|
|||||||
logger.error(f"Failed to create draft: {e}", exc_info=True)
|
logger.error(f"Failed to create draft: {e}", exc_info=True)
|
||||||
return {"error": f"Failed to create draft: {str(e)}"}
|
return {"error": f"Failed to create draft: {str(e)}"}
|
||||||
|
|
||||||
|
def generate_teaser(body, limit=300):
|
||||||
|
body_text = trafilatura.extract(body, include_comments=False, include_tables=False)
|
||||||
|
body_teaser = ". ".join(body_text[:limit].split(". ")[:-1])
|
||||||
|
return body_teaser
|
||||||
|
|
||||||
|
|
||||||
@mutation.field("update_draft")
|
@mutation.field("update_draft")
|
||||||
@login_required
|
@login_required
|
||||||
@@ -165,21 +166,51 @@ async def update_draft(_, info, draft_id: int, draft_input):
|
|||||||
if not draft:
|
if not draft:
|
||||||
return {"error": "Draft not found"}
|
return {"error": "Draft not found"}
|
||||||
|
|
||||||
|
# Generate SEO description if not provided and not already set
|
||||||
if "seo" not in draft_input and not draft.seo:
|
if "seo" not in draft_input and not draft.seo:
|
||||||
body_src = draft_input["body"] if "body" in draft_input else draft.body
|
body_src = draft_input.get("body") if "body" in draft_input else draft.body
|
||||||
body_text = trafilatura.extract(body_src)
|
lead_src = draft_input.get("lead") if "lead" in draft_input else draft.lead
|
||||||
lead_src = draft_input["lead"] if "lead" in draft_input else draft.lead
|
|
||||||
lead_text = trafilatura.extract(lead_src)
|
|
||||||
body_teaser = body_text[:300].split(". ")[:-1].join(".\n")
|
|
||||||
draft_input["seo"] = lead_text or body_teaser
|
|
||||||
|
|
||||||
|
body_text = None
|
||||||
|
if body_src:
|
||||||
|
try:
|
||||||
|
# Extract text, excluding comments and tables
|
||||||
|
body_text = trafilatura.extract(body_src, include_comments=False, include_tables=False)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Trafilatura failed to extract body text for draft {draft_id}: {e}")
|
||||||
|
|
||||||
|
lead_text = None
|
||||||
|
if lead_src:
|
||||||
|
try:
|
||||||
|
# Extract text from lead
|
||||||
|
lead_text = trafilatura.extract(lead_src, include_comments=False, include_tables=False)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Trafilatura failed to extract lead text for draft {draft_id}: {e}")
|
||||||
|
|
||||||
|
# Generate body teaser only if body_text was successfully extracted
|
||||||
|
body_teaser = generate_teaser(body_text, 300) if body_text else ""
|
||||||
|
|
||||||
|
# Prioritize lead_text for SEO, fallback to body_teaser. Ensure it's a string.
|
||||||
|
generated_seo = lead_text if lead_text else body_teaser
|
||||||
|
draft_input["seo"] = generated_seo if generated_seo else ""
|
||||||
|
|
||||||
|
# Update the draft object with new data from draft_input
|
||||||
|
# Assuming Draft.update is a helper that iterates keys or similar.
|
||||||
|
# A more standard SQLAlchemy approach would be:
|
||||||
|
# for key, value in draft_input.items():
|
||||||
|
# if hasattr(draft, key):
|
||||||
|
# setattr(draft, key, value)
|
||||||
|
# But we stick to the existing pattern for now.
|
||||||
Draft.update(draft, draft_input)
|
Draft.update(draft, draft_input)
|
||||||
# Set updated_at and updated_by from the authenticated user
|
|
||||||
|
# Set updated timestamp and author
|
||||||
current_time = int(time.time())
|
current_time = int(time.time())
|
||||||
draft.updated_at = current_time
|
draft.updated_at = current_time
|
||||||
draft.updated_by = author_id
|
draft.updated_by = author_id # Assuming author_id is correctly fetched context
|
||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
|
# Invalidate cache related to this draft if necessary (consider adding)
|
||||||
|
# await invalidate_draft_cache(draft_id)
|
||||||
return {"draft": draft}
|
return {"draft": draft}
|
||||||
|
|
||||||
|
|
||||||
|
@@ -19,7 +19,7 @@ from sqlalchemy import (
|
|||||||
inspect,
|
inspect,
|
||||||
text,
|
text,
|
||||||
)
|
)
|
||||||
from sqlalchemy.orm import Session, configure_mappers, declarative_base
|
from sqlalchemy.orm import Session, configure_mappers, declarative_base, joinedload
|
||||||
from sqlalchemy.sql.schema import Table
|
from sqlalchemy.sql.schema import Table
|
||||||
|
|
||||||
from settings import DB_URL
|
from settings import DB_URL
|
||||||
@@ -260,8 +260,11 @@ def get_json_builder():
|
|||||||
# Используем их в коде
|
# Используем их в коде
|
||||||
json_builder, json_array_builder, json_cast = get_json_builder()
|
json_builder, json_array_builder, json_cast = get_json_builder()
|
||||||
|
|
||||||
|
# Fetch all shouts, with authors preloaded
|
||||||
|
# This function is used for search indexing
|
||||||
|
|
||||||
async def fetch_all_shouts(session=None):
|
async def fetch_all_shouts(session=None):
|
||||||
"""Fetch all published shouts for search indexing"""
|
"""Fetch all published shouts for search indexing with authors preloaded"""
|
||||||
from orm.shout import Shout
|
from orm.shout import Shout
|
||||||
|
|
||||||
close_session = False
|
close_session = False
|
||||||
@@ -270,8 +273,10 @@ async def fetch_all_shouts(session=None):
|
|||||||
close_session = True
|
close_session = True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Fetch only published and non-deleted shouts
|
# Fetch only published and non-deleted shouts with authors preloaded
|
||||||
query = session.query(Shout).filter(
|
query = session.query(Shout).options(
|
||||||
|
joinedload(Shout.authors)
|
||||||
|
).filter(
|
||||||
Shout.published_at.is_not(None),
|
Shout.published_at.is_not(None),
|
||||||
Shout.deleted_at.is_(None)
|
Shout.deleted_at.is_(None)
|
||||||
)
|
)
|
||||||
|
@@ -216,8 +216,9 @@ class SearchService:
|
|||||||
"""Check if service is available"""
|
"""Check if service is available"""
|
||||||
return self.available
|
return self.available
|
||||||
|
|
||||||
|
|
||||||
async def verify_docs(self, doc_ids):
|
async def verify_docs(self, doc_ids):
|
||||||
"""Verify which documents exist in the search index"""
|
"""Verify which documents exist in the search index across all content types"""
|
||||||
if not self.available:
|
if not self.available:
|
||||||
return {"status": "disabled"}
|
return {"status": "disabled"}
|
||||||
|
|
||||||
@@ -231,14 +232,36 @@ class SearchService:
|
|||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
result = response.json()
|
result = response.json()
|
||||||
|
|
||||||
# Log summary of verification results
|
# Process the more detailed response format
|
||||||
missing_count = len(result.get("missing", []))
|
bodies_missing = set(result.get("bodies", {}).get("missing", []))
|
||||||
logger.info(f"Document verification complete: {missing_count} missing out of {len(doc_ids)} total")
|
titles_missing = set(result.get("titles", {}).get("missing", []))
|
||||||
|
|
||||||
return result
|
# Combine missing IDs from both bodies and titles
|
||||||
|
# A document is considered missing if it's missing from either index
|
||||||
|
all_missing = list(bodies_missing.union(titles_missing))
|
||||||
|
|
||||||
|
# Log summary of verification results
|
||||||
|
bodies_missing_count = len(bodies_missing)
|
||||||
|
titles_missing_count = len(titles_missing)
|
||||||
|
total_missing_count = len(all_missing)
|
||||||
|
|
||||||
|
logger.info(f"Document verification complete: {bodies_missing_count} bodies missing, {titles_missing_count} titles missing")
|
||||||
|
logger.info(f"Total unique missing documents: {total_missing_count} out of {len(doc_ids)} total")
|
||||||
|
|
||||||
|
# Return in a backwards-compatible format plus the detailed breakdown
|
||||||
|
return {
|
||||||
|
"missing": all_missing,
|
||||||
|
"details": {
|
||||||
|
"bodies_missing": list(bodies_missing),
|
||||||
|
"titles_missing": list(titles_missing),
|
||||||
|
"bodies_missing_count": bodies_missing_count,
|
||||||
|
"titles_missing_count": titles_missing_count
|
||||||
|
}
|
||||||
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Document verification error: {e}")
|
logger.error(f"Document verification error: {e}")
|
||||||
return {"status": "error", "message": str(e)}
|
return {"status": "error", "message": str(e)}
|
||||||
|
|
||||||
|
|
||||||
def index(self, shout):
|
def index(self, shout):
|
||||||
"""Index a single document"""
|
"""Index a single document"""
|
||||||
@@ -249,68 +272,147 @@ class SearchService:
|
|||||||
asyncio.create_task(self.perform_index(shout))
|
asyncio.create_task(self.perform_index(shout))
|
||||||
|
|
||||||
async def perform_index(self, shout):
|
async def perform_index(self, shout):
|
||||||
"""Actually perform the indexing operation"""
|
"""Index a single document across multiple endpoints"""
|
||||||
if not self.available:
|
if not self.available:
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Combine all text fields
|
logger.info(f"Indexing document {shout.id} to individual endpoints")
|
||||||
text = " ".join(filter(None, [
|
indexing_tasks = []
|
||||||
shout.title or "",
|
|
||||||
shout.subtitle or "",
|
|
||||||
shout.lead or "",
|
|
||||||
shout.body or "",
|
|
||||||
shout.media or ""
|
|
||||||
]))
|
|
||||||
|
|
||||||
if not text.strip():
|
# 1. Index title if available
|
||||||
logger.warning(f"No text content to index for shout {shout.id}")
|
if hasattr(shout, 'title') and shout.title and isinstance(shout.title, str):
|
||||||
return
|
title_doc = {
|
||||||
|
"id": str(shout.id),
|
||||||
|
"title": shout.title.strip()
|
||||||
|
}
|
||||||
|
indexing_tasks.append(
|
||||||
|
self.index_client.post("/index-title", json=title_doc)
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2. Index body content (subtitle, lead, body)
|
||||||
|
body_text_parts = []
|
||||||
|
for field_name in ['subtitle', 'lead', 'body']:
|
||||||
|
field_value = getattr(shout, field_name, None)
|
||||||
|
if field_value and isinstance(field_value, str) and field_value.strip():
|
||||||
|
body_text_parts.append(field_value.strip())
|
||||||
|
|
||||||
|
# Process media content if available
|
||||||
|
media = getattr(shout, 'media', None)
|
||||||
|
if media:
|
||||||
|
if isinstance(media, str):
|
||||||
|
try:
|
||||||
|
media_json = json.loads(media)
|
||||||
|
if isinstance(media_json, dict):
|
||||||
|
if 'title' in media_json:
|
||||||
|
body_text_parts.append(media_json['title'])
|
||||||
|
if 'body' in media_json:
|
||||||
|
body_text_parts.append(media_json['body'])
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
body_text_parts.append(media)
|
||||||
|
elif isinstance(media, dict):
|
||||||
|
if 'title' in media:
|
||||||
|
body_text_parts.append(media['title'])
|
||||||
|
if 'body' in media:
|
||||||
|
body_text_parts.append(media['body'])
|
||||||
|
|
||||||
|
if body_text_parts:
|
||||||
|
body_text = " ".join(body_text_parts)
|
||||||
|
# Truncate if too long
|
||||||
|
MAX_TEXT_LENGTH = 4000
|
||||||
|
if len(body_text) > MAX_TEXT_LENGTH:
|
||||||
|
body_text = body_text[:MAX_TEXT_LENGTH]
|
||||||
|
|
||||||
logger.info(f"Indexing document: ID={shout.id}, Text length={len(text)}")
|
body_doc = {
|
||||||
|
"id": str(shout.id),
|
||||||
|
"body": body_text
|
||||||
|
}
|
||||||
|
indexing_tasks.append(
|
||||||
|
self.index_client.post("/index-body", json=body_doc)
|
||||||
|
)
|
||||||
|
|
||||||
# Send to txtai service
|
# 3. Index authors
|
||||||
response = await self.client.post(
|
authors = getattr(shout, 'authors', [])
|
||||||
"/index",
|
for author in authors:
|
||||||
json={"id": str(shout.id), "text": text}
|
author_id = str(getattr(author, 'id', 0))
|
||||||
)
|
if not author_id or author_id == '0':
|
||||||
response.raise_for_status()
|
continue
|
||||||
result = response.json()
|
|
||||||
logger.info(f"Post {shout.id} successfully indexed: {result}")
|
name = getattr(author, 'name', '')
|
||||||
|
|
||||||
|
# Combine bio and about fields
|
||||||
|
bio_parts = []
|
||||||
|
bio = getattr(author, 'bio', '')
|
||||||
|
if bio and isinstance(bio, str):
|
||||||
|
bio_parts.append(bio.strip())
|
||||||
|
|
||||||
|
about = getattr(author, 'about', '')
|
||||||
|
if about and isinstance(about, str):
|
||||||
|
bio_parts.append(about.strip())
|
||||||
|
|
||||||
|
combined_bio = " ".join(bio_parts)
|
||||||
|
|
||||||
|
if name:
|
||||||
|
author_doc = {
|
||||||
|
"id": author_id,
|
||||||
|
"name": name,
|
||||||
|
"bio": combined_bio
|
||||||
|
}
|
||||||
|
indexing_tasks.append(
|
||||||
|
self.index_client.post("/index-author", json=author_doc)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Run all indexing tasks in parallel
|
||||||
|
if indexing_tasks:
|
||||||
|
responses = await asyncio.gather(*indexing_tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
# Check for errors in responses
|
||||||
|
for i, response in enumerate(responses):
|
||||||
|
if isinstance(response, Exception):
|
||||||
|
logger.error(f"Error in indexing task {i}: {response}")
|
||||||
|
elif hasattr(response, 'status_code') and response.status_code >= 400:
|
||||||
|
logger.error(f"Error response in indexing task {i}: {response.status_code}, {await response.text()}")
|
||||||
|
|
||||||
|
logger.info(f"Document {shout.id} indexed across {len(indexing_tasks)} endpoints")
|
||||||
|
else:
|
||||||
|
logger.warning(f"No content to index for shout {shout.id}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Indexing error for shout {shout.id}: {e}")
|
logger.error(f"Indexing error for shout {shout.id}: {e}")
|
||||||
|
|
||||||
async def bulk_index(self, shouts):
|
async def bulk_index(self, shouts):
|
||||||
"""Index multiple documents at once with adaptive batch sizing"""
|
"""Index multiple documents across three separate endpoints"""
|
||||||
if not self.available or not shouts:
|
if not self.available or not shouts:
|
||||||
logger.warning(f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}")
|
logger.warning(f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}")
|
||||||
return
|
return
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
logger.info(f"Starting bulk indexing of {len(shouts)} documents")
|
logger.info(f"Starting multi-endpoint bulk indexing of {len(shouts)} documents")
|
||||||
|
|
||||||
MAX_TEXT_LENGTH = 4000 # Maximum text length to send in a single request
|
# Prepare documents for different endpoints
|
||||||
max_batch_size = MAX_BATCH_SIZE
|
title_docs = []
|
||||||
total_indexed = 0
|
body_docs = []
|
||||||
|
author_docs = {} # Use dict to prevent duplicate authors
|
||||||
|
|
||||||
total_skipped = 0
|
total_skipped = 0
|
||||||
total_truncated = 0
|
|
||||||
total_retries = 0
|
|
||||||
|
|
||||||
# Group documents by size to process smaller documents in larger batches
|
|
||||||
small_docs = []
|
|
||||||
medium_docs = []
|
|
||||||
large_docs = []
|
|
||||||
|
|
||||||
# First pass: prepare all documents and categorize by size
|
|
||||||
for shout in shouts:
|
for shout in shouts:
|
||||||
try:
|
try:
|
||||||
text_fields = []
|
# 1. Process title documents
|
||||||
for field_name in ['title', 'subtitle', 'lead', 'body']:
|
if hasattr(shout, 'title') and shout.title and isinstance(shout.title, str):
|
||||||
|
title_docs.append({
|
||||||
|
"id": str(shout.id),
|
||||||
|
"title": shout.title.strip()
|
||||||
|
})
|
||||||
|
|
||||||
|
# 2. Process body documents (subtitle, lead, body)
|
||||||
|
body_text_parts = []
|
||||||
|
for field_name in ['subtitle', 'lead', 'body']:
|
||||||
field_value = getattr(shout, field_name, None)
|
field_value = getattr(shout, field_name, None)
|
||||||
if field_value and isinstance(field_value, str) and field_value.strip():
|
if field_value and isinstance(field_value, str) and field_value.strip():
|
||||||
text_fields.append(field_value.strip())
|
body_text_parts.append(field_value.strip())
|
||||||
|
|
||||||
# Media field processing remains the same
|
# Process media content if available
|
||||||
media = getattr(shout, 'media', None)
|
media = getattr(shout, 'media', None)
|
||||||
if media:
|
if media:
|
||||||
if isinstance(media, str):
|
if isinstance(media, str):
|
||||||
@@ -318,186 +420,180 @@ class SearchService:
|
|||||||
media_json = json.loads(media)
|
media_json = json.loads(media)
|
||||||
if isinstance(media_json, dict):
|
if isinstance(media_json, dict):
|
||||||
if 'title' in media_json:
|
if 'title' in media_json:
|
||||||
text_fields.append(media_json['title'])
|
body_text_parts.append(media_json['title'])
|
||||||
if 'body' in media_json:
|
if 'body' in media_json:
|
||||||
text_fields.append(media_json['body'])
|
body_text_parts.append(media_json['body'])
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
text_fields.append(media)
|
body_text_parts.append(media)
|
||||||
elif isinstance(media, dict):
|
elif isinstance(media, dict):
|
||||||
if 'title' in media:
|
if 'title' in media:
|
||||||
text_fields.append(media['title'])
|
body_text_parts.append(media['title'])
|
||||||
if 'body' in media:
|
if 'body' in media:
|
||||||
text_fields.append(media['body'])
|
body_text_parts.append(media['body'])
|
||||||
|
|
||||||
text = " ".join(text_fields)
|
# Only add body document if we have body text
|
||||||
|
if body_text_parts:
|
||||||
if not text.strip():
|
body_text = " ".join(body_text_parts)
|
||||||
total_skipped += 1
|
# Truncate if too long
|
||||||
continue
|
MAX_TEXT_LENGTH = 4000
|
||||||
|
if len(body_text) > MAX_TEXT_LENGTH:
|
||||||
# Truncate text if it exceeds the maximum length
|
body_text = body_text[:MAX_TEXT_LENGTH]
|
||||||
original_length = len(text)
|
|
||||||
if original_length > MAX_TEXT_LENGTH:
|
body_docs.append({
|
||||||
text = text[:MAX_TEXT_LENGTH]
|
"id": str(shout.id),
|
||||||
total_truncated += 1
|
"body": body_text
|
||||||
|
})
|
||||||
document = {
|
|
||||||
"id": str(shout.id),
|
|
||||||
"text": text
|
|
||||||
}
|
|
||||||
|
|
||||||
# Categorize by size
|
|
||||||
text_len = len(text)
|
|
||||||
if text_len > 5000:
|
|
||||||
large_docs.append(document)
|
|
||||||
elif text_len > 2000:
|
|
||||||
medium_docs.append(document)
|
|
||||||
else:
|
|
||||||
small_docs.append(document)
|
|
||||||
|
|
||||||
total_indexed += 1
|
|
||||||
|
|
||||||
|
# 3. Process authors if available
|
||||||
|
authors = getattr(shout, 'authors', [])
|
||||||
|
for author in authors:
|
||||||
|
author_id = str(getattr(author, 'id', 0))
|
||||||
|
if not author_id or author_id == '0':
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Skip if we've already processed this author
|
||||||
|
if author_id in author_docs:
|
||||||
|
continue
|
||||||
|
|
||||||
|
name = getattr(author, 'name', '')
|
||||||
|
|
||||||
|
# Combine bio and about fields
|
||||||
|
bio_parts = []
|
||||||
|
bio = getattr(author, 'bio', '')
|
||||||
|
if bio and isinstance(bio, str):
|
||||||
|
bio_parts.append(bio.strip())
|
||||||
|
|
||||||
|
about = getattr(author, 'about', '')
|
||||||
|
if about and isinstance(about, str):
|
||||||
|
bio_parts.append(about.strip())
|
||||||
|
|
||||||
|
combined_bio = " ".join(bio_parts)
|
||||||
|
|
||||||
|
# Only add if we have author data
|
||||||
|
if name:
|
||||||
|
author_docs[author_id] = {
|
||||||
|
"id": author_id,
|
||||||
|
"name": name,
|
||||||
|
"bio": combined_bio
|
||||||
|
}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing: {e}")
|
logger.error(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing: {e}")
|
||||||
total_skipped += 1
|
total_skipped += 1
|
||||||
|
|
||||||
# Process each category with appropriate batch sizes
|
# Convert author dict to list
|
||||||
logger.info(f"Documents categorized: {len(small_docs)} small, {len(medium_docs)} medium, {len(large_docs)} large")
|
author_docs_list = list(author_docs.values())
|
||||||
|
|
||||||
# Process small documents (larger batches)
|
# Process each endpoint in parallel
|
||||||
if small_docs:
|
indexing_tasks = [
|
||||||
batch_size = min(max_batch_size, 15)
|
self._index_endpoint(title_docs, "/bulk-index-titles", "title"),
|
||||||
await self._process_document_batches(small_docs, batch_size, "small")
|
self._index_endpoint(body_docs, "/bulk-index-bodies", "body"),
|
||||||
|
self._index_endpoint(author_docs_list, "/bulk-index-authors", "author")
|
||||||
# Process medium documents (medium batches)
|
]
|
||||||
if medium_docs:
|
|
||||||
batch_size = min(max_batch_size, 10)
|
await asyncio.gather(*indexing_tasks)
|
||||||
await self._process_document_batches(medium_docs, batch_size, "medium")
|
|
||||||
|
|
||||||
# Process large documents (small batches)
|
|
||||||
if large_docs:
|
|
||||||
batch_size = min(max_batch_size, 3)
|
|
||||||
await self._process_document_batches(large_docs, batch_size, "large")
|
|
||||||
|
|
||||||
elapsed = time.time() - start_time
|
elapsed = time.time() - start_time
|
||||||
logger.info(f"Bulk indexing completed in {elapsed:.2f}s: {total_indexed} indexed, {total_skipped} skipped, {total_truncated} truncated, {total_retries} retries")
|
logger.info(
|
||||||
|
f"Multi-endpoint indexing completed in {elapsed:.2f}s: "
|
||||||
async def _process_document_batches(self, documents, batch_size, size_category):
|
f"{len(title_docs)} titles, {len(body_docs)} bodies, {len(author_docs_list)} authors, "
|
||||||
"""Process document batches with retry logic"""
|
f"{total_skipped} shouts skipped"
|
||||||
# Check for possible database corruption before starting
|
)
|
||||||
db_error_count = 0
|
|
||||||
|
|
||||||
for i in range(0, len(documents), batch_size):
|
|
||||||
batch = documents[i:i+batch_size]
|
|
||||||
batch_id = f"{size_category}-{i//batch_size + 1}"
|
|
||||||
logger.info(f"Processing {size_category} batch {batch_id} of {len(batch)} documents")
|
|
||||||
|
|
||||||
retry_count = 0
|
|
||||||
max_retries = 3
|
|
||||||
success = False
|
|
||||||
|
|
||||||
# Process with retries
|
|
||||||
while not success and retry_count < max_retries:
|
|
||||||
try:
|
|
||||||
logger.info(f"Sending batch {batch_id} of {len(batch)} documents to search service (attempt {retry_count+1})")
|
|
||||||
response = await self.index_client.post(
|
|
||||||
"/bulk-index",
|
|
||||||
json=batch,
|
|
||||||
timeout=120.0 # Explicit longer timeout for large batches
|
|
||||||
)
|
|
||||||
|
|
||||||
# Handle 422 validation errors - these won't be fixed by retrying
|
|
||||||
if response.status_code == 422:
|
|
||||||
error_detail = response.json()
|
|
||||||
truncated_error = self._truncate_error_detail(error_detail)
|
|
||||||
logger.error(f"Validation error from search service for batch {batch_id}: {truncated_error}")
|
|
||||||
break
|
|
||||||
|
|
||||||
# Handle 500 server errors - these might be fixed by retrying with smaller batches
|
|
||||||
elif response.status_code == 500:
|
|
||||||
db_error_count += 1
|
|
||||||
|
|
||||||
# If we've seen multiple 500s, log a critical error
|
|
||||||
if db_error_count >= 3:
|
|
||||||
logger.critical(f"Multiple server errors detected (500). The search service may need manual intervention. Stopping batch {batch_id} processing.")
|
|
||||||
break
|
|
||||||
|
|
||||||
# Try again with exponential backoff
|
|
||||||
if retry_count < max_retries - 1:
|
|
||||||
retry_count += 1
|
|
||||||
wait_time = (2 ** retry_count) + (random.random() * 0.5) # Exponential backoff with jitter
|
|
||||||
await asyncio.sleep(wait_time)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Final retry, split the batch
|
|
||||||
elif len(batch) > 1:
|
|
||||||
mid = len(batch) // 2
|
|
||||||
await self._process_single_batch(batch[:mid], f"{batch_id}-A")
|
|
||||||
await self._process_single_batch(batch[mid:], f"{batch_id}-B")
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
# Can't split a single document
|
|
||||||
break
|
|
||||||
|
|
||||||
# Normal success case
|
|
||||||
response.raise_for_status()
|
|
||||||
success = True
|
|
||||||
db_error_count = 0 # Reset error counter on success
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
error_str = str(e).lower()
|
|
||||||
if "duplicate key" in error_str or "unique constraint" in error_str or "nonetype" in error_str:
|
|
||||||
db_error_count += 1
|
|
||||||
if db_error_count >= 2:
|
|
||||||
logger.critical(f"Potential database corruption detected: {error_str}. The search service may need manual intervention. Stopping batch {batch_id} processing.")
|
|
||||||
break
|
|
||||||
|
|
||||||
if retry_count < max_retries - 1:
|
|
||||||
retry_count += 1
|
|
||||||
wait_time = (2 ** retry_count) + (random.random() * 0.5)
|
|
||||||
await asyncio.sleep(wait_time)
|
|
||||||
else:
|
|
||||||
if len(batch) > 1:
|
|
||||||
mid = len(batch) // 2
|
|
||||||
await self._process_single_batch(batch[:mid], f"{batch_id}-A")
|
|
||||||
await self._process_single_batch(batch[mid:], f"{batch_id}-B")
|
|
||||||
break
|
|
||||||
|
|
||||||
async def _process_single_batch(self, documents, batch_id):
|
|
||||||
"""Process a single batch with maximum reliability"""
|
|
||||||
max_retries = 3
|
|
||||||
retry_count = 0
|
|
||||||
|
|
||||||
while retry_count < max_retries:
|
async def _index_endpoint(self, documents, endpoint, doc_type):
|
||||||
try:
|
"""Process and index documents to a specific endpoint"""
|
||||||
if not documents:
|
if not documents:
|
||||||
return
|
logger.info(f"No {doc_type} documents to index")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(f"Indexing {len(documents)} {doc_type} documents")
|
||||||
|
|
||||||
|
# Categorize documents by size
|
||||||
|
small_docs, medium_docs, large_docs = self._categorize_by_size(documents, doc_type)
|
||||||
|
|
||||||
|
# Process each category with appropriate batch sizes
|
||||||
|
batch_sizes = {
|
||||||
|
"small": min(MAX_BATCH_SIZE, 15),
|
||||||
|
"medium": min(MAX_BATCH_SIZE, 10),
|
||||||
|
"large": min(MAX_BATCH_SIZE, 3)
|
||||||
|
}
|
||||||
|
|
||||||
|
for category, docs in [("small", small_docs), ("medium", medium_docs), ("large", large_docs)]:
|
||||||
|
if docs:
|
||||||
|
batch_size = batch_sizes[category]
|
||||||
|
await self._process_batches(docs, batch_size, endpoint, f"{doc_type}-{category}")
|
||||||
|
|
||||||
|
def _categorize_by_size(self, documents, doc_type):
|
||||||
|
"""Categorize documents by size for optimized batch processing"""
|
||||||
|
small_docs = []
|
||||||
|
medium_docs = []
|
||||||
|
large_docs = []
|
||||||
|
|
||||||
|
for doc in documents:
|
||||||
|
# Extract relevant text based on document type
|
||||||
|
if doc_type == "title":
|
||||||
|
text = doc.get("title", "")
|
||||||
|
elif doc_type == "body":
|
||||||
|
text = doc.get("body", "")
|
||||||
|
else: # author
|
||||||
|
# For authors, consider both name and bio length
|
||||||
|
text = doc.get("name", "") + " " + doc.get("bio", "")
|
||||||
|
|
||||||
|
text_len = len(text)
|
||||||
|
|
||||||
|
if text_len > 5000:
|
||||||
|
large_docs.append(doc)
|
||||||
|
elif text_len > 2000:
|
||||||
|
medium_docs.append(doc)
|
||||||
|
else:
|
||||||
|
small_docs.append(doc)
|
||||||
|
|
||||||
|
logger.info(f"{doc_type.capitalize()} documents categorized: {len(small_docs)} small, {len(medium_docs)} medium, {len(large_docs)} large")
|
||||||
|
return small_docs, medium_docs, large_docs
|
||||||
|
|
||||||
|
async def _process_batches(self, documents, batch_size, endpoint, batch_prefix):
|
||||||
|
"""Process document batches with retry logic"""
|
||||||
|
for i in range(0, len(documents), batch_size):
|
||||||
|
batch = documents[i:i+batch_size]
|
||||||
|
batch_id = f"{batch_prefix}-{i//batch_size + 1}"
|
||||||
|
|
||||||
|
retry_count = 0
|
||||||
|
max_retries = 3
|
||||||
|
success = False
|
||||||
|
|
||||||
|
while not success and retry_count < max_retries:
|
||||||
|
try:
|
||||||
|
logger.info(f"Sending batch {batch_id} ({len(batch)} docs) to {endpoint}")
|
||||||
|
response = await self.index_client.post(
|
||||||
|
endpoint,
|
||||||
|
json=batch,
|
||||||
|
timeout=90.0
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 422:
|
||||||
|
error_detail = response.json()
|
||||||
|
logger.error(f"Validation error from search service for batch {batch_id}: {self._truncate_error_detail(error_detail)}")
|
||||||
|
break
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
success = True
|
||||||
|
logger.info(f"Successfully indexed batch {batch_id}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
retry_count += 1
|
||||||
|
if retry_count >= max_retries:
|
||||||
|
if len(batch) > 1:
|
||||||
|
mid = len(batch) // 2
|
||||||
|
logger.warning(f"Splitting batch {batch_id} into smaller batches for retry")
|
||||||
|
await self._process_batches(batch[:mid], batch_size // 2, endpoint, f"{batch_prefix}-{i//batch_size}-A")
|
||||||
|
await self._process_batches(batch[mid:], batch_size // 2, endpoint, f"{batch_prefix}-{i//batch_size}-B")
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed to index single document in batch {batch_id} after {max_retries} attempts: {str(e)}")
|
||||||
|
break
|
||||||
|
|
||||||
response = await self.index_client.post(
|
|
||||||
"/bulk-index",
|
|
||||||
json=documents,
|
|
||||||
timeout=90.0
|
|
||||||
)
|
|
||||||
response.raise_for_status()
|
|
||||||
return # Success, exit the retry loop
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
error_str = str(e).lower()
|
|
||||||
retry_count += 1
|
|
||||||
|
|
||||||
if "dictionary changed size" in error_str or "transaction error" in error_str:
|
|
||||||
wait_time = (2 ** retry_count) + (random.random() * 0.5)
|
wait_time = (2 ** retry_count) + (random.random() * 0.5)
|
||||||
await asyncio.sleep(wait_time) # Wait for txtai to recover
|
logger.warning(f"Retrying batch {batch_id} in {wait_time:.1f}s... (attempt {retry_count+1}/{max_retries})")
|
||||||
continue
|
await asyncio.sleep(wait_time)
|
||||||
|
|
||||||
if retry_count >= max_retries and len(documents) > 1:
|
|
||||||
for i, doc in enumerate(documents):
|
|
||||||
try:
|
|
||||||
resp = await self.index_client.post("/index", json=doc, timeout=30.0)
|
|
||||||
resp.raise_for_status()
|
|
||||||
except Exception as e2:
|
|
||||||
pass
|
|
||||||
return # Exit after individual processing attempt
|
|
||||||
|
|
||||||
def _truncate_error_detail(self, error_detail):
|
def _truncate_error_detail(self, error_detail):
|
||||||
"""Truncate error details for logging"""
|
"""Truncate error details for logging"""
|
||||||
@@ -517,70 +613,132 @@ class SearchService:
|
|||||||
|
|
||||||
return truncated_detail
|
return truncated_detail
|
||||||
|
|
||||||
async def search(self, text, limit, offset):
|
|
||||||
"""Search documents"""
|
#*******************
|
||||||
if not self.available:
|
# Specialized search methods for titles, bodies, and authors
|
||||||
return []
|
|
||||||
|
async def search_titles(self, text, limit=10, offset=0):
|
||||||
if not isinstance(text, str) or not text.strip():
|
"""Search only in titles using the specialized endpoint"""
|
||||||
|
if not self.available or not text.strip():
|
||||||
return []
|
return []
|
||||||
|
|
||||||
logger.info(f"Searching for: '{text}' (limit={limit}, offset={offset})")
|
cache_key = f"title:{text}"
|
||||||
|
|
||||||
# Check if we can serve from cache
|
# Try cache first if enabled
|
||||||
if SEARCH_CACHE_ENABLED:
|
if SEARCH_CACHE_ENABLED:
|
||||||
has_cache = await self.cache.has_query(text)
|
if await self.cache.has_query(cache_key):
|
||||||
if has_cache:
|
return await self.cache.get(cache_key, limit, offset)
|
||||||
cached_results = await self.cache.get(text, limit, offset)
|
|
||||||
if cached_results is not None:
|
|
||||||
return cached_results
|
|
||||||
|
|
||||||
# Not in cache or cache disabled, perform new search
|
|
||||||
try:
|
try:
|
||||||
search_limit = limit
|
logger.info(f"Searching titles for: '{text}' (limit={limit}, offset={offset})")
|
||||||
search_offset = offset
|
|
||||||
|
|
||||||
if SEARCH_CACHE_ENABLED:
|
|
||||||
search_limit = SEARCH_PREFETCH_SIZE
|
|
||||||
search_offset = 0
|
|
||||||
else:
|
|
||||||
search_limit = limit
|
|
||||||
search_offset = offset
|
|
||||||
|
|
||||||
response = await self.client.post(
|
response = await self.client.post(
|
||||||
"/search",
|
"/search-title",
|
||||||
json={"text": text, "limit": search_limit, "offset": search_offset}
|
json={"text": text, "limit": limit + offset}
|
||||||
)
|
)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|
||||||
result = response.json()
|
result = response.json()
|
||||||
|
title_results = result.get("results", [])
|
||||||
|
|
||||||
formatted_results = result.get("results", [])
|
# Apply score filtering if needed
|
||||||
|
|
||||||
valid_results = []
|
|
||||||
for item in formatted_results:
|
|
||||||
doc_id = item.get("id")
|
|
||||||
if doc_id and doc_id.isdigit():
|
|
||||||
valid_results.append(item)
|
|
||||||
|
|
||||||
if len(valid_results) != len(formatted_results):
|
|
||||||
formatted_results = valid_results
|
|
||||||
|
|
||||||
if SEARCH_MIN_SCORE > 0:
|
if SEARCH_MIN_SCORE > 0:
|
||||||
initial_count = len(formatted_results)
|
title_results = [r for r in title_results if r.get("score", 0) >= SEARCH_MIN_SCORE]
|
||||||
formatted_results = [r for r in formatted_results if r.get("score", 0) >= SEARCH_MIN_SCORE]
|
|
||||||
|
# Store in cache if enabled
|
||||||
if SEARCH_CACHE_ENABLED:
|
if SEARCH_CACHE_ENABLED:
|
||||||
await self.cache.store(text, formatted_results)
|
await self.cache.store(cache_key, title_results)
|
||||||
end_idx = offset + limit
|
|
||||||
page_results = formatted_results[offset:end_idx]
|
# Apply offset/limit (API might not support it directly)
|
||||||
return page_results
|
return title_results[offset:offset+limit]
|
||||||
|
|
||||||
return formatted_results
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Search error for '{text}': {e}", exc_info=True)
|
logger.error(f"Error searching titles for '{text}': {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
async def search_bodies(self, text, limit=10, offset=0):
|
||||||
|
"""Search only in document bodies using the specialized endpoint"""
|
||||||
|
if not self.available or not text.strip():
|
||||||
|
return []
|
||||||
|
|
||||||
|
cache_key = f"body:{text}"
|
||||||
|
|
||||||
|
# Try cache first if enabled
|
||||||
|
if SEARCH_CACHE_ENABLED:
|
||||||
|
if await self.cache.has_query(cache_key):
|
||||||
|
return await self.cache.get(cache_key, limit, offset)
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info(f"Searching bodies for: '{text}' (limit={limit}, offset={offset})")
|
||||||
|
response = await self.client.post(
|
||||||
|
"/search-body",
|
||||||
|
json={"text": text, "limit": limit + offset}
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
result = response.json()
|
||||||
|
body_results = result.get("results", [])
|
||||||
|
|
||||||
|
# Apply score filtering if needed
|
||||||
|
if SEARCH_MIN_SCORE > 0:
|
||||||
|
body_results = [r for r in body_results if r.get("score", 0) >= SEARCH_MIN_SCORE]
|
||||||
|
|
||||||
|
# Store in cache if enabled
|
||||||
|
if SEARCH_CACHE_ENABLED:
|
||||||
|
await self.cache.store(cache_key, body_results)
|
||||||
|
|
||||||
|
# Apply offset/limit
|
||||||
|
return body_results[offset:offset+limit]
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error searching bodies for '{text}': {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def search_authors(self, text, limit=10, offset=0):
|
||||||
|
"""Search only for authors using the specialized endpoint"""
|
||||||
|
if not self.available or not text.strip():
|
||||||
|
return []
|
||||||
|
|
||||||
|
cache_key = f"author:{text}"
|
||||||
|
|
||||||
|
# Try cache first if enabled
|
||||||
|
if SEARCH_CACHE_ENABLED:
|
||||||
|
if await self.cache.has_query(cache_key):
|
||||||
|
return await self.cache.get(cache_key, limit, offset)
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info(f"Searching authors for: '{text}' (limit={limit}, offset={offset})")
|
||||||
|
response = await self.client.post(
|
||||||
|
"/search-author",
|
||||||
|
json={"text": text, "limit": limit + offset}
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
result = response.json()
|
||||||
|
author_results = result.get("results", [])
|
||||||
|
|
||||||
|
# Apply score filtering if needed
|
||||||
|
if SEARCH_MIN_SCORE > 0:
|
||||||
|
author_results = [r for r in author_results if r.get("score", 0) >= SEARCH_MIN_SCORE]
|
||||||
|
|
||||||
|
# Store in cache if enabled
|
||||||
|
if SEARCH_CACHE_ENABLED:
|
||||||
|
await self.cache.store(cache_key, author_results)
|
||||||
|
|
||||||
|
# Apply offset/limit
|
||||||
|
return author_results[offset:offset+limit]
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error searching authors for '{text}': {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def search(self, text, limit, offset):
|
||||||
|
"""
|
||||||
|
Legacy search method that searches only bodies for backward compatibility.
|
||||||
|
Consider using the specialized search methods instead.
|
||||||
|
"""
|
||||||
|
logger.warning("Using deprecated search() method - consider using search_bodies(), search_titles(), or search_authors()")
|
||||||
|
return await self.search_bodies(text, limit, offset)
|
||||||
|
|
||||||
async def check_index_status(self):
|
async def check_index_status(self):
|
||||||
"""Get detailed statistics about the search index health"""
|
"""Get detailed statistics about the search index health"""
|
||||||
if not self.available:
|
if not self.available:
|
||||||
@@ -605,19 +763,63 @@ class SearchService:
|
|||||||
search_service = SearchService()
|
search_service = SearchService()
|
||||||
|
|
||||||
# API-compatible function to perform a search
|
# API-compatible function to perform a search
|
||||||
async def search_text(text: str, limit: int = 50, offset: int = 0):
|
|
||||||
payload = []
|
|
||||||
if search_service.available:
|
|
||||||
payload = await search_service.search(text, limit, offset)
|
|
||||||
return payload
|
|
||||||
|
|
||||||
async def get_search_count(text: str):
|
async def search_title_text(text: str, limit: int = 10, offset: int = 0):
|
||||||
"""Get total count of results for a query without fetching all results"""
|
"""Search titles API helper function"""
|
||||||
if search_service.available and SEARCH_CACHE_ENABLED:
|
if search_service.available:
|
||||||
if await search_service.cache.has_query(text):
|
return await search_service.search_titles(text, limit, offset)
|
||||||
return await search_service.cache.get_total_count(text)
|
return []
|
||||||
results = await search_text(text, SEARCH_PREFETCH_SIZE, 0)
|
|
||||||
return len(results)
|
async def search_body_text(text: str, limit: int = 10, offset: int = 0):
|
||||||
|
"""Search bodies API helper function"""
|
||||||
|
if search_service.available:
|
||||||
|
return await search_service.search_bodies(text, limit, offset)
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def search_author_text(text: str, limit: int = 10, offset: int = 0):
|
||||||
|
"""Search authors API helper function"""
|
||||||
|
if search_service.available:
|
||||||
|
return await search_service.search_authors(text, limit, offset)
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def get_title_search_count(text: str):
|
||||||
|
"""Get count of title search results"""
|
||||||
|
if not search_service.available:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if SEARCH_CACHE_ENABLED:
|
||||||
|
cache_key = f"title:{text}"
|
||||||
|
if await search_service.cache.has_query(cache_key):
|
||||||
|
return await search_service.cache.get_total_count(cache_key)
|
||||||
|
|
||||||
|
# If not found in cache, fetch from endpoint
|
||||||
|
return len(await search_title_text(text, SEARCH_PREFETCH_SIZE, 0))
|
||||||
|
|
||||||
|
async def get_body_search_count(text: str):
|
||||||
|
"""Get count of body search results"""
|
||||||
|
if not search_service.available:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if SEARCH_CACHE_ENABLED:
|
||||||
|
cache_key = f"body:{text}"
|
||||||
|
if await search_service.cache.has_query(cache_key):
|
||||||
|
return await search_service.cache.get_total_count(cache_key)
|
||||||
|
|
||||||
|
# If not found in cache, fetch from endpoint
|
||||||
|
return len(await search_body_text(text, SEARCH_PREFETCH_SIZE, 0))
|
||||||
|
|
||||||
|
async def get_author_search_count(text: str):
|
||||||
|
"""Get count of author search results"""
|
||||||
|
if not search_service.available:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if SEARCH_CACHE_ENABLED:
|
||||||
|
cache_key = f"author:{text}"
|
||||||
|
if await search_service.cache.has_query(cache_key):
|
||||||
|
return await search_service.cache.get_total_count(cache_key)
|
||||||
|
|
||||||
|
# If not found in cache, fetch from endpoint
|
||||||
|
return len(await search_author_text(text, SEARCH_PREFETCH_SIZE, 0))
|
||||||
|
|
||||||
async def initialize_search_index(shouts_data):
|
async def initialize_search_index(shouts_data):
|
||||||
"""Initialize search index with existing data during application startup"""
|
"""Initialize search index with existing data during application startup"""
|
||||||
@@ -632,7 +834,7 @@ async def initialize_search_index(shouts_data):
|
|||||||
return
|
return
|
||||||
|
|
||||||
index_stats = info.get("index_stats", {})
|
index_stats = info.get("index_stats", {})
|
||||||
indexed_doc_count = index_stats.get("document_count", 0)
|
indexed_doc_count = index_stats.get("total_count", 0)
|
||||||
|
|
||||||
index_status = await search_service.check_index_status()
|
index_status = await search_service.check_index_status()
|
||||||
if index_status.get("status") == "inconsistent":
|
if index_status.get("status") == "inconsistent":
|
||||||
@@ -671,7 +873,8 @@ async def initialize_search_index(shouts_data):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
test_query = "test"
|
test_query = "test"
|
||||||
test_results = await search_text(test_query, 5)
|
# Use body search since that's most likely to return results
|
||||||
|
test_results = await search_body_text(test_query, 5)
|
||||||
|
|
||||||
if test_results:
|
if test_results:
|
||||||
categories = set()
|
categories = set()
|
||||||
|
Reference in New Issue
Block a user