Skip to content

Commit f62ac3f

Browse files
committed
feat: add equality delete parsing
1 parent 3c5a38f commit f62ac3f

File tree

1 file changed

+220
-13
lines changed

1 file changed

+220
-13
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

Lines changed: 220 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,30 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::collections::HashMap;
18+
use std::collections::{HashMap, HashSet};
1919
use std::future::Future;
20+
use std::ops::Not;
2021
use std::pin::Pin;
2122
use std::sync::{Arc, OnceLock, RwLock};
2223
use std::task::{Context, Poll};
2324

25+
use arrow_array::{
26+
Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array,
27+
StringArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
28+
};
2429
use futures::channel::oneshot;
2530
use futures::future::join_all;
2631
use futures::{StreamExt, TryStreamExt};
32+
use itertools::Itertools;
2733

2834
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
29-
use crate::arrow::ArrowReader;
35+
use crate::arrow::{arrow_schema_to_schema, ArrowReader};
3036
use crate::delete_vector::DeleteVector;
3137
use crate::expr::Predicate::AlwaysTrue;
32-
use crate::expr::{Bind, BoundPredicate, Predicate};
38+
use crate::expr::{Bind, BoundPredicate, Predicate, Reference};
3339
use crate::io::FileIO;
3440
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile};
35-
use crate::spec::{DataContentType, Schema, SchemaRef};
41+
use crate::spec::{DataContentType, Datum, NestedFieldRef, PrimitiveType, Schema, SchemaRef};
3642
use crate::{Error, ErrorKind, Result};
3743

3844
#[allow(unused)]
@@ -118,6 +124,7 @@ enum DeleteFileContext {
118124
FreshEqDel {
119125
batch_stream: ArrowRecordBatchStream,
120126
sender: oneshot::Sender<Predicate>,
127+
equality_ids: HashSet<i32>,
121128
},
122129
}
123130

@@ -287,6 +294,7 @@ impl CachingDeleteFileManager {
287294
)
288295
.await?,
289296
sender,
297+
equality_ids: HashSet::from_iter(task.equality_ids),
290298
})
291299
}
292300

