Skip to content

Conversation

@bfoley13
Copy link
Collaborator

Reason for Change:
This is a PR in line with the AutoIndexer Proposal that create the AutoIndexer service that will run as a job within users clusters. The proposal lays out the definitions and reasons for them. This PR includes the initial service with support for Static data source types, PDF handling, and k8s AutoIndexer CRD status updates.

Requirements

  • added unit tests and e2e tests (if applicable).

Issue Fixed:

Notes for Reviewers:

@kaito-pr-agent
Copy link

Title

feat: Implement static data source handler with PDF support and unit tests


Description

  • Added StaticDataSourceHandler for static data sources with PDF extraction

  • Implemented comprehensive unit tests for static data source handling

  • Created KAITORAGClient for RAG engine API interactions

  • Added utility functions for file extension language detection


Changes walkthrough 📝

Relevant files
Tests
1 files
test_static_handler.py
Added comprehensive unit tests for StaticDataSourceHandler
+606/-0 
Enhancement
3 files
static_handler.py
Implemented static data source handler with PDF extraction
+619/-0 
rag_client.py
Created RAG client for API interactions                                   
+136/-0 
utils.py
Added utility functions for file extension handling           
+27/-0   
Miscellaneous
1 files
__init__.py
Added package initialization for tests                                     
+17/-0   
Configuration changes
1 files
config.py
Added environment variable configuration                                 
+13/-0   
Additional files
15 files
Makefile +19/-0   
Dockerfile +28/-0   
__init__.py +14/-0   
__init__.py +13/-0   
git_handler.py +58/-0   
handler.py +44/-0   
__init__.py +13/-0   
k8s_client.py +343/-0 
main.py +364/-0 
__init__.py +13/-0   
requirements-test.txt +5/-0     
requirements.txt +7/-0     
conftest.py +54/-0   
test_k8s_client.py +474/-0 
test_main_integration.py +534/-0 

