Skip to content

Commit 3636dc3

Browse files
committed
.
Signed-off-by: xxchan <[email protected]>
1 parent 28c31ce commit 3636dc3

File tree

1 file changed

+191
-9
lines changed

1 file changed

+191
-9
lines changed

Diff for: crates/iceberg/src/scan.rs

+191-9
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717

1818
//! Table scan api.
1919
20-
use std::collections::HashMap;
20+
use std::collections::{HashMap, HashSet};
2121
use std::sync::{Arc, RwLock};
2222

2323
use arrow_array::RecordBatch;
2424
use futures::channel::mpsc::{channel, Sender};
2525
use futures::stream::BoxStream;
2626
use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
27+
use itertools::Itertools;
2728
use serde::{Deserialize, Serialize};
2829

2930
use crate::arrow::ArrowReaderBuilder;
@@ -38,7 +39,7 @@ use crate::io::FileIO;
3839
use crate::runtime::spawn;
3940
use crate::spec::{
4041
DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile,
41-
ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
42+
ManifestList, ManifestStatus, Operation, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
4243
};
4344
use crate::table::Table;
4445
use crate::utils::available_parallelism;
@@ -55,6 +56,10 @@ pub struct TableScanBuilder<'a> {
5556
// Defaults to none which means select all columns
5657
column_names: Option<Vec<String>>,
5758
snapshot_id: Option<i64>,
59+
/// Exclusive. Used for incremental scan.
60+
from_snapshot_id: Option<i64>,
61+
/// Inclusive. Used for incremental scan.
62+
to_snapshot_id: Option<i64>,
5863
batch_size: Option<usize>,
5964
case_sensitive: bool,
6065
filter: Option<Predicate>,
@@ -78,6 +83,8 @@ impl<'a> TableScanBuilder<'a> {
7883
table,
7984
column_names: None,
8085
snapshot_id: None,
86+
from_snapshot_id: None,
87+
to_snapshot_id: None,
8188
batch_size: None,
8289
case_sensitive: true,
8390
filter: None,
@@ -140,6 +147,18 @@ impl<'a> TableScanBuilder<'a> {
140147
self
141148
}
142149

150+
/// Set the starting snapshot id (exclusive) for incremental scan.
151+
pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self {
152+
self.from_snapshot_id = Some(from_snapshot_id);
153+
self
154+
}
155+
156+
/// Set the ending snapshot id (inclusive) for incremental scan.
157+
pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self {
158+
self.to_snapshot_id = Some(to_snapshot_id);
159+
self
160+
}
161+
143162
/// Sets the concurrency limit for both manifest files and manifest
144163
/// entries for this scan
145164
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
@@ -227,6 +246,8 @@ impl<'a> TableScanBuilder<'a> {
227246
})?
228247
.clone(),
229248
};
249+
// TODO: we should validate either snapshot (snapshot scan) or
250+
// from_snapshot_id and to_snapshot_id are set (incremental scan)
230251

231252
let schema = snapshot.schema(self.table.metadata())?;
232253

