Skip to content

Commit dd1218b

Browse files
committed
feat: add eq delete parse test and fix logic bug
1 parent ae0fea1 commit dd1218b

File tree

1 file changed

+83
-5
lines changed

1 file changed

+83
-5
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

+83-5
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use std::collections::HashMap;
1919
use std::future::Future;
20-
use std::ops::BitOrAssign;
20+
use std::ops::{BitOrAssign, Not};
2121
use std::pin::Pin;
2222
use std::sync::{Arc, OnceLock, RwLock};
2323
use std::task::{Context, Poll};
@@ -413,10 +413,10 @@ impl DeleteFileManager {
413413
}
414414
}
415415
}
416-
result_predicate = result_predicate.or(row_predicate);
416+
result_predicate = result_predicate.and(row_predicate.not());
417417
}
418418
}
419-
Ok(result_predicate)
419+
Ok(result_predicate.rewrite_not())
420420
}
421421

422422
/// Builds eq delete predicate for the provided task.
@@ -596,7 +596,7 @@ mod tests {
596596
.build()
597597
.unwrap();
598598

599-
let file_scan_tasks = setup(table_location);
599+
let file_scan_tasks = setup_load_deletes_test_tasks(table_location);
600600

601601
delete_file_manager
602602
.load_deletes(&file_scan_tasks[0].deletes, file_io, 5)
@@ -614,7 +614,31 @@ mod tests {
614614
assert_eq!(result.len(), 3); // pos dels from pos del file 3
615615
}
616616

617-
fn setup(table_location: &Path) -> Vec<FileScanTask> {
617+
#[tokio::test]
618+
async fn test_delete_file_manager_parse_equality_deletes() {
619+
let tmp_dir = TempDir::new().unwrap();
620+
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
621+
let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
622+
623+
let eq_delete_file_path = setup_write_equality_delete_file_1(table_location);
624+
625+
let record_batch_stream =
626+
DeleteFileManager::parquet_to_batch_stream(&eq_delete_file_path, file_io.clone())
627+
.await
628+
.expect("could not get batch stream");
629+
630+
let parsed_eq_delete =
631+
DeleteFileManager::parse_equality_deletes_record_batch_stream(record_batch_stream)
632+
.await
633+
.expect("error parsing batch stream");
634+
println!("{}", parsed_eq_delete);
635+
636+
let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (y != 2)".to_string();
637+
638+
assert_eq!(parsed_eq_delete.to_string(), expected);
639+
}
640+
641+
fn setup_load_deletes_test_tasks(table_location: &Path) -> Vec<FileScanTask> {
618642
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
619643
let positional_delete_schema = create_pos_del_schema();
620644

@@ -751,4 +775,58 @@ mod tests {
751775
];
752776
Arc::new(arrow_schema::Schema::new(fields))
753777
}
778+
779+
fn setup_write_equality_delete_file_1(table_location: &str) -> String {
780+
let col_y_vals = vec![1, 2];
781+
let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
782+
783+
let col_z_vals = vec![Some(100), None];
784+
let col_z = Arc::new(Int64Array::from(col_z_vals)) as ArrayRef;
785+
786+
let col_a_vals = vec![Some("HELP"), None];
787+
let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef;
788+
789+
let equality_delete_schema = {
790+
let fields = vec![
791+
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, true).with_metadata(
792+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
793+
),
794+
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, true).with_metadata(
795+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
796+
),
797+
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata(
798+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
799+
),
800+
];
801+
Arc::new(arrow_schema::Schema::new(fields))
802+
};
803+
804+
let equality_deletes_to_write =
805+
RecordBatch::try_new(equality_delete_schema.clone(), vec![col_y, col_z, col_a])
806+
.unwrap();
807+
808+
let path = format!("{}/equality-deletes-1.parquet", &table_location);
809+
810+
let file = File::create(&path).unwrap();
811+
812+
let props = WriterProperties::builder()
813+
.set_compression(Compression::SNAPPY)
814+
.build();
815+
816+
let mut writer = ArrowWriter::try_new(
817+
file,
818+
equality_deletes_to_write.schema(),
819+
Some(props.clone()),
820+
)
821+
.unwrap();
822+
823+
writer
824+
.write(&equality_deletes_to_write)
825+
.expect("Writing batch");
826+
827+
// writer must be closed to write footer
828+
writer.close().unwrap();
829+
830+
path
831+
}
754832
}

0 commit comments

Comments
 (0)