@@ -310,9 +318,11 @@ impl CachingDeleteFileManager {
310318
DeleteFileContext::FreshEqDel {
311319
sender,
312320
batch_stream,
321+
equality_ids,
313322
} => {
314323
let predicate =
315-
Self::parse_equality_deletes_record_batch_stream(batch_stream).await?;
324+
Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids)
325+
.await?;
316326

317327
sender
318328
.send(predicate)
@@ -407,14 +417,49 @@ impl CachingDeleteFileManager {
407417
///
408418
/// Returns an unbound Predicate for each batch stream
409419
async fn parse_equality_deletes_record_batch_stream(
410-
streams: ArrowRecordBatchStream,
420+
mut stream: ArrowRecordBatchStream,
421+
equality_ids: HashSet<i32>,
411422
) -> Result<Predicate> {
412-
// TODO
423+
let mut result_predicate = AlwaysTrue;
413424

414-
Err(Error::new(
415-
ErrorKind::FeatureUnsupported,
416-
"parsing of equality deletes is not yet supported",
417-
))
425+
while let Some(record_batch) = stream.next().await {
426+
let record_batch = record_batch?;
427+
428+
if record_batch.num_columns() == 0 {
429+
return Ok(AlwaysTrue);
430+
}
431+
432+
let batch_schema_arrow = record_batch.schema();
433+
let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?;
434+
435+
let mut datum_columns_with_names: Vec<_> = record_batch
436+
.columns()
437+
.iter()
438+
.zip(batch_schema_iceberg.as_struct().fields())
439+
// only use columns that are in the set of equality_ids for this delete file
440+
.filter(|(field, value)| equality_ids.contains(&value.id))
441+
.map(|(column, field)| {
442+
let col_as_datum_vec = arrow_array_to_datum_iterator(column, field);
443+
col_as_datum_vec.map(|c| (c, field.name.to_string()))
444+
})
445+
.try_collect()?;
446+
447+
// consume all the iterators in lockstep, creating per-row predicates that get combined
448+
// into a single final predicate
449+
while datum_columns_with_names[0].0.len() > 0 {
450+
let mut row_predicate = AlwaysTrue;
451+
for (ref mut column, ref field_name) in &mut datum_columns_with_names {
452+
if let Some(item) = column.next() {
453+
if let Some(datum) = item? {
454+
row_predicate = row_predicate
455+
.and(Reference::new(field_name.clone()).equal_to(datum.clone()));
456+
}
457+
}
458+
}
459+
result_predicate = result_predicate.and(row_predicate.not());
460+
}
461+
}
462+
Ok(result_predicate.rewrite_not())
418463
}
419464

420465
/// Builds eq delete predicate for the provided task.
@@ -484,6 +529,83 @@ pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
484529
matches!(f.file_type, DataContentType::EqualityDeletes)
485530
}
486531

532+
macro_rules! prim_to_datum {
533+
($column:ident, $arr:ty, $dat:path) => {{
534+
let arr = $column.as_any().downcast_ref::<$arr>().ok_or(Error::new(
535+
ErrorKind::Unexpected,
536+
format!("could not downcast ArrayRef to {}", stringify!($arr)),
537+
))?;
538+
Ok(Box::new(arr.iter().map(|val| Ok(val.map($dat)))))
539+
}};
540+
}
541+
542+
fn eq_col_unsupported(ty: &str) -> Error {
543+
Error::new(
544+
ErrorKind::FeatureUnsupported,
545+
format!(
546+
"Equality deletes where a predicate acts upon a {} column are not yet supported",
547+
ty
548+
),
549+
)
550+
}
551+
552+
fn arrow_array_to_datum_iterator<'a>(
553+
column: &'a ArrayRef,
554+
field: &NestedFieldRef,
555+
) -> Result<Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>> + 'a>> {
556+
match field.field_type.as_primitive_type() {
557+
Some(primitive_type) => match primitive_type {
558+
PrimitiveType::Int => prim_to_datum!(column, Int32Array, Datum::int),
559+
PrimitiveType::Boolean => {
560+
prim_to_datum!(column, BooleanArray, Datum::bool)
561+
}
562+
PrimitiveType::Long => prim_to_datum!(column, Int64Array, Datum::long),
563+
PrimitiveType::Float => {
564+
prim_to_datum!(column, Float32Array, Datum::float)
565+
}
566+
PrimitiveType::Double => {
567+
prim_to_datum!(column, Float64Array, Datum::double)
568+
}
569+
PrimitiveType::String => {
570+
prim_to_datum!(column, StringArray, Datum::string)
571+
}
572+
PrimitiveType::Date => prim_to_datum!(column, Date32Array, Datum::date),
573+
PrimitiveType::Timestamp => {
574+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamp_micros)
575+
}
576+
PrimitiveType::Timestamptz => {
577+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamptz_micros)
578+
}
579+
PrimitiveType::TimestampNs => {
580+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamp_nanos)
581+
}
582+
PrimitiveType::TimestamptzNs => {
583+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamptz_nanos)
584+
}
585+
PrimitiveType::Time => {
586+
let arr = column
587+
.as_any()
588+
.downcast_ref::<Time64MicrosecondArray>()
589+
.ok_or(Error::new(
590+
ErrorKind::Unexpected,
591+
"could not downcast ArrayRef to Time64MicrosecondArray",
592+
))?;
593+
Ok(Box::new(arr.iter().map(|val| match val {
594+
None => Ok(None),
595+
Some(val) => Datum::time_micros(val).map(Some),
596+
})))
597+
}
598+
PrimitiveType::Decimal { .. } => Err(eq_col_unsupported("Decimal")),
599+
PrimitiveType::Uuid => Err(eq_col_unsupported("Uuid")),
600+
PrimitiveType::Fixed(_) => Err(eq_col_unsupported("Fixed")),
601+
PrimitiveType::Binary => Err(eq_col_unsupported("Binary")),
602+
},
603+
None => Err(eq_col_unsupported(
604+
"non-primitive (i.e. Struct, List, or Map)",
605+
)),
606+
}
607+
}
608+
487609
#[cfg(test)]
488610
mod tests {
489611
use std::fs::File;
@@ -518,7 +640,7 @@ mod tests {
518640
// the call to the loader fails with the expected FeatureUnsupportedError.
519641
let delete_file_manager = CachingDeleteFileManager::new(file_io.clone(), 10);
520642

521-
let file_scan_tasks = setup(table_location);
643+
let file_scan_tasks = setup_load_deletes_test_tasks(table_location);
522644

523645
let result = delete_file_manager
524646
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
@@ -527,7 +649,38 @@ mod tests {
527649
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
528650
}
529651

530-
fn setup(table_location: &Path) -> Vec<FileScanTask> {
652+
#[tokio::test]
653+
async fn test_delete_file_manager_parse_equality_deletes() {
654+
let tmp_dir = TempDir::new().unwrap();
655+
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
656+
let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
657+
658+
let eq_delete_file_path = setup_write_equality_delete_file_1(table_location);
659+
660+
let record_batch_stream = CachingDeleteFileManager::parquet_to_batch_stream(
661+
&eq_delete_file_path,
662+
file_io.clone(),
663+
)
664+
.await
665+
.expect("could not get batch stream");
666+
667+
let eq_ids = HashSet::from_iter(vec![2, 3, 4]);
668+
669+
let parsed_eq_delete =
670+
CachingDeleteFileManager::parse_equality_deletes_record_batch_stream(
671+
record_batch_stream,
672+
eq_ids,
673+
)
674+
.await
675+
.expect("error parsing batch stream");
676+
println!("{}", parsed_eq_delete);
677+
678+
let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (y != 2)".to_string();
679+
680+
assert_eq!(parsed_eq_delete.to_string(), expected);
681+
}
682+
683+
fn setup_load_deletes_test_tasks(table_location: &Path) -> Vec<FileScanTask> {
531684
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
532685
let positional_delete_schema = create_pos_del_schema();
533686

@@ -637,4 +790,58 @@ mod tests {
637790
];
638791
Arc::new(arrow_schema::Schema::new(fields))
639792
}
793+
794+
fn setup_write_equality_delete_file_1(table_location: &str) -> String {
795+
let col_y_vals = vec![1, 2];
796+
let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
797+
798+
let col_z_vals = vec![Some(100), None];
799+
let col_z = Arc::new(Int64Array::from(col_z_vals)) as ArrayRef;
800+
801+
let col_a_vals = vec![Some("HELP"), None];
802+
let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef;
803+
804+
let equality_delete_schema = {
805+
let fields = vec![
806+
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, true).with_metadata(
807+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
808+
),
809+
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, true).with_metadata(
810+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
811+
),
812+
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata(
813+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
814+
),
815+
];
816+
Arc::new(arrow_schema::Schema::new(fields))
817+
};
818+
819+
let equality_deletes_to_write =
820+
RecordBatch::try_new(equality_delete_schema.clone(), vec![col_y, col_z, col_a])
821+
.unwrap();
822+
823+
let path = format!("{}/equality-deletes-1.parquet", &table_location);
824+
825+
let file = File::create(&path).unwrap();
826+
827+
let props = WriterProperties::builder()
828+
.set_compression(Compression::SNAPPY)
829+
.build();
830+
831+
let mut writer = ArrowWriter::try_new(
832+
file,
833+
equality_deletes_to_write.schema(),
834+
Some(props.clone()),
835+
)
836+
.unwrap();
837+
838+
writer
839+
.write(&equality_deletes_to_write)
840+
.expect("Writing batch");
841+
842+
// writer must be closed to write footer
843+
writer.close().unwrap();
844+
845+
path
846+
}
640847
}

0 commit comments

Comments
 (0)