Need help?
  • Type /help how to ... in the comments thread for any questions about PR-Agent usage.
  • Check out the documentation for more information.
  • @kaito-pr-agent
    Copy link

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 PR contains tests
    🔒 Security concerns

    SSRF Risk:
    The _fetch_content_from_url method allows fetching content from arbitrary URLs. If the AutoIndexer runs in an environment with access to internal services, this could be used to access sensitive internal endpoints.
    Credential Exposure: The code handles credentials passed in configuration. Ensure credentials are stored and passed securely (e.g., via Kubernetes secrets) to avoid exposure in logs or status updates.

    ⚡ Recommended focus areas for review

    Error Handling

    The error handling in update_index() catches Exception which is too broad. Consider catching more specific exceptions to avoid masking unexpected errors.

    except DataSourceError as e:
        error_msg = f"Data source error: {e}"
        logger.error(error_msg)
        self.errors.append(error_msg)
    except Exception as e:
        error_msg = f"Unexpected error during indexing: {e}"
        logger.error(error_msg)
        self.errors.append(error_msg)
    PDF Extraction

    The PDF text extraction logic has multiple fallback mechanisms but lacks proper cleanup of resources like BytesIO streams, which could cause resource leaks.

    extracted_text = ""
    
    # Try PyPDF2 first (lighter weight)
    try:
        import PyPDF2
    
        pdf_stream = io.BytesIO(raw_content)
        pdf_reader = PyPDF2.PdfReader(pdf_stream)
    
        logger.info(f"PDF from {url} has {len(pdf_reader.pages)} pages")
    
        text_parts = []
        for page_num, page in enumerate(pdf_reader.pages, 1):
            try:
                page_text = page.extract_text()
                if page_text.strip():
                    text_parts.append(f"--- Page {page_num} ---\n{page_text.strip()}")
                    logger.debug(f"Extracted {len(page_text)} characters from page {page_num}")
            except Exception as e:
                logger.warning(f"Failed to extract text from page {page_num} of {url}: {e}")
                continue
    
        if text_parts:
            extracted_text = "\n\n".join(text_parts)
            logger.info(f"Successfully extracted {len(extracted_text)} characters from PDF using PyPDF2")
        else:
            logger.warning(f"No text extracted from PDF {url} using PyPDF2, trying alternative method")
    
    except ImportError:
        logger.debug("PyPDF2 not available")
    except Exception as e:
        logger.warning(f"PyPDF2 extraction failed for {url}: {e}, trying alternative method")
    
    # If PyPDF2 didn't work or extracted no text, try pdfplumber
    if not extracted_text.strip():
        try:
            import pdfplumber
    
            pdf_stream = io.BytesIO(raw_content)
    
            with pdfplumber.open(pdf_stream) as pdf:
                logger.info(f"PDF from {url} has {len(pdf.pages)} pages (pdfplumber)")
    
                text_parts = []
                for page_num, page in enumerate(pdf.pages, 1):
                    try:
                        page_text = page.extract_text()
                        if page_text and page_text.strip():
                            text_parts.append(f"--- Page {page_num} ---\n{page_text.strip()}")
                            logger.debug(f"Extracted {len(page_text)} characters from page {page_num} using pdfplumber")
    
                        # Also try to extract tables if available
                        tables = page.extract_tables()
                        if tables:
                            for table_num, table in enumerate(tables, 1):
                                try:
                                    # Convert table to text format
                                    table_text = self._table_to_text(table)
                                    if table_text.strip():
                                        text_parts.append(f"--- Page {page_num} Table {table_num} ---\n{table_text.strip()}")
                                except Exception as e:
                                    logger.debug(f"Failed to extract table {table_num} from page {page_num}: {e}")
    
                    except Exception as e:
                        logger.warning(f"Failed to extract text from page {page_num} of {url} using pdfplumber: {e}")
                        continue
    
                if text_parts:
                    extracted_text = "\n\n".join(text_parts)
                    logger.info(f"Successfully extracted {len(extracted_text)} characters from PDF using pdfplumber")
    
        except ImportError:
            logger.debug("pdfplumber not available")
        except Exception as e:
            logger.error(f"pdfplumber extraction failed for {url}: {e}")
    
    # Final validation
    if not extracted_text.strip():
        logger.error(f"Failed to extract any text from PDF {url}")
        raise DataSourceError(f"Unable to extract text from PDF: {url}")
    
    logger.info(f"Successfully extracted {len(extracted_text)} characters from PDF {url}")
    return extracted_text
    Encoding Handling

    The content decoding logic attempts multiple encodings but doesn't properly handle cases where chardet fails to detect encoding with low confidence.

    def _decode_content(self, raw_content: bytes, content_type: str, url: str) -> str:
        """
        Decode raw content bytes to string, handling various encodings.
    
        Args:
            raw_content: Raw bytes content
            content_type: HTTP content type header
            url: Original URL for context
    
        Returns:
            str: Decoded text content
        """
        # If content is empty
        if not raw_content:
            logger.warning(f"Empty content received from {url}")
            return ""
    
        # Try to determine encoding from content-type header
        encoding = None
        if 'charset=' in content_type:
            with contextlib.suppress(Exception):
                encoding = content_type.split('charset=')[1].split(';')[0].strip()
    
        # List of encodings to try in order
        encodings_to_try = []
    
        if encoding:
            encodings_to_try.append(encoding)
    
        # Add common encodings
        encodings_to_try.extend(['utf-8', 'utf-8-sig', 'latin1', 'cp1252', 'iso-8859-1'])
    
        # Try chardet detection if available
        try:
            detected = chardet.detect(raw_content)
            if detected and detected.get('encoding') and detected.get('confidence', 0) > 0.7:
                detected_encoding = detected['encoding']
                if detected_encoding not in encodings_to_try:
                    encodings_to_try.insert(1, detected_encoding)
        except Exception as e:
            logger.debug(f"Chardet detection failed for {url}: {e}")
    
        # Try each encoding
        for enc in encodings_to_try:
            try:
                decoded_content = raw_content.decode(enc)
                logger.debug(f"Successfully decoded content from {url} using encoding: {enc}")
    
                # Handle specific content types
                if 'application/json' in content_type:
                    try:
                        json_data = json.loads(decoded_content)
                        # Try to extract text content from JSON
                        if isinstance(json_data, dict):
                            if 'content' in json_data:
                                return str(json_data['content'])
                            elif 'text' in json_data:
                                return str(json_data['text'])
                            elif 'body' in json_data:
                                return str(json_data['body'])
                        # Return formatted JSON as string
                        return json.dumps(json_data, indent=2, ensure_ascii=False)
                    except json.JSONDecodeError:
                        # If JSON parsing fails, return as plain text
                        pass
    
                return decoded_content
    
            except UnicodeDecodeError:
                continue
            except Exception as e:
                logger.debug(f"Failed to decode with {enc}: {e}")
                continue
    
        # If all encodings fail, try with error handling
        try:
            content = raw_content.decode('utf-8', errors='replace')
            logger.warning(f"Used UTF-8 with error replacement for {url}")
            return content
        except Exception as e:
            logger.error(f"Failed to decode content from {url}: {e}")
            raise DataSourceError(f"Unable to decode content from {url}: {e}")

    @kaito-pr-agent
    Copy link

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Impact
    Possible issue
    Continue processing after URL errors

    Re-raising the exception after appending it to errors will terminate URL processing
    prematurely. Remove the re-raise to continue processing other URLs after logging the
    error.

    presets/autoindexer/data_source_handler/static_handler.py [178-181]

     except Exception as e:
    -    logger.error(f"Failed to fetch content from {url}: {e}")
    -    self.errors.append(f"Failed to fetch content from {url}: {e}")
    -    raise DataSourceError(f"Failed to fetch content from {url}: {e}")
    +    error_msg = f"Failed to fetch content from {url}: {e}"
    +    logger.error(error_msg)
    +    self.errors.append(error_msg)
    Suggestion importance[1-10]: 9

    __

    Why: Fixes a logic error where one URL failure would halt entire indexing process, significantly improving resilience.

    High
    Handle missing AutoIndexer CRD

    The code accesses current_ai["status"] without checking if current_ai exists. If
    get_autoindexer() returns None, this will raise a KeyError. Add a guard clause to
    handle the case where the AutoIndexer CRD is not found.

    presets/autoindexer/data_source_handler/static_handler.py [84-85]

     current_ai = self.autoindexer_client.get_autoindexer()
    +if not current_ai:
    +    logger.error("AutoIndexer CRD not found, cannot update status")
    +    return
     current_status = current_ai["status"]
    Suggestion importance[1-10]: 8

    __

    Why: Prevents potential KeyError crash when AutoIndexer CRD is missing, which is a critical error handling improvement.

    Medium
    Improve file size limit test accuracy

    The test uses a fixed 6MB chunk to simulate large content, but this doesn't account
    for the configurable max_file_size (5MB). Instead, generate a chunk that exactly
    exceeds the configured limit to ensure the size validation logic is properly tested.

    presets/autoindexer/tests/test_static_handler.py [271-276]

    -mock_response = Mock()
    -mock_response.raise_for_status.return_value = None
    -mock_response.headers = {'content-type': 'text/plain'}
    -# Create a large chunk that exceeds the limit
    -large_chunk = b'x' * (6 * 1024 * 1024)  # 6MB chunk
    -mock_response.iter_content.return_value = [large_chunk]
    +max_size = valid_config["max_file_size"]
    +large_chunk = b'x' * (max_size + 1)  # Exceed by 1 byte
    Suggestion importance[1-10]: 7

    __

    Why: The suggestion improves test accuracy by using the configurable max_file_size value instead of hardcoded 6MB, making the test more maintainable and realistic.

    Medium
    Validate specific exception message

    The test expects an exception during service initialization but doesn't specify the
    exact exception type. This could lead to false positives if unrelated exceptions
    occur. Use a more specific exception check to ensure only the intended error is
    caught.

    presets/autoindexer/tests/test_main_integration.py [342-349]

     def test_status_updates_without_k8s_client(self, valid_env_vars, mock_rag_client, mock_static_handler):
         """Test status update methods when K8s client is not available."""
         with patch('autoindexer.main.AutoIndexerK8sClient') as mock_class:
             mock_class.side_effect = Exception("K8s unavailable")
             
             with patch.dict(os.environ, valid_env_vars), \
    -             pytest.raises(Exception):
    +             pytest.raises(Exception, match="K8s unavailable"):
                 AutoIndexerService()
    Suggestion importance[1-10]: 7

    __

    Why: Adding exception message validation improves test precision by ensuring only intended errors are caught, making the test more robust.

    Medium
    Verify config fallback behavior

    The test doesn't verify that kubeconfig loading was called after incluster config
    fails. Add an assertion to confirm the fallback mechanism is properly triggered when
    incluster config fails.

    presets/autoindexer/tests/test_k8s_client.py [110-118]

     def test_init_kubeconfig_fallback(self, mock_k8s_config, mock_custom_api, mock_core_api):
         # Mock incluster config to fail, kubeconfig to succeed
         mock_k8s_config.load_incluster_config.side_effect = ConfigException("Not in cluster")
         mock_k8s_config.load_kube_config.return_value = None  # Success
         
         with patch('autoindexer.k8s.k8s_client.NAMESPACE', 'test-ns'), \
              patch('autoindexer.k8s.k8s_client.AUTOINDEXER_NAME', 'test-ai'):
             client = AutoIndexerK8sClient()
    +        
    +    # Verify fallback was triggered
    +    mock_k8s_config.load_kube_config.assert_called_once()
    Suggestion importance[1-10]: 7

    __

    Why: Adding assertion for config fallback behavior improves test completeness by verifying the fallback mechanism works as expected.

    Medium
    Strengthen error message validation

    The test doesn't verify the specific error message when no URLs are provided. Add an
    exact error message check to ensure the handler returns the expected error for this
    specific failure case.

    presets/autoindexer/tests/test_static_handler.py [228-232]

    -handler = StaticDataSourceHandler("test-index", config, mock_rag_client, mock_autoindexer_client)
     errors = handler.update_index()
    +assert errors == ["No documents fetched from static data source"]
     
    -assert len(errors) == 1
    -assert "No documents fetched" in errors[0]
    -
    Suggestion importance[1-10]: 6

    __

    Why: The suggestion enhances test precision by verifying the exact error message content, which improves test reliability but has moderate impact since the current test already checks key error components.

    Low
    Use config values in tests

    The test hardcodes a 20MB value instead of using the config's max_file_size. Derive
    the test value from the configuration to ensure consistency and avoid magic numbers.

    presets/autoindexer/tests/test_static_handler.py [260]

    -mock_response.headers = {'content-length': str(20 * 1024 * 1024)}  # 20MB
    +oversized = valid_config["max_file_size"] * 4  # 20MB when max=5MB
    +mock_response.headers = {'content-length': str(oversized)}
    Suggestion importance[1-10]: 6

    __

    Why: The suggestion improves test maintainability by deriving test values from configuration rather than using magic numbers.

    Low
    Track status update failures

    The status update failure is logged but not handled. Add the error to self.errors to
    ensure it's properly reported in the indexing results.

    presets/autoindexer/data_source_handler/static_handler.py [125-126]

     if not self.autoindexer_client.update_autoindexer_status(status_update, update_success_or_failure=True):
    -    logger.error("Failed to update AutoIndexer status")
    +    error_msg = "Failed to update AutoIndexer status"
    +    logger.error(error_msg)
    +    self.errors.append(error_msg)
    Suggestion importance[1-10]: 6

    __

    Why: Ensures status update failures are properly tracked in error logs, improving error reporting consistency.

    Low
    Ensure time mocking covers all calls

    The test mocks time.time with multiple calls but doesn't account for the exact
    number of calls needed during the run. This could cause flakiness if the actual
    number of time calls differs. Instead, explicitly define the time sequence to match
    expected call patterns during the test execution.

    presets/autoindexer/tests/test_main_integration.py [252-262]

     with patch.dict(os.environ, valid_env_vars):
         service = AutoIndexerService()
         
         # Mock exception during indexing
         service.data_source_handler.update_index.side_effect = Exception("Unexpected error")
         service.rag_client.list_documents.return_value = {"total": 0}
         
    -    with patch('time.time', side_effect=[1000.0] + [1020.0] * 10):  # 20 second duration, multiple calls
    +    # Define explicit time sequence: start + 10 intervals
    +    time_sequence = [1000.0] + [1020.0] * 12  # Allow extra calls
    +    with patch('time.time', side_effect=time_sequence):
             result = service.run()
         
         assert result is False
    Suggestion importance[1-10]: 6

    __

    Why: The suggestion improves test reliability by ensuring time mocking covers all calls, but it only addresses test flakiness rather than production functionality.

    Low
    Improve file size validation

    The size check occurs after processing each chunk, but large files could exceed the
    limit before being detected. Move the size check before appending to immediately
    reject oversized files.

    presets/autoindexer/data_source_handler/static_handler.py [280-285]

     for chunk in response.iter_content(chunk_size=8192, decode_unicode=False):
         if chunk:
    -        total_size += len(chunk)
    -        if total_size > max_size:
    +        chunk_size = len(chunk)
    +        if total_size + chunk_size > max_size:
                 raise DataSourceError(f"File too large: exceeds limit of {max_size} bytes")
    +        total_size += chunk_size
             content_chunks.append(chunk)
    Suggestion importance[1-10]: 5

    __

    Why: Provides more accurate size validation but offers marginal improvement since file would be rejected anyway.

    Low

    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

    Projects

    Status: No status

    Development

    Successfully merging this pull request may close these issues.

    1 participant