Skip to content

Commit d1c7784

Browse files
committed
feat: add equality delete parsing
1 parent 3c5a38f commit d1c7784

File tree

1 file changed

+219
-13
lines changed

1 file changed

+219
-13
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

+219-13
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,30 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::collections::HashMap;
18+
use std::collections::{HashMap, HashSet};
1919
use std::future::Future;
20+
use std::ops::Not;
2021
use std::pin::Pin;
2122
use std::sync::{Arc, OnceLock, RwLock};
2223
use std::task::{Context, Poll};
2324

25+
use arrow_array::{
26+
Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array,
27+
StringArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
28+
};
2429
use futures::channel::oneshot;
2530
use futures::future::join_all;
2631
use futures::{StreamExt, TryStreamExt};
32+
use itertools::Itertools;
2733

2834
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
29-
use crate::arrow::ArrowReader;
35+
use crate::arrow::{arrow_schema_to_schema, ArrowReader};
3036
use crate::delete_vector::DeleteVector;
3137
use crate::expr::Predicate::AlwaysTrue;
32-
use crate::expr::{Bind, BoundPredicate, Predicate};
38+
use crate::expr::{Bind, BoundPredicate, Predicate, Reference};
3339
use crate::io::FileIO;
3440
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile};
35-
use crate::spec::{DataContentType, Schema, SchemaRef};
41+
use crate::spec::{DataContentType, Datum, NestedFieldRef, PrimitiveType, Schema, SchemaRef};
3642
use crate::{Error, ErrorKind, Result};
3743

3844
#[allow(unused)]
@@ -118,6 +124,7 @@ enum DeleteFileContext {
118124
FreshEqDel {
119125
batch_stream: ArrowRecordBatchStream,
120126
sender: oneshot::Sender<Predicate>,
127+
equality_ids: HashSet<i32>,
121128
},
122129
}
123130

@@ -287,6 +294,7 @@ impl CachingDeleteFileManager {
287294
)
288295
.await?,
289296
sender,
297+
equality_ids: HashSet::from_iter(task.equality_ids),
290298
})
291299
}
292300

