Skip to content

feat: support incremental scan between 2 snapshots #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 213 additions & 3 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

//! Table scan api.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};

use arrow_array::RecordBatch;
use futures::channel::mpsc::{channel, Sender};
use futures::stream::BoxStream;
use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use crate::arrow::ArrowReaderBuilder;
Expand All @@ -38,7 +39,7 @@ use crate::io::FileIO;
use crate::runtime::spawn;
use crate::spec::{
DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile,
ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
ManifestList, ManifestStatus, Operation, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::utils::available_parallelism;
Expand All @@ -55,6 +56,10 @@ pub struct TableScanBuilder<'a> {
// Defaults to none which means select all columns
column_names: Option<Vec<String>>,
snapshot_id: Option<i64>,
/// Exclusive. Used for incremental scan.
from_snapshot_id: Option<i64>,
/// Inclusive. Used for incremental scan.
to_snapshot_id: Option<i64>,
batch_size: Option<usize>,
case_sensitive: bool,
filter: Option<Predicate>,
Expand All @@ -78,6 +83,8 @@ impl<'a> TableScanBuilder<'a> {
table,
column_names: None,
snapshot_id: None,
from_snapshot_id: None,
to_snapshot_id: None,
batch_size: None,
case_sensitive: true,
filter: None,
Expand Down Expand Up @@ -140,6 +147,18 @@ impl<'a> TableScanBuilder<'a> {
self
}

/// Set the starting snapshot id (exclusive) for incremental scan.
pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self {
self.from_snapshot_id = Some(from_snapshot_id);
self
}

/// Set the ending snapshot id (inclusive) for incremental scan.
pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self {
self.to_snapshot_id = Some(to_snapshot_id);
self
}

/// Sets the concurrency limit for both manifest files and manifest
/// entries for this scan
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
Expand Down Expand Up @@ -206,6 +225,25 @@ impl<'a> TableScanBuilder<'a> {

/// Build the table scan.
pub fn build(self) -> Result<TableScan> {
// Validate that we have either a snapshot scan or an incremental scan configuration
if self.from_snapshot_id.is_some() || self.to_snapshot_id.is_some() {
// For incremental scan, we need to_snapshot_id to be set. from_snapshot_id is optional.
if self.to_snapshot_id.is_none() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Incremental scan requires to_snapshot_id to be set",
));
}

// snapshot_id should not be set for incremental scan
if self.snapshot_id.is_some() {
return Err(Error::new(
ErrorKind::DataInvalid,
"snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead.",
));
}
}

let snapshot = match self.snapshot_id {
Some(snapshot_id) => self
.table
Expand All @@ -227,7 +265,6 @@ impl<'a> TableScanBuilder<'a> {
})?
.clone(),
};

let schema = snapshot.schema(self.table.metadata())?;

