diff --git a/orm/shout.py b/orm/shout.py index d74e84d4..30a8dc69 100644 --- a/orm/shout.py +++ b/orm/shout.py @@ -71,6 +71,34 @@ class ShoutAuthor(Base): class Shout(Base): """ Публикация в системе. + + Attributes: + body (str) + slug (str) + cover (str) : "Cover image url" + cover_caption (str) : "Cover image alt caption" + lead (str) + title (str) + subtitle (str) + layout (str) + media (dict) + authors (list[Author]) + topics (list[Topic]) + reactions (list[Reaction]) + lang (str) + version_of (int) + oid (str) + seo (str) : JSON + draft (int) + created_at (int) + updated_at (int) + published_at (int) + featured_at (int) + deleted_at (int) + created_by (int) + updated_by (int) + deleted_by (int) + community (int) """ __tablename__ = "shout" diff --git a/services/db.py b/services/db.py index ccca6c8a..935c25b4 100644 --- a/services/db.py +++ b/services/db.py @@ -19,7 +19,7 @@ from sqlalchemy import ( inspect, text, ) -from sqlalchemy.orm import Session, configure_mappers, declarative_base +from sqlalchemy.orm import Session, configure_mappers, declarative_base, joinedload from sqlalchemy.sql.schema import Table from settings import DB_URL @@ -260,8 +260,11 @@ def get_json_builder(): # Используем их в коде json_builder, json_array_builder, json_cast = get_json_builder() +# Fetch all shouts, with authors preloaded +# This function is used for search indexing + async def fetch_all_shouts(session=None): - """Fetch all published shouts for search indexing""" + """Fetch all published shouts for search indexing with authors preloaded""" from orm.shout import Shout close_session = False @@ -270,8 +273,10 @@ async def fetch_all_shouts(session=None): close_session = True try: - # Fetch only published and non-deleted shouts - query = session.query(Shout).filter( + # Fetch only published and non-deleted shouts with authors preloaded + query = session.query(Shout).options( + joinedload(Shout.authors) + ).filter( Shout.published_at.is_not(None), Shout.deleted_at.is_(None) ) diff --git a/services/search.py b/services/search.py index b8036644..75e56efe 100644 --- a/services/search.py +++ b/services/search.py @@ -216,8 +216,9 @@ class SearchService: """Check if service is available""" return self.available + async def verify_docs(self, doc_ids): - """Verify which documents exist in the search index""" + """Verify which documents exist in the search index across all content types""" if not self.available: return {"status": "disabled"} @@ -231,14 +232,36 @@ class SearchService: response.raise_for_status() result = response.json() - # Log summary of verification results - missing_count = len(result.get("missing", [])) - logger.info(f"Document verification complete: {missing_count} missing out of {len(doc_ids)} total") + # Process the more detailed response format + bodies_missing = set(result.get("bodies", {}).get("missing", [])) + titles_missing = set(result.get("titles", {}).get("missing", [])) - return result + # Combine missing IDs from both bodies and titles + # A document is considered missing if it's missing from either index + all_missing = list(bodies_missing.union(titles_missing)) + + # Log summary of verification results + bodies_missing_count = len(bodies_missing) + titles_missing_count = len(titles_missing) + total_missing_count = len(all_missing) + + logger.info(f"Document verification complete: {bodies_missing_count} bodies missing, {titles_missing_count} titles missing") + logger.info(f"Total unique missing documents: {total_missing_count} out of {len(doc_ids)} total") + + # Return in a backwards-compatible format plus the detailed breakdown + return { + "missing": all_missing, + "details": { + "bodies_missing": list(bodies_missing), + "titles_missing": list(titles_missing), + "bodies_missing_count": bodies_missing_count, + "titles_missing_count": titles_missing_count + } + } except Exception as e: logger.error(f"Document verification error: {e}") return {"status": "error", "message": str(e)} + def index(self, shout): """Index a single document""" @@ -249,68 +272,147 @@ class SearchService: asyncio.create_task(self.perform_index(shout)) async def perform_index(self, shout): - """Actually perform the indexing operation""" + """Index a single document across multiple endpoints""" if not self.available: return try: - # Combine all text fields - text = " ".join(filter(None, [ - shout.title or "", - shout.subtitle or "", - shout.lead or "", - shout.body or "", - shout.media or "" - ])) + logger.info(f"Indexing document {shout.id} to individual endpoints") + indexing_tasks = [] - if not text.strip(): - logger.warning(f"No text content to index for shout {shout.id}") - return + # 1. Index title if available + if hasattr(shout, 'title') and shout.title and isinstance(shout.title, str): + title_doc = { + "id": str(shout.id), + "title": shout.title.strip() + } + indexing_tasks.append( + self.index_client.post("/index-title", json=title_doc) + ) + + # 2. Index body content (subtitle, lead, body) + body_text_parts = [] + for field_name in ['subtitle', 'lead', 'body']: + field_value = getattr(shout, field_name, None) + if field_value and isinstance(field_value, str) and field_value.strip(): + body_text_parts.append(field_value.strip()) + + # Process media content if available + media = getattr(shout, 'media', None) + if media: + if isinstance(media, str): + try: + media_json = json.loads(media) + if isinstance(media_json, dict): + if 'title' in media_json: + body_text_parts.append(media_json['title']) + if 'body' in media_json: + body_text_parts.append(media_json['body']) + except json.JSONDecodeError: + body_text_parts.append(media) + elif isinstance(media, dict): + if 'title' in media: + body_text_parts.append(media['title']) + if 'body' in media: + body_text_parts.append(media['body']) + + if body_text_parts: + body_text = " ".join(body_text_parts) + # Truncate if too long + MAX_TEXT_LENGTH = 4000 + if len(body_text) > MAX_TEXT_LENGTH: + body_text = body_text[:MAX_TEXT_LENGTH] - logger.info(f"Indexing document: ID={shout.id}, Text length={len(text)}") + body_doc = { + "id": str(shout.id), + "body": body_text + } + indexing_tasks.append( + self.index_client.post("/index-body", json=body_doc) + ) - # Send to txtai service - response = await self.client.post( - "/index", - json={"id": str(shout.id), "text": text} - ) - response.raise_for_status() - result = response.json() - logger.info(f"Post {shout.id} successfully indexed: {result}") + # 3. Index authors + authors = getattr(shout, 'authors', []) + for author in authors: + author_id = str(getattr(author, 'id', 0)) + if not author_id or author_id == '0': + continue + + name = getattr(author, 'name', '') + + # Combine bio and about fields + bio_parts = [] + bio = getattr(author, 'bio', '') + if bio and isinstance(bio, str): + bio_parts.append(bio.strip()) + + about = getattr(author, 'about', '') + if about and isinstance(about, str): + bio_parts.append(about.strip()) + + combined_bio = " ".join(bio_parts) + + if name: + author_doc = { + "id": author_id, + "name": name, + "bio": combined_bio + } + indexing_tasks.append( + self.index_client.post("/index-author", json=author_doc) + ) + + # Run all indexing tasks in parallel + if indexing_tasks: + responses = await asyncio.gather(*indexing_tasks, return_exceptions=True) + + # Check for errors in responses + for i, response in enumerate(responses): + if isinstance(response, Exception): + logger.error(f"Error in indexing task {i}: {response}") + elif hasattr(response, 'status_code') and response.status_code >= 400: + logger.error(f"Error response in indexing task {i}: {response.status_code}, {await response.text()}") + + logger.info(f"Document {shout.id} indexed across {len(indexing_tasks)} endpoints") + else: + logger.warning(f"No content to index for shout {shout.id}") + except Exception as e: logger.error(f"Indexing error for shout {shout.id}: {e}") async def bulk_index(self, shouts): - """Index multiple documents at once with adaptive batch sizing""" + """Index multiple documents across three separate endpoints""" if not self.available or not shouts: logger.warning(f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}") return start_time = time.time() - logger.info(f"Starting bulk indexing of {len(shouts)} documents") + logger.info(f"Starting multi-endpoint bulk indexing of {len(shouts)} documents") - MAX_TEXT_LENGTH = 4000 # Maximum text length to send in a single request - max_batch_size = MAX_BATCH_SIZE - total_indexed = 0 + # Prepare documents for different endpoints + title_docs = [] + body_docs = [] + author_docs = {} # Use dict to prevent duplicate authors + total_skipped = 0 - total_truncated = 0 - total_retries = 0 - # Group documents by size to process smaller documents in larger batches - small_docs = [] - medium_docs = [] - large_docs = [] - - # First pass: prepare all documents and categorize by size for shout in shouts: try: - text_fields = [] - for field_name in ['title', 'subtitle', 'lead', 'body']: + # 1. Process title documents + if hasattr(shout, 'title') and shout.title and isinstance(shout.title, str): + title_docs.append({ + "id": str(shout.id), + "title": shout.title.strip() + }) + + # 2. Process body documents (subtitle, lead, body) + body_text_parts = [] + for field_name in ['subtitle', 'lead', 'body']: field_value = getattr(shout, field_name, None) if field_value and isinstance(field_value, str) and field_value.strip(): - text_fields.append(field_value.strip()) + body_text_parts.append(field_value.strip()) - # Media field processing remains the same + # Process media content if available media = getattr(shout, 'media', None) if media: if isinstance(media, str): @@ -318,186 +420,180 @@ class SearchService: media_json = json.loads(media) if isinstance(media_json, dict): if 'title' in media_json: - text_fields.append(media_json['title']) + body_text_parts.append(media_json['title']) if 'body' in media_json: - text_fields.append(media_json['body']) + body_text_parts.append(media_json['body']) except json.JSONDecodeError: - text_fields.append(media) + body_text_parts.append(media) elif isinstance(media, dict): if 'title' in media: - text_fields.append(media['title']) + body_text_parts.append(media['title']) if 'body' in media: - text_fields.append(media['body']) + body_text_parts.append(media['body']) - text = " ".join(text_fields) - - if not text.strip(): - total_skipped += 1 - continue - - # Truncate text if it exceeds the maximum length - original_length = len(text) - if original_length > MAX_TEXT_LENGTH: - text = text[:MAX_TEXT_LENGTH] - total_truncated += 1 - - document = { - "id": str(shout.id), - "text": text - } - - # Categorize by size - text_len = len(text) - if text_len > 5000: - large_docs.append(document) - elif text_len > 2000: - medium_docs.append(document) - else: - small_docs.append(document) - - total_indexed += 1 + # Only add body document if we have body text + if body_text_parts: + body_text = " ".join(body_text_parts) + # Truncate if too long + MAX_TEXT_LENGTH = 4000 + if len(body_text) > MAX_TEXT_LENGTH: + body_text = body_text[:MAX_TEXT_LENGTH] + + body_docs.append({ + "id": str(shout.id), + "body": body_text + }) + # 3. Process authors if available + authors = getattr(shout, 'authors', []) + for author in authors: + author_id = str(getattr(author, 'id', 0)) + if not author_id or author_id == '0': + continue + + # Skip if we've already processed this author + if author_id in author_docs: + continue + + name = getattr(author, 'name', '') + + # Combine bio and about fields + bio_parts = [] + bio = getattr(author, 'bio', '') + if bio and isinstance(bio, str): + bio_parts.append(bio.strip()) + + about = getattr(author, 'about', '') + if about and isinstance(about, str): + bio_parts.append(about.strip()) + + combined_bio = " ".join(bio_parts) + + # Only add if we have author data + if name: + author_docs[author_id] = { + "id": author_id, + "name": name, + "bio": combined_bio + } + except Exception as e: logger.error(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing: {e}") total_skipped += 1 - # Process each category with appropriate batch sizes - logger.info(f"Documents categorized: {len(small_docs)} small, {len(medium_docs)} medium, {len(large_docs)} large") + # Convert author dict to list + author_docs_list = list(author_docs.values()) - # Process small documents (larger batches) - if small_docs: - batch_size = min(max_batch_size, 15) - await self._process_document_batches(small_docs, batch_size, "small") - - # Process medium documents (medium batches) - if medium_docs: - batch_size = min(max_batch_size, 10) - await self._process_document_batches(medium_docs, batch_size, "medium") - - # Process large documents (small batches) - if large_docs: - batch_size = min(max_batch_size, 3) - await self._process_document_batches(large_docs, batch_size, "large") + # Process each endpoint in parallel + indexing_tasks = [ + self._index_endpoint(title_docs, "/bulk-index-titles", "title"), + self._index_endpoint(body_docs, "/bulk-index-bodies", "body"), + self._index_endpoint(author_docs_list, "/bulk-index-authors", "author") + ] + + await asyncio.gather(*indexing_tasks) elapsed = time.time() - start_time - logger.info(f"Bulk indexing completed in {elapsed:.2f}s: {total_indexed} indexed, {total_skipped} skipped, {total_truncated} truncated, {total_retries} retries") - - async def _process_document_batches(self, documents, batch_size, size_category): - """Process document batches with retry logic""" - # Check for possible database corruption before starting - db_error_count = 0 - - for i in range(0, len(documents), batch_size): - batch = documents[i:i+batch_size] - batch_id = f"{size_category}-{i//batch_size + 1}" - logger.info(f"Processing {size_category} batch {batch_id} of {len(batch)} documents") - - retry_count = 0 - max_retries = 3 - success = False - - # Process with retries - while not success and retry_count < max_retries: - try: - logger.info(f"Sending batch {batch_id} of {len(batch)} documents to search service (attempt {retry_count+1})") - response = await self.index_client.post( - "/bulk-index", - json=batch, - timeout=120.0 # Explicit longer timeout for large batches - ) - - # Handle 422 validation errors - these won't be fixed by retrying - if response.status_code == 422: - error_detail = response.json() - truncated_error = self._truncate_error_detail(error_detail) - logger.error(f"Validation error from search service for batch {batch_id}: {truncated_error}") - break - - # Handle 500 server errors - these might be fixed by retrying with smaller batches - elif response.status_code == 500: - db_error_count += 1 - - # If we've seen multiple 500s, log a critical error - if db_error_count >= 3: - logger.critical(f"Multiple server errors detected (500). The search service may need manual intervention. Stopping batch {batch_id} processing.") - break - - # Try again with exponential backoff - if retry_count < max_retries - 1: - retry_count += 1 - wait_time = (2 ** retry_count) + (random.random() * 0.5) # Exponential backoff with jitter - await asyncio.sleep(wait_time) - continue - - # Final retry, split the batch - elif len(batch) > 1: - mid = len(batch) // 2 - await self._process_single_batch(batch[:mid], f"{batch_id}-A") - await self._process_single_batch(batch[mid:], f"{batch_id}-B") - break - else: - # Can't split a single document - break - - # Normal success case - response.raise_for_status() - success = True - db_error_count = 0 # Reset error counter on success - - except Exception as e: - error_str = str(e).lower() - if "duplicate key" in error_str or "unique constraint" in error_str or "nonetype" in error_str: - db_error_count += 1 - if db_error_count >= 2: - logger.critical(f"Potential database corruption detected: {error_str}. The search service may need manual intervention. Stopping batch {batch_id} processing.") - break - - if retry_count < max_retries - 1: - retry_count += 1 - wait_time = (2 ** retry_count) + (random.random() * 0.5) - await asyncio.sleep(wait_time) - else: - if len(batch) > 1: - mid = len(batch) // 2 - await self._process_single_batch(batch[:mid], f"{batch_id}-A") - await self._process_single_batch(batch[mid:], f"{batch_id}-B") - break - - async def _process_single_batch(self, documents, batch_id): - """Process a single batch with maximum reliability""" - max_retries = 3 - retry_count = 0 + logger.info( + f"Multi-endpoint indexing completed in {elapsed:.2f}s: " + f"{len(title_docs)} titles, {len(body_docs)} bodies, {len(author_docs_list)} authors, " + f"{total_skipped} shouts skipped" + ) - while retry_count < max_retries: - try: - if not documents: - return + async def _index_endpoint(self, documents, endpoint, doc_type): + """Process and index documents to a specific endpoint""" + if not documents: + logger.info(f"No {doc_type} documents to index") + return + + logger.info(f"Indexing {len(documents)} {doc_type} documents") + + # Categorize documents by size + small_docs, medium_docs, large_docs = self._categorize_by_size(documents, doc_type) + + # Process each category with appropriate batch sizes + batch_sizes = { + "small": min(MAX_BATCH_SIZE, 15), + "medium": min(MAX_BATCH_SIZE, 10), + "large": min(MAX_BATCH_SIZE, 3) + } + + for category, docs in [("small", small_docs), ("medium", medium_docs), ("large", large_docs)]: + if docs: + batch_size = batch_sizes[category] + await self._process_batches(docs, batch_size, endpoint, f"{doc_type}-{category}") + + def _categorize_by_size(self, documents, doc_type): + """Categorize documents by size for optimized batch processing""" + small_docs = [] + medium_docs = [] + large_docs = [] + + for doc in documents: + # Extract relevant text based on document type + if doc_type == "title": + text = doc.get("title", "") + elif doc_type == "body": + text = doc.get("body", "") + else: # author + # For authors, consider both name and bio length + text = doc.get("name", "") + " " + doc.get("bio", "") + + text_len = len(text) + + if text_len > 5000: + large_docs.append(doc) + elif text_len > 2000: + medium_docs.append(doc) + else: + small_docs.append(doc) + + logger.info(f"{doc_type.capitalize()} documents categorized: {len(small_docs)} small, {len(medium_docs)} medium, {len(large_docs)} large") + return small_docs, medium_docs, large_docs + + async def _process_batches(self, documents, batch_size, endpoint, batch_prefix): + """Process document batches with retry logic""" + for i in range(0, len(documents), batch_size): + batch = documents[i:i+batch_size] + batch_id = f"{batch_prefix}-{i//batch_size + 1}" + + retry_count = 0 + max_retries = 3 + success = False + + while not success and retry_count < max_retries: + try: + logger.info(f"Sending batch {batch_id} ({len(batch)} docs) to {endpoint}") + response = await self.index_client.post( + endpoint, + json=batch, + timeout=90.0 + ) + + if response.status_code == 422: + error_detail = response.json() + logger.error(f"Validation error from search service for batch {batch_id}: {self._truncate_error_detail(error_detail)}") + break + + response.raise_for_status() + success = True + logger.info(f"Successfully indexed batch {batch_id}") + + except Exception as e: + retry_count += 1 + if retry_count >= max_retries: + if len(batch) > 1: + mid = len(batch) // 2 + logger.warning(f"Splitting batch {batch_id} into smaller batches for retry") + await self._process_batches(batch[:mid], batch_size // 2, endpoint, f"{batch_prefix}-{i//batch_size}-A") + await self._process_batches(batch[mid:], batch_size // 2, endpoint, f"{batch_prefix}-{i//batch_size}-B") + else: + logger.error(f"Failed to index single document in batch {batch_id} after {max_retries} attempts: {str(e)}") + break - response = await self.index_client.post( - "/bulk-index", - json=documents, - timeout=90.0 - ) - response.raise_for_status() - return # Success, exit the retry loop - - except Exception as e: - error_str = str(e).lower() - retry_count += 1 - - if "dictionary changed size" in error_str or "transaction error" in error_str: wait_time = (2 ** retry_count) + (random.random() * 0.5) - await asyncio.sleep(wait_time) # Wait for txtai to recover - continue - - if retry_count >= max_retries and len(documents) > 1: - for i, doc in enumerate(documents): - try: - resp = await self.index_client.post("/index", json=doc, timeout=30.0) - resp.raise_for_status() - except Exception as e2: - pass - return # Exit after individual processing attempt + logger.warning(f"Retrying batch {batch_id} in {wait_time:.1f}s... (attempt {retry_count+1}/{max_retries})") + await asyncio.sleep(wait_time) def _truncate_error_detail(self, error_detail): """Truncate error details for logging""" @@ -632,7 +728,7 @@ async def initialize_search_index(shouts_data): return index_stats = info.get("index_stats", {}) - indexed_doc_count = index_stats.get("document_count", 0) + indexed_doc_count = index_stats.get("total_count", 0) index_status = await search_service.check_index_status() if index_status.get("status") == "inconsistent":