Skip to content

Commit 368a65c

Browse files
committed
feat: add equality delete parsing
1 parent 0c6f842 commit 368a65c

File tree

1 file changed

+204
-10
lines changed

1 file changed

+204
-10
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

+204-10
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,27 @@
1717

1818
use std::collections::HashMap;
1919
use std::future::Future;
20-
use std::ops::BitOrAssign;
20+
use std::ops::{BitOrAssign, Not};
2121
use std::pin::Pin;
2222
use std::sync::{Arc, OnceLock, RwLock};
2323
use std::task::{Context, Poll};
2424

25-
use arrow_array::{Int64Array, StringArray};
25+
use arrow_array::{
26+
Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array,
27+
StringArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
28+
};
2629
use futures::channel::oneshot;
2730
use futures::future::join_all;
2831
use futures::{StreamExt, TryStreamExt};
32+
use itertools::Itertools;
2933
use roaring::RoaringTreemap;
3034

31-
use crate::arrow::ArrowReader;
35+
use crate::arrow::{arrow_schema_to_schema, ArrowReader};
3236
use crate::expr::Predicate::AlwaysTrue;
33-
use crate::expr::{Bind, BoundPredicate, Predicate};
37+
use crate::expr::{Bind, BoundPredicate, Predicate, Reference};
3438
use crate::io::FileIO;
3539
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile};
36-
use crate::spec::DataContentType;
40+
use crate::spec::{DataContentType, Datum, NestedFieldRef, PrimitiveType};
3741
use crate::{Error, ErrorKind, Result};
3842

3943
// Equality deletes may apply to more than one DataFile in a scan, and so
@@ -340,11 +344,46 @@ impl DeleteFileManager {
340344
///
341345
/// Returns an unbound Predicate for each batch stream
342346
async fn parse_equality_deletes_record_batch_stream(
343-
streams: ArrowRecordBatchStream,
347+
mut stream: ArrowRecordBatchStream,
344348
) -> Result<Predicate> {
345-
// TODO
349+
let mut result_predicate = AlwaysTrue;
350+
351+
while let Some(record_batch) = stream.next().await {
352+
let record_batch = record_batch?;
353+
354+
if record_batch.num_columns() == 0 {
355+
return Ok(AlwaysTrue);
356+
}
346357

347-
Ok(AlwaysTrue)
358+
let batch_schema_arrow = record_batch.schema();
359+
let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?;
360+
361+
let mut datum_columns_with_names: Vec<_> = record_batch
362+
.columns()
363+
.iter()
364+
.zip(batch_schema_iceberg.as_struct().fields())
365+
.map(|(column, field)| {
366+
let col_as_datum_vec = arrow_array_to_datum_iterator(column, field);
367+
col_as_datum_vec.map(|c| (c, field.name.to_string()))
368+
})
369+
.try_collect()?;
370+
371+
// consume all the iterators in lockstep, creating per-row predicates that get combined
372+
// into a single final predicate
373+
while datum_columns_with_names[0].0.len() > 0 {
374+
let mut row_predicate = AlwaysTrue;
375+
for (ref mut column, ref field_name) in &mut datum_columns_with_names {
376+
if let Some(item) = column.next() {
377+
if let Some(datum) = item? {
378+
row_predicate = row_predicate
379+
.and(Reference::new(field_name.clone()).equal_to(datum.clone()));
380+
}
381+
}
382+
}
383+
result_predicate = result_predicate.and(row_predicate.not());
384+
}
385+
}
386+
Ok(result_predicate.rewrite_not())
348387
}
349388

350389
/// Builds eq delete predicate for the provided task.
@@ -413,6 +452,83 @@ pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
413452
matches!(f.file_type, DataContentType::EqualityDeletes)
414453
}
415454