@@ -297,6 +318,8 @@ impl<'a> TableScanBuilder<'a> {
297318
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
298319
object_cache: self.table.object_cache(),
299320
field_ids: Arc::new(field_ids),
321+
from_snapshot_id: self.from_snapshot_id,
322+
to_snapshot_id: self.to_snapshot_id,
300323
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
301324
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
302325
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
@@ -354,6 +377,8 @@ struct PlanContext {
354377
snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
355378
object_cache: Arc<ObjectCache>,
356379
field_ids: Arc<Vec<i32>>,
380+
from_snapshot_id: Option<i64>,
381+
to_snapshot_id: Option<i64>,
357382

358383
partition_filter_cache: Arc<PartitionFilterCache>,
359384
manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
@@ -375,13 +400,74 @@ impl TableScan {
375400
// used to stream the results back to the caller
376401
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
377402

403+
if let (Some(from_snapshot_id), Some(to_snapshot_id)) = (
404+
self.plan_context.from_snapshot_id,
405+
self.plan_context.to_snapshot_id,
406+
) {
407+
// Incremental scan mode
408+
let added_files = added_files_between(
409+
&self.plan_context.object_cache,
410+
&self.plan_context.table_metadata,
411+
to_snapshot_id,
412+
from_snapshot_id,
413+
)
414+
.await?;
415+
416+
for entry in added_files {
417+
let manifest_entry_context = ManifestEntryContext {
418+
manifest_entry: entry,
419+
expression_evaluator_cache: self
420+
.plan_context
421+
.expression_evaluator_cache
422+
.clone(),
423+
field_ids: self.plan_context.field_ids.clone(),
424+
bound_predicates: None, // TODO: support predicates in incremental scan
425+
partition_spec_id: 0, // TODO: get correct partition spec id
426+
// It's used to skip any data file whose partition data indicates that it can't contain
427+
// any data that matches this scan's filter
428+
snapshot_schema: self.plan_context.snapshot_schema.clone(),
429+
// delete is not supported in incremental scan
430+
delete_file_index: None,
431+
};
432+
433+
manifest_entry_data_ctx_tx
434+
.clone()
435+
.send(manifest_entry_context)
436+
.await
437+
.map_err(|_| Error::new(ErrorKind::Unexpected, "mpsc channel SendError"))?;
438+
}
439+
440+
let mut channel_for_manifest_entry_error = file_scan_task_tx.clone();
441+
442+
// Process the [`ManifestEntry`] stream in parallel
443+
spawn(async move {
444+
let result = manifest_entry_data_ctx_rx
445+
.map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
446+
.try_for_each_concurrent(
447+
concurrency_limit_manifest_entries,
448+
|(manifest_entry_context, tx)| async move {
449+
spawn(async move {
450+
Self::process_data_manifest_entry(manifest_entry_context, tx).await
451+
})
452+
.await
453+
},
454+
)
455+
.await;
456+
457+
if let Err(error) = result {
458+
let _ = channel_for_manifest_entry_error.send(Err(error)).await;
459+
}
460+
});
461+
462+
return Ok(file_scan_task_rx.boxed());
463+
}
464+
378465
let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
379466
if self.delete_file_processing_enabled {
380467
Some(DeleteFileIndex::new())
381468
} else {
382469
None
383470
};
384-
385471
let manifest_list = self.plan_context.get_manifest_list().await?;
386472

387473
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
@@ -1146,6 +1232,100 @@ impl FileScanTask {
11461232
}
11471233
}
11481234

1235+
struct Ancestors {
1236+
next: Option<SnapshotRef>,
1237+
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
1238+
}
1239+
1240+
impl Iterator for Ancestors {
1241+
type Item = SnapshotRef;
1242+
1243+
fn next(&mut self) -> Option<Self::Item> {
1244+
let snapshot = self.next.take()?;
1245+
let result = snapshot.clone();
1246+
self.next = snapshot
1247+
.parent_snapshot_id()
1248+
.and_then(|id| (self.get_snapshot)(id));
1249+
Some(result)
1250+
}
1251+
}
1252+
1253+
/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
1254+
fn ancestors_of(
1255+
table_metadata: &TableMetadataRef,
1256+
snapshot: i64,
1257+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
1258+
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
1259+
let table_metadata = table_metadata.clone();
1260+
Box::new(Ancestors {
1261+
next: Some(snapshot.clone()),
1262+
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
1263+
})
1264+
} else {
1265+
Box::new(std::iter::empty())
1266+
}
1267+
}
1268+
1269+
/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
1270+
fn ancestors_between(
1271+
table_metadata: &TableMetadataRef,
1272+
latest_snapshot_id: i64,
1273+
oldest_snapshot_id: i64,
1274+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
1275+
if latest_snapshot_id == oldest_snapshot_id {
1276+
return Box::new(std::iter::empty());
1277+
}
1278+
1279+
Box::new(
1280+
ancestors_of(table_metadata, latest_snapshot_id)
1281+
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
1282+
)
1283+
}
1284+
1285+
/// Get all added files between two snapshots.
1286+
/// The files in `latest_snapshot_id` (inclusive) but not in `oldest_snapshot_id` (exclusive).
1287+
async fn added_files_between(
1288+
object_cache: &ObjectCache,
1289+
table_metadata: &TableMetadataRef,
1290+
latest_snapshot_id: i64,
1291+
oldest_snapshot_id: i64,
1292+
) -> Result<Vec<ManifestEntryRef>> {
1293+
let mut added_files = vec![];
1294+
1295+
let append_snapshots =
1296+
ancestors_between(table_metadata, latest_snapshot_id, oldest_snapshot_id)
1297+
.filter(|snapshot| matches!(snapshot.summary().operation, Operation::Append))
1298+
.collect_vec();
1299+
let snapshot_ids: HashSet<i64> = append_snapshots
1300+
.iter()
1301+
.map(|snapshot| snapshot.snapshot_id())
1302+
.collect();
1303+
1304+
for snapshot in append_snapshots {
1305+
let manifest_list = object_cache
1306+
.get_manifest_list(&snapshot, &table_metadata)
1307+
.await?;
1308+
1309+
for manifest_file in manifest_list.entries() {
1310+
if !snapshot_ids.contains(&manifest_file.added_snapshot_id) {
1311+
continue;
1312+
}
1313+
let manifest = object_cache.get_manifest(&manifest_file).await?;
1314+
let entries = manifest.entries().into_iter().cloned().filter(|entry| {
1315+
matches!(entry.status(), ManifestStatus::Added)
1316+
&& (
1317+
// Is it possible that the snapshot id here is not contained?
1318+
entry.snapshot_id().is_none()
1319+
|| snapshot_ids.contains(&entry.snapshot_id().unwrap())
1320+
)
1321+
});
1322+
added_files.extend(entries);
1323+
}
1324+
}
1325+
1326+
Ok(added_files)
1327+
}
1328+
11491329
#[cfg(test)]
11501330
pub mod tests {
11511331
use std::collections::HashMap;
@@ -1472,9 +1652,10 @@ pub mod tests {
14721652
let values: BooleanArray = values.into();
14731653
let col8 = Arc::new(values) as ArrayRef;
14741654

1475-
let to_write = RecordBatch::try_new(schema.clone(), vec![
1476-
col1, col2, col3, col4, col5, col6, col7, col8,
1477-
])
1655+
let to_write = RecordBatch::try_new(
1656+
schema.clone(),
1657+
vec![col1, col2, col3, col4, col5, col6, col7, col8],
1658+
)
14781659
.unwrap();
14791660

14801661
// Write the Parquet files
@@ -1680,9 +1861,10 @@ pub mod tests {
16801861
let values: BooleanArray = values.into();
16811862
let col8 = Arc::new(values) as ArrayRef;
16821863

1683-
let to_write = RecordBatch::try_new(schema.clone(), vec![
1684-
col1, col2, col3, col4, col5, col6, col7, col8,
1685-
])
1864+
let to_write = RecordBatch::try_new(
1865+
schema.clone(),
1866+
vec![col1, col2, col3, col4, col5, col6, col7, col8],
1867+
)
16861868
.unwrap();
16871869

16881870
// Write the Parquet files

0 commit comments

Comments
 (0)