Skip to content

Commit dcfe6fa

Browse files
committed
feat: add equality delete parsing
1 parent 5d77476 commit dcfe6fa

File tree

1 file changed

+208
-8
lines changed

1 file changed

+208
-8
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

+208-8
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,27 @@
1717

1818
use std::collections::HashMap;
1919
use std::future::Future;
20+
use std::ops::Not;
2021
use std::pin::Pin;
2122
use std::sync::{Arc, OnceLock, RwLock};
2223
use std::task::{Context, Poll};
2324

25+
use arrow_array::{
26+
Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array,
27+
StringArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
28+
};
2429
use futures::channel::oneshot;
2530
use futures::future::join_all;
2631
use futures::{StreamExt, TryStreamExt};
32+
use itertools::Itertools;
2733

28-
use crate::arrow::ArrowReader;
34+
use crate::arrow::{arrow_schema_to_schema, ArrowReader};
2935
use crate::delete_vector::DeleteVector;
3036
use crate::expr::Predicate::AlwaysTrue;
31-
use crate::expr::{Bind, BoundPredicate, Predicate};
37+
use crate::expr::{Bind, BoundPredicate, Predicate, Reference};
3238
use crate::io::FileIO;
3339
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile};
34-
use crate::spec::DataContentType;
40+
use crate::spec::{DataContentType, Datum, NestedFieldRef, PrimitiveType};
3541
use crate::{Error, ErrorKind, Result};
3642

3743
#[allow(unused)]
@@ -301,11 +307,46 @@ impl CachingDeleteFileManager {
301307
///
302308
/// Returns an unbound Predicate for each batch stream
303309
async fn parse_equality_deletes_record_batch_stream(
304-
streams: ArrowRecordBatchStream,
310+
mut stream: ArrowRecordBatchStream,
305311
) -> Result<Predicate> {
306-
// TODO
312+
let mut result_predicate = AlwaysTrue;
313+
314+
while let Some(record_batch) = stream.next().await {
315+
let record_batch = record_batch?;
316+
317+
if record_batch.num_columns() == 0 {
318+
return Ok(AlwaysTrue);
319+
}
307320

308-
Ok(AlwaysTrue)
321+
let batch_schema_arrow = record_batch.schema();
322+
let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?;
323+
324+
let mut datum_columns_with_names: Vec<_> = record_batch
325+
.columns()
326+
.iter()
327+
.zip(batch_schema_iceberg.as_struct().fields())
328+
.map(|(column, field)| {
329+
let col_as_datum_vec = arrow_array_to_datum_iterator(column, field);
330+
col_as_datum_vec.map(|c| (c, field.name.to_string()))
331+
})
332+
.try_collect()?;
333+
334+
// consume all the iterators in lockstep, creating per-row predicates that get combined
335+
// into a single final predicate
336+
while datum_columns_with_names[0].0.len() > 0 {
337+
let mut row_predicate = AlwaysTrue;
338+
for (ref mut column, ref field_name) in &mut datum_columns_with_names {
339+
if let Some(item) = column.next() {
340+
if let Some(datum) = item? {
341+
row_predicate = row_predicate
342+
.and(Reference::new(field_name.clone()).equal_to(datum.clone()));
343+
}
344+
}
345+
}
346+
result_predicate = result_predicate.and(row_predicate.not());
347+
}
348+
}
349+
Ok(result_predicate.rewrite_not())
309350
}
310351

311352
/// Builds eq delete predicate for the provided task.
@@ -375,6 +416,83 @@ pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
375416
matches!(f.file_type, DataContentType::EqualityDeletes)
376417
}
377418