// Check that all column names exist in the schema.
Expand Down Expand Up @@ -297,6 +334,8 @@ impl<'a> TableScanBuilder<'a> {
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
object_cache: self.table.object_cache(),
field_ids: Arc::new(field_ids),
from_snapshot_id: self.from_snapshot_id,
to_snapshot_id: self.to_snapshot_id,
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
Expand Down Expand Up @@ -358,6 +397,11 @@ struct PlanContext {
partition_filter_cache: Arc<PartitionFilterCache>,
manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,

// for incremental scan.
// If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`.
from_snapshot_id: Option<i64>,
to_snapshot_id: Option<i64>,
}

impl TableScan {
Expand All @@ -375,6 +419,65 @@ impl TableScan {
// used to stream the results back to the caller
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);

if let Some(to_snapshot_id) = self.plan_context.to_snapshot_id {
// Incremental scan mode
let added_files = added_files_between(
&self.plan_context.object_cache,
&self.plan_context.table_metadata,
to_snapshot_id,
self.plan_context.from_snapshot_id,
)
.await?;

for entry in added_files {
let manifest_entry_context = ManifestEntryContext {
manifest_entry: entry,
expression_evaluator_cache: self
.plan_context
.expression_evaluator_cache
.clone(),
field_ids: self.plan_context.field_ids.clone(),
bound_predicates: None, // TODO: support predicates in incremental scan
partition_spec_id: 0, // TODO: get correct partition spec id
// It's used to skip any data file whose partition data indicates that it can't contain
// any data that matches this scan's filter
snapshot_schema: self.plan_context.snapshot_schema.clone(),
// delete is not supported in incremental scan
delete_file_index: None,
};

manifest_entry_data_ctx_tx
.clone()
.send(manifest_entry_context)
.await

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking over the code, and I’m wondering if there might be a chance for the logic to get stuck here when the channel’s buffer size is smaller than the number of entries.
It seems like the channel could block once the buffer fills up, and since the consumers only start after all entries are sent, it might cause a hang.
I could be missing something, though—do you think this could be an issue, or is there a detail I might not be seeing? I’d really appreciate your thoughts!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, curious how did you find here 👀

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came across your PR while browsing through this repo, and it caught my attention because I'm looking for a way to implement incremental ingestion for an Iceberg table using Rust. Great work on it!

Copy link
Member Author

@xxchan xxchan Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think your insight is correct here. I've refined implementation in #36

.map_err(|_| Error::new(ErrorKind::Unexpected, "mpsc channel SendError"))?;
}

let mut channel_for_manifest_entry_error = file_scan_task_tx.clone();

// Process the [`ManifestEntry`] stream in parallel
spawn(async move {
let result = manifest_entry_data_ctx_rx
.map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
.try_for_each_concurrent(
concurrency_limit_manifest_entries,
|(manifest_entry_context, tx)| async move {
spawn(async move {
Self::process_data_manifest_entry(manifest_entry_context, tx).await
})
.await
},
)
.await;

if let Err(error) = result {
let _ = channel_for_manifest_entry_error.send(Err(error)).await;
}
});

return Ok(file_scan_task_rx.boxed());
}

let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
if self.delete_file_processing_enabled {
Some(DeleteFileIndex::new())
Expand Down Expand Up @@ -1146,6 +1249,113 @@ impl FileScanTask {
}
}

struct Ancestors {
next: Option<SnapshotRef>,
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
}

impl Iterator for Ancestors {
type Item = SnapshotRef;

fn next(&mut self) -> Option<Self::Item> {
let snapshot = self.next.take()?;
let result = snapshot.clone();
self.next = snapshot
.parent_snapshot_id()
.and_then(|id| (self.get_snapshot)(id));
Some(result)
}
}

/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
fn ancestors_of(
table_metadata: &TableMetadataRef,
snapshot: i64,
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
let table_metadata = table_metadata.clone();
Box::new(Ancestors {
next: Some(snapshot.clone()),
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
})
} else {
Box::new(std::iter::empty())
}
}

/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
fn ancestors_between(
table_metadata: &TableMetadataRef,
latest_snapshot_id: i64,
oldest_snapshot_id: Option<i64>,
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
let Some(oldest_snapshot_id) = oldest_snapshot_id else {
return Box::new(ancestors_of(table_metadata, latest_snapshot_id));
};

if latest_snapshot_id == oldest_snapshot_id {
return Box::new(std::iter::empty());
}

Box::new(
ancestors_of(table_metadata, latest_snapshot_id)
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
)
}

/// Get all added files between two snapshots.
/// - data files in `Append` and `Overwrite` snapshots are included.
/// - delete files are ignored
/// - `Replace` snapshots (e.g., compaction) are ignored.
///
/// `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive.
async fn added_files_between(
object_cache: &ObjectCache,
table_metadata: &TableMetadataRef,
latest_snapshot_id: i64,
oldest_snapshot_id: Option<i64>,
) -> Result<Vec<ManifestEntryRef>> {
let mut added_files = vec![];

let snapshots = ancestors_between(table_metadata, latest_snapshot_id, oldest_snapshot_id)
.filter(|snapshot| {
matches!(
snapshot.summary().operation,
Operation::Append | Operation::Overwrite
)
})
.collect_vec();
let snapshot_ids: HashSet<i64> = snapshots
.iter()
.map(|snapshot| snapshot.snapshot_id())
.collect();

for snapshot in snapshots {
let manifest_list = object_cache
.get_manifest_list(&snapshot, table_metadata)
.await?;

for manifest_file in manifest_list.entries() {
if !snapshot_ids.contains(&manifest_file.added_snapshot_id) {
continue;
}
let manifest = object_cache.get_manifest(manifest_file).await?;
let entries = manifest.entries().iter().filter(|entry| {
matches!(entry.status(), ManifestStatus::Added)
&& matches!(entry.data_file().content_type(), DataContentType::Data)
&& (
// Is it possible that the snapshot id here is not contained?
entry.snapshot_id().is_none()
|| snapshot_ids.contains(&entry.snapshot_id().unwrap())
)
});
added_files.extend(entries.cloned());
}
}

Ok(added_files)
}

#[cfg(test)]
pub mod tests {
use std::collections::HashMap;
Expand Down
Loading