@@ -310,9 +318,11 @@ impl CachingDeleteFileManager {
310318
DeleteFileContext::FreshEqDel {
311319
sender,
312320
batch_stream,
321+
equality_ids,
313322
} => {
314323
let predicate =
315-
Self::parse_equality_deletes_record_batch_stream(batch_stream).await?;
324+
Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids)
325+
.await?;
316326

317327
sender
318328
.send(predicate)
@@ -407,14 +417,48 @@ impl CachingDeleteFileManager {
407417
///
408418
/// Returns an unbound Predicate for each batch stream
409419
async fn parse_equality_deletes_record_batch_stream(
410-
streams: ArrowRecordBatchStream,
420+
mut stream: ArrowRecordBatchStream,
421+
eq_ids: HashSet<i32>,
411422
) -> Result<Predicate> {
412-
// TODO
423+
let mut result_predicate = AlwaysTrue;
413424

414-
Err(Error::new(
415-
ErrorKind::FeatureUnsupported,
416-
"parsing of equality deletes is not yet supported",
417-
))
425+
while let Some(record_batch) = stream.next().await {
426+
let record_batch = record_batch?;
427+
428+
if record_batch.num_columns() == 0 {
429+
return Ok(AlwaysTrue);
430+
}
431+
432+
let batch_schema_arrow = record_batch.schema();
433+
let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?;
434+
435+
let mut datum_columns_with_names: Vec<_> = record_batch
436+
.columns()
437+
.iter()
438+
.zip(batch_schema_iceberg.as_struct().fields())
439+
.filter(|(field, value)| eq_ids.contains(&value.id))
440+
.map(|(column, field)| {
441+
let col_as_datum_vec = arrow_array_to_datum_iterator(column, field);
442+
col_as_datum_vec.map(|c| (c, field.name.to_string()))
443+
})
444+
.try_collect()?;
445+
446+
// consume all the iterators in lockstep, creating per-row predicates that get combined
447+
// into a single final predicate
448+
while datum_columns_with_names[0].0.len() > 0 {
449+
let mut row_predicate = AlwaysTrue;
450+
for (ref mut column, ref field_name) in &mut datum_columns_with_names {
451+
if let Some(item) = column.next() {
452+
if let Some(datum) = item? {
453+
row_predicate = row_predicate
454+
.and(Reference::new(field_name.clone()).equal_to(datum.clone()));
455+
}
456+
}
457+
}
458+
result_predicate = result_predicate.and(row_predicate.not());
459+
}
460+
}
461+
Ok(result_predicate.rewrite_not())
418462
}
419463

420464
/// Builds eq delete predicate for the provided task.
@@ -484,6 +528,83 @@ pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
484528
matches!(f.file_type, DataContentType::EqualityDeletes)
485529
}
486530

531+
macro_rules! prim_to_datum {
532+
($column:ident, $arr:ty, $dat:path) => {{
533+
let arr = $column.as_any().downcast_ref::<$arr>().ok_or(Error::new(
534+
ErrorKind::Unexpected,
535+
format!("could not downcast ArrayRef to {}", stringify!($arr)),
536+
))?;
537+
Ok(Box::new(arr.iter().map(|val| Ok(val.map($dat)))))
538+
}};
539+
}
540+
541+
fn eq_col_unsupported(ty: &str) -> Error {
542+
Error::new(
543+
ErrorKind::FeatureUnsupported,
544+
format!(
545+
"Equality deletes where a predicate acts upon a {} column are not yet supported",
546+
ty
547+
),
548+
)
549+
}
550+
551+
fn arrow_array_to_datum_iterator<'a>(
552+
column: &'a ArrayRef,
553+
field: &NestedFieldRef,
554+
) -> Result<Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>> + 'a>> {
555+
match field.field_type.as_primitive_type() {
556+
Some(primitive_type) => match primitive_type {
557+
PrimitiveType::Int => prim_to_datum!(column, Int32Array, Datum::int),
558+
PrimitiveType::Boolean => {
559+
prim_to_datum!(column, BooleanArray, Datum::bool)
560+
}
561+
PrimitiveType::Long => prim_to_datum!(column, Int64Array, Datum::long),
562+
PrimitiveType::Float => {
563+
prim_to_datum!(column, Float32Array, Datum::float)
564+
}
565+
PrimitiveType::Double => {
566+
prim_to_datum!(column, Float64Array, Datum::double)
567+
}
568+
PrimitiveType::String => {
569+
prim_to_datum!(column, StringArray, Datum::string)
570+
}
571+
PrimitiveType::Date => prim_to_datum!(column, Date32Array, Datum::date),
572+
PrimitiveType::Timestamp => {
573+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamp_micros)
574+
}
575+
PrimitiveType::Timestamptz => {
576+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamptz_micros)
577+
}
578+
PrimitiveType::TimestampNs => {
579+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamp_nanos)
580+
}
581+
PrimitiveType::TimestamptzNs => {
582+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamptz_nanos)
583+
}
584+
PrimitiveType::Time => {
585+
let arr = column
586+
.as_any()
587+
.downcast_ref::<Time64MicrosecondArray>()
588+
.ok_or(Error::new(
589+
ErrorKind::Unexpected,
590+
"could not downcast ArrayRef to Time64MicrosecondArray",
591+
))?;
592+
Ok(Box::new(arr.iter().map(|val| match val {
593+
None => Ok(None),
594+
Some(val) => Datum::time_micros(val).map(Some),
595+
})))
596+
}
597+
PrimitiveType::Decimal { .. } => Err(eq_col_unsupported("Decimal")),
598+
PrimitiveType::Uuid => Err(eq_col_unsupported("Uuid")),
599+
PrimitiveType::Fixed(_) => Err(eq_col_unsupported("Fixed")),
600+
PrimitiveType::Binary => Err(eq_col_unsupported("Binary")),
601+
},
602+
None => Err(eq_col_unsupported(
603+
"non-primitive (i.e. Struct, List, or Map)",
604+
)),
605+
}
606+
}
607+
487608
#[cfg(test)]
488609
mod tests {
489610
use std::fs::File;
@@ -518,7 +639,7 @@ mod tests {
518639
// the call to the loader fails with the expected FeatureUnsupportedError.
519640
let delete_file_manager = CachingDeleteFileManager::new(file_io.clone(), 10);
520641

521-
let file_scan_tasks = setup(table_location);
642+
let file_scan_tasks = setup_load_deletes_test_tasks(table_location);
522643

523644
let result = delete_file_manager
524645
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
@@ -527,7 +648,38 @@ mod tests {
527648
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
528649
}
529650

530-
fn setup(table_location: &Path) -> Vec<FileScanTask> {
651+
#[tokio::test]
652+
async fn test_delete_file_manager_parse_equality_deletes() {
653+
let tmp_dir = TempDir::new().unwrap();
654+
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
655+
let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
656+
657+
let eq_delete_file_path = setup_write_equality_delete_file_1(table_location);
658+
659+
let record_batch_stream = CachingDeleteFileManager::parquet_to_batch_stream(
660+
&eq_delete_file_path,
661+
file_io.clone(),
662+
)
663+
.await
664+
.expect("could not get batch stream");
665+
666+
let eq_ids = HashSet::from_iter(vec![2, 3, 4]);
667+
668+
let parsed_eq_delete =
669+
CachingDeleteFileManager::parse_equality_deletes_record_batch_stream(
670+
record_batch_stream,
671+
eq_ids,
672+
)
673+
.await
674+
.expect("error parsing batch stream");
675+
println!("{}", parsed_eq_delete);
676+
677+
let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (y != 2)".to_string();
678+
679+
assert_eq!(parsed_eq_delete.to_string(), expected);
680+
}
681+
682+
fn setup_load_deletes_test_tasks(table_location: &Path) -> Vec<FileScanTask> {
531683
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
532684
let positional_delete_schema = create_pos_del_schema();
533685

@@ -637,4 +789,58 @@ mod tests {
637789
];
638790
Arc::new(arrow_schema::Schema::new(fields))
639791
}
792+
793+
fn setup_write_equality_delete_file_1(table_location: &str) -> String {
794+
let col_y_vals = vec![1, 2];
795+
let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
796+
797+
let col_z_vals = vec![Some(100), None];
798+
let col_z = Arc::new(Int64Array::from(col_z_vals)) as ArrayRef;
799+
800+
let col_a_vals = vec![Some("HELP"), None];
801+
let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef;
802+
803+
let equality_delete_schema = {
804+
let fields = vec![
805+
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, true).with_metadata(
806+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
807+
),
808+
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, true).with_metadata(
809+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
810+
),
811+
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata(
812+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
813+
),
814+
];
815+
Arc::new(arrow_schema::Schema::new(fields))
816+
};
817+
818+
let equality_deletes_to_write =
819+
RecordBatch::try_new(equality_delete_schema.clone(), vec![col_y, col_z, col_a])
820+
.unwrap();
821+
822+
let path = format!("{}/equality-deletes-1.parquet", &table_location);
823+
824+
let file = File::create(&path).unwrap();
825+
826+
let props = WriterProperties::builder()
827+
.set_compression(Compression::SNAPPY)
828+
.build();
829+
830+
let mut writer = ArrowWriter::try_new(
831+
file,
832+
equality_deletes_to_write.schema(),
833+
Some(props.clone()),
834+
)
835+
.unwrap();
836+
837+
writer
838+
.write(&equality_deletes_to_write)
839+
.expect("Writing batch");
840+
841+
// writer must be closed to write footer
842+
writer.close().unwrap();
843+
844+
path
845+
}
640846
}

0 commit comments

Comments
 (0)