Skip to content

Commit 783695d

Browse files
sgrebnovlukekim
authored andcommitted
Implement limit push down for IcebergTableProvider (#19)
Original PR: #19 Upstream PR: apache#1673
1 parent 668ed7c commit 783695d

8 files changed

Lines changed: 856 additions & 1209 deletions

File tree

Cargo.lock

Lines changed: 682 additions & 1203 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,7 @@ pub(crate) mod tests {
408408
project_field_ids: vec![],
409409
predicate: None,
410410
deletes: vec![pos_del_1, pos_del_2.clone()],
411+
limit: None,
411412
partition: None,
412413
partition_spec: None,
413414
name_mapping: None,
@@ -423,6 +424,7 @@ pub(crate) mod tests {
423424
project_field_ids: vec![],
424425
predicate: None,
425426
deletes: vec![pos_del_3],
427+
limit: None,
426428
partition: None,
427429
partition_spec: None,
428430
name_mapping: None,
@@ -478,6 +480,7 @@ pub(crate) mod tests {
478480
partition_spec_id: 0,
479481
equality_ids: None,
480482
}],
483+
limit: None,
481484
partition: None,
482485
partition_spec: None,
483486
name_mapping: None,

crates/iceberg/src/arrow/reader.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,9 @@ impl ArrowReader {
179179
row_group_filtering_enabled: bool,
180180
row_selection_enabled: bool,
181181
) -> Result<ArrowRecordBatchStream> {
182-
let should_load_page_index =
183-
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
182+
let should_load_page_index = (row_selection_enabled && task.predicate.is_some())
183+
|| !task.deletes.is_empty()
184+
|| task.limit.is_some();
184185

185186
let delete_filter_rx =
186187
delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
@@ -426,6 +427,10 @@ impl ArrowReader {
426427
record_batch_stream_builder.with_row_groups(selected_row_group_indices);
427428
}
428429

430+
if let Some(limit) = task.limit {
431+
record_batch_stream_builder = record_batch_stream_builder.with_limit(limit);
432+
}
433+
429434
// Build the batch stream and send all the RecordBatches that it generates
430435
// to the requester.
431436
let record_batch_stream =
@@ -2079,6 +2084,7 @@ message schema {
20792084
project_field_ids: vec![1],
20802085
predicate: Some(predicate.bind(schema, true).unwrap()),
20812086
deletes: vec![],
2087+
limit: None,
20822088
partition: None,
20832089
partition_spec: None,
20842090
name_mapping: None,

crates/iceberg/src/scan/context.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub(crate) struct ManifestFileContext {
4242

4343
field_ids: Arc<Vec<i32>>,
4444
bound_predicates: Option<Arc<BoundPredicates>>,
45+
limit: Option<usize>,
4546
object_cache: Arc<ObjectCache>,
4647
snapshot_schema: SchemaRef,
4748
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
@@ -61,6 +62,7 @@ pub(crate) struct ManifestEntryContext {
6162
pub snapshot_schema: SchemaRef,
6263
pub delete_file_index: DeleteFileIndex,
6364
pub case_sensitive: bool,
65+
pub limit: Option<usize>,
6466
}
6567

6668
impl ManifestFileContext {
@@ -76,6 +78,7 @@ impl ManifestFileContext {
7678
mut sender,
7779
expression_evaluator_cache,
7880
delete_file_index,
81+
limit,
7982
..
8083
} = self;
8184

@@ -92,6 +95,7 @@ impl ManifestFileContext {
9295
snapshot_schema: snapshot_schema.clone(),
9396
delete_file_index: delete_file_index.clone(),
9497
case_sensitive: self.case_sensitive,
98+
limit,
9599
};
96100

97101
sender
@@ -139,6 +143,8 @@ impl ManifestEntryContext {
139143
// TODO: Extract name_mapping from table metadata property "schema.name-mapping.default"
140144
name_mapping: None,
141145
case_sensitive: self.case_sensitive,
146+
147+
limit: self.limit,
142148
})
143149
}
144150
}
@@ -153,6 +159,7 @@ pub(crate) struct PlanContext {
153159
pub snapshot_schema: SchemaRef,
154160
pub case_sensitive: bool,
155161
pub predicate: Option<Arc<Predicate>>,
162+
pub limit: Option<usize>,
156163
pub snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
157164
pub object_cache: Arc<ObjectCache>,
158165
pub field_ids: Arc<Vec<i32>>,
@@ -276,6 +283,7 @@ impl PlanContext {
276283
manifest_file: manifest_file.clone(),
277284
bound_predicates,
278285
sender,
286+
limit: self.limit,
279287
object_cache: self.object_cache.clone(),
280288
snapshot_schema: self.snapshot_schema.clone(),
281289
field_ids: self.field_ids.clone(),

crates/iceberg/src/scan/mod.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ pub struct TableScanBuilder<'a> {
6060
concurrency_limit_manifest_files: usize,
6161
row_group_filtering_enabled: bool,
6262
row_selection_enabled: bool,
63+
64+
limit: Option<usize>,
6365
}
6466

6567
impl<'a> TableScanBuilder<'a> {
@@ -78,9 +80,16 @@ impl<'a> TableScanBuilder<'a> {
7880
concurrency_limit_manifest_files: num_cpus,
7981
row_group_filtering_enabled: true,
8082
row_selection_enabled: false,
83+
limit: None,
8184
}
8285
}
8386

87+
/// Sets the maximum number of records to return
88+
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
89+
self.limit = limit;
90+
self
91+
}
92+
8493
/// Sets the desired size of batches in the response
8594
/// to something other than the default
8695
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
@@ -285,6 +294,7 @@ impl<'a> TableScanBuilder<'a> {
285294
snapshot_schema: schema,
286295
case_sensitive: self.case_sensitive,
287296
predicate: self.filter.map(Arc::new),
297+
limit: self.limit,
288298
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
289299
object_cache: self.table.object_cache(),
290300
field_ids: Arc::new(field_ids),
@@ -1508,6 +1518,130 @@ pub mod tests {
15081518
assert_eq!(int64_arr.value(0), 2);
15091519
}
15101520

1521+
#[tokio::test]
1522+
async fn test_limit() {
1523+
let mut fixture = TableTestFixture::new();
1524+
fixture.setup_manifest_files().await;
1525+
1526+
let mut builder = fixture.table.scan();
1527+
builder = builder.with_limit(Some(1));
1528+
let table_scan = builder.build().unwrap();
1529+
1530+
let batch_stream = table_scan.to_arrow().await.unwrap();
1531+
1532+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1533+
1534+
assert_eq!(batches.len(), 2);
1535+
assert_eq!(batches[0].num_rows(), 1);
1536+
assert_eq!(batches[1].num_rows(), 1);
1537+
1538+
let col = batches[0].column_by_name("x").unwrap();
1539+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1540+
assert_eq!(int64_arr.value(0), 1);
1541+
1542+
let col = batches[0].column_by_name("y").unwrap();
1543+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1544+
assert_eq!(int64_arr.value(0), 2);
1545+
1546+
let col = batches[0].column_by_name("x").unwrap();
1547+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1548+
assert_eq!(int64_arr.value(0), 1);
1549+
1550+
let col = batches[0].column_by_name("y").unwrap();
1551+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1552+
assert_eq!(int64_arr.value(0), 2);
1553+
}
1554+
1555+
#[tokio::test]
1556+
async fn test_limit_with_predicate() {
1557+
let mut fixture = TableTestFixture::new();
1558+
fixture.setup_manifest_files().await;
1559+
1560+
// Filter: y > 3
1561+
let mut builder = fixture.table.scan();
1562+
let predicate = Reference::new("y").greater_than(Datum::long(3));
1563+
builder = builder.with_filter(predicate).with_limit(Some(1));
1564+
let table_scan = builder.build().unwrap();
1565+
1566+
let batch_stream = table_scan.to_arrow().await.unwrap();
1567+
1568+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1569+
1570+
assert_eq!(batches.len(), 2);
1571+
assert_eq!(batches[0].num_rows(), 1);
1572+
assert_eq!(batches[1].num_rows(), 1);
1573+
1574+
let col = batches[0].column_by_name("x").unwrap();
1575+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1576+
assert_eq!(int64_arr.value(0), 1);
1577+
1578+
let col = batches[0].column_by_name("y").unwrap();
1579+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1580+
assert_eq!(int64_arr.value(0), 4);
1581+
}
1582+
1583+
#[tokio::test]
1584+
async fn test_limit_with_predicate_and_row_selection() {
1585+
let mut fixture = TableTestFixture::new();
1586+
fixture.setup_manifest_files().await;
1587+
1588+
// Filter: y > 3
1589+
let mut builder = fixture.table.scan();
1590+
let predicate = Reference::new("y").greater_than(Datum::long(3));
1591+
builder = builder
1592+
.with_filter(predicate)
1593+
.with_limit(Some(1))
1594+
.with_row_selection_enabled(true);
1595+
let table_scan = builder.build().unwrap();
1596+
1597+
let batch_stream = table_scan.to_arrow().await.unwrap();
1598+
1599+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1600+
1601+
assert_eq!(batches.len(), 2);
1602+
assert_eq!(batches[0].num_rows(), 1);
1603+
assert_eq!(batches[1].num_rows(), 1);
1604+
1605+
let col = batches[0].column_by_name("x").unwrap();
1606+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1607+
assert_eq!(int64_arr.value(0), 1);
1608+
1609+
let col = batches[0].column_by_name("y").unwrap();
1610+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1611+
assert_eq!(int64_arr.value(0), 4);
1612+
}
1613+
1614+
#[tokio::test]
1615+
async fn test_limit_higher_than_total_rows() {
1616+
let mut fixture = TableTestFixture::new();
1617+
fixture.setup_manifest_files().await;
1618+
1619+
// Filter: y > 3
1620+
let mut builder = fixture.table.scan();
1621+
let predicate = Reference::new("y").greater_than(Datum::long(3));
1622+
builder = builder
1623+
.with_filter(predicate)
1624+
.with_limit(Some(100_000_000))
1625+
.with_row_selection_enabled(true);
1626+
let table_scan = builder.build().unwrap();
1627+
1628+
let batch_stream = table_scan.to_arrow().await.unwrap();
1629+
1630+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1631+
1632+
assert_eq!(batches.len(), 2);
1633+
assert_eq!(batches[0].num_rows(), 312);
1634+
assert_eq!(batches[1].num_rows(), 312);
1635+
1636+
let col = batches[0].column_by_name("x").unwrap();
1637+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1638+
assert_eq!(int64_arr.value(0), 1);
1639+
1640+
let col = batches[0].column_by_name("y").unwrap();
1641+
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1642+
assert_eq!(int64_arr.value(0), 4);
1643+
}
1644+
15111645
#[tokio::test]
15121646
async fn test_filter_on_arrow_gt_eq() {
15131647
let mut fixture = TableTestFixture::new();
@@ -1882,6 +2016,7 @@ pub mod tests {
18822016
record_count: Some(100),
18832017
data_file_format: DataFileFormat::Parquet,
18842018
deletes: vec![],
2019+
limit: None,
18852020
partition: None,
18862021
partition_spec: None,
18872022
name_mapping: None,
@@ -1900,6 +2035,7 @@ pub mod tests {
19002035
record_count: None,
19012036
data_file_format: DataFileFormat::Avro,
19022037
deletes: vec![],
2038+
limit: None,
19032039
partition: None,
19042040
partition_spec: None,
19052041
name_mapping: None,

crates/iceberg/src/scan/task.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ pub struct FileScanTask {
7878
/// The list of delete files that may need to be applied to this data file
7979
pub deletes: Vec<FileScanTaskDeleteFile>,
8080

81+
/// Maximum number of records to return, None means no limit
82+
pub limit: Option<usize>,
83+
8184
/// Partition data from the manifest entry, used to identify which columns can use
8285
/// constant values from partition metadata vs. reading from the data file.
8386
/// Per the Iceberg spec, only identity-transformed partition fields should use constants.

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ pub struct IcebergTableScan {
5151
projection: Option<Vec<String>>,
5252
/// Filters to apply to the table scan
5353
predicates: Option<Predicate>,
54+
/// Maximum number of records to return, None means no limit
55+
limit: Option<usize>,
5456
}
5557

5658
impl IcebergTableScan {
@@ -61,6 +63,7 @@ impl IcebergTableScan {
6163
schema: ArrowSchemaRef,
6264
projection: Option<&Vec<usize>>,
6365
filters: &[Expr],
66+
limit: Option<usize>,
6467
) -> Self {
6568
let output_schema = match projection {
6669
None => schema.clone(),
@@ -76,6 +79,7 @@ impl IcebergTableScan {
7679
plan_properties,
7780
projection,
7881
predicates,
82+
limit,
7983
}
8084
}
8185

@@ -143,6 +147,7 @@ impl ExecutionPlan for IcebergTableScan {
143147
self.snapshot_id,
144148
self.projection.clone(),
145149
self.predicates.clone(),
150+
self.limit,
146151
);
147152
let stream = futures::stream::once(fut).try_flatten();
148153

@@ -161,13 +166,14 @@ impl DisplayAs for IcebergTableScan {
161166
) -> std::fmt::Result {
162167
write!(
163168
f,
164-
"IcebergTableScan projection:[{}] predicate:[{}]",
169+
"IcebergTableScan projection:[{}] predicate:[{}] limit:[{}]",
165170
self.projection
166171
.clone()
167172
.map_or(String::new(), |v| v.join(",")),
168173
self.predicates
169174
.clone()
170-
.map_or(String::from(""), |p| format!("{p}"))
175+
.map_or(String::from(""), |p| format!("{p}")),
176+
self.limit.map_or(String::from(""), |p| format!("{p}")),
171177
)
172178
}
173179
}
@@ -182,6 +188,7 @@ async fn get_batch_stream(
182188
snapshot_id: Option<i64>,
183189
column_names: Option<Vec<String>>,
184190
predicates: Option<Predicate>,
191+
limit: Option<usize>,
185192
) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
186193
let scan_builder = match snapshot_id {
187194
Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
@@ -195,6 +202,9 @@ async fn get_batch_stream(
195202
if let Some(pred) = predicates {
196203
scan_builder = scan_builder.with_filter(pred);
197204
}
205+
206+
scan_builder = scan_builder.with_limit(limit);
207+
198208
let table_scan = scan_builder.build().map_err(to_datafusion_error)?;
199209

200210
let stream = table_scan

0 commit comments

Comments
 (0)