Skip to content

Commit d6faf63

Browse files
committed
feat: support incremental scan between 2 snapshots (#13)
1 parent 9bef8de commit d6faf63

File tree

2 files changed

+194
-21
lines changed

2 files changed

+194
-21
lines changed

Diff for: crates/iceberg/src/scan/context.rs

+137-8
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::HashSet;
1819
use std::sync::Arc;
1920

2021
use futures::channel::mpsc::Sender;
2122
use futures::{SinkExt, TryFutureExt};
23+
use itertools::Itertools;
2224

2325
use crate::delete_file_index::DeleteFileIndex;
2426
use crate::expr::{Bind, BoundPredicate, Predicate};
@@ -28,11 +30,12 @@ use crate::scan::{
2830
PartitionFilterCache,
2931
};
3032
use crate::spec::{
31-
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef,
32-
TableMetadataRef,
33+
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList,
34+
ManifestStatus, Operation, SchemaRef, SnapshotRef, TableMetadataRef,
3335
};
3436
use crate::{Error, ErrorKind, Result};
3537

38+
type ManifestEntryFilterFn = dyn Fn(&ManifestEntryRef) -> bool + Send + Sync;
3639
/// Wraps a [`ManifestFile`] alongside the objects that are needed
3740
/// to process it in a thread-safe manner
3841
pub(crate) struct ManifestFileContext {
@@ -46,6 +49,10 @@ pub(crate) struct ManifestFileContext {
4649
snapshot_schema: SchemaRef,
4750
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
4851
delete_file_index: Option<DeleteFileIndex>,
52+
53+
/// filter manifest entries.
54+
/// Used for different kind of scans, e.g., only scan newly added files without delete files.
55+
filter_fn: Option<Arc<ManifestEntryFilterFn>>,
4956
}
5057

5158
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -74,12 +81,13 @@ impl ManifestFileContext {
7481
mut sender,
7582
expression_evaluator_cache,
7683
delete_file_index,
77-
..
84+
filter_fn,
7885
} = self;
86+
let filter_fn = filter_fn.unwrap_or_else(|| Arc::new(|_| true));
7987

8088
let manifest = object_cache.get_manifest(&manifest_file).await?;
8189

82-
for manifest_entry in manifest.entries() {
90+
for manifest_entry in manifest.entries().iter().filter(|e| filter_fn(e)) {
8391
let manifest_entry_context = ManifestEntryContext {
8492
// TODO: refactor to avoid the expensive ManifestEntry clone
8593
manifest_entry: manifest_entry.clone(),
@@ -153,6 +161,11 @@ pub(crate) struct PlanContext {
153161
pub partition_filter_cache: Arc<PartitionFilterCache>,
154162
pub manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
155163
pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
164+
165+
// for incremental scan.
166+
// If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`.
167+
pub from_snapshot_id: Option<i64>,
168+
pub to_snapshot_id: Option<i64>,
156169
}
157170

158171
impl PlanContext {
@@ -184,18 +197,77 @@ impl PlanContext {
184197
Ok(partition_filter)
185198
}
186199

187-
pub(crate) fn build_manifest_file_contexts(
200+
pub(crate) async fn build_manifest_file_contexts(
188201
&self,
189-
manifest_list: Arc<ManifestList>,
190202
tx_data: Sender<ManifestEntryContext>,
191203
delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<ManifestEntryContext>)>,
192204
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
193-
let manifest_files = manifest_list.entries().iter();
205+
let mut filter_fn: Option<Arc<ManifestEntryFilterFn>> = None;
206+
let manifest_files = {
207+
if let Some(to_snapshot_id) = self.to_snapshot_id {
208+
// Incremental scan mode:
209+
// Get all added files between two snapshots.
210+
// - data files in `Append` and `Overwrite` snapshots are included.
211+
// - delete files are ignored
212+
// - `Replace` snapshots (e.g., compaction) are ignored.
213+
//
214+
// `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive.
215+
216+
// prevent misuse
217+
assert!(
218+
delete_file_idx_and_tx.is_none(),
219+
"delete file is not supported in incremental scan mode"
220+
);
221+
222+
let snapshots =
223+
ancestors_between(&self.table_metadata, to_snapshot_id, self.from_snapshot_id)
224+
.filter(|snapshot| {
225+
matches!(
226+
snapshot.summary().operation,
227+
Operation::Append | Operation::Overwrite
228+
)
229+
})
230+
.collect_vec();
231+
let snapshot_ids: HashSet<i64> = snapshots
232+
.iter()
233+
.map(|snapshot| snapshot.snapshot_id())
234+
.collect();
235+
236+
let mut manifest_files = vec![];
237+
for snapshot in snapshots {
238+
let manifest_list = self
239+
.object_cache
240+
.get_manifest_list(&snapshot, &self.table_metadata)
241+
.await?;
242+
for entry in manifest_list.entries() {
243+
if !snapshot_ids.contains(&entry.added_snapshot_id) {
244+
continue;
245+
}
246+
manifest_files.push(entry.clone());
247+
}
248+
}
249+
250+
filter_fn = Some(Arc::new(move |entry: &ManifestEntryRef| {
251+
matches!(entry.status(), ManifestStatus::Added)
252+
&& matches!(entry.data_file().content_type(), DataContentType::Data)
253+
&& (
254+
// Is it possible that the snapshot id here is not contained?
255+
entry.snapshot_id().is_none()
256+
|| snapshot_ids.contains(&entry.snapshot_id().unwrap())
257+
)
258+
}));
259+
260+
manifest_files
261+
} else {
262+
let manifest_list = self.get_manifest_list().await?;
263+
manifest_list.entries().to_vec()
264+
}
265+
};
194266

195267
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
196268
let mut filtered_mfcs = vec![];
197269

198-
for manifest_file in manifest_files {
270+
for manifest_file in &manifest_files {
199271
let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes {
200272
let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else {
201273
continue;
@@ -234,6 +306,7 @@ impl PlanContext {
234306
partition_bound_predicate,
235307
tx,
236308
delete_file_idx,
309+
filter_fn.clone(),
237310
);
238311

239312
filtered_mfcs.push(Ok(mfc));
@@ -248,6 +321,7 @@ impl PlanContext {
248321
partition_filter: Option<Arc<BoundPredicate>>,
249322
sender: Sender<ManifestEntryContext>,
250323
delete_file_index: Option<DeleteFileIndex>,
324+
filter_fn: Option<Arc<ManifestEntryFilterFn>>,
251325
) -> ManifestFileContext {
252326
let bound_predicates =
253327
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =
@@ -270,6 +344,61 @@ impl PlanContext {
270344
field_ids: self.field_ids.clone(),
271345
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
272346
delete_file_index,
347+
filter_fn,
273348
}
274349
}
275350
}
351+
352+
struct Ancestors {
353+
next: Option<SnapshotRef>,
354+
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
355+
}
356+
357+
impl Iterator for Ancestors {
358+
type Item = SnapshotRef;
359+
360+
fn next(&mut self) -> Option<Self::Item> {
361+
let snapshot = self.next.take()?;
362+
let result = snapshot.clone();
363+
self.next = snapshot
364+
.parent_snapshot_id()
365+
.and_then(|id| (self.get_snapshot)(id));
366+
Some(result)
367+
}
368+
}
369+
370+
/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
371+
fn ancestors_of(
372+
table_metadata: &TableMetadataRef,
373+
snapshot: i64,
374+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
375+
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
376+
let table_metadata = table_metadata.clone();
377+
Box::new(Ancestors {
378+
next: Some(snapshot.clone()),
379+
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
380+
})
381+
} else {
382+
Box::new(std::iter::empty())
383+
}
384+
}
385+
386+
/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
387+
fn ancestors_between(
388+
table_metadata: &TableMetadataRef,
389+
latest_snapshot_id: i64,
390+
oldest_snapshot_id: Option<i64>,
391+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
392+
let Some(oldest_snapshot_id) = oldest_snapshot_id else {
393+
return Box::new(ancestors_of(table_metadata, latest_snapshot_id));
394+
};
395+
396+
if latest_snapshot_id == oldest_snapshot_id {
397+
return Box::new(std::iter::empty());
398+
}
399+
400+
Box::new(
401+
ancestors_of(table_metadata, latest_snapshot_id)
402+
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
403+
)
404+
}

Diff for: crates/iceberg/src/scan/mod.rs

+57-13
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod cache;
2121
use cache::*;
2222
mod context;
2323
use context::*;
24+
pub use task::*;
2425
mod task;
2526

2627
use std::sync::Arc;
@@ -29,7 +30,6 @@ use arrow_array::RecordBatch;
2930
use futures::channel::mpsc::{channel, Sender};
3031
use futures::stream::BoxStream;
3132
use futures::{SinkExt, StreamExt, TryStreamExt};
32-
pub use task::*;
3333

3434
use crate::arrow::ArrowReaderBuilder;
3535
use crate::delete_file_index::DeleteFileIndex;
@@ -51,6 +51,10 @@ pub struct TableScanBuilder<'a> {
5151
// Defaults to none which means select all columns
5252
column_names: Option<Vec<String>>,
5353
snapshot_id: Option<i64>,
54+
/// Exclusive. Used for incremental scan.
55+
from_snapshot_id: Option<i64>,
56+
/// Inclusive. Used for incremental scan.
57+
to_snapshot_id: Option<i64>,
5458
batch_size: Option<usize>,
5559
case_sensitive: bool,
5660
filter: Option<Predicate>,
@@ -74,6 +78,8 @@ impl<'a> TableScanBuilder<'a> {
7478
table,
7579
column_names: None,
7680
snapshot_id: None,
81+
from_snapshot_id: None,
82+
to_snapshot_id: None,
7783
batch_size: None,
7884
case_sensitive: true,
7985
filter: None,
@@ -136,6 +142,18 @@ impl<'a> TableScanBuilder<'a> {
136142
self
137143
}
138144

145+
/// Set the starting snapshot id (exclusive) for incremental scan.
146+
pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self {
147+
self.from_snapshot_id = Some(from_snapshot_id);
148+
self
149+
}
150+
151+
/// Set the ending snapshot id (inclusive) for incremental scan.
152+
pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self {
153+
self.to_snapshot_id = Some(to_snapshot_id);
154+
self
155+
}
156+
139157
/// Sets the concurrency limit for both manifest files and manifest
140158
/// entries for this scan
141159
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
@@ -202,6 +220,32 @@ impl<'a> TableScanBuilder<'a> {
202220

203221
/// Build the table scan.
204222
pub fn build(self) -> Result<TableScan> {
223+
// Validate that we have either a snapshot scan or an incremental scan configuration
224+
if self.from_snapshot_id.is_some() || self.to_snapshot_id.is_some() {
225+
// For incremental scan, we need to_snapshot_id to be set. from_snapshot_id is optional.
226+
if self.to_snapshot_id.is_none() {
227+
return Err(Error::new(
228+
ErrorKind::DataInvalid,
229+
"Incremental scan requires to_snapshot_id to be set",
230+
));
231+
}
232+
233+
// snapshot_id should not be set for incremental scan
234+
if self.snapshot_id.is_some() {
235+
return Err(Error::new(
236+
ErrorKind::DataInvalid,
237+
"snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead.",
238+
));
239+
}
240+
241+
if self.delete_file_processing_enabled {
242+
return Err(Error::new(
243+
ErrorKind::DataInvalid,
244+
"delete_file_processing_enabled should not be set for incremental scan",
245+
));
246+
}
247+
}
248+
205249
let snapshot = match self.snapshot_id {
206250
Some(snapshot_id) => self
207251
.table
@@ -223,7 +267,6 @@ impl<'a> TableScanBuilder<'a> {
223267
})?
224268
.clone(),
225269
};
226-
227270
let schema = snapshot.schema(self.table.metadata())?;
228271

229272
// Check that all column names exist in the schema.
@@ -293,6 +336,8 @@ impl<'a> TableScanBuilder<'a> {
293336
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
294337
object_cache: self.table.object_cache(),
295338
field_ids: Arc::new(field_ids),
339+
from_snapshot_id: self.from_snapshot_id,
340+
to_snapshot_id: self.to_snapshot_id,
296341
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
297342
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
298343
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
@@ -359,18 +404,18 @@ impl TableScan {
359404
None
360405
};
361406

362-
let manifest_list = self.plan_context.get_manifest_list().await?;
363-
364407
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
365408
// whose partitions cannot match this
366409
// scan's filter
367-
let manifest_file_contexts = self.plan_context.build_manifest_file_contexts(
368-
manifest_list,
369-
manifest_entry_data_ctx_tx,
370-
delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
371-
(delete_file_idx.clone(), manifest_entry_delete_ctx_tx)
372-
}),
373-
)?;
410+
let manifest_file_contexts = self
411+
.plan_context
412+
.build_manifest_file_contexts(
413+
manifest_entry_data_ctx_tx,
414+
delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
415+
(delete_file_idx.clone(), manifest_entry_delete_ctx_tx)
416+
}),
417+
)
418+
.await?;
374419

375420
let mut channel_for_manifest_error = file_scan_task_tx.clone();
376421

@@ -387,8 +432,6 @@ impl TableScan {
387432
}
388433
});
389434

390-
let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
391-
392435
if let Some((_, delete_file_tx)) = delete_file_idx_and_tx {
393436
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
394437

@@ -417,6 +460,7 @@ impl TableScan {
417460
.await;
418461
}
419462

463+
let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
420464
// Process the data file [`ManifestEntry`] stream in parallel
421465
spawn(async move {
422466
let result = manifest_entry_data_ctx_rx

0 commit comments

Comments
 (0)