Skip to content

Commit d4d27e6

Browse files
committed
.
Signed-off-by: xxchan <[email protected]>
1 parent 683fb89 commit d4d27e6

File tree

1 file changed

+182
-4
lines changed

1 file changed

+182
-4
lines changed

Diff for: crates/iceberg/src/scan.rs

+182-4
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717

1818
//! Table scan api.
1919
20-
use std::collections::HashMap;
20+
use std::collections::{HashMap, HashSet};
2121
use std::sync::{Arc, RwLock};
2222

2323
use arrow_array::RecordBatch;
2424
use futures::channel::mpsc::{channel, Sender};
2525
use futures::stream::BoxStream;
2626
use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
27+
use itertools::Itertools;
2728
use serde::{Deserialize, Serialize};
2829

2930
use crate::arrow::ArrowReaderBuilder;
@@ -36,8 +37,8 @@ use crate::io::object_cache::ObjectCache;
3637
use crate::io::FileIO;
3738
use crate::runtime::spawn;
3839
use crate::spec::{
39-
DataContentType, DataFileFormat, ManifestEntryRef, ManifestFile, ManifestList, Schema,
40-
SchemaRef, SnapshotRef, TableMetadataRef,
40+
DataContentType, DataFileFormat, ManifestEntryRef, ManifestFile, ManifestList, ManifestStatus,
41+
Operation, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
4142
};
4243
use crate::table::Table;
4344
use crate::utils::available_parallelism;
@@ -54,6 +55,10 @@ pub struct TableScanBuilder<'a> {
5455
// Defaults to none which means select all columns
5556
column_names: Option<Vec<String>>,
5657
snapshot_id: Option<i64>,
58+
/// Exclusive. Used for incremental scan.
59+
from_snapshot_id: Option<i64>,
60+
/// Inclusive. Used for incremental scan.
61+
to_snapshot_id: Option<i64>,
5762
batch_size: Option<usize>,
5863
case_sensitive: bool,
5964
filter: Option<Predicate>,
@@ -72,6 +77,8 @@ impl<'a> TableScanBuilder<'a> {
7277
table,
7378
column_names: None,
7479
snapshot_id: None,
80+
from_snapshot_id: None,
81+
to_snapshot_id: None,
7582
batch_size: None,
7683
case_sensitive: true,
7784
filter: None,
@@ -133,6 +140,18 @@ impl<'a> TableScanBuilder<'a> {
133140
self
134141
}
135142

143+
/// Set the starting snapshot id (exclusive) for incremental scan.
144+
pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self {
145+
self.from_snapshot_id = Some(from_snapshot_id);
146+
self
147+
}
148+
149+
/// Set the ending snapshot id (inclusive) for incremental scan.
150+
pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self {
151+
self.to_snapshot_id = Some(to_snapshot_id);
152+
self
153+
}
154+
136155
/// Sets the concurrency limit for both manifest files and manifest
137156
/// entries for this scan
138157
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
@@ -209,6 +228,8 @@ impl<'a> TableScanBuilder<'a> {
209228
})?
210229
.clone(),
211230
};
231+
// TODO: we should validate either snapshot (snapshot scan) or
232+
// from_snapshot_id and to_snapshot_id are set (incremental scan)
212233

213234
let schema = snapshot.schema(self.table.metadata())?;
214235

