Skip to content

Commit 6dc7880

Browse files
phillipleblanclukekim
authored andcommitted
feat: Add RowDeltaAction for row-level deletes via delete files (#28)
Add a new RowDeltaAction transaction action that supports adding both data files and delete files (equality and positional) in a single atomic operation. This enables row-level deletes using Iceberg's merge-on-read strategy. Changes: - Add RowDeltaAction in transaction/row_delta.rs with full test coverage - Extend SnapshotProducer to support writing delete manifests alongside data manifests - Add Transaction::row_delta() method for creating row delta actions - Operation is Delete when only delete files are added, Overwrite when both data and delete files are added - Validates format version >= 2 for delete files - Fix pre-existing test compilation errors (missing limit field)
1 parent 78c9b2c commit 6dc7880

5 files changed

Lines changed: 681 additions & 5 deletions

File tree

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,7 @@ mod tests {
940940
partition_spec: None,
941941
name_mapping: None,
942942
case_sensitive: false,
943+
limit: None,
943944
};
944945

945946
// Load the deletes - should handle both types without error

crates/iceberg/src/arrow/reader.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2411,6 +2411,7 @@ message schema {
24112411
partition_spec: None,
24122412
name_mapping: None,
24132413
case_sensitive: false,
2414+
limit: None,
24142415
};
24152416

24162417
// Task 2: read the second and third row groups
@@ -2428,6 +2429,7 @@ message schema {
24282429
partition_spec: None,
24292430
name_mapping: None,
24302431
case_sensitive: false,
2432+
limit: None,
24312433
};
24322434

24332435
let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
@@ -2556,6 +2558,7 @@ message schema {
25562558
partition_spec: None,
25572559
name_mapping: None,
25582560
case_sensitive: false,
2561+
limit: None,
25592562
})]
25602563
.into_iter(),
25612564
)) as FileScanTaskStream;
@@ -2728,6 +2731,7 @@ message schema {
27282731
partition_spec: None,
27292732
name_mapping: None,
27302733
case_sensitive: false,
2734+
limit: None,
27312735
};
27322736

27332737
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
@@ -2946,6 +2950,7 @@ message schema {
29462950
partition_spec: None,
29472951
name_mapping: None,
29482952
case_sensitive: false,
2953+
limit: None,
29492954
};
29502955

29512956
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
@@ -3157,6 +3162,7 @@ message schema {
31573162
partition_spec: None,
31583163
name_mapping: None,
31593164
case_sensitive: false,
3165+
limit: None,
31603166
};
31613167

31623168
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
@@ -3261,6 +3267,7 @@ message schema {
32613267
partition_spec: None,
32623268
name_mapping: None,
32633269
case_sensitive: false,
3270+
limit: None,
32643271
})]
32653272
.into_iter(),
32663273
)) as FileScanTaskStream;
@@ -3359,6 +3366,7 @@ message schema {
33593366
partition_spec: None,
33603367
name_mapping: None,
33613368
case_sensitive: false,
3369+
limit: None,
33623370
})]
33633371
.into_iter(),
33643372
)) as FileScanTaskStream;
@@ -3446,6 +3454,7 @@ message schema {
34463454
partition_spec: None,
34473455
name_mapping: None,
34483456
case_sensitive: false,
3457+
limit: None,
34493458
})]
34503459
.into_iter(),
34513460
)) as FileScanTaskStream;
@@ -3547,6 +3556,7 @@ message schema {
35473556
partition_spec: None,
35483557
name_mapping: None,
35493558
case_sensitive: false,
3559+
limit: None,
35503560
})]
35513561
.into_iter(),
35523562
)) as FileScanTaskStream;
@@ -3677,6 +3687,7 @@ message schema {
36773687
partition_spec: None,
36783688
name_mapping: None,
36793689
case_sensitive: false,
3690+
limit: None,
36803691
})]
36813692
.into_iter(),
36823693
)) as FileScanTaskStream;
@@ -3774,6 +3785,7 @@ message schema {
37743785
partition_spec: None,
37753786
name_mapping: None,
37763787
case_sensitive: false,
3788+
limit: None,
37773789
})]
37783790
.into_iter(),
37793791
)) as FileScanTaskStream;
@@ -3884,6 +3896,7 @@ message schema {
38843896
partition_spec: None,
38853897
name_mapping: None,
38863898
case_sensitive: false,
3899+
limit: None,
38873900
})]
38883901
.into_iter(),
38893902
)) as FileScanTaskStream;
@@ -4024,6 +4037,7 @@ message schema {
40244037
partition_spec: Some(partition_spec),
40254038
name_mapping: None,
40264039
case_sensitive: false,
4040+
limit: None,
40274041
})]
40284042
.into_iter(),
40294043
)) as FileScanTaskStream;

crates/iceberg/src/transaction/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ mod action;
5454

5555
pub use action::*;
5656
mod append;
57+
mod row_delta;
5758
mod snapshot;
5859
mod sort_order;
5960
mod update_location;
@@ -71,6 +72,7 @@ use crate::spec::TableProperties;
7172
use crate::table::Table;
7273
use crate::transaction::action::BoxedTransactionAction;
7374
use crate::transaction::append::FastAppendAction;
75+
use crate::transaction::row_delta::RowDeltaAction;
7476
use crate::transaction::sort_order::ReplaceSortOrderAction;
7577
use crate::transaction::update_location::UpdateLocationAction;
7678
use crate::transaction::update_properties::UpdatePropertiesAction;
@@ -141,6 +143,11 @@ impl Transaction {
141143
FastAppendAction::new()
142144
}
143145

146+
/// Creates a row delta action for adding both data files and delete files.
147+
pub fn row_delta(&self) -> RowDeltaAction {
148+
RowDeltaAction::new()
149+
}
150+
144151
/// Creates replace sort order action.
145152
pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
146153
ReplaceSortOrderAction::new()

0 commit comments

Comments
 (0)