Skip to content

feat: support incremental scan between 2 snapshots #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 204 additions & 3 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

//! Table scan api.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};

use arrow_array::RecordBatch;
use futures::channel::mpsc::{channel, Sender};
use futures::stream::BoxStream;
use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use crate::arrow::ArrowReaderBuilder;
Expand All @@ -38,7 +39,7 @@ use crate::io::FileIO;
use crate::runtime::spawn;
use crate::spec::{
DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile,
ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
ManifestList, ManifestStatus, Operation, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::utils::available_parallelism;
Expand All @@ -55,6 +56,10 @@ pub struct TableScanBuilder<'a> {
// Defaults to none which means select all columns
column_names: Option<Vec<String>>,
snapshot_id: Option<i64>,
/// Exclusive. Used for incremental scan.
from_snapshot_id: Option<i64>,
/// Inclusive. Used for incremental scan.
to_snapshot_id: Option<i64>,
batch_size: Option<usize>,
case_sensitive: bool,
filter: Option<Predicate>,
Expand All @@ -78,6 +83,8 @@ impl<'a> TableScanBuilder<'a> {
table,
column_names: None,
snapshot_id: None,
from_snapshot_id: None,
to_snapshot_id: None,
batch_size: None,
case_sensitive: true,
filter: None,
Expand Down Expand Up @@ -140,6 +147,18 @@ impl<'a> TableScanBuilder<'a> {
self
}

/// Set the starting snapshot id (exclusive) for incremental scan.
pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self {
self.from_snapshot_id = Some(from_snapshot_id);
self
}

/// Set the ending snapshot id (inclusive) for incremental scan.
pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self {
self.to_snapshot_id = Some(to_snapshot_id);
self
}

/// Sets the concurrency limit for both manifest files and manifest
/// entries for this scan
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
Expand Down Expand Up @@ -206,6 +225,25 @@ impl<'a> TableScanBuilder<'a> {

/// Build the table scan.
pub fn build(self) -> Result<TableScan> {
// Validate that we have either a snapshot scan or an incremental scan configuration
if self.from_snapshot_id.is_some() || self.to_snapshot_id.is_some() {
// For incremental scan, we need to_snapshot_id to be set. from_snapshot_id is optional.
if self.to_snapshot_id.is_none() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Incremental scan requires to_snapshot_id to be set",
));
}

// snapshot_id should not be set for incremental scan
if self.snapshot_id.is_some() {
return Err(Error::new(
ErrorKind::DataInvalid,
"snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead.",
));
}
}

let snapshot = match self.snapshot_id {
Some(snapshot_id) => self
.table
Expand All @@ -227,7 +265,6 @@ impl<'a> TableScanBuilder<'a> {
})?
.clone(),
};

let schema = snapshot.schema(self.table.metadata())?;

// Check that all column names exist in the schema.
Expand Down Expand Up @@ -297,6 +334,8 @@ impl<'a> TableScanBuilder<'a> {
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
object_cache: self.table.object_cache(),
field_ids: Arc::new(field_ids),
from_snapshot_id: self.from_snapshot_id,
to_snapshot_id: self.to_snapshot_id,
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
Expand Down Expand Up @@ -358,6 +397,11 @@ struct PlanContext {
partition_filter_cache: Arc<PartitionFilterCache>,
manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,

// for incremental scan.
// If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`.
from_snapshot_id: Option<i64>,
to_snapshot_id: Option<i64>,
}

impl TableScan {
Expand All @@ -375,6 +419,65 @@ impl TableScan {
// used to stream the results back to the caller
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);

if let Some(to_snapshot_id) = self.plan_context.to_snapshot_id {
// Incremental scan mode
let added_files = added_files_between(
&self.plan_context.object_cache,
&self.plan_context.table_metadata,
to_snapshot_id,
self.plan_context.from_snapshot_id,
)
.await?;

for entry in added_files {
let manifest_entry_context = ManifestEntryContext {
manifest_entry: entry,
expression_evaluator_cache: self
.plan_context
.expression_evaluator_cache
.clone(),
field_ids: self.plan_context.field_ids.clone(),
bound_predicates: None, // TODO: support predicates in incremental scan
partition_spec_id: 0, // TODO: get correct partition spec id
// It's used to skip any data file whose partition data indicates that it can't contain
// any data that matches this scan's filter
snapshot_schema: self.plan_context.snapshot_schema.clone(),
// delete is not supported in incremental scan
delete_file_index: None,
};

manifest_entry_data_ctx_tx
.clone()
.send(manifest_entry_context)
.await

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking over the code, and I’m wondering if there might be a chance for the logic to get stuck here when the channel’s buffer size is smaller than the number of entries.
It seems like the channel could block once the buffer fills up, and since the consumers only start after all entries are sent, it might cause a hang.
I could be missing something, though—do you think this could be an issue, or is there a detail I might not be seeing? I’d really appreciate your thoughts!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, curious how did you find here 👀

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came across your PR while browsing through this repo, and it caught my attention because I'm looking for a way to implement incremental ingestion for an Iceberg table using Rust. Great work on it!

Copy link
Member Author

@xxchan xxchan Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think your insight is correct here. I've refined implementation in #36

.map_err(|_| Error::new(ErrorKind::Unexpected, "mpsc channel SendError"))?;
}

let mut channel_for_manifest_entry_error = file_scan_task_tx.clone();

// Process the [`ManifestEntry`] stream in parallel
spawn(async move {
let result = manifest_entry_data_ctx_rx
.map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
.try_for_each_concurrent(
concurrency_limit_manifest_entries,
|(manifest_entry_context, tx)| async move {
spawn(async move {
Self::process_data_manifest_entry(manifest_entry_context, tx).await
})
.await
},
)
.await;

if let Err(error) = result {
let _ = channel_for_manifest_entry_error.send(Err(error)).await;
}
});

return Ok(file_scan_task_rx.boxed());
}

let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
if self.delete_file_processing_enabled {
Some(DeleteFileIndex::new())
Expand Down Expand Up @@ -1146,6 +1249,104 @@ impl FileScanTask {
}
}

struct Ancestors {
next: Option<SnapshotRef>,
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
}

impl Iterator for Ancestors {
type Item = SnapshotRef;

fn next(&mut self) -> Option<Self::Item> {
let snapshot = self.next.take()?;
let result = snapshot.clone();
self.next = snapshot
.parent_snapshot_id()
.and_then(|id| (self.get_snapshot)(id));
Some(result)
}
}

/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
fn ancestors_of(
table_metadata: &TableMetadataRef,
snapshot: i64,
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
let table_metadata = table_metadata.clone();
Box::new(Ancestors {
next: Some(snapshot.clone()),
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
})
} else {
Box::new(std::iter::empty())
}
}

/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
fn ancestors_between(
table_metadata: &TableMetadataRef,
latest_snapshot_id: i64,
oldest_snapshot_id: Option<i64>,
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
let Some(oldest_snapshot_id) = oldest_snapshot_id else {
return Box::new(ancestors_of(table_metadata, latest_snapshot_id));
};

if latest_snapshot_id == oldest_snapshot_id {
return Box::new(std::iter::empty());
}

Box::new(
ancestors_of(table_metadata, latest_snapshot_id)
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
)
}

/// Get all added files between two snapshots.
/// The files in `latest_snapshot_id` (inclusive) but not in `oldest_snapshot_id` (exclusive).
async fn added_files_between(
object_cache: &ObjectCache,
table_metadata: &TableMetadataRef,
latest_snapshot_id: i64,
oldest_snapshot_id: Option<i64>,
) -> Result<Vec<ManifestEntryRef>> {
let mut added_files = vec![];

let append_snapshots =
ancestors_between(table_metadata, latest_snapshot_id, oldest_snapshot_id)
.filter(|snapshot| matches!(snapshot.summary().operation, Operation::Append))
.collect_vec();
let snapshot_ids: HashSet<i64> = append_snapshots
.iter()
.map(|snapshot| snapshot.snapshot_id())
.collect();

for snapshot in append_snapshots {
let manifest_list = object_cache
.get_manifest_list(&snapshot, table_metadata)
.await?;

for manifest_file in manifest_list.entries() {
if !snapshot_ids.contains(&manifest_file.added_snapshot_id) {
continue;
}
let manifest = object_cache.get_manifest(manifest_file).await?;
let entries = manifest.entries().iter().filter(|entry| {
matches!(entry.status(), ManifestStatus::Added)
&& (
// Is it possible that the snapshot id here is not contained?
entry.snapshot_id().is_none()
|| snapshot_ids.contains(&entry.snapshot_id().unwrap())
)
});
Copy link
Collaborator

@chenzl25 chenzl25 Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think simply checking manifest status Added is not enough, since compaction will add an manifest entry with Added as well, however, we don't want to consume a compaction snapshot. We should detect whether a snapshot is a compaction snapshot. If the snapshot is a compaction, we should skip it. For compaction snapshot, I think we can just check if there is a manifest entry having ManifestStatus::Deleted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have filtered Append snapshots above:

let append_snapshots =
        ancestors_between(table_metadata, latest_snapshot_id, oldest_snapshot_id)
            .filter(|snapshot| matches!(snapshot.summary().operation, Operation::Append))

The definition of snapshot operation is:
(so compaction snapshot should be Replace)

/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
pub enum Operation {
    /// Only data files were added and no files were removed.
    Append,
    /// Data and delete files were added and removed without changing table data;
    /// i.e., compaction, changing the data file format, or relocating data files.
    Replace,
    /// Data and delete files were added and removed in a logical overwrite operation.
    Overwrite,
    /// Data files were removed and their contents logically deleted and/or delete files were added to delete rows.
    Delete,
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @Li0k

Copy link
Member Author

@xxchan xxchan Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a problem: Seems delete files are also included in Append snapshot. 🤔

Encountered an entry for a delete file in a data file manifest: 

ManifestEntry { status: Added, snapshot_id: Some(4746599406424318107), sequence_number: Some(4), file_sequence_number: Some(4), data_file: DataFile { content: EqualityDeletes, file_path: "s3://hummock001/iceberg_connection/public/t/data/11-00077-eq-del-0195a491-109c-75b1-a93c-e29d371c2d0b.parquet", file_format: Parquet, partition: Struct { fields: [], null_bitmap: BitVec<usize, bitvec::order::Lsb0> { addr: 0x8, head: 000000, bits: 0, capacity: 0 } [] }, record_count: 2, file_size_in_bytes: 680, column_sizes: {1: 82}, value_counts: {1: 2}, null_value_counts: {1: 0}, nan_value_counts: {}, lower_bounds: {1: Datum { type: Long, literal: Long(524235685507891200) }}, upper_bounds: {1: Datum { type: Long, literal: Long(524235873517572096) }}, key_metadata: None, split_offsets: [4], equality_ids: [1], sort_order_id: None } }

The java code uses a ignoreDeleted to further filter the manifest.
https://github.com/apache/iceberg/blob/6ec3de390d3fa6e797c6975b1eaaea41719db0fe/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L86

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// Only data files were added and no files were removed.
Append,

Indeed, by definition, there should be no delete files in an append snapshot. However, RisingWave's iceberg sink would include delete files to the append snapshot. Shall we change the snapshot operation to Overwrite if it is not an append-only commit? But once we change it to OverWrite, it means we can't read any incremental data between the snapshots. It seems we should add the added data of overwrite snapshot to the incremental read as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some facts I found:

image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ZENOTME If we commit delete files to the iceberg, it seems we should change the iceberg snapshot operation to OverWrite instead of Append.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's used in SparkChangelogTable https://iceberg.apache.org/docs/latest/spark-procedures/?h=changes#carry-over-rows

I think we should handle OverWrite as well. Ignoring delete files is acceptable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. kind of like "force-append-only"

added_files.extend(entries.cloned());
}
}

Ok(added_files)
}

#[cfg(test)]
pub mod tests {
use std::collections::HashMap;
Expand Down
Loading