Skip to content

Commit ba45f20

Browse files
committed
Implement limit push down
1 parent ea81659 commit ba45f20

6 files changed

Lines changed: 117 additions & 1 deletion

File tree

crates/iceberg/src/arrow/reader.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,10 @@ impl ArrowReader {
306306
record_batch_stream_builder.with_row_groups(selected_row_group_indices);
307307
}
308308

309+
if let Some(limit) = task.limit {
310+
record_batch_stream_builder = record_batch_stream_builder.with_limit(limit);
311+
}
312+
309313
// Build the batch stream and send all the RecordBatches that it generates
310314
// to the requester.
311315
let record_batch_stream =
@@ -1744,6 +1748,7 @@ message schema {
17441748
project_field_ids: vec![1],
17451749
predicate: Some(predicate.bind(schema, true).unwrap()),
17461750
deletes: vec![],
1751+
limit: None,
17471752
})]
17481753
.into_iter(),
17491754
)) as FileScanTaskStream;

crates/iceberg/src/scan/context.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub(crate) struct ManifestFileContext {
4242

4343
field_ids: Arc<Vec<i32>>,
4444
bound_predicates: Option<Arc<BoundPredicates>>,
45+
limit: Option<usize>,
4546
object_cache: Arc<ObjectCache>,
4647
snapshot_schema: SchemaRef,
4748
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
@@ -59,6 +60,7 @@ pub(crate) struct ManifestEntryContext {
5960
pub partition_spec_id: i32,
6061
pub snapshot_schema: SchemaRef,
6162
pub delete_file_index: Option<DeleteFileIndex>,
63+
pub limit: Option<usize>,
6264
}
6365

6466
impl ManifestFileContext {
@@ -74,6 +76,7 @@ impl ManifestFileContext {
7476
mut sender,
7577
expression_evaluator_cache,
7678
delete_file_index,
79+
limit,
7780
..
7881
} = self;
7982

@@ -89,6 +92,7 @@ impl ManifestFileContext {
8992
bound_predicates: bound_predicates.clone(),
9093
snapshot_schema: snapshot_schema.clone(),
9194
delete_file_index: delete_file_index.clone(),
95+
limit,
9296
};
9397

9498
sender
@@ -132,6 +136,8 @@ impl ManifestEntryContext {
132136
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),
133137

134138
deletes,
139+
140+
limit: self.limit,
135141
})
136142
}
137143
}
@@ -146,6 +152,7 @@ pub(crate) struct PlanContext {
146152
pub snapshot_schema: SchemaRef,
147153
pub case_sensitive: bool,
148154
pub predicate: Option<Arc<Predicate>>,
155+
pub limit: Option<usize>,
149156
pub snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
150157
pub object_cache: Arc<ObjectCache>,
151158
pub field_ids: Arc<Vec<i32>>,
@@ -265,6 +272,7 @@ impl PlanContext {
265272
manifest_file: manifest_file.clone(),
266273
bound_predicates,
267274
sender,
275+
limit: self.limit,
268276
object_cache: self.object_cache.clone(),
269277
snapshot_schema: self.snapshot_schema.clone(),
270278
field_ids: self.field_ids.clone(),

crates/iceberg/src/scan/mod.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use context::*;
2424
mod task;
2525

2626
use std::sync::Arc;
27+
use std::sync::atomic::{AtomicUsize, Ordering};
2728

2829
use arrow_array::RecordBatch;
2930
use futures::channel::mpsc::{Sender, channel};
@@ -64,6 +65,8 @@ pub struct TableScanBuilder<'a> {
6465
// is still being worked on but will switch to a default of true
6566
// once this work is complete
6667
delete_file_processing_enabled: bool,
68+
69+
limit: Option<usize>,
6770
}
6871

6972
impl<'a> TableScanBuilder<'a> {
@@ -83,9 +86,16 @@ impl<'a> TableScanBuilder<'a> {
8386
row_group_filtering_enabled: true,
8487
row_selection_enabled: false,
8588
delete_file_processing_enabled: false,
89+
limit: None,
8690
}
8791
}
8892

93+
/// Sets the maximum number of records to return
94+
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
95+
self.limit = limit;
96+
self
97+
}
98+
8999
/// Sets the desired size of batches in the response
90100
/// to something other than the default
91101
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
@@ -299,6 +309,7 @@ impl<'a> TableScanBuilder<'a> {
299309
snapshot_schema: schema,
300310
case_sensitive: self.case_sensitive,
301311
predicate: self.filter.map(Arc::new),
312+
limit: self.limit,
302313
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
303314
object_cache: self.table.object_cache(),
304315
field_ids: Arc::new(field_ids),
@@ -1441,6 +1452,83 @@ pub mod tests {
14411452
assert_eq!(int64_arr.value(0), 2);
14421453
}
14431454

1455+
#[tokio::test]
1456+
async fn test_limit() {
1457+
let mut fixture = TableTestFixture::new();
1458+
fixture.setup_manifest_files().await;
1459+
1460+
let mut builder = fixture.table.scan();
1461+
builder = builder.with_limit(Some(1));
1462+
let table_scan = builder.build().unwrap();
1463+
1464+
let batch_stream = table_scan.to_arrow().await.unwrap();
1465+
1466+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1467+
1468+
assert_eq!(batches.len(), 2);
1469+
assert_eq!(batches[0].num_rows(), 1);
1470+
assert_eq!(batches[1].num_rows(), 1);
1471+
1472+
let col = batches[0].column_by_name("x").unwrap();
1473+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1474+
assert_eq!(int64_arr.value(0), 1);
1475+
1476+
let col = batches[0].column_by_name("y").unwrap();
1477+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1478+
assert_eq!(int64_arr.value(0), 2);
1479+
}
1480+
1481+
#[tokio::test]
1482+
async fn test_limit_with_predicate() {
1483+
let mut fixture = TableTestFixture::new();
1484+
fixture.setup_manifest_files().await;
1485+
1486+
// Filter: y < 3
1487+
let mut builder = fixture.table.scan();
1488+
let predicate = Reference::new("y").greater_than(Datum::long(3));
1489+
builder = builder.with_filter(predicate).with_limit(Some(1));
1490+
let table_scan = builder.build().unwrap();
1491+
1492+
let batch_stream = table_scan.to_arrow().await.unwrap();
1493+
1494+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1495+
1496+
assert_eq!(batches.len(), 2);
1497+
assert_eq!(batches[0].num_rows(), 1);
1498+
assert_eq!(batches[1].num_rows(), 1);
1499+
}
1500+
1501+
#[tokio::test]
1502+
async fn test_limit_with_predicate_and_row_selection() {
1503+
let mut fixture = TableTestFixture::new();
1504+
fixture.setup_manifest_files().await;
1505+
1506+
// Filter: y < 3
1507+
let mut builder = fixture.table.scan();
1508+
let predicate = Reference::new("y").greater_than(Datum::long(3));
1509+
builder = builder
1510+
.with_filter(predicate)
1511+
.with_limit(Some(1))
1512+
.with_row_selection_enabled(true);
1513+
let table_scan = builder.build().unwrap();
1514+
1515+
let batch_stream = table_scan.to_arrow().await.unwrap();
1516+
1517+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1518+
1519+
assert_eq!(batches.len(), 2);
1520+
assert_eq!(batches[0].num_rows(), 1);
1521+
assert_eq!(batches[1].num_rows(), 1);
1522+
1523+
// let col = batches[0].column_by_name("x").unwrap();
1524+
// let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1525+
// assert_eq!(int64_arr.value(0), 1);
1526+
//
1527+
// let col = batches[0].column_by_name("y").unwrap();
1528+
// let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1529+
// assert_eq!(int64_arr.value(0), 2);
1530+
}
1531+
14441532
#[tokio::test]
14451533
async fn test_filter_on_arrow_gt_eq() {
14461534
let mut fixture = TableTestFixture::new();
@@ -1816,6 +1904,7 @@ pub mod tests {
18161904
record_count: Some(100),
18171905
data_file_format: DataFileFormat::Parquet,
18181906
deletes: vec![],
1907+
limit: None,
18191908
};
18201909
test_fn(task);
18211910

@@ -1831,6 +1920,7 @@ pub mod tests {
18311920
record_count: None,
18321921
data_file_format: DataFileFormat::Avro,
18331922
deletes: vec![],
1923+
limit: None,
18341924
};
18351925
test_fn(task);
18361926
}

crates/iceberg/src/scan/task.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ pub struct FileScanTask {
5757

5858
/// The list of delete files that may need to be applied to this data file
5959
pub deletes: Vec<FileScanTaskDeleteFile>,
60+
61+
/// Maximum number of records to return, None means no limit
62+
pub limit: Option<usize>,
6063
}
6164

6265
impl FileScanTask {

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ pub(crate) struct IcebergTableScan {
5151
projection: Option<Vec<String>>,
5252
/// Filters to apply to the table scan
5353
predicates: Option<Predicate>,
54+
/// Maximum number of records to return, None means no limit
55+
limit: Option<usize>,
5456
}
5557

5658
impl IcebergTableScan {
@@ -61,6 +63,7 @@ impl IcebergTableScan {
6163
schema: ArrowSchemaRef,
6264
projection: Option<&Vec<usize>>,
6365
filters: &[Expr],
66+
limit: Option<usize>,
6467
) -> Self {
6568
let output_schema = match projection {
6669
None => schema.clone(),
@@ -76,6 +79,7 @@ impl IcebergTableScan {
7679
plan_properties,
7780
projection,
7881
predicates,
82+
limit,
7983
}
8084
}
8185

@@ -127,6 +131,7 @@ impl ExecutionPlan for IcebergTableScan {
127131
self.snapshot_id,
128132
self.projection.clone(),
129133
self.predicates.clone(),
134+
self.limit,
130135
);
131136
let stream = futures::stream::once(fut).try_flatten();
132137

@@ -166,6 +171,7 @@ async fn get_batch_stream(
166171
snapshot_id: Option<i64>,
167172
column_names: Option<Vec<String>>,
168173
predicates: Option<Predicate>,
174+
limit: Option<usize>,
169175
) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
170176
let scan_builder = match snapshot_id {
171177
Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
@@ -179,6 +185,9 @@ async fn get_batch_stream(
179185
if let Some(pred) = predicates {
180186
scan_builder = scan_builder.with_filter(pred);
181187
}
188+
189+
scan_builder = scan_builder.with_limit(limit);
190+
182191
let table_scan = scan_builder.build().map_err(to_datafusion_error)?;
183192

184193
let stream = table_scan

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl TableProvider for IcebergTableProvider {
139139
_state: &dyn Session,
140140
projection: Option<&Vec<usize>>,
141141
filters: &[Expr],
142-
_limit: Option<usize>,
142+
limit: Option<usize>,
143143
) -> DFResult<Arc<dyn ExecutionPlan>> {
144144
// Get the latest table metadata from the catalog if it exists
145145
let table = if let Some(catalog) = &self.catalog {
@@ -158,6 +158,7 @@ impl TableProvider for IcebergTableProvider {
158158
self.schema.clone(),
159159
projection,
160160
filters,
161+
limit,
161162
)))
162163
}
163164

0 commit comments

Comments
 (0)