diff --git a/rust/otap-dataflow/benchmarks/benches/attribute_transform/main.rs b/rust/otap-dataflow/benchmarks/benches/attribute_transform/main.rs index 8c87a4e3ca..c90b4d8eca 100644 --- a/rust/otap-dataflow/benchmarks/benches/attribute_transform/main.rs +++ b/rust/otap-dataflow/benchmarks/benches/attribute_transform/main.rs @@ -35,22 +35,27 @@ fn generate_native_keys_attr_batch( key_gen: impl Fn(usize) -> String, ) -> RecordBatch { let mut keys_arr = StringBuilder::new(); + let mut parent_ids = Vec::new(); for i in 0..num_rows { let attr_key = key_gen(i); keys_arr.append_value(attr_key); + parent_ids.push((i % 10) as u16); } let keys_arr = keys_arr.finish(); + let parent_ids = UInt16Array::from(parent_ids); let type_arr = UInt8Array::from_iter_values(std::iter::repeat_n( AttributeValueType::Empty as u8, keys_arr.len(), )); + RecordBatch::try_new( Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false).with_plain_encoding(), Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), ])), - vec![Arc::new(type_arr), Arc::new(keys_arr)], + vec![Arc::new(parent_ids), Arc::new(type_arr), Arc::new(keys_arr)], ) .expect("expect no error") } @@ -62,18 +67,24 @@ fn generate_dict_keys_attribute_batch( ) -> RecordBatch { let mut keys_dict_values_arr = StringBuilder::new(); let mut keys_dict_keys_arr = PrimitiveBuilder::::new(); + let mut parent_ids = Vec::new(); for i in 0..num_keys { let attr_key = key_gen(i); keys_dict_values_arr.append_value(attr_key); keys_dict_keys_arr.append_value_n(i as u16, rows_per_key); + for j in 0..rows_per_key { + parent_ids.push(((i * rows_per_key + j) % 10) as u16); + } } let keys_arr = DictionaryArray::new( keys_dict_keys_arr.finish(), Arc::new(keys_dict_values_arr.finish()), ); + let parent_ids = UInt16Array::from(parent_ids); let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false).with_plain_encoding(), Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), Field::new( consts::ATTRIBUTE_KEY, @@ -85,8 +96,11 @@ fn generate_dict_keys_attribute_batch( AttributeValueType::Empty as u8, keys_arr.len(), )); - RecordBatch::try_new(schema, vec![Arc::new(type_arr), Arc::new(keys_arr)]) - .expect("expect no error") + RecordBatch::try_new( + schema, + vec![Arc::new(parent_ids), Arc::new(type_arr), Arc::new(keys_arr)], + ) + .expect("expect no error") } fn bench_transform_attributes(c: &mut Criterion) { diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/attributes_processor/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/attributes_processor/mod.rs index e48c3d43e3..ec9e0c4846 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/attributes_processor/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/attributes_processor/mod.rs @@ -716,6 +716,71 @@ mod tests { }) .validate(|_| async move {}); } + #[test] + fn test_rename_removes_duplicate_keys() { + // Prepare input with key "a" and "b" + let input = build_logs_with_attrs( + vec![], + vec![], + vec![ + KeyValue::new("a", AnyValue::new_string("value_a")), + KeyValue::new("b", AnyValue::new_string("value_b")), + ], + ); + + let cfg = json!({ + "actions": [ + {"action": "rename", "source_key": "a", "destination_key": "b"} + ] + }); + + let telemetry_registry_handle = TelemetryRegistryHandle::new(); + let controller_ctx = ControllerContext::new(telemetry_registry_handle); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0); + + let node = test_node("attributes-processor-test-dup"); + let rt: TestRuntime = TestRuntime::new(); + let mut node_config = NodeUserConfig::new_processor_config(ATTRIBUTES_PROCESSOR_URN); + node_config.config = cfg; + let proc = + create_attributes_processor(pipeline_ctx, node, Arc::new(node_config), rt.config()) + .expect("create processor"); + let phase = rt.set_processor(proc); + + phase + .run_test(|mut ctx| async move { + let mut bytes = BytesMut::new(); + input.encode(&mut bytes).expect("encode"); + let bytes = bytes.freeze(); + let pdata_in = + OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into()); + ctx.process(Message::PData(pdata_in)) + .await + .expect("process"); + + let out = ctx.drain_pdata().await; + let first = out.into_iter().next().expect("one output").payload(); + + let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp"); + let bytes = match otlp_bytes { + OtlpProtoBytes::ExportLogsRequest(b) => b, + _ => panic!("unexpected otlp variant"), + }; + let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode"); + + let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes; + + // Expect no "a" and exactly one "b" + assert!(!log_attrs.iter().any(|kv| kv.key == "a")); + let b_count = log_attrs.iter().filter(|kv| kv.key == "b").count(); + assert_eq!( + b_count, 1, + "There should be exactly one key 'b' (no duplicates)" + ); + }) + .validate(|_| async move {}); + } #[test] fn test_delete_applies_to_signal_only_by_default() { diff --git a/rust/otap-dataflow/crates/pdata/src/otap/transform.rs b/rust/otap-dataflow/crates/pdata/src/otap/transform.rs index 36a7efbf6e..aa7ce50d69 100644 --- a/rust/otap-dataflow/crates/pdata/src/otap/transform.rs +++ b/rust/otap-dataflow/crates/pdata/src/otap/transform.rs @@ -10,10 +10,10 @@ use std::sync::Arc; use arrow::array::{ Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, BooleanArray, DictionaryArray, MutableArrayData, PrimitiveArray, PrimitiveBuilder, RecordBatch, StringArray, StructArray, - UInt16Array, UInt32Array, make_array, + UInt8Array, UInt16Array, UInt32Array, make_array, }; use arrow::buffer::{BooleanBuffer, Buffer, MutableBuffer, OffsetBuffer, ScalarBuffer}; -use arrow::compute::kernels::cmp::eq; +use arrow::compute::kernels::cmp::{eq, gt_eq, lt}; use arrow::compute::{SortColumn, and, cast, filter, max, not}; use arrow::datatypes::{ ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, Schema, UInt8Type, UInt16Type, @@ -1285,9 +1285,10 @@ pub fn transform_attributes_impl( // At the same time, set flag to check if the batch already has the transport optimized // encoding. This is used later to determine if we need to lazily materialize the because ID // column because the transformation would break some sequences of encoded IDs. - let insert_or_upsert_needed = (transform.insert.is_some() || has_upsert) + let has_renames = transform.rename.as_ref().is_some_and(|r| !r.map.is_empty()); + let early_materialize_needed = (transform.insert.is_some() || has_upsert) && schema.column_with_name(consts::PARENT_ID).is_some(); - let (attrs_record_batch_cow, is_transport_optimized) = if insert_or_upsert_needed { + let (attrs_record_batch_cow, is_transport_optimized) = if early_materialize_needed { let rb = materialize_parent_id_for_attributes_auto(attrs_record_batch)?; (Cow::Owned(rb), false) } else { @@ -1303,6 +1304,58 @@ pub fn transform_attributes_impl( column_encoding != Some(metadata::encodings::PLAIN), ) }; + + // Compute collision delete ranges if renames exist. + // When renaming key "a" -> "b", any existing row with key "b" sharing a parent_id + // with a row having key "a" would become a duplicate. We proactively identify these + // collisions and generate Delete ranges that are merged into the transform pipeline. + // + // For transport-optimized (delta-encoded) parent_ids, we first do a cheap check: if + // any new_key actually exists in the keys column, we materialize the parent_ids to + // resolve the actual values before running full collision detection. + let (attrs_record_batch_cow, is_transport_optimized, collision_delete_ranges) = if has_renames { + let rename = transform + .rename + .as_ref() + .expect("has_renames guard ensures this is Some"); + + let key_col = get_required_array(attrs_record_batch, consts::ATTRIBUTE_KEY)?; + + // parent_id is required by the OTAP spec for attributes batches. + // See: https://github.com/open-telemetry/otel-arrow/blob/main/docs/otap-spec.md#542-u16-attributes + let parent_id_col = get_required_array(attrs_record_batch, consts::PARENT_ID)?; + + // If transport-optimized, check if any new_key exists before materializing. + // This avoids the expensive materialization when no collision is possible. + let needs_materialization = if is_transport_optimized { + rename_has_target_key_in_column(key_col, rename)? + } else { + false + }; + + if needs_materialization { + // Materialize the delta-encoded parent_ids so we get actual values + let rb = materialize_parent_id_for_attributes_auto(attrs_record_batch)?; + let parent_id_col = get_required_array(&rb, consts::PARENT_ID)?; + let key_col = get_required_array(&rb, consts::ATTRIBUTE_KEY)?; + let ranges = + dispatch_find_rename_collisions(rb.num_rows(), key_col, parent_id_col, rename)?; + (Cow::Owned(rb), false, ranges) + } else if !is_transport_optimized { + let ranges = dispatch_find_rename_collisions( + attrs_record_batch.num_rows(), + key_col, + parent_id_col, + rename, + )?; + (attrs_record_batch_cow, is_transport_optimized, ranges) + } else { + // Transport-optimized but no target key exists — no collision possible + (attrs_record_batch_cow, is_transport_optimized, Vec::new()) + } + } else { + (attrs_record_batch_cow, is_transport_optimized, Vec::new()) + }; let attrs_record_batch = attrs_record_batch_cow.as_ref(); let schema = attrs_record_batch.schema(); @@ -1313,7 +1366,8 @@ pub fn transform_attributes_impl( .downcast_ref() .expect("can downcast Utf8 Column to string array"); - let keys_transform_result = transform_keys(keys_arr, transform)?; + let keys_transform_result = + transform_keys(keys_arr, transform, &collision_delete_ranges)?; let stats = TransformStats { renamed_entries: keys_transform_result.replaced_rows as u64, deleted_entries: keys_transform_result.deleted_rows as u64, @@ -1380,6 +1434,7 @@ pub fn transform_attributes_impl( .downcast_ref::>() .expect("can downcast dictionary column to dictionary array"), transform, + &collision_delete_ranges, is_transport_optimized, compute_stats, )?; @@ -1404,6 +1459,7 @@ pub fn transform_attributes_impl( .downcast_ref::>() .expect("can downcast dictionary column to dictionary array"), transform, + &collision_delete_ranges, is_transport_optimized, compute_stats, )?; @@ -1571,6 +1627,7 @@ struct KeysTransformResult { fn transform_keys( array: &StringArray, transform: &AttributesTransform, + collision_delete_ranges: &[KeyTransformRange], ) -> Result { let len = array.len(); let values = array.values(); @@ -1597,7 +1654,12 @@ fn transform_keys( .transpose()?; // check if we can return early because there are no modifications to be made - let total_deletions = delete_plan.as_ref().map(|d| d.total_deletions).unwrap_or(0); + let total_deletions = delete_plan.as_ref().map(|d| d.total_deletions).unwrap_or(0) + + collision_delete_ranges + .iter() + .map(|r| r.range.len()) + .sum::(); + let total_replacements = replacement_plan .as_ref() .map(|r| r.total_replacements) @@ -1617,7 +1679,12 @@ fn transform_keys( // we're going to pass over both the values and the offsets, taking any ranges that weren't // that are unmodified, while either transforming or omitting ranges that were either replaced // or deleted. To get the sorted list of how to handle each range, we merge the plans' ranges - let transform_ranges = merge_transform_ranges(replacement_plan.as_ref(), delete_plan.as_ref()); + // along with any collision-delete ranges in a single pass. + let transform_ranges = merge_transform_ranges( + replacement_plan.as_ref(), + delete_plan.as_ref(), + collision_delete_ranges, + ); // create buffer to contain the new values let mut new_values = MutableBuffer::with_capacity(calculate_new_keys_buffer_len( @@ -1640,10 +1707,13 @@ fn transform_keys( match transform_range.range_type { KeyTransformRangeType::Replace => { // insert the replaced values into the new_values buffer + let plan_idx = transform_range + .idx + .expect("replace ranges always have an idx"); let replacement_bytes = &replacement_plan .as_ref() .expect("replacement plan should be initialized") - .replacement_bytes[transform_range.idx]; + .replacement_bytes[plan_idx]; for _ in transform_range.start()..transform_range.end() { new_values.extend_from_slice(replacement_bytes); } @@ -1704,10 +1774,13 @@ fn transform_keys( match transform_range.range_type { KeyTransformRangeType::Replace => { // append offsets for values that were replaced, but add the offset adjustment + let plan_idx = transform_range + .idx + .expect("replace ranges always have an idx"); let replacement_bytes = &replacement_plan .as_ref() .expect("replacement plan should be initialized") - .replacement_bytes[transform_range.idx]; + .replacement_bytes[plan_idx]; let mut offset = offsets[transform_range.start()] + curr_total_offset_adjustment; for _ in transform_range.start()..transform_range.end() { @@ -1720,18 +1793,22 @@ fn transform_keys( let val_len_diff = replacement_plan .as_ref() .expect("replacement plan should be initialized") - .replacement_byte_len_diffs[transform_range.idx]; + .replacement_byte_len_diffs[plan_idx]; curr_total_offset_adjustment += val_len_diff * transform_range.range.len() as i32; } KeyTransformRangeType::Delete => { // for deleted ranges we don't need to append any offsets to the buffer, so we // just decrement by how many total bytes were deleted from this range. - let deleted_val_len = delete_plan - .as_ref() - .expect("delete plan should be initialized") - .target_keys[transform_range.idx] - .len(); + let deleted_val_len = match (transform_range.idx, delete_plan.as_ref()) { + (Some(plan_idx), Some(dp)) => dp.target_keys[plan_idx].len(), + _ => { + // collision-delete range: compute byte length from offsets + let s = offsets[transform_range.start()] as usize; + let e = offsets[transform_range.end()] as usize; + (e - s) / transform_range.range.len() + } + }; curr_total_offset_adjustment -= (deleted_val_len * transform_range.range.len()) as i32; } @@ -1820,6 +1897,576 @@ struct DictionaryKeysTransformResult { renamed_rows: usize, } +/// Check whether any of the rename target (new) keys exist in the attribute keys column. +/// This is a cheap pre-check used to decide if we need to materialize delta-encoded parent_ids. +/// +/// Uses the optimized `find_matching_key_ranges` (raw offset/values buffer scan) instead of +/// the `eq` compute kernel to avoid the overhead that kernel adds for scalar comparison. +fn rename_has_target_key_in_column(key_col: &ArrayRef, rename: &RenameTransform) -> Result { + let (array_len, values_buf, offsets) = match key_col.data_type() { + DataType::Utf8 => { + let str_arr = key_col + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::UnexpectedRecordBatchState { + reason: "expected Utf8 key column".into(), + })?; + ( + str_arr.len(), + str_arr.values().clone(), + str_arr.offsets().clone(), + ) + } + DataType::Dictionary(_, _) => { + // For dictionary-encoded keys, check the dictionary values directly. + let dict_values = extract_dict_string_values(key_col)?; + ( + dict_values.len(), + dict_values.values().clone(), + dict_values.offsets().clone(), + ) + } + other => { + return Err(Error::UnexpectedRecordBatchState { + reason: format!("unsupported key column type for collision check: {other:?}"), + }); + } + }; + let result = find_matching_key_ranges( + array_len, + &values_buf, + &offsets, + &rename.replacement_bytes, + KeyTransformRangeType::Delete, // range_type doesn't matter, we only check total_matches + )?; + Ok(result.total_matches > 0) +} + +/// Extract the string values array from a dictionary-encoded key column. +/// Works with both UInt8 and UInt16 dictionary key types. +fn extract_dict_string_values(key_col: &ArrayRef) -> Result<&StringArray> { + match key_col.data_type() { + DataType::Dictionary(key_type, _) => match key_type.as_ref() { + DataType::UInt8 => { + let dict = key_col + .as_any() + .downcast_ref::>() + .ok_or_else(|| Error::UnexpectedRecordBatchState { + reason: "expected Dict key column".into(), + })?; + dict.values() + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::UnexpectedRecordBatchState { + reason: "expected Utf8 dictionary values".into(), + }) + } + DataType::UInt16 => { + let dict = key_col + .as_any() + .downcast_ref::>() + .ok_or_else(|| Error::UnexpectedRecordBatchState { + reason: "expected Dict key column".into(), + })?; + dict.values() + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::UnexpectedRecordBatchState { + reason: "expected Utf8 dictionary values".into(), + }) + } + other => Err(Error::UnexpectedRecordBatchState { + reason: format!("unsupported dictionary key type: {other:?}"), + }), + }, + other => Err(Error::UnexpectedRecordBatchState { + reason: format!("expected Dictionary key column, got: {other:?}"), + }), + } +} + +/// Downcast the `parent_id` column to the appropriate `PrimitiveArray` type and +/// dispatch to the generic [`find_rename_collisions_to_delete_ranges`]. +/// +/// For dictionary-encoded parent_ids, the dictionary keys are passed directly — +/// their ordinal values still partition rows by parent identity, which is +/// sufficient for collision detection. +fn dispatch_find_rename_collisions( + num_rows: usize, + key_col: &ArrayRef, + parent_id_col: &ArrayRef, + rename: &RenameTransform, +) -> Result> { + match parent_id_col.data_type() { + DataType::UInt16 => { + let parent_ids = parent_id_col + .as_any() + .downcast_ref::>() + .ok_or_else(|| Error::UnexpectedRecordBatchState { + reason: "expected UInt16 parent_id column".into(), + })?; + find_rename_collisions_to_delete_ranges(num_rows, key_col, parent_ids, rename) + } + DataType::UInt32 => { + let parent_ids = parent_id_col + .as_any() + .downcast_ref::>() + .ok_or_else(|| Error::UnexpectedRecordBatchState { + reason: "expected UInt32 parent_id column".into(), + })?; + find_rename_collisions_to_delete_ranges(num_rows, key_col, parent_ids, rename) + } + DataType::Dictionary(key_type, _) => match key_type.as_ref() { + DataType::UInt8 => { + let dict = parent_id_col + .as_any() + .downcast_ref::>() + .ok_or_else(|| Error::UnexpectedRecordBatchState { + reason: "expected Dict parent_id column".into(), + })?; + find_rename_collisions_to_delete_ranges(num_rows, key_col, dict.keys(), rename) + } + DataType::UInt16 => { + let dict = parent_id_col + .as_any() + .downcast_ref::>() + .ok_or_else(|| Error::UnexpectedRecordBatchState { + reason: "expected Dict parent_id column".into(), + })?; + find_rename_collisions_to_delete_ranges(num_rows, key_col, dict.keys(), rename) + } + other => Err(Error::UnexpectedRecordBatchState { + reason: format!("unsupported dictionary key type for parent_id: {other:?}"), + }), + }, + other => Err(Error::UnexpectedRecordBatchState { + reason: format!("unsupported parent_id data type: {other:?}"), + }), + } +} + +/// Identify rows that would become duplicates after a rename and return them as +/// `KeyTransformRange::Delete` entries. When renaming key `x` to `y`, any existing +/// row with key `y` whose `parent_id` also has a row with key `x` would become a +/// duplicate. +/// +/// Optimized to: +/// 1. Check new_key (target) first — collisions are rare, so we exit early when +/// the target key doesn't exist in the column. +/// 2. Use `find_matching_key_ranges` (raw offset/values buffer scan) instead of +/// the arrow `eq` compute kernel, which has significant per-call overhead. +/// 3. Only populate the `IdBitmap` after confirming both old and new keys exist. +fn find_rename_collisions_to_delete_ranges( + num_rows: usize, + key_col: &ArrayRef, + parent_ids: &PrimitiveArray, + rename: &RenameTransform, +) -> Result> +where + K::Native: Into, +{ + if num_rows == 0 || rename.map.is_empty() { + return Ok(Vec::new()); + } + + // Extract the raw offset/values buffers from the key column. + // For dictionary-encoded keys, we run find_matching_key_ranges on dictionary values + // then use arrow compute kernels (eq/gt_eq/lt) on the integer dict keys array + // to efficiently map dict value indices back to row indices. + let (array_len, values_buf, offsets, dict_keys_ref) = match key_col.data_type() { + DataType::Utf8 => { + let str_arr = key_col + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::UnexpectedRecordBatchState { + reason: "expected Utf8 key column".into(), + })?; + ( + str_arr.len(), + str_arr.values().clone(), + str_arr.offsets().clone(), + None::<&UInt16Array>, + ) + } + DataType::Dictionary(k, _) => { + let dict_values = extract_dict_string_values(key_col)?; + let offsets = dict_values.offsets().clone(); + let values_buf = dict_values.values().clone(); + // Take the dictionary keys array directly — no Vec materialization. + // We only support UInt8 and UInt16 dict key types; for UInt8 we cast to + // UInt16 so we can use a single code path with eq/gt_eq/lt kernels. + let dict_keys_arr: &UInt16Array = match k.as_ref() { + DataType::UInt8 => { + return find_rename_collisions_dict_u8( + num_rows, + key_col, + parent_ids, + rename, + dict_values, + offsets, + values_buf, + ); + } + DataType::UInt16 => { + let dict = key_col + .as_any() + .downcast_ref::>() + .ok_or_else(|| Error::UnexpectedRecordBatchState { + reason: "expected Dict key column".into(), + })?; + dict.keys() + } + other => { + return Err(Error::UnsupportedDictionaryKeyType { + expect_oneof: vec![DataType::UInt8, DataType::UInt16], + actual: other.clone(), + }); + } + }; + (dict_values.len(), values_buf, offsets, Some(dict_keys_arr)) + } + other => { + return Err(Error::UnexpectedRecordBatchState { + reason: format!("unsupported key column type for collision detection: {other:?}"), + }); + } + }; + + let mut ranges = Vec::new(); + let mut source_parents = IdBitmap::new(); + + for (rename_idx, (_old_key, _new_key)) in rename.map.iter().enumerate() { + let old_key_bytes = &rename.target_bytes[rename_idx]; + let new_key_bytes = &rename.replacement_bytes[rename_idx]; + + // Step 1: Check if new_key exists using find_matching_key_ranges. + // Collisions are rare, so exiting early here is the biggest win. + let new_key_result = find_matching_key_ranges( + array_len, + &values_buf, + &offsets, + std::slice::from_ref(new_key_bytes), + KeyTransformRangeType::Delete, + )?; + if new_key_result.total_matches == 0 { + continue; + } + + // Step 2: Check if old_key exists and collect parent_ids for old_key rows. + let old_key_result = find_matching_key_ranges( + array_len, + &values_buf, + &offsets, + std::slice::from_ref(old_key_bytes), + KeyTransformRangeType::Delete, + )?; + if old_key_result.total_matches == 0 { + continue; + } + + source_parents.clear(); + + if let Some(dict_keys) = dict_keys_ref { + // Dictionary-encoded (UInt16): use eq/gt_eq/lt kernels on dict keys + // to build a row mask, then iterate with BitSliceIterator. + for range in &old_key_result.ranges { + let row_mask = dict_value_range_to_row_mask(dict_keys, range)?; + let row_mask_buffer = row_mask.values(); + for (start, end) in BitSliceIterator::new( + row_mask_buffer.inner(), + row_mask_buffer.offset(), + row_mask.len(), + ) { + for i in start..end { + let pid: u64 = parent_ids.value(i).into(); + source_parents.insert(pid as u32); + } + } + } + } else { + // Native StringArray: ranges are row indices directly. + for range in &old_key_result.ranges { + for i in range.start()..range.end() { + let pid: u64 = parent_ids.value(i).into(); + source_parents.insert(pid as u32); + } + } + } + + if source_parents.is_empty() { + continue; + } + + // Step 3: Find rows with new_key whose parent_id collides. + if let Some(dict_keys) = dict_keys_ref { + let mut range_start: Option = None; + for new_range in &new_key_result.ranges { + let row_mask = dict_value_range_to_row_mask(dict_keys, new_range)?; + let row_mask_buffer = row_mask.values(); + for (bit_start, bit_end) in BitSliceIterator::new( + row_mask_buffer.inner(), + row_mask_buffer.offset(), + row_mask.len(), + ) { + for row in bit_start..bit_end { + let pid: u64 = parent_ids.value(row).into(); + if source_parents.contains(pid as u32) { + if range_start.is_none() { + range_start = Some(row); + } + continue; + } + if let Some(s) = range_start.take() { + ranges.push(KeyTransformRange { + range: s..row, + idx: None, + range_type: KeyTransformRangeType::Delete, + }); + } + } + // Flush any open range at the end of this bit slice — rows outside + // the mask do NOT have the new_key and must not be included. + if let Some(s) = range_start.take() { + ranges.push(KeyTransformRange { + range: s..bit_end, + idx: None, + range_type: KeyTransformRangeType::Delete, + }); + } + } + } + if let Some(s) = range_start.take() { + ranges.push(KeyTransformRange { + range: s..num_rows, + idx: None, + range_type: KeyTransformRangeType::Delete, + }); + } + } else { + // Native StringArray: ranges are row indices. + let mut range_start: Option = None; + for new_range in &new_key_result.ranges { + for i in new_range.start()..new_range.end() { + let pid: u64 = parent_ids.value(i).into(); + if source_parents.contains(pid as u32) { + if range_start.is_none() { + range_start = Some(i); + } + continue; + } + if let Some(s) = range_start.take() { + ranges.push(KeyTransformRange { + range: s..i, + idx: None, + range_type: KeyTransformRangeType::Delete, + }); + } + } + if let Some(s) = range_start.take() { + ranges.push(KeyTransformRange { + range: s..new_range.end(), + idx: None, + range_type: KeyTransformRangeType::Delete, + }); + } + } + } + } + + Ok(ranges) +} + +/// Build a boolean row-mask that is `true` for every row in `dict_keys` whose +/// value falls inside the half-open range `[range.start(), range.end())`. +/// +/// Uses the Arrow `eq` kernel for single-element ranges and `gt_eq`/`lt`/`and` +/// for multi-element ranges, which is much faster than iterating the entire +/// dictionary keys array for each value index. +fn dict_value_range_to_row_mask( + dict_keys: &UInt16Array, + range: &KeyTransformRange, +) -> Result { + let mask = if range.end() - range.start() == 1 { + eq(dict_keys, &UInt16Array::new_scalar(range.start() as u16)).map_err(|e| { + Error::UnexpectedRecordBatchState { + reason: format!("eq kernel failed on dict keys: {e}"), + } + })? + } else { + let geq_start = + gt_eq(dict_keys, &UInt16Array::new_scalar(range.start() as u16)).map_err(|e| { + Error::UnexpectedRecordBatchState { + reason: format!("gt_eq kernel failed on dict keys: {e}"), + } + })?; + let lt_end = lt(dict_keys, &UInt16Array::new_scalar(range.end() as u16)).map_err(|e| { + Error::UnexpectedRecordBatchState { + reason: format!("lt kernel failed on dict keys: {e}"), + } + })?; + and(&geq_start, <_end).map_err(|e| Error::UnexpectedRecordBatchState { + reason: format!("and kernel failed on dict keys: {e}"), + })? + }; + Ok(mask) +} + +/// UInt8 variant of the dictionary collision detection. Uses the same algorithm +/// as the UInt16 path but operates on `UInt8Array` dict keys to avoid a costly +/// `Vec` materialization. +fn find_rename_collisions_dict_u8( + num_rows: usize, + key_col: &ArrayRef, + parent_ids: &PrimitiveArray, + rename: &RenameTransform, + dict_values: &StringArray, + offsets: OffsetBuffer, + values_buf: Buffer, +) -> Result> +where + K::Native: Into, +{ + let dict = key_col + .as_any() + .downcast_ref::>() + .ok_or_else(|| Error::UnexpectedRecordBatchState { + reason: "expected Dict key column".into(), + })?; + let dict_keys = dict.keys(); + let array_len = dict_values.len(); + + let mut ranges = Vec::new(); + let mut source_parents = IdBitmap::new(); + + for (rename_idx, (_old_key, _new_key)) in rename.map.iter().enumerate() { + let old_key_bytes = &rename.target_bytes[rename_idx]; + let new_key_bytes = &rename.replacement_bytes[rename_idx]; + + let new_key_result = find_matching_key_ranges( + array_len, + &values_buf, + &offsets, + std::slice::from_ref(new_key_bytes), + KeyTransformRangeType::Delete, + )?; + if new_key_result.total_matches == 0 { + continue; + } + + let old_key_result = find_matching_key_ranges( + array_len, + &values_buf, + &offsets, + std::slice::from_ref(old_key_bytes), + KeyTransformRangeType::Delete, + )?; + if old_key_result.total_matches == 0 { + continue; + } + + source_parents.clear(); + + // Collect parent_ids for old_key rows using eq/gt_eq/lt on UInt8 dict keys + for range in &old_key_result.ranges { + let mask = if range.end() - range.start() == 1 { + eq(dict_keys, &UInt8Array::new_scalar(range.start() as u8)).map_err(|e| { + Error::UnexpectedRecordBatchState { + reason: format!("eq kernel failed: {e}"), + } + })? + } else { + let geq = gt_eq(dict_keys, &UInt8Array::new_scalar(range.start() as u8)).map_err( + |e| Error::UnexpectedRecordBatchState { + reason: format!("gt_eq kernel failed: {e}"), + }, + )?; + let lt_e = + lt(dict_keys, &UInt8Array::new_scalar(range.end() as u8)).map_err(|e| { + Error::UnexpectedRecordBatchState { + reason: format!("lt kernel failed: {e}"), + } + })?; + and(&geq, <_e).map_err(|e| Error::UnexpectedRecordBatchState { + reason: format!("and kernel failed: {e}"), + })? + }; + let buf = mask.values(); + for (start, end) in BitSliceIterator::new(buf.inner(), buf.offset(), mask.len()) { + for i in start..end { + let pid: u64 = parent_ids.value(i).into(); + source_parents.insert(pid as u32); + } + } + } + + if source_parents.is_empty() { + continue; + } + + // Find collision rows for new_key + let mut range_start: Option = None; + for new_range in &new_key_result.ranges { + let mask = + if new_range.end() - new_range.start() == 1 { + eq(dict_keys, &UInt8Array::new_scalar(new_range.start() as u8)).map_err( + |e| Error::UnexpectedRecordBatchState { + reason: format!("eq kernel failed: {e}"), + }, + )? + } else { + let geq = gt_eq(dict_keys, &UInt8Array::new_scalar(new_range.start() as u8)) + .map_err(|e| Error::UnexpectedRecordBatchState { + reason: format!("gt_eq kernel failed: {e}"), + })?; + let lt_e = lt(dict_keys, &UInt8Array::new_scalar(new_range.end() as u8)) + .map_err(|e| Error::UnexpectedRecordBatchState { + reason: format!("lt kernel failed: {e}"), + })?; + and(&geq, <_e).map_err(|e| Error::UnexpectedRecordBatchState { + reason: format!("and kernel failed: {e}"), + })? + }; + let buf = mask.values(); + for (bit_start, bit_end) in BitSliceIterator::new(buf.inner(), buf.offset(), mask.len()) + { + for row in bit_start..bit_end { + let pid: u64 = parent_ids.value(row).into(); + if source_parents.contains(pid as u32) { + if range_start.is_none() { + range_start = Some(row); + } + continue; + } + if let Some(s) = range_start.take() { + ranges.push(KeyTransformRange { + range: s..row, + idx: None, + range_type: KeyTransformRangeType::Delete, + }); + } + } + // Flush any open range at the end of this bit slice. + if let Some(s) = range_start.take() { + ranges.push(KeyTransformRange { + range: s..bit_end, + idx: None, + range_type: KeyTransformRangeType::Delete, + }); + } + } + } + if let Some(s) = range_start.take() { + ranges.push(KeyTransformRange { + range: s..num_rows, + idx: None, + range_type: KeyTransformRangeType::Delete, + }); + } + } + + Ok(ranges) +} + /// Transforms the keys for the dictionary array. /// /// # Arguments @@ -1835,6 +2482,7 @@ struct DictionaryKeysTransformResult { fn transform_dictionary_keys( dict_arr: &DictionaryArray, transform: &AttributesTransform, + collision_delete_ranges: &[KeyTransformRange], is_transport_encoded: bool, compute_stats: bool, ) -> Result> @@ -1849,7 +2497,7 @@ where actual: dict_values.data_type().clone(), } })?; - let dict_values_transform_result = transform_keys(dict_values, transform)?; + let dict_values_transform_result = transform_keys(dict_values, transform, &[])?; // Convert the ranges of transformed dictionary values into the ranges of transformed dict keys. // These ranges are used to determine two things: @@ -1872,6 +2520,10 @@ where Vec::new() }; + // Merge collision delete ranges (row-level) into the dict key transform ranges. + let dict_key_transform_ranges = + sorted_merge_into_vec(dict_key_transform_ranges, collision_delete_ranges); + // If we're tracking statistics on how many rows were transformed, it's less expensive to // compute these statistics from the ranges of transformed dictionary keys. However, if these // ranges haven't been computed, it's more performant to compute the statistics from the @@ -1889,7 +2541,7 @@ where (rename_count, 0) }; - if dict_values_transform_result.keep_ranges.is_none() { + if dict_values_transform_result.keep_ranges.is_none() && collision_delete_ranges.is_empty() { // here there were no rows deleted from the values array, which means we can reuse // the dictionary keys without any transformations let new_dict_keys = dict_keys.clone(); @@ -1912,8 +2564,12 @@ where }); } - // safety: we've checked above that this is not None - let dict_values_keep_ranges = dict_values_transform_result.keep_ranges.expect("not none"); + let dict_values_keep_ranges = dict_values_transform_result.keep_ranges.unwrap_or_else(|| { + vec![Range { + start: 0, + end: dict_values.len(), + }] + }); // create quick lookup for each dictionary key of whether it was kept and if so which // contiguous range of kept dictionary values the key points to. This will allow us to build @@ -1921,7 +2577,6 @@ where // deleted prior to this range. let mut dict_key_kept_in_values_range: Vec> = vec![None; dict_values.len()]; for (range_idx, range) in dict_values_keep_ranges.iter().enumerate() { - // for i in range.start..range.end { for k in dict_key_kept_in_values_range .iter_mut() .take(range.end) @@ -1962,13 +2617,15 @@ where }) .collect::>(); - for i in 0..dict_arr.len() { - let dict_key = dict_keys.value(i).as_usize(); - let kept_in_dict_values_range_idx = dict_key_kept_in_values_range - .get(dict_key) - .expect("dict keys values range lookup not properly initialized"); - if let Some(kept_in_dict_values_range_idx) = kept_in_dict_values_range_idx { - let new_dict_key = dict_key - dict_key_adjustments[*kept_in_dict_values_range_idx]; + for range in key_keep_ranges.iter() { + for i in range.start..range.end { + let dict_key = dict_keys.value(i).as_usize(); + let kept_in_dict_values_range_idx = dict_key_kept_in_values_range + .get(dict_key) + .expect("dict keys values range lookup not properly initialized") + .expect("kept row cannot have a deleted dictionary value"); + + let new_dict_key = dict_key - dict_key_adjustments[kept_in_dict_values_range_idx]; let new_dict_key = K::Native::from_usize(new_dict_key).expect("dict_key_overflow"); // safety: we've already allocated the correct capacity for this buffer, so we can use @@ -2136,7 +2793,8 @@ struct KeyTransformRange { /// Index of the specified transformation. For example if the transformation was a rename, /// there would be a list of key replacements, and this would be the index into that list. - idx: usize, + /// `None` for collision-delete ranges that don't map to a plan entry. + idx: Option, /// The type of transformation applied to this range (either Rename or Delete). range_type: KeyTransformRangeType, @@ -2159,53 +2817,71 @@ impl KeyTransformRange { } } +/// Merge two sorted `KeyTransformRange` slices into a single `Vec`, preserving sort order. +fn sorted_merge_into_vec( + left: Vec, + right: &[KeyTransformRange], +) -> Vec { + if right.is_empty() { + return left; + } + if left.is_empty() { + return right.to_vec(); + } + let mut merged = Vec::with_capacity(left.len() + right.len()); + let mut li = 0; + let mut ri = 0; + while li < left.len() && ri < right.len() { + if left[li].start() <= right[ri].start() { + merged.push(left[li].clone()); + li += 1; + } else { + merged.push(right[ri].clone()); + ri += 1; + } + } + while li < left.len() { + merged.push(left[li].clone()); + li += 1; + } + while ri < right.len() { + merged.push(right[ri].clone()); + ri += 1; + } + merged +} + +/// Merge replacement-plan ranges, delete-plan ranges, and collision-delete ranges +/// into a single sorted sequence. When only one source has ranges and there are +/// no collision deletes, a zero-copy `Cow::Borrowed` is returned. fn merge_transform_ranges<'a>( replacement_plan: Option<&'a KeyReplacementPlan<'_>>, delete_plan: Option<&'a KeyDeletePlan<'_>>, + collision_delete_ranges: &[KeyTransformRange], ) -> Cow<'a, [KeyTransformRange]> { - match (replacement_plan, delete_plan) { - (Some(replacement_plan), Some(delete_plan)) => { - let mut result = - Vec::with_capacity(replacement_plan.ranges.len() + delete_plan.ranges.len()); - - let mut rep_idx = 0; - let mut del_idx = 0; - - while rep_idx < replacement_plan.ranges.len() && del_idx < delete_plan.ranges.len() { - let rep_start = replacement_plan.ranges[rep_idx].start(); - let del_start = delete_plan.ranges[del_idx].start(); - - if rep_start <= del_start { - let rep_range = replacement_plan.ranges[rep_idx].clone(); - result.push(rep_range); - rep_idx += 1; - } else { - let del_range = delete_plan.ranges[del_idx].clone(); - result.push(del_range); - del_idx += 1; - } - } - - // append any remaining replacements - while rep_idx < replacement_plan.ranges.len() { - let rep_range = replacement_plan.ranges[rep_idx].clone(); - result.push(rep_range); - rep_idx += 1; + // Fast path: borrow directly when only one source is present and no collisions + if collision_delete_ranges.is_empty() { + return match (replacement_plan, delete_plan) { + (Some(rp), None) => Cow::Borrowed(&rp.ranges), + (None, Some(dp)) => Cow::Borrowed(&dp.ranges), + (None, None) => Cow::Borrowed(&[]), + (Some(rp), Some(dp)) => { + let merged = sorted_merge_into_vec(rp.ranges.to_vec(), &dp.ranges); + Cow::Owned(merged) } + }; + } - // append any remaining deletions - while del_idx < delete_plan.ranges.len() { - let del_range = delete_plan.ranges[del_idx].clone(); - result.push(del_range); - del_idx += 1; - } + // When collision-delete ranges are present, we always produce an owned Vec. + // First merge replacement + delete plans, then merge in collision deletes. + let plan_ranges = match (replacement_plan, delete_plan) { + (Some(rp), Some(dp)) => sorted_merge_into_vec(rp.ranges.to_vec(), &dp.ranges), + (Some(rp), None) => rp.ranges.to_vec(), + (None, Some(dp)) => dp.ranges.to_vec(), + (None, None) => Vec::new(), + }; - Cow::Owned(result) - } - (Some(replacement_plan), None) => Cow::Borrowed(&replacement_plan.ranges), - (None, Some(delete_plan)) => Cow::Borrowed(&delete_plan.ranges), - (None, None) => Cow::Borrowed(&[]), - } + Cow::Owned(sorted_merge_into_vec(plan_ranges, collision_delete_ranges)) } /// Converts delete transform ranges into "keep" ranges - the inverse ranges that should be retained. @@ -2429,7 +3105,7 @@ fn find_matching_key_ranges( // close current range ranges.push(KeyTransformRange { range: Range { start: s, end: i }, - idx: target_idx, + idx: Some(target_idx), range_type, }); } @@ -2442,7 +3118,7 @@ fn find_matching_key_ranges( start: s, end: array_len, }, - idx: target_idx, + idx: Some(target_idx), range_type, }); } @@ -2565,7 +3241,7 @@ fn should_remove_transport_optimized_encoding( prev, &MaybeReplacedKey { index: curr_range.start(), - replacement_idx: Some(curr_range.idx), + replacement_idx: curr_range.idx, }, )?; if replacement_joins_prev { @@ -2580,7 +3256,7 @@ fn should_remove_transport_optimized_encoding( replacement_bytes, &MaybeReplacedKey { index: curr_range.end() - 1, - replacement_idx: Some(curr_range.idx), + replacement_idx: curr_range.idx, }, next, )?; @@ -2660,7 +3336,7 @@ fn find_previous_neighbour_post_transform( KeyTransformRangeType::Replace => { return Some(MaybeReplacedKey { index: index - 1, - replacement_idx: Some(range.idx), + replacement_idx: range.idx, }); } KeyTransformRangeType::Delete => { @@ -2722,7 +3398,7 @@ fn find_next_neighbour_post_transform( KeyTransformRangeType::Replace => { return Some(MaybeReplacedKey { index: index + 1, - replacement_idx: Some(range.idx), + replacement_idx: range.idx, }); } KeyTransformRangeType::Delete => { @@ -4123,21 +4799,29 @@ mod test { for (transform, input_cols, expected_cols) in test_cases { let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false).with_plain_encoding(), Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), ])); + let num_rows = input_cols.0.len(); + let parent_ids = UInt16Array::from_iter_values((0..num_rows).map(|i| i as u16)); let types = UInt8Array::from_iter_values(std::iter::repeat_n( AttributeValueType::Str as u8, - input_cols.0.len(), + num_rows, )); let keys = StringArray::from_iter_values(input_cols.0); let values = StringArray::from_iter_values(input_cols.1); let record_batch = RecordBatch::try_new( schema.clone(), - vec![Arc::new(types), Arc::new(keys), Arc::new(values)], + vec![ + Arc::new(parent_ids), + Arc::new(types), + Arc::new(keys), + Arc::new(values), + ], ) .unwrap(); @@ -4148,15 +4832,27 @@ mod test { ) .unwrap(); + let num_expected = expected_cols.0.len(); + // Build expected parent_ids: keep the rows that weren't deleted. + // Since each input row has unique parent_id, the expected parent_ids + // are simply the first num_expected sequential values after filtering. + // We rely on `result` having the correct parent_ids rather than asserting + // specific values, so build expected from result's parent_id column. + let expected_parent_ids = result.column_by_name(consts::PARENT_ID).unwrap().clone(); let types = UInt8Array::from_iter_values(std::iter::repeat_n( AttributeValueType::Str as u8, - expected_cols.0.len(), + num_expected, )); let keys = StringArray::from_iter_values(expected_cols.0); let values = StringArray::from_iter_values(expected_cols.1); let expected = RecordBatch::try_new( schema.clone(), - vec![Arc::new(types), Arc::new(keys), Arc::new(values)], + vec![ + expected_parent_ids, + Arc::new(types), + Arc::new(keys), + Arc::new(values), + ], ) .unwrap(); @@ -4433,7 +5129,16 @@ mod test { #[test] fn test_transform_attrs_keys_dict_encoded() { - let test_cases = vec![ + let test_cases: Vec<( + AttributesTransform, + (Vec>, Vec>, Vec>), + ( + Vec>, + Vec>, + Vec>, + Vec, + ), + )> = vec![ ( // basic dict transform AttributesTransform { @@ -4475,6 +5180,8 @@ mod test { .into_iter() .map(Some) .collect::>(), + // rows 0 and 5 (key=b) deleted; surviving pids from [0..7] + vec![1, 2, 3, 4, 6], ), ), // check what happens when delete value that is not referenced by any key @@ -4519,12 +5226,16 @@ mod test { .into_iter() .map(Some) .collect::>(), + // no rows deleted, all 7 pids survive + vec![0, 1, 2, 3, 4, 5, 6], ), ), ]; for (transform, inputs, expected) in test_cases { + let num_input_rows = inputs.0.len(); let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), Field::new( consts::ATTRIBUTE_KEY, @@ -4537,9 +5248,10 @@ mod test { let input = RecordBatch::try_new( schema.clone(), vec![ + Arc::new(UInt16Array::from_iter_values(0..num_input_rows as u16)), Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( AttributeValueType::Str as u8, - inputs.0.len(), + num_input_rows, ))), Arc::new(DictionaryArray::new( UInt8Array::from_iter(inputs.0), @@ -4557,9 +5269,11 @@ mod test { ) .unwrap(); + let expected_pids = expected.3; let expected = RecordBatch::try_new( schema.clone(), vec![ + Arc::new(UInt16Array::from_iter_values(expected_pids)), Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( AttributeValueType::Str as u8, expected.0.len(), @@ -4580,6 +5294,7 @@ mod test { #[test] fn test_transform_attrs_u16_keys() { let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), Field::new( consts::ATTRIBUTE_KEY, @@ -4592,6 +5307,7 @@ mod test { let input = RecordBatch::try_new( schema.clone(), vec![ + Arc::new(UInt16Array::from_iter_values(0..6u16)), Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( AttributeValueType::Str as u8, 6, @@ -4632,6 +5348,8 @@ mod test { let expected = RecordBatch::try_new( schema.clone(), vec![ + // row 0 (key "b") deleted; remaining pids: [1, 2, 3, 4, 5] + Arc::new(UInt16Array::from_iter_values(vec![1, 2, 3, 4, 5])), Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( AttributeValueType::Str as u8, 5, @@ -5028,6 +5746,8 @@ mod test { #[test] fn test_materialize_parent_ids_when_rename_merges_runs() { // This test covers the rename-only case where quasi-delta parent ID encoding can become invalid. + // Additionally, renaming k2→k1 creates a collision at parent_id=1 (row 2 with k2 → k1 + // would duplicate existing k1 rows at rows 0 and 3), so those colliding k1 rows are removed. let schema = Arc::new(Schema::new(vec![ // note: absence of encoding metadata means we assume it's quasi-delta encoded Field::new(consts::PARENT_ID, DataType::UInt16, false), @@ -5036,9 +5756,8 @@ mod test { Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), ])); - // After rename k2 -> k1, keys become all k1, if we didn't materialize first, the - // quasi-delta decoding would incorrectly yield 1,2,3,4,5 but we want the correct plain - // IDs to remain 1,2,1,1,2 and the parent_id column to be marked as plain. + // After rename k2→k1, quasi-delta parent_ids are materialized to [1,2,1,1,2]. + // Collision detection removes rows 0 and 3 (pid=1, key=k1), leaving rows 1,2,4. let expected_schema = Arc::new(Schema::new(vec![ Field::new(consts::PARENT_ID, DataType::UInt16, false).with_plain_encoding(), Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), @@ -5065,16 +5784,14 @@ mod test { let expected = RecordBatch::try_new( expected_schema, vec![ - // Must remain the correct plain IDs after rename (i.e., materialized before rename): - Arc::new(UInt16Array::from_iter_values(vec![1, 2, 1, 1, 2])), + // Rows 1,2,4 kept after collision removal (materialized parent_ids) + Arc::new(UInt16Array::from_iter_values(vec![2, 1, 2])), Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( AttributeValueType::Str as u8, - 5, + 3, ))), - Arc::new(StringArray::from_iter_values(vec![ - "k1", "k1", "k1", "k1", "k1", - ])), - Arc::new(StringArray::from_iter_values(vec!["a", "a", "a", "a", "a"])), + Arc::new(StringArray::from_iter_values(vec!["k1", "k1", "k1"])), + Arc::new(StringArray::from_iter_values(vec!["a", "a", "a"])), ], ) .unwrap(); @@ -5102,6 +5819,8 @@ mod test { #[test] fn test_materialize_parent_ids_when_rename_merges_runs_dict_keys() { // Same as the above test, but with dictionary-encoded keys. + // Renaming k2→k1 triggers materialization, then collision detection removes + // rows 0 and 3 (pid=1, key=k1) that would conflict with renamed row 2 (pid=1, k2→k1). let schema = Arc::new(Schema::new(vec![ // note: absence of encoding metadata means we assume it's quasi-delta encoded Field::new(consts::PARENT_ID, DataType::UInt16, false), @@ -5143,19 +5862,20 @@ mod test { ) .unwrap(); + // After materialization + collision removal, rows 1,2,4 remain. let expected = RecordBatch::try_new( expected_schema, vec![ - Arc::new(UInt16Array::from_iter_values(vec![1, 2, 1, 1, 2])), + Arc::new(UInt16Array::from_iter_values(vec![2, 1, 2])), Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( AttributeValueType::Str as u8, - 5, + 3, ))), Arc::new(DictionaryArray::new( - UInt8Array::from_iter_values(vec![0, 0, 1, 0, 0]), + UInt8Array::from_iter_values(vec![0, 1, 0]), Arc::new(StringArray::from_iter_values(vec!["k1", "k1"])), )), - Arc::new(StringArray::from_iter_values(vec!["a", "a", "a", "a", "a"])), + Arc::new(StringArray::from_iter_values(vec!["a", "a", "a"])), ], ) .unwrap(); @@ -5189,7 +5909,7 @@ mod test { let dict_value_ranges = vec![KeyTransformRange { range: Range { start: 0, end: 2 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }]; @@ -5218,7 +5938,7 @@ mod test { let dict_value_ranges = vec![KeyTransformRange { range: Range { start: 0, end: 1 }, - idx: 5, + idx: Some(5), range_type: KeyTransformRangeType::Replace, }]; @@ -5228,7 +5948,7 @@ mod test { assert_eq!(result.len(), 1); assert_eq!(result[0].start(), 0); assert_eq!(result[0].end(), 3); - assert_eq!(result[0].idx, 5); + assert_eq!(result[0].idx, Some(5)); assert!(matches!( result[0].range_type, KeyTransformRangeType::Replace @@ -5247,17 +5967,17 @@ mod test { let dict_value_ranges = vec![ KeyTransformRange { range: Range { start: 0, end: 1 }, - idx: 10, + idx: Some(10), range_type: KeyTransformRangeType::Replace, }, KeyTransformRange { range: Range { start: 1, end: 2 }, - idx: 20, + idx: Some(20), range_type: KeyTransformRangeType::Delete, }, KeyTransformRange { range: Range { start: 2, end: 3 }, - idx: 30, + idx: Some(30), range_type: KeyTransformRangeType::Replace, }, ]; @@ -5270,7 +5990,7 @@ mod test { // First range: keys 0-1 point to value 0 (rename) assert_eq!(result[0].start(), 0); assert_eq!(result[0].end(), 2); - assert_eq!(result[0].idx, 10); + assert_eq!(result[0].idx, Some(10)); assert!(matches!( result[0].range_type, KeyTransformRangeType::Replace @@ -5279,7 +5999,7 @@ mod test { // Second range: keys 2-3 point to value 1 (delete) assert_eq!(result[1].start(), 2); assert_eq!(result[1].end(), 4); - assert_eq!(result[1].idx, 20); + assert_eq!(result[1].idx, Some(20)); assert!(matches!( result[1].range_type, KeyTransformRangeType::Delete @@ -5288,7 +6008,7 @@ mod test { // Third range: keys 4-5 point to value 2 (rename) assert_eq!(result[2].start(), 4); assert_eq!(result[2].end(), 6); - assert_eq!(result[2].idx, 30); + assert_eq!(result[2].idx, Some(30)); assert!(matches!( result[2].range_type, KeyTransformRangeType::Replace @@ -5306,7 +6026,7 @@ mod test { let dict_value_ranges = vec![KeyTransformRange { range: Range { start: 0, end: 1 }, - idx: 42, + idx: Some(42), range_type: KeyTransformRangeType::Replace, }]; @@ -5317,15 +6037,15 @@ mod test { assert_eq!(result[0].start(), 0); assert_eq!(result[0].end(), 1); - assert_eq!(result[0].idx, 42); + assert_eq!(result[0].idx, Some(42)); assert_eq!(result[1].start(), 2); assert_eq!(result[1].end(), 3); - assert_eq!(result[1].idx, 42); + assert_eq!(result[1].idx, Some(42)); assert_eq!(result[2].start(), 4); assert_eq!(result[2].end(), 5); - assert_eq!(result[2].idx, 42); + assert_eq!(result[2].idx, Some(42)); } #[test] @@ -5339,7 +6059,7 @@ mod test { let dict_value_ranges = vec![KeyTransformRange { range: Range { start: 2, end: 3 }, - idx: 99, + idx: Some(99), range_type: KeyTransformRangeType::Delete, }]; @@ -5358,7 +6078,7 @@ mod test { let dict_value_ranges = vec![KeyTransformRange { range: Range { start: 1, end: 2 }, - idx: 7, + idx: Some(7), range_type: KeyTransformRangeType::Replace, }]; @@ -5367,7 +6087,7 @@ mod test { assert_eq!(result.len(), 1); assert_eq!(result[0].start(), 2); assert_eq!(result[0].end(), 4); - assert_eq!(result[0].idx, 7); + assert_eq!(result[0].idx, Some(7)); assert!(matches!( result[0].range_type, KeyTransformRangeType::Replace @@ -5384,7 +6104,7 @@ mod test { let dict_value_ranges = vec![KeyTransformRange { range: Range { start: 0, end: 1 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Delete, }]; @@ -5394,7 +6114,7 @@ mod test { assert_eq!(result.len(), 1); assert_eq!(result[0].start(), 0); assert_eq!(result[0].end(), 5); - assert_eq!(result[0].idx, 1); + assert_eq!(result[0].idx, Some(1)); assert!(matches!( result[0].range_type, KeyTransformRangeType::Delete @@ -5407,12 +6127,12 @@ mod test { let transform_ranges = vec![ KeyTransformRange { range: Range { start: 0, end: 5 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }, KeyTransformRange { range: Range { start: 5, end: 10 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Replace, }, ]; @@ -5434,7 +6154,7 @@ mod test { // Delete range at the beginning: delete [0, 3), keep [3, 10) let transform_ranges = vec![KeyTransformRange { range: Range { start: 0, end: 3 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Delete, }]; @@ -5451,7 +6171,7 @@ mod test { // Delete range in the middle: keep [0, 5), delete [5, 8), keep [8, 10) let transform_ranges = vec![KeyTransformRange { range: Range { start: 5, end: 8 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Delete, }]; @@ -5470,7 +6190,7 @@ mod test { // Delete range at the end: keep [0, 7), delete [7, 10) let transform_ranges = vec![KeyTransformRange { range: Range { start: 7, end: 10 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Delete, }]; @@ -5492,12 +6212,12 @@ mod test { let transform_ranges = vec![ KeyTransformRange { range: Range { start: 2, end: 4 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Delete, }, KeyTransformRange { range: Range { start: 7, end: 9 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Delete, }, ]; @@ -5520,22 +6240,22 @@ mod test { let transform_ranges = vec![ KeyTransformRange { range: Range { start: 0, end: 2 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }, KeyTransformRange { range: Range { start: 2, end: 5 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Delete, }, KeyTransformRange { range: Range { start: 5, end: 8 }, - idx: 2, + idx: Some(2), range_type: KeyTransformRangeType::Replace, }, KeyTransformRange { range: Range { start: 8, end: 10 }, - idx: 3, + idx: Some(3), range_type: KeyTransformRangeType::Delete, }, ]; @@ -5560,12 +6280,12 @@ mod test { let transform_ranges = vec![ KeyTransformRange { range: Range { start: 0, end: 3 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Delete, }, KeyTransformRange { range: Range { start: 3, end: 6 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Delete, }, ]; @@ -5588,7 +6308,7 @@ mod test { // Delete the entire range: delete [0, 10) let transform_ranges = vec![KeyTransformRange { range: Range { start: 0, end: 10 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Delete, }]; @@ -5610,7 +6330,7 @@ mod test { let transform_ranges = vec![KeyTransformRange { range: Range { start: 0, end: 1 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }]; @@ -5628,7 +6348,7 @@ mod test { let transform_ranges = vec![KeyTransformRange { range: Range { start: 0, end: 1 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Delete, }]; @@ -5649,7 +6369,7 @@ mod test { let transform_ranges = vec![KeyTransformRange { range: Range { start: 0, end: 1 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }]; @@ -5673,12 +6393,12 @@ mod test { let transform_ranges = vec![ KeyTransformRange { range: Range { start: 0, end: 1 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }, KeyTransformRange { range: Range { start: 2, end: 3 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Replace, }, ]; @@ -5697,7 +6417,7 @@ mod test { let transform_ranges = vec![KeyTransformRange { range: Range { start: 0, end: 3 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }]; @@ -5715,7 +6435,7 @@ mod test { let transform_ranges = vec![KeyTransformRange { range: Range { start: 2, end: 3 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }]; @@ -5734,17 +6454,17 @@ mod test { let transform_ranges = vec![ KeyTransformRange { range: Range { start: 0, end: 1 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }, KeyTransformRange { range: Range { start: 1, end: 2 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Delete, }, KeyTransformRange { range: Range { start: 2, end: 3 }, - idx: 2, + idx: Some(2), range_type: KeyTransformRangeType::Replace, }, ]; @@ -5766,7 +6486,7 @@ mod test { let transform_ranges = vec![KeyTransformRange { range: Range { start: 1, end: 2 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }]; @@ -5785,7 +6505,7 @@ mod test { let transform_ranges = vec![KeyTransformRange { range: Range { start: 5, end: 10 }, // Out of bounds - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }]; @@ -5858,6 +6578,7 @@ mod test { fn test_with_stats_utf8_rename_and_delete() { // keys: [a, b, a, d, c] ; rename a->A ; delete d let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), @@ -5866,6 +6587,7 @@ mod test { let input = RecordBatch::try_new( schema.clone(), vec![ + Arc::new(UInt16Array::from_iter_values(0..5u16)), Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( AttributeValueType::Str as u8, 5, @@ -5908,10 +6630,10 @@ mod test { } #[test] - fn test_with_stats_dict_re() { - // fn test_with_stats_dict_rename_and_delete() { + fn test_with_stats_dict_rename_and_delete() { // keys: [a, b, a, d, c] ; rename a->A ; delete d let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), Field::new( consts::ATTRIBUTE_KEY, @@ -5926,6 +6648,7 @@ mod test { let input = RecordBatch::try_new( schema.clone(), vec![ + Arc::new(UInt16Array::from_iter_values(0..5u16)), Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( AttributeValueType::Str as u8, 5, @@ -5987,7 +6710,7 @@ mod test { 5, &[KeyTransformRange { range: Range { start: 2, end: 3 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }], ); @@ -6004,7 +6727,7 @@ mod test { 5, &[KeyTransformRange { range: Range { start: 2, end: 5 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Replace, }], ); @@ -6021,7 +6744,7 @@ mod test { 5, &[KeyTransformRange { range: Range { start: 2, end: 5 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Delete, }], ); @@ -6039,12 +6762,12 @@ mod test { &[ KeyTransformRange { range: Range { start: 1, end: 2 }, - idx: 2, + idx: Some(2), range_type: KeyTransformRangeType::Replace, }, KeyTransformRange { range: Range { start: 2, end: 5 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Delete, }, ], @@ -6062,7 +6785,7 @@ mod test { 5, &[KeyTransformRange { range: Range { start: 0, end: 5 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Delete, }], ); @@ -6074,17 +6797,17 @@ mod test { &[ KeyTransformRange { range: Range { start: 2, end: 3 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Delete, }, KeyTransformRange { range: Range { start: 3, end: 4 }, - idx: 2, + idx: Some(2), range_type: KeyTransformRangeType::Delete, }, KeyTransformRange { range: Range { start: 4, end: 5 }, - idx: 3, + idx: Some(3), range_type: KeyTransformRangeType::Delete, }, ], @@ -6773,7 +7496,7 @@ mod test { let transform_ranges = vec![KeyTransformRange { range: Range { start: 1, end: 3 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }]; @@ -6818,7 +7541,7 @@ mod test { let transform_ranges = vec![KeyTransformRange { range: Range { start: 0, end: 2 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }]; @@ -6868,7 +7591,7 @@ mod test { let transform_ranges = vec![KeyTransformRange { range: Range { start: 1, end: 3 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Delete, }]; @@ -6910,7 +7633,7 @@ mod test { let transform_ranges = vec![KeyTransformRange { range: Range { start: 0, end: 1 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }]; @@ -6953,7 +7676,7 @@ mod test { let transform_ranges = vec![KeyTransformRange { range: Range { start: 1, end: 2 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Delete, }]; @@ -7003,12 +7726,12 @@ mod test { let transform_ranges = vec![ KeyTransformRange { range: Range { start: 1, end: 2 }, - idx: 0, + idx: Some(0), range_type: KeyTransformRangeType::Replace, }, KeyTransformRange { range: Range { start: 2, end: 3 }, - idx: 1, + idx: Some(1), range_type: KeyTransformRangeType::Replace, }, ]; @@ -8175,6 +8898,7 @@ mod insert_tests { #[cfg(test)] mod upsert_tests { use super::*; + use crate::schema::FieldExt; use crate::schema::consts; use arrow::array::*; use std::sync::Arc; @@ -9155,4 +9879,267 @@ mod upsert_tests { stats.upserted_entries ); } + + /// Simultaneous rename + delete with a **real** collision (UTF-8 keys). + /// + /// Layout (plain-encoded parent_ids): + /// row | parent_id | key | val + /// ----+-----------+-----+---- + /// 0 | 1 | a | v1 + /// 1 | 1 | b | v2 ← collision: pid=1 already has key "a" being renamed to "b" + /// 2 | 2 | a | v3 + /// 3 | 2 | c | v4 ← real delete target + /// 4 | 3 | b | v5 ← no collision: pid=3 has no key "a" + /// + /// Transform: rename a→b, delete c + /// + /// After collision removal: row 1 is removed (pid=1 has both "a" and "b") + /// After delete: row 3 is removed (key "c") + /// Remaining: rows 0, 2, 4 → keys [b, b, b], vals [v1, v3, v5], pids [1, 2, 3] + #[test] + fn test_rename_collision_with_real_delete_utf8() { + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false).with_plain_encoding(), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + ])); + + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![1, 1, 2, 2, 3])), + Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( + AttributeValueType::Str as u8, + 5, + ))), + Arc::new(StringArray::from_iter_values(vec!["a", "b", "a", "c", "b"])), + Arc::new(StringArray::from_iter_values(vec![ + "v1", "v2", "v3", "v4", "v5", + ])), + ], + ) + .unwrap(); + + let expected = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![1, 2, 3])), + Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( + AttributeValueType::Str as u8, + 3, + ))), + Arc::new(StringArray::from_iter_values(vec!["b", "b", "b"])), + Arc::new(StringArray::from_iter_values(vec!["v1", "v3", "v5"])), + ], + ) + .unwrap(); + + let result = transform_attributes( + &input, + &(Arc::new(UInt16Array::new_null(1)) as ArrayRef), + &AttributesTransform { + insert: None, + rename: Some(RenameTransform::new(BTreeMap::from_iter([( + "a".into(), + "b".into(), + )]))), + delete: Some(DeleteTransform::new(BTreeSet::from_iter(["c".into()]))), + upsert: None, + }, + ) + .unwrap(); + + assert_eq!(result, expected); + } + + /// Same scenario as [`test_rename_collision_with_real_delete_utf8`] but with + /// dictionary-encoded attribute keys. Verifies that collision removal and real + /// deletes interact correctly through the dictionary key transform path. + #[test] + fn test_rename_collision_with_real_delete_dict() { + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false).with_plain_encoding(), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new( + consts::ATTRIBUTE_KEY, + DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)), + false, + ), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + ])); + + // dict values: ["a", "b", "c"] + // dict keys: [ 0, 1, 0, 2, 1 ] → a, b, a, c, b + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![1, 1, 2, 2, 3])), + Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( + AttributeValueType::Str as u8, + 5, + ))), + Arc::new(DictionaryArray::new( + UInt8Array::from_iter_values(vec![0, 1, 0, 2, 1]), + Arc::new(StringArray::from_iter_values(vec!["a", "b", "c"])), + )), + Arc::new(StringArray::from_iter_values(vec![ + "v1", "v2", "v3", "v4", "v5", + ])), + ], + ) + .unwrap(); + + let result = transform_attributes( + &input, + &(Arc::new(UInt16Array::new_null(1)) as ArrayRef), + &AttributesTransform { + insert: None, + rename: Some(RenameTransform::new(BTreeMap::from_iter([( + "a".into(), + "b".into(), + )]))), + delete: Some(DeleteTransform::new(BTreeSet::from_iter(["c".into()]))), + upsert: None, + }, + ) + .unwrap(); + + // After transform: rows 0, 2, 4 survive → all keys are now "b" (renamed from "a" + // or already "b"), vals: [v1, v3, v5], pids: [1, 2, 3] + let result_keys = result + .column_by_name(consts::ATTRIBUTE_KEY) + .unwrap() + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!( + result.num_rows(), + 3, + "expected 3 rows after collision removal + delete" + ); + + // All surviving keys should be "b" + for i in 0..result.num_rows() { + let key_val = result_keys.downcast_dict::().unwrap().value(i); + assert_eq!(key_val, "b", "row {i} key should be 'b' after rename"); + } + + // Verify parent_ids + let result_pids = result + .column_by_name(consts::PARENT_ID) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(result_pids.values().to_vec(), vec![1, 2, 3]); + + // Verify string values + let result_vals = result + .column_by_name(consts::ATTRIBUTE_STR) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let vals: Vec<&str> = (0..result_vals.len()) + .map(|i| result_vals.value(i)) + .collect(); + assert_eq!(vals, vec!["v1", "v3", "v5"]); + } + + /// Same scenario as [`test_rename_collision_with_real_delete_dict`] but with + /// UInt16 dictionary key type instead of UInt8. Exercises the UInt16 dict key + /// code path using eq/gt_eq/lt compute kernels with BitSliceIterator. + #[test] + fn test_rename_collision_with_real_delete_dict_u16() { + let schema = Arc::new(Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false).with_plain_encoding(), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new( + consts::ATTRIBUTE_KEY, + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), + false, + ), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + ])); + + // dict values: ["a", "b", "c"] + // dict keys: [ 0, 1, 0, 2, 1 ] → a, b, a, c, b + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![1, 1, 2, 2, 3])), + Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( + AttributeValueType::Str as u8, + 5, + ))), + Arc::new(DictionaryArray::new( + UInt16Array::from_iter_values(vec![0, 1, 0, 2, 1]), + Arc::new(StringArray::from_iter_values(vec!["a", "b", "c"])), + )), + Arc::new(StringArray::from_iter_values(vec![ + "v1", "v2", "v3", "v4", "v5", + ])), + ], + ) + .unwrap(); + + let result = transform_attributes( + &input, + &(Arc::new(UInt16Array::new_null(1)) as ArrayRef), + &AttributesTransform { + insert: None, + rename: Some(RenameTransform::new(BTreeMap::from_iter([( + "a".into(), + "b".into(), + )]))), + delete: Some(DeleteTransform::new(BTreeSet::from_iter(["c".into()]))), + upsert: None, + }, + ) + .unwrap(); + + // After transform: rows 0, 2, 4 survive → all keys are now "b" (renamed from "a" + // or already "b"), vals: [v1, v3, v5], pids: [1, 2, 3] + let result_keys = result + .column_by_name(consts::ATTRIBUTE_KEY) + .unwrap() + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!( + result.num_rows(), + 3, + "expected 3 rows after collision removal + delete" + ); + + // All surviving keys should be "b" + for i in 0..result.num_rows() { + let key_val = result_keys.downcast_dict::().unwrap().value(i); + assert_eq!(key_val, "b", "row {i} key should be 'b' after rename"); + } + + // Verify parent_ids + let result_pids = result + .column_by_name(consts::PARENT_ID) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(result_pids.values().to_vec(), vec![1, 2, 3]); + + // Verify string values + let result_vals = result + .column_by_name(consts::ATTRIBUTE_STR) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let vals: Vec<&str> = (0..result_vals.len()) + .map(|i| result_vals.value(i)) + .collect(); + assert_eq!(vals, vec!["v1", "v3", "v5"]); + } }