From 106222b0e081b41e1fa662fb1716d5ff46df23ac Mon Sep 17 00:00:00 2001 From: Stepan Vladovskiy Date: Mon, 7 Apr 2025 11:41:48 -0300 Subject: [PATCH] debug: without debug logging. clean --- resolvers/reader.py | 10 ---- services/search.py | 135 ++++---------------------------------------- 2 files changed, 10 insertions(+), 135 deletions(-) diff --git a/resolvers/reader.py b/resolvers/reader.py index e83ff6d4..aeb60e50 100644 --- a/resolvers/reader.py +++ b/resolvers/reader.py @@ -187,12 +187,10 @@ def get_shouts_with_links(info, q, limit=20, offset=0): """ shouts = [] try: - # logger.info(f"Starting get_shouts_with_links with limit={limit}, offset={offset}") q = q.limit(limit).offset(offset) with local_session() as session: shouts_result = session.execute(q).all() - # logger.info(f"Got {len(shouts_result) if shouts_result else 0} shouts from query") if not shouts_result: logger.warning("No shouts found in query result") @@ -203,7 +201,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0): shout = None if hasattr(row, "Shout"): shout = row.Shout - # logger.debug(f"Processing shout#{shout.id} at index {idx}") if shout: shout_id = int(f"{shout.id}") shout_dict = shout.dict() @@ -231,20 +228,16 @@ def get_shouts_with_links(info, q, limit=20, offset=0): topics = None if has_field(info, "topics") and hasattr(row, "topics"): topics = orjson.loads(row.topics) if isinstance(row.topics, str) else row.topics - # logger.debug(f"Shout#{shout_id} topics: {topics}") shout_dict["topics"] = topics if has_field(info, "main_topic"): main_topic = None if hasattr(row, "main_topic"): - # logger.debug(f"Raw main_topic for shout#{shout_id}: {row.main_topic}") main_topic = ( orjson.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic ) - # logger.debug(f"Parsed main_topic for shout#{shout_id}: {main_topic}") if not main_topic and topics and len(topics) > 0: - # logger.info(f"No main_topic found for shout#{shout_id}, using first topic from list") main_topic = { "id": topics[0]["id"], "title": topics[0]["title"], @@ -252,10 +245,8 @@ def get_shouts_with_links(info, q, limit=20, offset=0): "is_main": True, } elif not main_topic: - logger.debug(f"No main_topic and no topics found for shout#{shout_id}") main_topic = {"id": 0, "title": "no topic", "slug": "notopic", "is_main": True} shout_dict["main_topic"] = main_topic - logger.debug(f"Final main_topic for shout#{shout_id}: {main_topic}") if has_field(info, "authors") and hasattr(row, "authors"): shout_dict["authors"] = ( @@ -282,7 +273,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0): logger.error(f"Fatal error in get_shouts_with_links: {e}", exc_info=True) raise finally: - logger.info(f"Returning {len(shouts)} shouts from get_shouts_with_links") return shouts diff --git a/services/search.py b/services/search.py index f0907d53..b8036644 100644 --- a/services/search.py +++ b/services/search.py @@ -84,7 +84,6 @@ class SearchCache: if cached_data: all_results = json.loads(cached_data) logger.info(f"Retrieved search results for '{query}' from Redis") - # Redis TTL is auto-extended when setting the key with expiry except Exception as e: logger.error(f"Error retrieving search results from Redis: {e}") @@ -96,7 +95,7 @@ class SearchCache: # If not found in any cache if all_results is None: - logger.debug(f"Cache miss for query '{query}'") + logger.info(f"Cache miss for query '{query}'") return None # Return paginated subset @@ -314,7 +313,6 @@ class SearchService: # Media field processing remains the same media = getattr(shout, 'media', None) if media: - # Your existing media processing logic if isinstance(media, str): try: media_json = json.loads(media) @@ -334,7 +332,6 @@ class SearchService: text = " ".join(text_fields) if not text.strip(): - logger.debug(f"Skipping shout {shout.id}: no text content") total_skipped += 1 continue @@ -342,7 +339,6 @@ class SearchService: original_length = len(text) if original_length > MAX_TEXT_LENGTH: text = text[:MAX_TEXT_LENGTH] - logger.info(f"Truncated document {shout.id} from {original_length} to {MAX_TEXT_LENGTH} chars") total_truncated += 1 document = { @@ -403,10 +399,6 @@ class SearchService: # Process with retries while not success and retry_count < max_retries: try: - if batch: - sample = batch[0] - logger.info(f"Sample document in batch {batch_id}: id={sample['id']}, text_length={len(sample['text'])}") - logger.info(f"Sending batch {batch_id} of {len(batch)} documents to search service (attempt {retry_count+1})") response = await self.index_client.post( "/bulk-index", @@ -434,31 +426,25 @@ class SearchService: if retry_count < max_retries - 1: retry_count += 1 wait_time = (2 ** retry_count) + (random.random() * 0.5) # Exponential backoff with jitter - logger.warning(f"Server error for batch {batch_id}, retrying in {wait_time:.1f}s (attempt {retry_count+1}/{max_retries})") await asyncio.sleep(wait_time) continue # Final retry, split the batch elif len(batch) > 1: - logger.warning(f"Splitting batch {batch_id} after repeated failures") mid = len(batch) // 2 await self._process_single_batch(batch[:mid], f"{batch_id}-A") await self._process_single_batch(batch[mid:], f"{batch_id}-B") break else: # Can't split a single document - logger.error(f"Failed to index document {batch[0]['id']} after {max_retries} attempts") break # Normal success case response.raise_for_status() - result = response.json() - logger.info(f"Batch {batch_id} indexed successfully: {result}") success = True db_error_count = 0 # Reset error counter on success except Exception as e: - # Check if it looks like a database corruption error error_str = str(e).lower() if "duplicate key" in error_str or "unique constraint" in error_str or "nonetype" in error_str: db_error_count += 1 @@ -469,17 +455,12 @@ class SearchService: if retry_count < max_retries - 1: retry_count += 1 wait_time = (2 ** retry_count) + (random.random() * 0.5) - logger.warning(f"Error for batch {batch_id}, retrying in {wait_time:.1f}s: {str(e)[:200]}") await asyncio.sleep(wait_time) else: - # Last resort - try to split the batch if len(batch) > 1: - logger.warning(f"Splitting batch {batch_id} after exception: {str(e)[:200]}") mid = len(batch) // 2 await self._process_single_batch(batch[:mid], f"{batch_id}-A") await self._process_single_batch(batch[mid:], f"{batch_id}-B") - else: - logger.error(f"Failed to index document {batch[0]['id']} after {max_retries} attempts: {e}") break async def _process_single_batch(self, documents, batch_id): @@ -492,41 +473,30 @@ class SearchService: if not documents: return - logger.info(f"Processing sub-batch {batch_id} with {len(documents)} documents") response = await self.index_client.post( "/bulk-index", json=documents, timeout=90.0 ) response.raise_for_status() - result = response.json() - logger.info(f"Sub-batch {batch_id} indexed successfully: {result}") return # Success, exit the retry loop except Exception as e: error_str = str(e).lower() retry_count += 1 - # Check if it's a transient error that txtai might recover from internally if "dictionary changed size" in error_str or "transaction error" in error_str: wait_time = (2 ** retry_count) + (random.random() * 0.5) - logger.warning(f"Transient txtai error in sub-batch {batch_id}, waiting {wait_time:.1f}s for recovery: {str(e)[:200]}") await asyncio.sleep(wait_time) # Wait for txtai to recover - continue # Try again + continue - # For other errors or final retry failure - logger.error(f"Error indexing sub-batch {batch_id} (attempt {retry_count}/{max_retries}): {str(e)[:200]}") - - # Only try one-by-one on the final retry if retry_count >= max_retries and len(documents) > 1: - logger.info(f"Processing documents in sub-batch {batch_id} individually") for i, doc in enumerate(documents): try: resp = await self.index_client.post("/index", json=doc, timeout=30.0) resp.raise_for_status() - logger.info(f"Indexed document {doc['id']} individually") except Exception as e2: - logger.error(f"Failed to index document {doc['id']} individually: {str(e2)[:100]}") + pass return # Exit after individual processing attempt def _truncate_error_detail(self, error_detail): @@ -537,13 +507,11 @@ class SearchService: for i, item in enumerate(truncated_detail['detail']): if isinstance(item, dict) and 'input' in item: if isinstance(item['input'], dict) and any(k in item['input'] for k in ['documents', 'text']): - # Check for documents list if 'documents' in item['input'] and isinstance(item['input']['documents'], list): for j, doc in enumerate(item['input']['documents']): if 'text' in doc and isinstance(doc['text'], str) and len(doc['text']) > 100: item['input']['documents'][j]['text'] = f"{doc['text'][:100]}... [truncated, total {len(doc['text'])} chars]" - # Check for direct text field if 'text' in item['input'] and isinstance(item['input']['text'], str) and len(item['input']['text']) > 100: item['input']['text'] = f"{item['input']['text'][:100]}... [truncated, total {len(item['input']['text'])} chars]" @@ -552,11 +520,9 @@ class SearchService: async def search(self, text, limit, offset): """Search documents""" if not self.available: - logger.warning("Search not available") return [] if not isinstance(text, str) or not text.strip(): - logger.warning(f"Invalid search text: {text}") return [] logger.info(f"Searching for: '{text}' (limit={limit}, offset={offset})") @@ -567,7 +533,6 @@ class SearchService: if has_cache: cached_results = await self.cache.get(text, limit, offset) if cached_results is not None: - logger.info(f"Serving search results for '{text}' from cache (offset={offset}, limit={limit})") return cached_results # Not in cache or cache disabled, perform new search @@ -575,61 +540,40 @@ class SearchService: search_limit = limit search_offset = offset - # Always prefetch full results when caching is enabled if SEARCH_CACHE_ENABLED: - search_limit = SEARCH_PREFETCH_SIZE # Always fetch a large set - search_offset = 0 # Always start from beginning + search_limit = SEARCH_PREFETCH_SIZE + search_offset = 0 else: search_limit = limit search_offset = offset - logger.info(f"Sending search request: text='{text}', limit={search_limit}, offset={search_offset}") response = await self.client.post( "/search", json={"text": text, "limit": search_limit, "offset": search_offset} ) response.raise_for_status() - # logger.info(f"Raw search response: {response.text}") result = response.json() - # logger.info(f"Parsed search response: {result}") formatted_results = result.get("results", []) - # Filter out non-numeric IDs to prevent database errors valid_results = [] for item in formatted_results: doc_id = item.get("id") if doc_id and doc_id.isdigit(): valid_results.append(item) - else: - logger.warning(f"Filtered out non-numeric document ID: {doc_id}") if len(valid_results) != len(formatted_results): - logger.info(f"Filtered {len(formatted_results) - len(valid_results)} results with non-numeric IDs") formatted_results = valid_results - # Filter out low-score results if SEARCH_MIN_SCORE > 0: initial_count = len(formatted_results) formatted_results = [r for r in formatted_results if r.get("score", 0) >= SEARCH_MIN_SCORE] - if len(formatted_results) != initial_count: - logger.info(f"Filtered {initial_count - len(formatted_results)} results with score < {SEARCH_MIN_SCORE}") - - logger.info(f"Search for '{text}' returned {len(formatted_results)} valid results") - - if formatted_results: - logger.info(f"Sample result: {formatted_results[0]}") - else: - logger.warning(f"No results found for '{text}'") - if SEARCH_CACHE_ENABLED: - logger.info(f"Storing {len(formatted_results)} results in cache for query '{text}'") - await self.cache.store(text, formatted_results) # Return the proper page slice from the full results stored in cache end_idx = offset + limit + await self.cache.store(text, formatted_results) end_idx = offset + limit page_results = formatted_results[offset:end_idx] - logger.info(f"Returning results from {offset} to {end_idx} (of {len(formatted_results)} total)") return page_results return formatted_results @@ -646,9 +590,7 @@ class SearchService: response = await self.client.get("/index-status") response.raise_for_status() result = response.json() - logger.info(f"Index status check: {result['status']}, {result['documents_count']} documents") - # Log warnings for any inconsistencies if result.get("consistency", {}).get("status") != "ok": null_count = result.get("consistency", {}).get("null_embeddings_count", 0) if null_count > 0: @@ -674,126 +616,69 @@ async def get_search_count(text: str): if search_service.available and SEARCH_CACHE_ENABLED: if await search_service.cache.has_query(text): return await search_service.cache.get_total_count(text) - # If not cached, we'll need to perform the full search once results = await search_text(text, SEARCH_PREFETCH_SIZE, 0) return len(results) async def initialize_search_index(shouts_data): """Initialize search index with existing data during application startup""" if not SEARCH_ENABLED: - logger.info("Search indexing skipped (SEARCH_ENABLED=False)") return if not shouts_data: - logger.warning("No shouts data provided for search indexing") return - logger.info(f"Checking search index status for {len(shouts_data)} documents") - - # Get the current index info info = await search_service.info() if info.get("status") in ["error", "unavailable", "disabled"]: - logger.error(f"Cannot initialize search index: {info}") return - # Check if index has approximately right number of documents index_stats = info.get("index_stats", {}) indexed_doc_count = index_stats.get("document_count", 0) - # Add a more detailed status check index_status = await search_service.check_index_status() - if index_status.get("status") == "healthy": - logger.info("Index status check passed") - elif index_status.get("status") == "inconsistent": - logger.warning("Index status check found inconsistencies") - - # Get documents with null embeddings + if index_status.get("status") == "inconsistent": problem_ids = index_status.get("consistency", {}).get("null_embeddings_sample", []) if problem_ids: - logger.info(f"Repairing {len(problem_ids)} documents with NULL embeddings") problem_docs = [shout for shout in shouts_data if str(shout.id) in problem_ids] if problem_docs: await search_service.bulk_index(problem_docs) - # Log database document summary db_ids = [str(shout.id) for shout in shouts_data] - logger.info(f"Database contains {len(shouts_data)} documents. Sample IDs: {', '.join(db_ids[:5])}...") - # Calculate summary by ID range to understand the coverage try: - # Parse numeric IDs where possible to analyze coverage numeric_ids = [int(sid) for sid in db_ids if sid.isdigit()] if numeric_ids: min_id = min(numeric_ids) max_id = max(numeric_ids) id_range = max_id - min_id + 1 - coverage_pct = (len(numeric_ids) / id_range) * 100 if id_range > 0 else 0 - logger.info(f"ID range analysis: min_id={min_id}, max_id={max_id}, range={id_range}, " - f"coverage={coverage_pct:.1f}% ({len(numeric_ids)}/{id_range})") except Exception as e: - logger.warning(f"Could not analyze ID ranges: {e}") + pass - # If counts are significantly different, do verification if abs(indexed_doc_count - len(shouts_data)) > 10: - logger.info(f"Document count mismatch: {indexed_doc_count} in index vs {len(shouts_data)} in database. Verifying...") - - # Get all document IDs from your database doc_ids = [str(shout.id) for shout in shouts_data] - # Verify which ones are missing from the index verification = await search_service.verify_docs(doc_ids) if verification.get("status") == "error": - logger.error(f"Document verification failed: {verification.get('message')}") return - # Index only missing documents missing_ids = verification.get("missing", []) if missing_ids: - logger.info(f"Found {len(missing_ids)} documents missing from index. Indexing them...") - logger.info(f"Sample missing IDs: {', '.join(missing_ids[:10])}...") missing_docs = [shout for shout in shouts_data if str(shout.id) in missing_ids] await search_service.bulk_index(missing_docs) - else: - logger.info("All documents are already indexed.") else: - logger.info(f"Search index appears to be in sync ({indexed_doc_count} documents indexed).") + pass - # Optional sample verification (can be slow with large document sets) - # Uncomment if you want to periodically check a random sample even when counts match - """ - sample_size = 10 - if len(db_ids) > sample_size: - sample_ids = random.sample(db_ids, sample_size) - logger.info(f"Performing random sample verification on {sample_size} documents...") - verification = await search_service.verify_docs(sample_ids) - if verification.get("missing"): - missing_count = len(verification.get("missing", [])) - logger.warning(f"Random verification found {missing_count}/{sample_size} missing docs " - f"despite count match. Consider full verification.") - else: - logger.info("Random document sample verification passed.") - """ - - # Verify with test query try: test_query = "test" - logger.info(f"Verifying search index with query: '{test_query}'") test_results = await search_text(test_query, 5) if test_results: - logger.info(f"Search verification successful: found {len(test_results)} results") - # Log categories covered by search results categories = set() for result in test_results: result_id = result.get("id") matching_shouts = [s for s in shouts_data if str(s.id) == result_id] if matching_shouts and hasattr(matching_shouts[0], 'category'): categories.add(getattr(matching_shouts[0], 'category', 'unknown')) - if categories: - logger.info(f"Search results cover categories: {', '.join(categories)}") - else: - logger.warning("Search verification returned no results. Index may be empty or not working.") except Exception as e: - logger.error(f"Error verifying search index: {e}") + pass