Skip to content

Commit c8b4714

Browse files
committed
.
Signed-off-by: xxchan <[email protected]>
1 parent 5f91098 commit c8b4714

File tree

1 file changed

+212
-10
lines changed

1 file changed

+212
-10
lines changed

Diff for: crates/iceberg/src/scan.rs

+212-10
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717

1818
//! Table scan api.
1919
20-
use std::collections::HashMap;
20+
use std::collections::{HashMap, HashSet};
2121
use std::sync::{Arc, RwLock};
2222

2323
use arrow_array::RecordBatch;
2424
use futures::channel::mpsc::{channel, Sender};
2525
use futures::stream::BoxStream;
2626
use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
27+
use itertools::Itertools;
2728
use serde::{Deserialize, Serialize};
2829

2930
use crate::arrow::ArrowReaderBuilder;
@@ -38,7 +39,7 @@ use crate::io::FileIO;
3839
use crate::runtime::spawn;
3940
use crate::spec::{
4041
DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile,
41-
ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
42+
ManifestList, ManifestStatus, Operation, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
4243
};
4344
use crate::table::Table;
4445
use crate::utils::available_parallelism;
@@ -55,6 +56,10 @@ pub struct TableScanBuilder<'a> {
5556
// Defaults to none which means select all columns
5657
column_names: Option<Vec<String>>,
5758
snapshot_id: Option<i64>,
59+
/// Exclusive. Used for incremental scan.
60+
from_snapshot_id: Option<i64>,
61+
/// Inclusive. Used for incremental scan.
62+
to_snapshot_id: Option<i64>,
5863
batch_size: Option<usize>,
5964
case_sensitive: bool,
6065
filter: Option<Predicate>,
@@ -78,6 +83,8 @@ impl<'a> TableScanBuilder<'a> {
7883
table,
7984
column_names: None,
8085
snapshot_id: None,
86+
from_snapshot_id: None,
87+
to_snapshot_id: None,
8188
batch_size: None,
8289
case_sensitive: true,
8390
filter: None,
@@ -140,6 +147,18 @@ impl<'a> TableScanBuilder<'a> {
140147
self
141148
}
142149

150+
/// Set the starting snapshot id (exclusive) for incremental scan.
151+
pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self {
152+
self.from_snapshot_id = Some(from_snapshot_id);
153+
self
154+
}
155+
156+
/// Set the ending snapshot id (inclusive) for incremental scan.
157+
pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self {
158+
self.to_snapshot_id = Some(to_snapshot_id);
159+
self
160+
}
161+
143162
/// Sets the concurrency limit for both manifest files and manifest
144163
/// entries for this scan
145164
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
@@ -206,6 +225,25 @@ impl<'a> TableScanBuilder<'a> {
206225

207226
/// Build the table scan.
208227
pub fn build(self) -> Result<TableScan> {
228+
// Validate that we have either a snapshot scan or an incremental scan configuration
229+
if self.from_snapshot_id.is_some() || self.to_snapshot_id.is_some() {
230+
// For incremental scan, we need to_snapshot_id to be set. from_snapshot_id is optional.
231+
if self.to_snapshot_id.is_none() {
232+
return Err(Error::new(
233+
ErrorKind::DataInvalid,
234+
"Incremental scan requires to_snapshot_id to be set",
235+
));
236+
}
237+
238+
// snapshot_id should not be set for incremental scan
239+
if self.snapshot_id.is_some() {
240+
return Err(Error::new(
241+
ErrorKind::DataInvalid,
242+
"snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead.",
243+
));
244+
}
245+
}
246+
209247
let snapshot = match self.snapshot_id {
210248
Some(snapshot_id) => self
211249
.table
@@ -227,7 +265,6 @@ impl<'a> TableScanBuilder<'a> {
227265
})?
228266
.clone(),
229267
};
230-
231268
let schema = snapshot.schema(self.table.metadata())?;
232269

233270
// Check that all column names exist in the schema.
@@ -297,6 +334,8 @@ impl<'a> TableScanBuilder<'a> {
297334
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
298335
object_cache: self.table.object_cache(),
299336
field_ids: Arc::new(field_ids),
337+
from_snapshot_id: self.from_snapshot_id,
338+
to_snapshot_id: self.to_snapshot_id,
300339
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
301340
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
302341
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
@@ -358,6 +397,11 @@ struct PlanContext {
358397
partition_filter_cache: Arc<PartitionFilterCache>,
359398
manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
360399
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
400+
401+
// for incremental scan.
402+
// If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`.
403+
from_snapshot_id: Option<i64>,
404+
to_snapshot_id: Option<i64>,
361405
}
362406

363407
impl TableScan {
@@ -375,13 +419,71 @@ impl TableScan {
375419
// used to stream the results back to the caller
376420
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
377421

422+
if let Some(to_snapshot_id) = self.plan_context.to_snapshot_id {
423+
// Incremental scan mode
424+
let added_files = added_files_between(
425+
&self.plan_context.object_cache,
426+
&self.plan_context.table_metadata,
427+
to_snapshot_id,
428+
self.plan_context.from_snapshot_id,
429+
)
430+
.await?;
431+
432+
for entry in added_files {
433+
let manifest_entry_context = ManifestEntryContext {
434+
manifest_entry: entry,
435+
expression_evaluator_cache: self
436+
.plan_context
437+
.expression_evaluator_cache
438+
.clone(),
439+
field_ids: self.plan_context.field_ids.clone(),
440+
bound_predicates: None, // TODO: support predicates in incremental scan
441+
partition_spec_id: 0, // TODO: get correct partition spec id
442+
// It's used to skip any data file whose partition data indicates that it can't contain
443+
// any data that matches this scan's filter
444+
snapshot_schema: self.plan_context.snapshot_schema.clone(),
445+
// delete is not supported in incremental scan
446+
delete_file_index: None,
447+
};
448+
449+
manifest_entry_data_ctx_tx
450+
.clone()
451+
.send(manifest_entry_context)
452+
.await
453+
.map_err(|_| Error::new(ErrorKind::Unexpected, "mpsc channel SendError"))?;
454+
}
455+
456+
let mut channel_for_manifest_entry_error = file_scan_task_tx.clone();
457+
458+
// Process the [`ManifestEntry`] stream in parallel
459+
spawn(async move {
460+
let result = manifest_entry_data_ctx_rx
461+
.map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
462+
.try_for_each_concurrent(
463+
concurrency_limit_manifest_entries,
464+
|(manifest_entry_context, tx)| async move {
465+
spawn(async move {
466+
Self::process_data_manifest_entry(manifest_entry_context, tx).await
467+
})
468+
.await
469+
},
470+
)
471+
.await;
472+
473+
if let Err(error) = result {
474+
let _ = channel_for_manifest_entry_error.send(Err(error)).await;
475+
}
476+
});
477+
478+
return Ok(file_scan_task_rx.boxed());
479+
}
480+
378481
let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
379482
if self.delete_file_processing_enabled {
380483
Some(DeleteFileIndex::new())
381484
} else {
382485
None
383486
};
384-
385487
let manifest_list = self.plan_context.get_manifest_list().await?;
386488

387489
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
@@ -1146,6 +1248,104 @@ impl FileScanTask {
11461248
}
11471249
}
11481250

1251+
struct Ancestors {
1252+
next: Option<SnapshotRef>,
1253+
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
1254+
}
1255+
1256+
impl Iterator for Ancestors {
1257+
type Item = SnapshotRef;
1258+
1259+
fn next(&mut self) -> Option<Self::Item> {
1260+
let snapshot = self.next.take()?;
1261+
let result = snapshot.clone();
1262+
self.next = snapshot
1263+
.parent_snapshot_id()
1264+
.and_then(|id| (self.get_snapshot)(id));
1265+
Some(result)
1266+
}
1267+
}
1268+
1269+
/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
1270+
fn ancestors_of(
1271+
table_metadata: &TableMetadataRef,
1272+
snapshot: i64,
1273+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
1274+
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
1275+
let table_metadata = table_metadata.clone();
1276+
Box::new(Ancestors {
1277+
next: Some(snapshot.clone()),
1278+
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
1279+
})
1280+
} else {
1281+
Box::new(std::iter::empty())
1282+
}
1283+
}
1284+
1285+
/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
1286+
fn ancestors_between(
1287+
table_metadata: &TableMetadataRef,
1288+
latest_snapshot_id: i64,
1289+
oldest_snapshot_id: Option<i64>,
1290+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
1291+
let Some(oldest_snapshot_id) = oldest_snapshot_id else {
1292+
return Box::new(ancestors_of(table_metadata, latest_snapshot_id));
1293+
};
1294+
1295+
if latest_snapshot_id == oldest_snapshot_id {
1296+
return Box::new(std::iter::empty());
1297+
}
1298+
1299+
Box::new(
1300+
ancestors_of(table_metadata, latest_snapshot_id)
1301+
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
1302+
)
1303+
}
1304+
1305+
/// Get all added files between two snapshots.
1306+
/// The files in `latest_snapshot_id` (inclusive) but not in `oldest_snapshot_id` (exclusive).
1307+
async fn added_files_between(
1308+
object_cache: &ObjectCache,
1309+
table_metadata: &TableMetadataRef,
1310+
latest_snapshot_id: i64,
1311+
oldest_snapshot_id: Option<i64>,
1312+
) -> Result<Vec<ManifestEntryRef>> {
1313+
let mut added_files = vec![];
1314+
1315+
let append_snapshots =
1316+
ancestors_between(table_metadata, latest_snapshot_id, oldest_snapshot_id)
1317+
.filter(|snapshot| matches!(snapshot.summary().operation, Operation::Append))
1318+
.collect_vec();
1319+
let snapshot_ids: HashSet<i64> = append_snapshots
1320+
.iter()
1321+
.map(|snapshot| snapshot.snapshot_id())
1322+
.collect();
1323+
1324+
for snapshot in append_snapshots {
1325+
let manifest_list = object_cache
1326+
.get_manifest_list(&snapshot, table_metadata)
1327+
.await?;
1328+
1329+
for manifest_file in manifest_list.entries() {
1330+
if !snapshot_ids.contains(&manifest_file.added_snapshot_id) {
1331+
continue;
1332+
}
1333+
let manifest = object_cache.get_manifest(manifest_file).await?;
1334+
let entries = manifest.entries().iter().filter(|entry| {
1335+
matches!(entry.status(), ManifestStatus::Added)
1336+
&& (
1337+
// Is it possible that the snapshot id here is not contained?
1338+
entry.snapshot_id().is_none()
1339+
|| snapshot_ids.contains(&entry.snapshot_id().unwrap())
1340+
)
1341+
});
1342+
added_files.extend(entries.cloned());
1343+
}
1344+
}
1345+
1346+
Ok(added_files)
1347+
}
1348+
11491349
#[cfg(test)]
11501350
pub mod tests {
11511351
use std::collections::HashMap;
@@ -1472,9 +1672,10 @@ pub mod tests {
14721672
let values: BooleanArray = values.into();
14731673
let col8 = Arc::new(values) as ArrayRef;
14741674

1475-
let to_write = RecordBatch::try_new(schema.clone(), vec![
1476-
col1, col2, col3, col4, col5, col6, col7, col8,
1477-
])
1675+
let to_write = RecordBatch::try_new(
1676+
schema.clone(),
1677+
vec![col1, col2, col3, col4, col5, col6, col7, col8],
1678+
)
14781679
.unwrap();
14791680

14801681
// Write the Parquet files
@@ -1680,9 +1881,10 @@ pub mod tests {
16801881
let values: BooleanArray = values.into();
16811882
let col8 = Arc::new(values) as ArrayRef;
16821883

1683-
let to_write = RecordBatch::try_new(schema.clone(), vec![
1684-
col1, col2, col3, col4, col5, col6, col7, col8,
1685-
])
1884+
let to_write = RecordBatch::try_new(
1885+
schema.clone(),
1886+
vec![col1, col2, col3, col4, col5, col6, col7, col8],
1887+
)
16861888
.unwrap();
16871889

16881890
// Write the Parquet files

0 commit comments

Comments
 (0)