2024-02-29 11:04:24 +00:00
import asyncio
2022-11-17 19:53:58 +00:00
import json
2024-06-02 13:36:12 +00:00
import logging
2024-06-02 14:01:22 +00:00
import os
2025-03-12 15:06:09 +00:00
import httpx
2025-03-12 17:13:55 +00:00
import time
2025-03-21 18:40:29 +00:00
import random
2023-12-17 20:30:20 +00:00
2025-03-12 17:13:55 +00:00
# Set up proper logging
2024-06-02 13:36:12 +00:00
logger = logging . getLogger ( " search " )
2025-03-12 17:13:55 +00:00
logger . setLevel ( logging . INFO ) # Change to INFO to see more details
2024-06-02 13:36:12 +00:00
2025-03-12 15:06:09 +00:00
# Configuration for search service
2025-03-05 20:08:21 +00:00
SEARCH_ENABLED = bool ( os . environ . get ( " SEARCH_ENABLED " , " true " ) . lower ( ) in [ " true " , " 1 " , " yes " ] )
2025-03-21 18:40:29 +00:00
TXTAI_SERVICE_URL = os . environ . get ( " TXTAI_SERVICE_URL " , " none " )
2025-03-19 17:47:31 +00:00
MAX_BATCH_SIZE = int ( os . environ . get ( " SEARCH_MAX_BATCH_SIZE " , " 25 " ) )
2024-05-18 08:52:17 +00:00
2024-02-29 11:09:50 +00:00
2024-01-29 01:09:54 +00:00
class SearchService :
2025-03-12 15:06:09 +00:00
def __init__ ( self ) :
2025-03-12 17:13:55 +00:00
logger . info ( f " Initializing search service with URL: { TXTAI_SERVICE_URL } " )
2025-03-05 20:08:21 +00:00
self . available = SEARCH_ENABLED
2025-03-19 17:47:31 +00:00
# Use different timeout settings for indexing and search requests
2025-03-12 16:11:19 +00:00
self . client = httpx . AsyncClient ( timeout = 30.0 , base_url = TXTAI_SERVICE_URL )
2025-03-19 17:47:31 +00:00
self . index_client = httpx . AsyncClient ( timeout = 120.0 , base_url = TXTAI_SERVICE_URL )
2025-03-05 20:08:21 +00:00
if not self . available :
2025-03-12 15:06:09 +00:00
logger . info ( " Search disabled (SEARCH_ENABLED = False) " )
2025-03-05 20:08:21 +00:00
2024-05-18 08:22:13 +00:00
async def info ( self ) :
2025-03-05 20:08:21 +00:00
""" Return information about search service """
if not self . available :
2024-11-22 17:32:14 +00:00
return { " status " : " disabled " }
2024-12-11 20:02:14 +00:00
2024-11-22 17:23:45 +00:00
try :
2025-03-12 15:06:09 +00:00
response = await self . client . get ( " /info " )
response . raise_for_status ( )
2025-03-12 17:13:55 +00:00
result = response . json ( )
logger . info ( f " Search service info: { result } " )
return result
2024-11-22 17:32:14 +00:00
except Exception as e :
logger . error ( f " Failed to get search info: { e } " )
return { " status " : " error " , " message " : str ( e ) }
2024-01-29 01:41:46 +00:00
2025-03-05 20:08:21 +00:00
def is_ready ( self ) :
2025-03-12 15:06:09 +00:00
""" Check if service is available """
2025-03-12 16:11:19 +00:00
return self . available
2025-03-25 16:31:45 +00:00
async def verify_docs ( self , doc_ids ) :
""" Verify which documents exist in the search index """
if not self . available :
return { " status " : " disabled " }
try :
logger . info ( f " Verifying { len ( doc_ids ) } documents in search index " )
response = await self . client . post (
" /verify-docs " ,
json = { " doc_ids " : doc_ids } ,
timeout = 60.0 # Longer timeout for potentially large ID lists
)
response . raise_for_status ( )
result = response . json ( )
# Log summary of verification results
missing_count = len ( result . get ( " missing " , [ ] ) )
logger . info ( f " Document verification complete: { missing_count } missing out of { len ( doc_ids ) } total " )
return result
except Exception as e :
logger . error ( f " Document verification error: { e } " )
return { " status " : " error " , " message " : str ( e ) }
2024-01-29 00:27:30 +00:00
2024-01-29 03:42:02 +00:00
def index ( self , shout ) :
2025-03-05 20:08:21 +00:00
""" Index a single document """
if not self . available :
2024-11-22 17:32:14 +00:00
return
2024-12-11 20:02:14 +00:00
2025-03-12 15:06:09 +00:00
logger . info ( f " Indexing post { shout . id } " )
2025-03-05 20:08:21 +00:00
# Start in background to not block
asyncio . create_task ( self . perform_index ( shout ) )
async def perform_index ( self , shout ) :
""" Actually perform the indexing operation """
2025-03-12 15:06:09 +00:00
if not self . available :
return
2025-03-05 20:08:21 +00:00
try :
# Combine all text fields
text = " " . join ( filter ( None , [
shout . title or " " ,
shout . subtitle or " " ,
shout . lead or " " ,
shout . body or " " ,
shout . media or " "
] ) )
2025-03-12 17:13:55 +00:00
if not text . strip ( ) :
logger . warning ( f " No text content to index for shout { shout . id } " )
return
logger . info ( f " Indexing document: ID= { shout . id } , Text length= { len ( text ) } " )
2025-03-12 16:11:19 +00:00
# Send to txtai service
response = await self . client . post (
" /index " ,
json = { " id " : str ( shout . id ) , " text " : text }
2025-03-05 20:08:21 +00:00
)
2025-03-12 16:11:19 +00:00
response . raise_for_status ( )
2025-03-12 17:13:55 +00:00
result = response . json ( )
logger . info ( f " Post { shout . id } successfully indexed: { result } " )
2025-03-05 20:08:21 +00:00
except Exception as e :
logger . error ( f " Indexing error for shout { shout . id } : { e } " )
2024-04-08 07:23:54 +00:00
2025-03-05 20:08:21 +00:00
async def bulk_index ( self , shouts ) :
2025-03-21 18:40:29 +00:00
""" Index multiple documents at once with adaptive batch sizing """
2025-03-05 20:08:21 +00:00
if not self . available or not shouts :
2025-03-12 17:13:55 +00:00
logger . warning ( f " Bulk indexing skipped: available= { self . available } , shouts_count= { len ( shouts ) if shouts else 0 } " )
2025-03-05 20:08:21 +00:00
return
2025-03-12 17:13:55 +00:00
start_time = time . time ( )
logger . info ( f " Starting bulk indexing of { len ( shouts ) } documents " )
2025-03-21 17:18:32 +00:00
2025-03-21 20:56:54 +00:00
MAX_TEXT_LENGTH = 4000 # Maximum text length to send in a single request
2025-03-21 18:40:29 +00:00
max_batch_size = MAX_BATCH_SIZE
2025-03-12 17:13:55 +00:00
total_indexed = 0
total_skipped = 0
2025-03-21 17:18:32 +00:00
total_truncated = 0
2025-03-21 18:40:29 +00:00
total_retries = 0
# Group documents by size to process smaller documents in larger batches
small_docs = [ ]
medium_docs = [ ]
large_docs = [ ]
# First pass: prepare all documents and categorize by size
for shout in shouts :
try :
text_fields = [ ]
for field_name in [ ' title ' , ' subtitle ' , ' lead ' , ' body ' ] :
field_value = getattr ( shout , field_name , None )
if field_value and isinstance ( field_value , str ) and field_value . strip ( ) :
text_fields . append ( field_value . strip ( ) )
# Media field processing remains the same
media = getattr ( shout , ' media ' , None )
if media :
# Your existing media processing logic
if isinstance ( media , str ) :
try :
media_json = json . loads ( media )
if isinstance ( media_json , dict ) :
if ' title ' in media_json :
text_fields . append ( media_json [ ' title ' ] )
if ' body ' in media_json :
text_fields . append ( media_json [ ' body ' ] )
except json . JSONDecodeError :
text_fields . append ( media )
elif isinstance ( media , dict ) :
if ' title ' in media :
text_fields . append ( media [ ' title ' ] )
if ' body ' in media :
text_fields . append ( media [ ' body ' ] )
text = " " . join ( text_fields )
if not text . strip ( ) :
logger . debug ( f " Skipping shout { shout . id } : no text content " )
total_skipped + = 1
continue
2025-03-21 17:18:32 +00:00
2025-03-21 18:40:29 +00:00
# Truncate text if it exceeds the maximum length
original_length = len ( text )
if original_length > MAX_TEXT_LENGTH :
text = text [ : MAX_TEXT_LENGTH ]
logger . info ( f " Truncated document { shout . id } from { original_length } to { MAX_TEXT_LENGTH } chars " )
total_truncated + = 1
document = {
" id " : str ( shout . id ) ,
" text " : text
}
# Categorize by size
text_len = len ( text )
if text_len > 5000 :
large_docs . append ( document )
elif text_len > 2000 :
medium_docs . append ( document )
else :
small_docs . append ( document )
total_indexed + = 1
except Exception as e :
logger . error ( f " Error processing shout { getattr ( shout , ' id ' , ' unknown ' ) } for indexing: { e } " )
total_skipped + = 1
# Process each category with appropriate batch sizes
logger . info ( f " Documents categorized: { len ( small_docs ) } small, { len ( medium_docs ) } medium, { len ( large_docs ) } large " )
# Process small documents (larger batches)
if small_docs :
2025-03-21 20:56:54 +00:00
batch_size = min ( max_batch_size , 15 )
2025-03-21 18:40:29 +00:00
await self . _process_document_batches ( small_docs , batch_size , " small " )
2025-03-05 20:08:21 +00:00
2025-03-21 18:40:29 +00:00
# Process medium documents (medium batches)
if medium_docs :
2025-03-21 20:56:54 +00:00
batch_size = min ( max_batch_size , 10 )
2025-03-21 18:40:29 +00:00
await self . _process_document_batches ( medium_docs , batch_size , " medium " )
# Process large documents (small batches)
if large_docs :
2025-03-21 20:56:54 +00:00
batch_size = min ( max_batch_size , 3 )
2025-03-21 18:40:29 +00:00
await self . _process_document_batches ( large_docs , batch_size , " large " )
elapsed = time . time ( ) - start_time
logger . info ( f " Bulk indexing completed in { elapsed : .2f } s: { total_indexed } indexed, { total_skipped } skipped, { total_truncated } truncated, { total_retries } retries " )
async def _process_document_batches ( self , documents , batch_size , size_category ) :
2025-03-24 22:47:02 +00:00
""" Process document batches with retry logic """
# Check for possible database corruption before starting
db_error_count = 0
2025-03-21 18:40:29 +00:00
2025-03-24 22:47:02 +00:00
for i in range ( 0 , len ( documents ) , batch_size ) :
batch = documents [ i : i + batch_size ]
batch_id = f " { size_category } - { i / / batch_size + 1 } "
logger . info ( f " Processing { size_category } batch { batch_id } of { len ( batch ) } documents " )
retry_count = 0
max_retries = 3
success = False
# Process with retries
while not success and retry_count < max_retries :
try :
if batch :
sample = batch [ 0 ]
logger . info ( f " Sample document in batch { batch_id } : id= { sample [ ' id ' ] } , text_length= { len ( sample [ ' text ' ] ) } " )
logger . info ( f " Sending batch { batch_id } of { len ( batch ) } documents to search service (attempt { retry_count + 1 } ) " )
response = await self . index_client . post (
" /bulk-index " ,
json = batch ,
timeout = 120.0 # Explicit longer timeout for large batches
)
2025-03-21 20:28:54 +00:00
2025-03-24 22:47:02 +00:00
# Handle 422 validation errors - these won't be fixed by retrying
if response . status_code == 422 :
error_detail = response . json ( )
truncated_error = self . _truncate_error_detail ( error_detail )
logger . error ( f " Validation error from search service for batch { batch_id } : { truncated_error } " )
break
# Handle 500 server errors - these might be fixed by retrying with smaller batches
elif response . status_code == 500 :
db_error_count + = 1
# If we've seen multiple 500s, log a critical error
if db_error_count > = 3 :
logger . critical ( f " Multiple server errors detected (500). The search service may need manual intervention. Stopping batch { batch_id } processing. " )
break
# Try again with exponential backoff
if retry_count < max_retries - 1 :
retry_count + = 1
wait_time = ( 2 * * retry_count ) + ( random . random ( ) * 0.5 ) # Exponential backoff with jitter
logger . warning ( f " Server error for batch { batch_id } , retrying in { wait_time : .1f } s (attempt { retry_count + 1 } / { max_retries } ) " )
await asyncio . sleep ( wait_time )
2025-03-21 20:28:54 +00:00
continue
2025-03-24 22:47:02 +00:00
# Final retry, split the batch
elif len ( batch ) > 1 :
logger . warning ( f " Splitting batch { batch_id } after repeated failures " )
mid = len ( batch ) / / 2
await self . _process_single_batch ( batch [ : mid ] , f " { batch_id } -A " )
await self . _process_single_batch ( batch [ mid : ] , f " { batch_id } -B " )
break
else :
# Can't split a single document
logger . error ( f " Failed to index document { batch [ 0 ] [ ' id ' ] } after { max_retries } attempts " )
break
# Normal success case
response . raise_for_status ( )
result = response . json ( )
logger . info ( f " Batch { batch_id } indexed successfully: { result } " )
success = True
db_error_count = 0 # Reset error counter on success
except Exception as e :
# Check if it looks like a database corruption error
error_str = str ( e ) . lower ( )
if " duplicate key " in error_str or " unique constraint " in error_str or " nonetype " in error_str :
db_error_count + = 1
if db_error_count > = 2 :
logger . critical ( f " Potential database corruption detected: { error_str } . The search service may need manual intervention. Stopping batch { batch_id } processing. " )
break
2025-03-21 18:40:29 +00:00
if retry_count < max_retries - 1 :
retry_count + = 1
2025-03-24 22:47:02 +00:00
wait_time = ( 2 * * retry_count ) + ( random . random ( ) * 0.5 )
logger . warning ( f " Error for batch { batch_id } , retrying in { wait_time : .1f } s: { str ( e ) [ : 200 ] } " )
2025-03-21 18:40:29 +00:00
await asyncio . sleep ( wait_time )
else :
2025-03-24 22:47:02 +00:00
# Last resort - try to split the batch
if len ( batch ) > 1 :
logger . warning ( f " Splitting batch { batch_id } after exception: { str ( e ) [ : 200 ] } " )
mid = len ( batch ) / / 2
await self . _process_single_batch ( batch [ : mid ] , f " { batch_id } -A " )
await self . _process_single_batch ( batch [ mid : ] , f " { batch_id } -B " )
else :
logger . error ( f " Failed to index document { batch [ 0 ] [ ' id ' ] } after { max_retries } attempts: { e } " )
2025-03-21 18:40:29 +00:00
break
async def _process_single_batch ( self , documents , batch_id ) :
""" Process a single batch with maximum reliability """
2025-03-24 23:16:07 +00:00
max_retries = 3
retry_count = 0
while retry_count < max_retries :
try :
if not documents :
return
logger . info ( f " Processing sub-batch { batch_id } with { len ( documents ) } documents " )
response = await self . index_client . post (
" /bulk-index " ,
json = documents ,
timeout = 90.0
)
response . raise_for_status ( )
result = response . json ( )
logger . info ( f " Sub-batch { batch_id } indexed successfully: { result } " )
return # Success, exit the retry loop
2025-03-12 17:13:55 +00:00
2025-03-24 23:16:07 +00:00
except Exception as e :
error_str = str ( e ) . lower ( )
retry_count + = 1
# Check if it's a transient error that txtai might recover from internally
if " dictionary changed size " in error_str or " transaction error " in error_str :
wait_time = ( 2 * * retry_count ) + ( random . random ( ) * 0.5 )
logger . warning ( f " Transient txtai error in sub-batch { batch_id } , waiting { wait_time : .1f } s for recovery: { str ( e ) [ : 200 ] } " )
await asyncio . sleep ( wait_time ) # Wait for txtai to recover
continue # Try again
# For other errors or final retry failure
logger . error ( f " Error indexing sub-batch { batch_id } (attempt { retry_count } / { max_retries } ): { str ( e ) [ : 200 ] } " )
# Only try one-by-one on the final retry
if retry_count > = max_retries and len ( documents ) > 1 :
logger . info ( f " Processing documents in sub-batch { batch_id } individually " )
for i , doc in enumerate ( documents ) :
try :
resp = await self . index_client . post ( " /index " , json = doc , timeout = 30.0 )
resp . raise_for_status ( )
logger . info ( f " Indexed document { doc [ ' id ' ] } individually " )
except Exception as e2 :
logger . error ( f " Failed to index document { doc [ ' id ' ] } individually: { str ( e2 ) [ : 100 ] } " )
return # Exit after individual processing attempt
2025-03-21 18:40:29 +00:00
def _truncate_error_detail ( self , error_detail ) :
""" Truncate error details for logging """
truncated_detail = error_detail . copy ( ) if isinstance ( error_detail , dict ) else error_detail
2025-03-12 17:13:55 +00:00
2025-03-21 18:40:29 +00:00
if isinstance ( truncated_detail , dict ) and ' detail ' in truncated_detail and isinstance ( truncated_detail [ ' detail ' ] , list ) :
for i , item in enumerate ( truncated_detail [ ' detail ' ] ) :
if isinstance ( item , dict ) and ' input ' in item :
if isinstance ( item [ ' input ' ] , dict ) and any ( k in item [ ' input ' ] for k in [ ' documents ' , ' text ' ] ) :
# Check for documents list
if ' documents ' in item [ ' input ' ] and isinstance ( item [ ' input ' ] [ ' documents ' ] , list ) :
for j , doc in enumerate ( item [ ' input ' ] [ ' documents ' ] ) :
if ' text ' in doc and isinstance ( doc [ ' text ' ] , str ) and len ( doc [ ' text ' ] ) > 100 :
item [ ' input ' ] [ ' documents ' ] [ j ] [ ' text ' ] = f " { doc [ ' text ' ] [ : 100 ] } ... [truncated, total { len ( doc [ ' text ' ] ) } chars] "
# Check for direct text field
if ' text ' in item [ ' input ' ] and isinstance ( item [ ' input ' ] [ ' text ' ] , str ) and len ( item [ ' input ' ] [ ' text ' ] ) > 100 :
item [ ' input ' ] [ ' text ' ] = f " { item [ ' input ' ] [ ' text ' ] [ : 100 ] } ... [truncated, total { len ( item [ ' input ' ] [ ' text ' ] ) } chars] "
return truncated_detail
2024-01-29 00:27:30 +00:00
2024-01-29 06:45:00 +00:00
async def search ( self , text , limit , offset ) :
2025-03-05 20:08:21 +00:00
""" Search documents """
if not self . available :
2025-03-12 17:13:55 +00:00
logger . warning ( " Search not available " )
return [ ]
if not isinstance ( text , str ) or not text . strip ( ) :
logger . warning ( f " Invalid search text: { text } " )
2024-11-22 17:32:14 +00:00
return [ ]
2025-03-05 20:08:21 +00:00
2025-03-12 17:13:55 +00:00
logger . info ( f " Searching for: ' { text } ' (limit= { limit } , offset= { offset } ) " )
2025-03-05 20:08:21 +00:00
try :
2025-03-12 17:13:55 +00:00
logger . info ( f " Sending search request: text= ' { text } ' , limit= { limit } , offset= { offset } " )
2025-03-12 16:11:19 +00:00
response = await self . client . post (
" /search " ,
json = { " text " : text , " limit " : limit , " offset " : offset }
2025-03-12 15:06:09 +00:00
)
2025-03-12 16:11:19 +00:00
response . raise_for_status ( )
2025-03-12 17:13:55 +00:00
logger . info ( f " Raw search response: { response . text } " )
2025-03-12 16:11:19 +00:00
result = response . json ( )
2025-03-12 17:13:55 +00:00
logger . info ( f " Parsed search response: { result } " )
2025-03-12 15:06:09 +00:00
formatted_results = result . get ( " results " , [ ] )
2025-03-12 17:13:55 +00:00
logger . info ( f " Search for ' { text } ' returned { len ( formatted_results ) } results " )
if formatted_results :
logger . info ( f " Sample result: { formatted_results [ 0 ] } " )
else :
logger . warning ( f " No results found for ' { text } ' " )
2025-03-05 20:08:21 +00:00
return formatted_results
except Exception as e :
2025-03-12 17:13:55 +00:00
logger . error ( f " Search error for ' { text } ' : { e } " , exc_info = True )
2025-03-05 20:08:21 +00:00
return [ ]
2024-01-29 00:27:30 +00:00
2024-02-29 11:09:50 +00:00
2025-03-05 20:08:21 +00:00
# Create the search service singleton
2024-01-29 03:42:02 +00:00
search_service = SearchService ( )
2024-01-29 01:41:46 +00:00
2024-02-29 11:09:50 +00:00
2025-03-19 17:47:31 +00:00
# API-compatible function to perform a search
2024-01-29 01:41:46 +00:00
async def search_text ( text : str , limit : int = 50 , offset : int = 0 ) :
payload = [ ]
2025-03-05 20:08:21 +00:00
if search_service . available :
2024-01-29 07:48:36 +00:00
payload = await search_service . search ( text , limit , offset )
2024-01-29 01:41:46 +00:00
return payload
2024-11-22 17:23:45 +00:00
2024-12-11 20:02:14 +00:00
2025-03-05 20:08:21 +00:00
async def initialize_search_index ( shouts_data ) :
""" Initialize search index with existing data during application startup """
2025-03-25 16:31:45 +00:00
if not SEARCH_ENABLED :
logger . info ( " Search indexing skipped (SEARCH_ENABLED=False) " )
return
2025-03-12 17:13:55 +00:00
2025-03-25 16:31:45 +00:00
if not shouts_data :
logger . warning ( " No shouts data provided for search indexing " )
return
2025-03-12 17:13:55 +00:00
2025-03-25 16:31:45 +00:00
logger . info ( f " Checking search index status for { len ( shouts_data ) } documents " )
# Get the current index info
info = await search_service . info ( )
if info . get ( " status " ) in [ " error " , " unavailable " , " disabled " ] :
logger . error ( f " Cannot initialize search index: { info } " )
return
# Check if index has approximately right number of documents
index_stats = info . get ( " index_stats " , { } )
indexed_doc_count = index_stats . get ( " document_count " , 0 )
2025-03-25 17:44:05 +00:00
# Log database document summary
db_ids = [ str ( shout . id ) for shout in shouts_data ]
logger . info ( f " Database contains { len ( shouts_data ) } documents. Sample IDs: { ' , ' . join ( db_ids [ : 5 ] ) } ... " )
# Calculate summary by ID range to understand the coverage
try :
# Parse numeric IDs where possible to analyze coverage
numeric_ids = [ int ( sid ) for sid in db_ids if sid . isdigit ( ) ]
if numeric_ids :
min_id = min ( numeric_ids )
max_id = max ( numeric_ids )
id_range = max_id - min_id + 1
coverage_pct = ( len ( numeric_ids ) / id_range ) * 100 if id_range > 0 else 0
logger . info ( f " ID range analysis: min_id= { min_id } , max_id= { max_id } , range= { id_range } , "
f " coverage= { coverage_pct : .1f } % ( { len ( numeric_ids ) } / { id_range } ) " )
except Exception as e :
logger . warning ( f " Could not analyze ID ranges: { e } " )
2025-03-25 16:31:45 +00:00
# If counts are significantly different, do verification
if abs ( indexed_doc_count - len ( shouts_data ) ) > 10 :
logger . info ( f " Document count mismatch: { indexed_doc_count } in index vs { len ( shouts_data ) } in database. Verifying... " )
2025-03-12 17:13:55 +00:00
2025-03-25 16:31:45 +00:00
# Get all document IDs from your database
doc_ids = [ str ( shout . id ) for shout in shouts_data ]
# Verify which ones are missing from the index
verification = await search_service . verify_docs ( doc_ids )
if verification . get ( " status " ) == " error " :
logger . error ( f " Document verification failed: { verification . get ( ' message ' ) } " )
return
2025-03-12 17:13:55 +00:00
2025-03-25 16:31:45 +00:00
# Index only missing documents
missing_ids = verification . get ( " missing " , [ ] )
if missing_ids :
logger . info ( f " Found { len ( missing_ids ) } documents missing from index. Indexing them... " )
2025-03-25 17:44:05 +00:00
logger . info ( f " Sample missing IDs: { ' , ' . join ( missing_ids [ : 10 ] ) } ... " )
2025-03-25 16:31:45 +00:00
missing_docs = [ shout for shout in shouts_data if str ( shout . id ) in missing_ids ]
await search_service . bulk_index ( missing_docs )
else :
logger . info ( " All documents are already indexed. " )
2025-03-12 17:13:55 +00:00
else :
2025-03-25 16:31:45 +00:00
logger . info ( f " Search index appears to be in sync ( { indexed_doc_count } documents indexed). " )
2025-03-25 17:44:05 +00:00
# Optional sample verification (can be slow with large document sets)
# Uncomment if you want to periodically check a random sample even when counts match
"""
sample_size = 10
if len ( db_ids ) > sample_size :
sample_ids = random . sample ( db_ids , sample_size )
logger . info ( f " Performing random sample verification on { sample_size } documents... " )
verification = await search_service . verify_docs ( sample_ids )
if verification . get ( " missing " ) :
missing_count = len ( verification . get ( " missing " , [ ] ) )
logger . warning ( f " Random verification found { missing_count } / { sample_size } missing docs "
f " despite count match. Consider full verification. " )
else :
logger . info ( " Random document sample verification passed. " )
"""
2025-03-25 16:31:45 +00:00
# Verify with test query
try :
test_query = " test "
logger . info ( f " Verifying search index with query: ' { test_query } ' " )
test_results = await search_text ( test_query , 5 )
if test_results :
logger . info ( f " Search verification successful: found { len ( test_results ) } results " )
2025-03-25 17:44:05 +00:00
# Log categories covered by search results
categories = set ( )
for result in test_results :
result_id = result . get ( " id " )
matching_shouts = [ s for s in shouts_data if str ( s . id ) == result_id ]
if matching_shouts and hasattr ( matching_shouts [ 0 ] , ' category ' ) :
categories . add ( getattr ( matching_shouts [ 0 ] , ' category ' , ' unknown ' ) )
if categories :
logger . info ( f " Search results cover categories: { ' , ' . join ( categories ) } " )
2025-03-25 16:31:45 +00:00
else :
logger . warning ( " Search verification returned no results. Index may be empty or not working. " )
except Exception as e :
logger . error ( f " Error verifying search index: { e } " )