Skip to content

Commit 5a097f0

Browse files
authored
feat: exec pushdown with global aggregates (#124)
* Exec pushdown: internal __lance_exec for global aggregates Add a small ExecIR container (reusing existing FilterIR bytes) and an optimizer rewrite that replaces ungrouped aggregates over Lance scans with the internal __lance_exec table function. The Rust side validates the IR and executes it via DataFusion on top of the Lance scanner, returning a (typically tiny) Arrow stream back to DuckDB. The rewrite is always enabled but remains conservative: on any encode/validate/type mismatch it falls back to the original plan. Tests: GEN=ninja make test_debug * Fix ci * Fix exec pushdown on table_filters-keyed scans LogicalGet.table_filters uses column IDs as map keys, not scan column positions. Exec pushdown previously fed these filters into BuildLanceTableFilterIRParts with a column_ids vector sized to the projected columns, causing all_filters_pushed=false and a silent fallback for TPC-H Q6. Fix by using an identity column_ids mapping sized by the max filter column id and collecting extra scan columns by column id. Add a sqllogictest that asserts Q6 rewrites to __lance_exec. Tests: GEN=ninja make test_debug * Enable always-on exec pushdown for group/order aggregates * Fix sqllogictest expectations under always-on exec pushdown * ExecIR: encode CAST to preserve GROUP BY semantics * Add formatting requirement to AGENTS
1 parent 1fec57c commit 5a097f0

25 files changed

Lines changed: 2785 additions & 19 deletions

AGENTS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ This repository is configured with `uv`, so you can run formatting via:
6060
uv run make format
6161
```
6262

63+
PR requirement: Before submitting a PR, run `uv run make format` and commit any resulting formatting changes (as a separate commit if it helps review).
64+
6365
### Testing
6466

6567
The `release` build can be slow. For fast iteration, prefer `test_debug` when available.

CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ set(EXTENSION_SOURCES src/lance_extension.cpp src/lance_scan.cpp
1313
src/lance_search.cpp src/lance_common.cpp
1414
src/lance_secrets.cpp
1515
src/lance_filter_ir.cpp
16+
src/lance_exec_ir.cpp
17+
src/lance_logical_exec.cpp
1618
src/lance_storage.cpp
1719
src/lance_metadata.cpp
1820
src/lance_replacement.cpp
@@ -117,10 +119,13 @@ set(RUST_FFI_DEPENDS
117119
${CMAKE_CURRENT_LIST_DIR}/Cargo.toml
118120
${CMAKE_CURRENT_LIST_DIR}/Cargo.lock
119121
${CMAKE_CURRENT_LIST_DIR}/rust/constants.rs
122+
${CMAKE_CURRENT_LIST_DIR}/rust/datafusion_stream.rs
120123
${CMAKE_CURRENT_LIST_DIR}/rust/error.rs
124+
${CMAKE_CURRENT_LIST_DIR}/rust/exec_ir.rs
121125
${CMAKE_CURRENT_LIST_DIR}/rust/filter_ir.rs
122126
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/arrow_export.rs
123127
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/dataset.rs
128+
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/exec.rs
124129
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/index.rs
125130
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/knn.rs
126131
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/mod.rs

rust/datafusion_stream.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use arrow::array::RecordBatch;
2+
use arrow::datatypes::SchemaRef;
3+
use datafusion::physical_plan::SendableRecordBatchStream;
4+
use futures::StreamExt;
5+
use tokio::runtime::Handle;
6+
7+
pub struct DataFusionStream {
8+
handle: Handle,
9+
stream: SendableRecordBatchStream,
10+
}
11+
12+
impl DataFusionStream {
13+
pub fn try_new(stream: SendableRecordBatchStream) -> Result<Self, Box<dyn std::error::Error>> {
14+
let handle = crate::runtime::handle()?;
15+
Ok(Self { handle, stream })
16+
}
17+
18+
pub fn schema(&self) -> SchemaRef {
19+
self.stream.schema()
20+
}
21+
22+
pub fn next(&mut self) -> Result<Option<RecordBatch>, datafusion_common::DataFusionError> {
23+
self.handle.block_on(async {
24+
let next = self.stream.next().await;
25+
match next {
26+
Some(Ok(batch)) => Ok(Some(batch)),
27+
Some(Err(err)) => Err(err),
28+
None => Ok(None),
29+
}
30+
})
31+
}
32+
}

rust/error.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub enum ErrorCode {
6565
NamespaceDescribeTableInfo = 49,
6666
NamespaceCreateEmptyTable = 50,
6767
NamespaceDropTable = 51,
68+
Exec = 52,
6869
}
6970

7071
struct LastError {

0 commit comments

Comments
 (0)