Skip to content

Conversation

@huleilei
Copy link
Contributor

@huleilei huleilei commented Jan 8, 2026

Changes Made

Related Issues

@github-actions github-actions bot added the feat label Jan 8, 2026
@huleilei huleilei marked this pull request as draft January 8, 2026 07:45
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

Changed count pushdown implementation to distribute work across fragments instead of using a single task. The count operation now splits fragments into chunks (default 500 or configurable via fragment_group_size) and creates multiple scan tasks that each count a subset of fragments. The partial counts are then aggregated during query execution.

Key changes:

  • Modified _lancedb_count_result_function to accept fragment_ids parameter for counting specific fragment chunks
  • Updated _create_count_rows_scan_task to iterate through fragments and create multiple tasks
  • Each task now returns a partial count that will be aggregated later

Benefits:

  • Better parallelization for count operations on large datasets
  • Leverages distributed execution for count pushdown
  • Consistent with fragment grouping behavior in regular scan tasks

Confidence Score: 4/5

  • This PR is safe to merge with minimal risk
  • The changes are well-structured and follow existing patterns in the codebase. The implementation correctly adds fragment-wise distribution for count pushdown, and the fallback to dataset-level count when fragment_ids is None ensures backward compatibility. The chunking logic is consistent with similar implementations elsewhere in the file.
  • No files require special attention

Important Files Changed

File Analysis

Filename Score Overview
daft/io/lance/lance_scan.py 4/5 Added distributed fragment-wise count pushdown with chunking logic for better distributed processing

Sequence Diagram

sequenceDiagram
    participant Client
    participant LanceDBScanOperator
    participant _create_count_rows_scan_task
    participant ScanTask
    participant _lancedb_count_result_function
    participant LanceDataset
    
    Client->>LanceDBScanOperator: to_scan_tasks(pushdowns with count)
    LanceDBScanOperator->>_create_count_rows_scan_task: Create count scan tasks
    _create_count_rows_scan_task->>LanceDataset: get_fragments()
    LanceDataset-->>_create_count_rows_scan_task: All fragments
    
    loop For each chunk of fragments
        _create_count_rows_scan_task->>_create_count_rows_scan_task: Split into chunks (size 500 or fragment_group_size)
        _create_count_rows_scan_task->>ScanTask: Create task with fragment_ids
        ScanTask-->>LanceDBScanOperator: ScanTask
    end
    
    LanceDBScanOperator-->>Client: Iterator of ScanTasks
    
    Note over Client,LanceDataset: During execution (distributed)
    
    loop For each ScanTask
        Client->>_lancedb_count_result_function: Execute with fragment_ids chunk
        _lancedb_count_result_function->>LanceDataset: get_fragment(fid) for each fragment
        loop For each fragment in chunk
            _lancedb_count_result_function->>LanceDataset: fragment.count_rows(filter)
            LanceDataset-->>_lancedb_count_result_function: Row count
        end
        _lancedb_count_result_function->>_lancedb_count_result_function: Sum counts
        _lancedb_count_result_function-->>Client: RecordBatch with partial count
    end
    
    Client->>Client: Aggregate all partial counts
Loading

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant