Skip to content

Commit 681a2c0

Browse files
committed
feat: add delta writer for row-level changes
This commit adds support for row-level changes (inserts, updates, and deletes) to Iceberg tables through a new DeltaWriter implementation. This enables CDC (Change Data Capture), upsert operations, and efficient row-level deletions. Key Components: 1. RowDeltaAction Transaction - New transaction type for applying row-level changes atomically - Supports both data files and delete files in a single transaction - Located in transaction/row_delta.rs 2. Position Delete Writer - Writes position delete files for efficient row-level deletions - Deletes rows by file path and row position - Located in writer/base_writer/position_delete_writer.rs 3. Delta Writer (Combined Writer) - Orchestrates data file, position delete, and equality delete writers - Intelligently routes operations based on row tracking - Memory-bounded row tracking with configurable limits - Falls back to equality deletes for older/evicted rows - Located in writer/combined_writer/delta_writer.rs Input Format: The DeltaWriter expects RecordBatch with an operations column: - Value 1 = Insert/Update (write to data file) - Value -1 = Delete (write to delete file) Memory Management: - Tracks recently written rows for efficient position deletes - Configurable max_seen_rows limit (default: 100,000) - FIFO eviction when limit is reached - Can be disabled (set to 0) to use only equality deletes
1 parent c720895 commit 681a2c0

File tree

23 files changed

+2089
-44
lines changed

23 files changed

+2089
-44
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ arrow-array = "57.1"
4747
arrow-buffer = "57.1"
4848
arrow-cast = "57.1"
4949
arrow-ord = "57.1"
50+
arrow-row = "57.1"
5051
arrow-schema = "57.1"
5152
arrow-select = "57.1"
5253
arrow-string = "57.1"

crates/catalog/rest/src/catalog.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,13 @@ impl Catalog for RestCatalog {
716716
})
717717
.build()?;
718718

719+
println!(
720+
"Create table request: {:?}",
721+
request
722+
.body()
723+
.map(|b| String::from_utf8_lossy(b.as_bytes().unwrap_or(&[])))
724+
);
725+
719726
let http_response = context.client.query_catalog(request).await?;
720727

721728
let response = match http_response.status() {

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ arrow-array = { workspace = true }
4949
arrow-buffer = { workspace = true }
5050
arrow-cast = { workspace = true }
5151
arrow-ord = { workspace = true }
52+
arrow-row = { workspace = true }
5253
arrow-schema = { workspace = true }
5354
arrow-select = { workspace = true }
5455
arrow-string = { workspace = true }

crates/iceberg/src/transaction/append.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,19 @@ impl TransactionAction for FastAppendAction {
9090
self.key_metadata.clone(),
9191
self.snapshot_properties.clone(),
9292
self.added_data_files.clone(),
93+
vec![], // fast append doesn't support delete files
9394
);
9495

95-
// validate added files
96-
snapshot_producer.validate_added_data_files()?;
96+
// validate added files - ensure they are Data content type
97+
for data_file in &self.added_data_files {
98+
if data_file.content_type() != crate::spec::DataContentType::Data {
99+
return Err(crate::Error::new(
100+
crate::ErrorKind::DataInvalid,
101+
"Only data content type is allowed for fast append",
102+
));
103+
}
104+
}
105+
snapshot_producer.validate_added_data_files(&self.added_data_files)?;
97106

98107
// Checks duplicate files
99108
if self.check_duplicate {

crates/iceberg/src/transaction/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ mod action;
5454

5555
pub use action::*;
5656
mod append;
57+
mod row_delta;
5758
mod snapshot;
5859
mod sort_order;
5960
mod update_location;
@@ -71,6 +72,7 @@ use crate::spec::TableProperties;
7172
use crate::table::Table;
7273
use crate::transaction::action::BoxedTransactionAction;
7374
use crate::transaction::append::FastAppendAction;
75+
use crate::transaction::row_delta::RowDeltaAction;
7476
use crate::transaction::sort_order::ReplaceSortOrderAction;
7577
use crate::transaction::update_location::UpdateLocationAction;
7678
use crate::transaction::update_properties::UpdatePropertiesAction;
@@ -141,6 +143,16 @@ impl Transaction {
141143
FastAppendAction::new()
142144
}
143145

146+
/// Creates a row delta action for row-level changes.
147+
///
148+
/// Use this action for:
149+
/// - CDC (Change Data Capture) ingestion
150+
/// - Upsert operations
151+
/// - Adding delete files (position or equality deletes)
152+
pub fn row_delta(&self) -> RowDeltaAction {
153+
RowDeltaAction::new()
154+
}
155+
144156
/// Creates replace sort order action.
145157
pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
146158
ReplaceSortOrderAction::new()

0 commit comments

Comments
 (0)