455+
macro_rules! prim_to_datum {
456+
($column:ident, $arr:ty, $dat:path) => {{
457+
let arr = $column.as_any().downcast_ref::<$arr>().ok_or(Error::new(
458+
ErrorKind::Unexpected,
459+
format!("could not downcast ArrayRef to {}", stringify!($arr)),
460+
))?;
461+
Ok(Box::new(arr.iter().map(|val| Ok(val.map($dat)))))
462+
}};
463+
}
464+
465+
fn eq_col_unsupported(ty: &str) -> Error {
466+
Error::new(
467+
ErrorKind::FeatureUnsupported,
468+
format!(
469+
"Equality deletes where a predicate acts upon a {} column are not yet supported",
470+
ty
471+
),
472+
)
473+
}
474+
475+
fn arrow_array_to_datum_iterator<'a>(
476+
column: &'a ArrayRef,
477+
field: &NestedFieldRef,
478+
) -> Result<Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>> + 'a>> {
479+
match field.field_type.as_primitive_type() {
480+
Some(primitive_type) => match primitive_type {
481+
PrimitiveType::Int => prim_to_datum!(column, Int32Array, Datum::int),
482+
PrimitiveType::Boolean => {
483+
prim_to_datum!(column, BooleanArray, Datum::bool)
484+
}
485+
PrimitiveType::Long => prim_to_datum!(column, Int64Array, Datum::long),
486+
PrimitiveType::Float => {
487+
prim_to_datum!(column, Float32Array, Datum::float)
488+
}
489+
PrimitiveType::Double => {
490+
prim_to_datum!(column, Float64Array, Datum::double)
491+
}
492+
PrimitiveType::String => {
493+
prim_to_datum!(column, StringArray, Datum::string)
494+
}
495+
PrimitiveType::Date => prim_to_datum!(column, Date32Array, Datum::date),
496+
PrimitiveType::Timestamp => {
497+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamp_micros)
498+
}
499+
PrimitiveType::Timestamptz => {
500+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamptz_micros)
501+
}
502+
PrimitiveType::TimestampNs => {
503+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamp_nanos)
504+
}
505+
PrimitiveType::TimestamptzNs => {
506+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamptz_nanos)
507+
}
508+
PrimitiveType::Time => {
509+
let arr = column
510+
.as_any()
511+
.downcast_ref::<Time64MicrosecondArray>()
512+
.ok_or(Error::new(
513+
ErrorKind::Unexpected,
514+
"could not downcast ArrayRef to Time64MicrosecondArray",
515+
))?;
516+
Ok(Box::new(arr.iter().map(|val| match val {
517+
None => Ok(None),
518+
Some(val) => Datum::time_micros(val).map(Some),
519+
})))
520+
}
521+
PrimitiveType::Decimal { .. } => Err(eq_col_unsupported("Decimal")),
522+
PrimitiveType::Uuid => Err(eq_col_unsupported("Uuid")),
523+
PrimitiveType::Fixed(_) => Err(eq_col_unsupported("Fixed")),
524+
PrimitiveType::Binary => Err(eq_col_unsupported("Binary")),
525+
},
526+
None => Err(eq_col_unsupported(
527+
"non-primitive (i.e. Struct, List, or Map)",
528+
)),
529+
}
530+
}
531+
416532
#[cfg(test)]
417533
mod tests {
418534
use std::fs::File;
@@ -447,15 +563,39 @@ mod tests {
447563
.build()
448564
.unwrap();
449565

450-
let file_scan_tasks = setup(table_location);
566+
let file_scan_tasks = setup_load_deletes_test_tasks(table_location);
451567

452568
delete_file_manager
453569
.load_deletes(&file_scan_tasks[0].deletes, file_io, 5)
454570
.await
455571
.unwrap();
456572
}
457573

458-
fn setup(table_location: &Path) -> Vec<FileScanTask> {
574+
#[tokio::test]
575+
async fn test_delete_file_manager_parse_equality_deletes() {
576+
let tmp_dir = TempDir::new().unwrap();
577+
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
578+
let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
579+
580+
let eq_delete_file_path = setup_write_equality_delete_file_1(table_location);
581+
582+
let record_batch_stream =
583+
DeleteFileManager::parquet_to_batch_stream(&eq_delete_file_path, file_io.clone())
584+
.await
585+
.expect("could not get batch stream");
586+
587+
let parsed_eq_delete =
588+
DeleteFileManager::parse_equality_deletes_record_batch_stream(record_batch_stream)
589+
.await
590+
.expect("error parsing batch stream");
591+
println!("{}", parsed_eq_delete);
592+
593+
let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (y != 2)".to_string();
594+
595+
assert_eq!(parsed_eq_delete.to_string(), expected);
596+
}
597+
598+
fn setup_load_deletes_test_tasks(table_location: &Path) -> Vec<FileScanTask> {
459599
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
460600
let positional_delete_schema = create_pos_del_schema();
461601

@@ -562,4 +702,58 @@ mod tests {
562702
];
563703
Arc::new(arrow_schema::Schema::new(fields))
564704
}
705+
706+
fn setup_write_equality_delete_file_1(table_location: &str) -> String {
707+
let col_y_vals = vec![1, 2];
708+
let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
709+
710+
let col_z_vals = vec![Some(100), None];
711+
let col_z = Arc::new(Int64Array::from(col_z_vals)) as ArrayRef;
712+
713+
let col_a_vals = vec![Some("HELP"), None];
714+
let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef;
715+
716+
let equality_delete_schema = {
717+
let fields = vec![
718+
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, true).with_metadata(
719+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
720+
),
721+
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, true).with_metadata(
722+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
723+
),
724+
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata(
725+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
726+
),
727+
];
728+
Arc::new(arrow_schema::Schema::new(fields))
729+
};
730+
731+
let equality_deletes_to_write =
732+
RecordBatch::try_new(equality_delete_schema.clone(), vec![col_y, col_z, col_a])
733+
.unwrap();
734+
735+
let path = format!("{}/equality-deletes-1.parquet", &table_location);
736+
737+
let file = File::create(&path).unwrap();
738+
739+
let props = WriterProperties::builder()
740+
.set_compression(Compression::SNAPPY)
741+
.build();
742+
743+
let mut writer = ArrowWriter::try_new(
744+
file,
745+
equality_deletes_to_write.schema(),
746+
Some(props.clone()),
747+
)
748+
.unwrap();
749+
750+
writer
751+
.write(&equality_deletes_to_write)
752+
.expect("Writing batch");
753+
754+
// writer must be closed to write footer
755+
writer.close().unwrap();
756+
757+
path
758+
}
565759
}

0 commit comments

Comments
 (0)