@@ -289,6 +310,8 @@ impl<'a> TableScanBuilder<'a> {
289310
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
290311
object_cache: self.table.object_cache(),
291312
field_ids: Arc::new(field_ids),
313+
from_snapshot_id: self.from_snapshot_id,
314+
to_snapshot_id: self.to_snapshot_id,
292315
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
293316
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
294317
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
@@ -344,6 +367,8 @@ struct PlanContext {
344367
snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
345368
object_cache: Arc<ObjectCache>,
346369
field_ids: Arc<Vec<i32>>,
370+
from_snapshot_id: Option<i64>,
371+
to_snapshot_id: Option<i64>,
347372

348373
partition_filter_cache: Arc<PartitionFilterCache>,
349374
manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
@@ -362,6 +387,66 @@ impl TableScan {
362387
// used to stream the results back to the caller
363388
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
364389

390+
if let (Some(from_snapshot_id), Some(to_snapshot_id)) = (
391+
self.plan_context.from_snapshot_id,
392+
self.plan_context.to_snapshot_id,
393+
) {
394+
// Incremental scan mode
395+
let added_files = added_files_between(
396+
&self.plan_context.object_cache,
397+
&self.plan_context.table_metadata,
398+
to_snapshot_id,
399+
from_snapshot_id,
400+
)
401+
.await?;
402+
403+
for entry in added_files {
404+
let manifest_entry_context = ManifestEntryContext {
405+
manifest_entry: entry,
406+
expression_evaluator_cache: self
407+
.plan_context
408+
.expression_evaluator_cache
409+
.clone(),
410+
field_ids: self.plan_context.field_ids.clone(),
411+
bound_predicates: None, // TODO: support predicates in incremental scan
412+
partition_spec_id: 0, // TODO: get correct partition spec id
413+
// It's used to skip any data file whose partition data indicates that it can't contain
414+
// any data that matches this scan's filter
415+
snapshot_schema: self.plan_context.snapshot_schema.clone(),
416+
};
417+
418+
manifest_entry_ctx_tx
419+
.clone()
420+
.send(manifest_entry_context)
421+
.await
422+
.map_err(|_| Error::new(ErrorKind::Unexpected, "mpsc channel SendError"))?;
423+
}
424+
425+
let mut channel_for_manifest_entry_error = file_scan_task_tx.clone();
426+
427+
// Process the [`ManifestEntry`] stream in parallel
428+
spawn(async move {
429+
let result = manifest_entry_ctx_rx
430+
.map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
431+
.try_for_each_concurrent(
432+
concurrency_limit_manifest_entries,
433+
|(manifest_entry_context, tx)| async move {
434+
spawn(async move {
435+
Self::process_manifest_entry(manifest_entry_context, tx).await
436+
})
437+
.await
438+
},
439+
)
440+
.await;
441+
442+
if let Err(error) = result {
443+
let _ = channel_for_manifest_entry_error.send(Err(error)).await;
444+
}
445+
});
446+
447+
return Ok(file_scan_task_rx.boxed());
448+
}
449+
365450
let manifest_list = self.plan_context.get_manifest_list().await?;
366451

367452
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out
@@ -947,6 +1032,100 @@ impl FileScanTask {
9471032
}
9481033
}
9491034

1035+
struct Ancestors {
1036+
next: Option<SnapshotRef>,
1037+
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
1038+
}
1039+
1040+
impl Iterator for Ancestors {
1041+
type Item = SnapshotRef;
1042+
1043+
fn next(&mut self) -> Option<Self::Item> {
1044+
let snapshot = self.next.take()?;
1045+
let result = snapshot.clone();
1046+
self.next = snapshot
1047+
.parent_snapshot_id()
1048+
.and_then(|id| (self.get_snapshot)(id));
1049+
Some(result)
1050+
}
1051+
}
1052+
1053+
/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
1054+
fn ancestors_of(
1055+
table_metadata: &TableMetadataRef,
1056+
snapshot: i64,
1057+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
1058+
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
1059+
let table_metadata = table_metadata.clone();
1060+
Box::new(Ancestors {
1061+
next: Some(snapshot.clone()),
1062+
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
1063+
})
1064+
} else {
1065+
Box::new(std::iter::empty())
1066+
}
1067+
}
1068+
1069+
/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
1070+
fn ancestors_between(
1071+
table_metadata: &TableMetadataRef,
1072+
latest_snapshot_id: i64,
1073+
oldest_snapshot_id: i64,
1074+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
1075+
if latest_snapshot_id == oldest_snapshot_id {
1076+
return Box::new(std::iter::empty());
1077+
}
1078+
1079+
Box::new(
1080+
ancestors_of(table_metadata, latest_snapshot_id)
1081+
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
1082+
)
1083+
}
1084+
1085+
/// Get all added files between two snapshots.
1086+
/// The files in `latest_snapshot_id` (inclusive) but not in `oldest_snapshot_id` (exclusive).
1087+
async fn added_files_between(
1088+
object_cache: &ObjectCache,
1089+
table_metadata: &TableMetadataRef,
1090+
latest_snapshot_id: i64,
1091+
oldest_snapshot_id: i64,
1092+
) -> Result<Vec<ManifestEntryRef>> {
1093+
let mut added_files = vec![];
1094+
1095+
let append_snapshots =
1096+
ancestors_between(table_metadata, latest_snapshot_id, oldest_snapshot_id)
1097+
.filter(|snapshot| matches!(snapshot.summary().operation, Operation::Append))
1098+
.collect_vec();
1099+
let snapshot_ids: HashSet<i64> = append_snapshots
1100+
.iter()
1101+
.map(|snapshot| snapshot.snapshot_id())
1102+
.collect();
1103+
1104+
for snapshot in append_snapshots {
1105+
let manifest_list = object_cache
1106+
.get_manifest_list(&snapshot, &table_metadata)
1107+
.await?;
1108+
1109+
for manifest_file in manifest_list.entries() {
1110+
if !snapshot_ids.contains(&manifest_file.added_snapshot_id) {
1111+
continue;
1112+
}
1113+
let manifest = object_cache.get_manifest(&manifest_file).await?;
1114+
let entries = manifest.entries().into_iter().cloned().filter(|entry| {
1115+
matches!(entry.status(), ManifestStatus::Added)
1116+
&& (
1117+
// Is it possible that the snapshot id here is not contained?
1118+
entry.snapshot_id().is_none()
1119+
|| snapshot_ids.contains(&entry.snapshot_id().unwrap())
1120+
)
1121+
});
1122+
added_files.extend(entries);
1123+
}
1124+
}
1125+
1126+
Ok(added_files)
1127+
}
1128+
9501129
#[cfg(test)]
9511130
mod tests {
9521131
use std::collections::HashMap;
@@ -1709,7 +1888,6 @@ mod tests {
17091888
let mut values = vec![2; 512];
17101889
values.append(vec![3; 200].as_mut());
17111890
values.append(vec![4; 300].as_mut());
1712-
values.append(vec![5; 12].as_mut());
17131891
let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
17141892
assert_eq!(col, &expected_y);
17151893

0 commit comments

Comments
 (0)