Skip to content

Commit 11a6be2

Browse files
authored
Implement limit push down (#19)
* Implement limit push down * Minor fixes
1 parent cc4f03f commit 11a6be2

6 files changed

Lines changed: 163 additions & 1 deletion

File tree

crates/iceberg/src/arrow/reader.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,10 @@ impl ArrowReader {
306306
record_batch_stream_builder.with_row_groups(selected_row_group_indices);
307307
}
308308

309+
if let Some(limit) = task.limit {
310+
record_batch_stream_builder = record_batch_stream_builder.with_limit(limit);
311+
}
312+
309313
// Build the batch stream and send all the RecordBatches that it generates
310314
// to the requester.
311315
let record_batch_stream =
@@ -1744,6 +1748,7 @@ message schema {
17441748
project_field_ids: vec![1],
17451749
predicate: Some(predicate.bind(schema, true).unwrap()),
17461750
deletes: vec![],
1751+
limit: None,
17471752
})]
17481753
.into_iter(),
17491754
)) as FileScanTaskStream;

crates/iceberg/src/scan/context.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub(crate) struct ManifestFileContext {
4242

4343
field_ids: Arc<Vec<i32>>,
4444
bound_predicates: Option<Arc<BoundPredicates>>,
45+
limit: Option<usize>,
4546
object_cache: Arc<ObjectCache>,
4647
snapshot_schema: SchemaRef,
4748
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
@@ -59,6 +60,7 @@ pub(crate) struct ManifestEntryContext {
5960
pub partition_spec_id: i32,
6061
pub snapshot_schema: SchemaRef,
6162
pub delete_file_index: Option<DeleteFileIndex>,
63+
pub limit: Option<usize>,
6264
}
6365

6466
impl ManifestFileContext {
@@ -74,6 +76,7 @@ impl ManifestFileContext {
7476
mut sender,
7577
expression_evaluator_cache,
7678
delete_file_index,
79+
limit,
7780
..
7881
} = self;
7982

@@ -89,6 +92,7 @@ impl ManifestFileContext {
8992
bound_predicates: bound_predicates.clone(),
9093
snapshot_schema: snapshot_schema.clone(),
9194
delete_file_index: delete_file_index.clone(),
95+
limit,
9296
};
9397

9498
sender
@@ -132,6 +136,8 @@ impl ManifestEntryContext {
132136
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),
133137

134138
deletes,
139+
140+
limit: self.limit,
135141
})
136142
}
137143
}
@@ -146,6 +152,7 @@ pub(crate) struct PlanContext {
146152
pub snapshot_schema: SchemaRef,
147153
pub case_sensitive: bool,
148154
pub predicate: Option<Arc<Predicate>>,
155+
pub limit: Option<usize>,
149156
pub snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
150157
pub object_cache: Arc<ObjectCache>,
151158
pub field_ids: Arc<Vec<i32>>,
@@ -265,6 +272,7 @@ impl PlanContext {
265272
manifest_file: manifest_file.clone(),
266273
bound_predicates,
267274
sender,
275+
limit: self.limit,
268276
object_cache: self.object_cache.clone(),
269277
snapshot_schema: self.snapshot_schema.clone(),
270278
field_ids: self.field_ids.clone(),

crates/iceberg/src/scan/mod.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ pub struct TableScanBuilder<'a> {
6464
// is still being worked on but will switch to a default of true
6565
// once this work is complete
6666
delete_file_processing_enabled: bool,
67+
68+
limit: Option<usize>,
6769
}
6870

6971
impl<'a> TableScanBuilder<'a> {
@@ -83,9 +85,16 @@ impl<'a> TableScanBuilder<'a> {
8385
row_group_filtering_enabled: true,
8486
row_selection_enabled: false,
8587
delete_file_processing_enabled: false,
88+
limit: None,
8689
}
8790
}
8891

92+
/// Sets the maximum number of records to return
93+
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
94+
self.limit = limit;
95+
self
96+
}
97+
8998
/// Sets the desired size of batches in the response
9099
/// to something other than the default
91100
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
@@ -299,6 +308,7 @@ impl<'a> TableScanBuilder<'a> {
299308
snapshot_schema: schema,
300309
case_sensitive: self.case_sensitive,
301310
predicate: self.filter.map(Arc::new),
311+
limit: self.limit,
302312
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
303313
object_cache: self.table.object_cache(),
304314
field_ids: Arc::new(field_ids),
@@ -1441,6 +1451,130 @@ pub mod tests {
14411451
assert_eq!(int64_arr.value(0), 2);
14421452
}
14431453

1454+
#[tokio::test]
1455+
async fn test_limit() {
1456+
let mut fixture = TableTestFixture::new();
1457+
fixture.setup_manifest_files().await;
1458+
1459+
let mut builder = fixture.table.scan();
1460+
builder = builder.with_limit(Some(1));
1461+
let table_scan = builder.build().unwrap();
1462+
1463+
let batch_stream = table_scan.to_arrow().await.unwrap();
1464+
1465+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1466+
1467+
assert_eq!(batches.len(), 2);
1468+
assert_eq!(batches[0].num_rows(), 1);
1469+
assert_eq!(batches[1].num_rows(), 1);
1470+
1471+
let col = batches[0].column_by_name("x").unwrap();
1472+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1473+
assert_eq!(int64_arr.value(0), 1);
1474+
1475+
let col = batches[0].column_by_name("y").unwrap();
1476+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1477+
assert_eq!(int64_arr.value(0), 2);
1478+
1479+
let col = batches[0].column_by_name("x").unwrap();
1480+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1481+
assert_eq!(int64_arr.value(0), 1);
1482+
1483+
let col = batches[0].column_by_name("y").unwrap();
1484+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1485+
assert_eq!(int64_arr.value(0), 2);
1486+
}
1487+
1488+
#[tokio::test]
1489+
async fn test_limit_with_predicate() {
1490+
let mut fixture = TableTestFixture::new();
1491+
fixture.setup_manifest_files().await;
1492+
1493+
// Filter: y < 3
1494+
let mut builder = fixture.table.scan();
1495+
let predicate = Reference::new("y").greater_than(Datum::long(3));
1496+
builder = builder.with_filter(predicate).with_limit(Some(1));
1497+
let table_scan = builder.build().unwrap();
1498+
1499+
let batch_stream = table_scan.to_arrow().await.unwrap();
1500+
1501+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1502+
1503+
assert_eq!(batches.len(), 2);
1504+
assert_eq!(batches[0].num_rows(), 1);
1505+
assert_eq!(batches[1].num_rows(), 1);
1506+
1507+
let col = batches[0].column_by_name("x").unwrap();
1508+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1509+
assert_eq!(int64_arr.value(0), 1);
1510+
1511+
let col = batches[0].column_by_name("y").unwrap();
1512+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1513+
assert_eq!(int64_arr.value(0), 4);
1514+
}
1515+
1516+
#[tokio::test]
1517+
async fn test_limit_with_predicate_and_row_selection() {
1518+
let mut fixture = TableTestFixture::new();
1519+
fixture.setup_manifest_files().await;
1520+
1521+
// Filter: y < 3
1522+
let mut builder = fixture.table.scan();
1523+
let predicate = Reference::new("y").greater_than(Datum::long(3));
1524+
builder = builder
1525+
.with_filter(predicate)
1526+
.with_limit(Some(1))
1527+
.with_row_selection_enabled(true);
1528+
let table_scan = builder.build().unwrap();
1529+
1530+
let batch_stream = table_scan.to_arrow().await.unwrap();
1531+
1532+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1533+
1534+
assert_eq!(batches.len(), 2);
1535+
assert_eq!(batches[0].num_rows(), 1);
1536+
assert_eq!(batches[1].num_rows(), 1);
1537+
1538+
let col = batches[0].column_by_name("x").unwrap();
1539+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1540+
assert_eq!(int64_arr.value(0), 1);
1541+
1542+
let col = batches[0].column_by_name("y").unwrap();
1543+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1544+
assert_eq!(int64_arr.value(0), 4);
1545+
}
1546+
1547+
#[tokio::test]
1548+
async fn test_limit_higher_than_total_rows() {
1549+
let mut fixture = TableTestFixture::new();
1550+
fixture.setup_manifest_files().await;
1551+
1552+
// Filter: y < 3
1553+
let mut builder = fixture.table.scan();
1554+
let predicate = Reference::new("y").greater_than(Datum::long(3));
1555+
builder = builder
1556+
.with_filter(predicate)
1557+
.with_limit(Some(100_000_000))
1558+
.with_row_selection_enabled(true);
1559+
let table_scan = builder.build().unwrap();
1560+
1561+
let batch_stream = table_scan.to_arrow().await.unwrap();
1562+
1563+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1564+
1565+
assert_eq!(batches.len(), 2);
1566+
assert_eq!(batches[0].num_rows(), 312);
1567+
assert_eq!(batches[1].num_rows(), 312);
1568+
1569+
let col = batches[0].column_by_name("x").unwrap();
1570+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1571+
assert_eq!(int64_arr.value(0), 1);
1572+
1573+
let col = batches[0].column_by_name("y").unwrap();
1574+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1575+
assert_eq!(int64_arr.value(0), 4);
1576+
}
1577+
14441578
#[tokio::test]
14451579
async fn test_filter_on_arrow_gt_eq() {
14461580
let mut fixture = TableTestFixture::new();
@@ -1816,6 +1950,7 @@ pub mod tests {
18161950
record_count: Some(100),
18171951
data_file_format: DataFileFormat::Parquet,
18181952
deletes: vec![],
1953+
limit: None,
18191954
};
18201955
test_fn(task);
18211956

@@ -1831,6 +1966,7 @@ pub mod tests {
18311966
record_count: None,
18321967
data_file_format: DataFileFormat::Avro,
18331968
deletes: vec![],
1969+
limit: None,
18341970
};
18351971
test_fn(task);
18361972
}

crates/iceberg/src/scan/task.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ pub struct FileScanTask {
5757

5858
/// The list of delete files that may need to be applied to this data file
5959
pub deletes: Vec<FileScanTaskDeleteFile>,
60+
61+
/// Maximum number of records to return, None means no limit
62+
pub limit: Option<usize>,
6063
}
6164

6265
impl FileScanTask {

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ pub(crate) struct IcebergTableScan {
5151
projection: Option<Vec<String>>,
5252
/// Filters to apply to the table scan
5353
predicates: Option<Predicate>,
54+
/// Maximum number of records to return, None means no limit
55+
limit: Option<usize>,
5456
}
5557

5658
impl IcebergTableScan {
@@ -61,6 +63,7 @@ impl IcebergTableScan {
6163
schema: ArrowSchemaRef,
6264
projection: Option<&Vec<usize>>,
6365
filters: &[Expr],
66+
limit: Option<usize>,
6467
) -> Self {
6568
let output_schema = match projection {
6669
None => schema.clone(),
@@ -76,6 +79,7 @@ impl IcebergTableScan {
7679
plan_properties,
7780
projection,
7881
predicates,
82+
limit,
7983
}
8084
}
8185

@@ -127,6 +131,7 @@ impl ExecutionPlan for IcebergTableScan {
127131
self.snapshot_id,
128132
self.projection.clone(),
129133
self.predicates.clone(),
134+
self.limit,
130135
);
131136
let stream = futures::stream::once(fut).try_flatten();
132137

@@ -166,6 +171,7 @@ async fn get_batch_stream(
166171
snapshot_id: Option<i64>,
167172
column_names: Option<Vec<String>>,
168173
predicates: Option<Predicate>,
174+
limit: Option<usize>,
169175
) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
170176
let scan_builder = match snapshot_id {
171177
Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
@@ -179,6 +185,9 @@ async fn get_batch_stream(
179185
if let Some(pred) = predicates {
180186
scan_builder = scan_builder.with_filter(pred);
181187
}
188+
189+
scan_builder = scan_builder.with_limit(limit);
190+
182191
let table_scan = scan_builder.build().map_err(to_datafusion_error)?;
183192

184193
let stream = table_scan

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl TableProvider for IcebergTableProvider {
139139
_state: &dyn Session,
140140
projection: Option<&Vec<usize>>,
141141
filters: &[Expr],
142-
_limit: Option<usize>,
142+
limit: Option<usize>,
143143
) -> DFResult<Arc<dyn ExecutionPlan>> {
144144
// Get the latest table metadata from the catalog if it exists
145145
let table = if let Some(catalog) = &self.catalog {
@@ -158,6 +158,7 @@ impl TableProvider for IcebergTableProvider {
158158
self.schema.clone(),
159159
projection,
160160
filters,
161+
limit,
161162
)))
162163
}
163164

0 commit comments

Comments
 (0)