419+
macro_rules! prim_to_datum {
420+
($column:ident, $arr:ty, $dat:path) => {{
421+
let arr = $column.as_any().downcast_ref::<$arr>().ok_or(Error::new(
422+
ErrorKind::Unexpected,
423+
format!("could not downcast ArrayRef to {}", stringify!($arr)),
424+
))?;
425+
Ok(Box::new(arr.iter().map(|val| Ok(val.map($dat)))))
426+
}};
427+
}
428+
429+
fn eq_col_unsupported(ty: &str) -> Error {
430+
Error::new(
431+
ErrorKind::FeatureUnsupported,
432+
format!(
433+
"Equality deletes where a predicate acts upon a {} column are not yet supported",
434+
ty
435+
),
436+
)
437+
}
438+
439+
fn arrow_array_to_datum_iterator<'a>(
440+
column: &'a ArrayRef,
441+
field: &NestedFieldRef,
442+
) -> Result<Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>> + 'a>> {
443+
match field.field_type.as_primitive_type() {
444+
Some(primitive_type) => match primitive_type {
445+
PrimitiveType::Int => prim_to_datum!(column, Int32Array, Datum::int),
446+
PrimitiveType::Boolean => {
447+
prim_to_datum!(column, BooleanArray, Datum::bool)
448+
}
449+
PrimitiveType::Long => prim_to_datum!(column, Int64Array, Datum::long),
450+
PrimitiveType::Float => {
451+
prim_to_datum!(column, Float32Array, Datum::float)
452+
}
453+
PrimitiveType::Double => {
454+
prim_to_datum!(column, Float64Array, Datum::double)
455+
}
456+
PrimitiveType::String => {
457+
prim_to_datum!(column, StringArray, Datum::string)
458+
}
459+
PrimitiveType::Date => prim_to_datum!(column, Date32Array, Datum::date),
460+
PrimitiveType::Timestamp => {
461+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamp_micros)
462+
}
463+
PrimitiveType::Timestamptz => {
464+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamptz_micros)
465+
}
466+
PrimitiveType::TimestampNs => {
467+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamp_nanos)
468+
}
469+
PrimitiveType::TimestamptzNs => {
470+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamptz_nanos)
471+
}
472+
PrimitiveType::Time => {
473+
let arr = column
474+
.as_any()
475+
.downcast_ref::<Time64MicrosecondArray>()
476+
.ok_or(Error::new(
477+
ErrorKind::Unexpected,
478+
"could not downcast ArrayRef to Time64MicrosecondArray",
479+
))?;
480+
Ok(Box::new(arr.iter().map(|val| match val {
481+
None => Ok(None),
482+
Some(val) => Datum::time_micros(val).map(Some),
483+
})))
484+
}
485+
PrimitiveType::Decimal { .. } => Err(eq_col_unsupported("Decimal")),
486+
PrimitiveType::Uuid => Err(eq_col_unsupported("Uuid")),
487+
PrimitiveType::Fixed(_) => Err(eq_col_unsupported("Fixed")),
488+
PrimitiveType::Binary => Err(eq_col_unsupported("Binary")),
489+
},
490+
None => Err(eq_col_unsupported(
491+
"non-primitive (i.e. Struct, List, or Map)",
492+
)),
493+
}
494+
}
495+
378496
#[cfg(test)]
379497
mod tests {
380498
use std::fs::File;
@@ -409,15 +527,43 @@ mod tests {
409527
// the call to the loader does not fail.
410528
let delete_file_manager = CachingDeleteFileManager::new(file_io.clone(), 10);
411529

412-
let file_scan_tasks = setup(table_location);
530+
let file_scan_tasks = setup_load_deletes_test_tasks(table_location);
413531

414532
delete_file_manager
415533
.load_deletes(&file_scan_tasks[0].deletes)
416534
.await
417535
.unwrap();
418536
}
419537

420-
fn setup(table_location: &Path) -> Vec<FileScanTask> {
538+
#[tokio::test]
539+
async fn test_delete_file_manager_parse_equality_deletes() {
540+
let tmp_dir = TempDir::new().unwrap();
541+
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
542+
let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
543+
544+
let eq_delete_file_path = setup_write_equality_delete_file_1(table_location);
545+
546+
let record_batch_stream = CachingDeleteFileManager::parquet_to_batch_stream(
547+
&eq_delete_file_path,
548+
file_io.clone(),
549+
)
550+
.await
551+
.expect("could not get batch stream");
552+
553+
let parsed_eq_delete =
554+
CachingDeleteFileManager::parse_equality_deletes_record_batch_stream(
555+
record_batch_stream,
556+
)
557+
.await
558+
.expect("error parsing batch stream");
559+
println!("{}", parsed_eq_delete);
560+
561+
let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (y != 2)".to_string();
562+
563+
assert_eq!(parsed_eq_delete.to_string(), expected);
564+
}
565+
566+
fn setup_load_deletes_test_tasks(table_location: &Path) -> Vec<FileScanTask> {
421567
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
422568
let positional_delete_schema = create_pos_del_schema();
423569

@@ -524,4 +670,58 @@ mod tests {
524670
];
525671
Arc::new(arrow_schema::Schema::new(fields))
526672
}
673+
674+
fn setup_write_equality_delete_file_1(table_location: &str) -> String {
675+
let col_y_vals = vec![1, 2];
676+
let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
677+
678+
let col_z_vals = vec![Some(100), None];
679+
let col_z = Arc::new(Int64Array::from(col_z_vals)) as ArrayRef;
680+
681+
let col_a_vals = vec![Some("HELP"), None];
682+
let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef;
683+
684+
let equality_delete_schema = {
685+
let fields = vec![
686+
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, true).with_metadata(
687+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
688+
),
689+
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, true).with_metadata(
690+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
691+
),
692+
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata(
693+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
694+
),
695+
];
696+
Arc::new(arrow_schema::Schema::new(fields))
697+
};
698+
699+
let equality_deletes_to_write =
700+
RecordBatch::try_new(equality_delete_schema.clone(), vec![col_y, col_z, col_a])
701+
.unwrap();
702+
703+
let path = format!("{}/equality-deletes-1.parquet", &table_location);
704+
705+
let file = File::create(&path).unwrap();
706+
707+
let props = WriterProperties::builder()
708+
.set_compression(Compression::SNAPPY)
709+
.build();
710+
711+
let mut writer = ArrowWriter::try_new(
712+
file,
713+
equality_deletes_to_write.schema(),
714+
Some(props.clone()),
715+
)
716+
.unwrap();
717+
718+
writer
719+
.write(&equality_deletes_to_write)
720+
.expect("Writing batch");
721+
722+
// writer must be closed to write footer
723+
writer.close().unwrap();
724+
725+
path
726+
}
527727
}

0 commit comments

Comments
 (0)