Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,71 @@ mod tests {
})
.validate(|_| async move {});
}
#[test]
fn test_rename_removes_duplicate_keys() {
// Prepare input with key "a" and "b"
let input = build_logs_with_attrs(
vec![],
vec![],
vec![
KeyValue::new("a", AnyValue::new_string("value_a")),
KeyValue::new("b", AnyValue::new_string("value_b")),
],
);

let cfg = json!({
"actions": [
{"action": "rename", "source_key": "a", "destination_key": "b"}
]
});

let telemetry_registry_handle = TelemetryRegistryHandle::new();
let controller_ctx = ControllerContext::new(telemetry_registry_handle);
let pipeline_ctx =
controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0);

let node = test_node("attributes-processor-test-dup");
let rt: TestRuntime<OtapPdata> = TestRuntime::new();
let mut node_config = NodeUserConfig::new_processor_config(ATTRIBUTES_PROCESSOR_URN);
node_config.config = cfg;
let proc =
create_attributes_processor(pipeline_ctx, node, Arc::new(node_config), rt.config())
.expect("create processor");
let phase = rt.set_processor(proc);

phase
.run_test(|mut ctx| async move {
let mut bytes = BytesMut::new();
input.encode(&mut bytes).expect("encode");
let bytes = bytes.freeze();
let pdata_in =
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
ctx.process(Message::PData(pdata_in))
.await
.expect("process");

let out = ctx.drain_pdata().await;
let first = out.into_iter().next().expect("one output").payload();

let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp");
let bytes = match otlp_bytes {
OtlpProtoBytes::ExportLogsRequest(b) => b,
_ => panic!("unexpected otlp variant"),
};
let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode");

let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes;

// Expect no "a" and exactly one "b"
assert!(!log_attrs.iter().any(|kv| kv.key == "a"));
let b_count = log_attrs.iter().filter(|kv| kv.key == "b").count();
assert_eq!(
b_count, 1,
"There should be exactly one key 'b' (no duplicates)"
);
})
.validate(|_| async move {});
}

#[test]
fn test_delete_applies_to_signal_only_by_default() {
Expand Down
213 changes: 200 additions & 13 deletions rust/otap-dataflow/crates/pdata/src/otap/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1285,9 +1285,10 @@ pub fn transform_attributes_impl(
// At the same time, set flag to check if the batch already has the transport optimized
// encoding. This is used later to determine if we need to lazily materialize the because ID
// column because the transformation would break some sequences of encoded IDs.
let insert_or_upsert_needed = (transform.insert.is_some() || has_upsert)
let has_renames = transform.rename.as_ref().is_some_and(|r| !r.map.is_empty());
let early_materialize_needed = (transform.insert.is_some() || has_upsert)
&& schema.column_with_name(consts::PARENT_ID).is_some();
let (attrs_record_batch_cow, is_transport_optimized) = if insert_or_upsert_needed {
let (attrs_record_batch_cow, is_transport_optimized) = if early_materialize_needed {
let rb = materialize_parent_id_for_attributes_auto(attrs_record_batch)?;
(Cow::Owned(rb), false)
} else {
Expand All @@ -1306,14 +1307,46 @@ pub fn transform_attributes_impl(
let attrs_record_batch = attrs_record_batch_cow.as_ref();
let schema = attrs_record_batch.schema();

// Compute collision delete ranges if renames exist.
// When renaming key "a" -> "b", any existing row with key "b" sharing a parent_id
// with a row having key "a" would become a duplicate. We proactively identify these
// collisions and generate Delete ranges that are merged into the transform pipeline.
// Only when parent_ids are plain-encoded (not transport-optimized) — quasi-delta
// encoded values don't represent actual parent IDs.
Comment thread
albertlockett marked this conversation as resolved.
Outdated
let collision_delete_ranges = if has_renames && !is_transport_optimized {
let rename = transform
.rename
.as_ref()
.expect("has_renames guard ensures this is Some");
let parent_ids_vec = attrs_record_batch
.column_by_name(consts::PARENT_ID)
.map(read_parent_ids_as_u32)
.transpose()?;
if let Some(parent_ids) = parent_ids_vec.as_ref() {
Comment thread
albertlockett marked this conversation as resolved.
Outdated
let key_col = get_required_array(attrs_record_batch, consts::ATTRIBUTE_KEY)?;
let key_accessor = StringArrayAccessor::try_new(key_col)?;
find_rename_collisions_to_delete_ranges(
attrs_record_batch.num_rows(),
&key_accessor,
parent_ids,
rename,
)
} else {
Vec::new()
}
} else {
Vec::new()
};

let (mut rb, mut stats) = match key_column.data_type() {
DataType::Utf8 => {
let keys_arr = key_column
.as_any()
.downcast_ref()
.expect("can downcast Utf8 Column to string array");

let keys_transform_result = transform_keys(keys_arr, transform)?;
let keys_transform_result =
transform_keys(keys_arr, transform, &collision_delete_ranges)?;
let stats = TransformStats {
renamed_entries: keys_transform_result.replaced_rows as u64,
deleted_entries: keys_transform_result.deleted_rows as u64,
Expand Down Expand Up @@ -1380,6 +1413,7 @@ pub fn transform_attributes_impl(
.downcast_ref::<DictionaryArray<UInt8Type>>()
.expect("can downcast dictionary column to dictionary array"),
transform,
&collision_delete_ranges,
is_transport_optimized,
compute_stats,
)?;
Expand All @@ -1404,6 +1438,7 @@ pub fn transform_attributes_impl(
.downcast_ref::<DictionaryArray<UInt16Type>>()
.expect("can downcast dictionary column to dictionary array"),
transform,
&collision_delete_ranges,
is_transport_optimized,
compute_stats,
)?;
Expand Down Expand Up @@ -1571,6 +1606,7 @@ struct KeysTransformResult {
fn transform_keys(
array: &StringArray,
transform: &AttributesTransform,
collision_delete_ranges: &[KeyTransformRange],
) -> Result<KeysTransformResult> {
let len = array.len();
let values = array.values();
Expand Down Expand Up @@ -1617,7 +1653,15 @@ fn transform_keys(
// we're going to pass over both the values and the offsets, taking any ranges that weren't
// that are unmodified, while either transforming or omitting ranges that were either replaced
// or deleted. To get the sorted list of how to handle each range, we merge the plans' ranges
let transform_ranges = merge_transform_ranges(replacement_plan.as_ref(), delete_plan.as_ref());
let mut transform_ranges =
merge_transform_ranges(replacement_plan.as_ref(), delete_plan.as_ref()).into_owned();

// Merge collision delete ranges into the transform plan
if !collision_delete_ranges.is_empty() {
transform_ranges.extend_from_slice(collision_delete_ranges);
transform_ranges.sort_by_key(|r| r.start());
}
let transform_ranges: Cow<'_, [KeyTransformRange]> = Cow::Owned(transform_ranges);
Comment thread
albertlockett marked this conversation as resolved.
Outdated

// create buffer to contain the new values
let mut new_values = MutableBuffer::with_capacity(calculate_new_keys_buffer_len(
Expand Down Expand Up @@ -1727,11 +1771,21 @@ fn transform_keys(
KeyTransformRangeType::Delete => {
// for deleted ranges we don't need to append any offsets to the buffer, so we
// just decrement by how many total bytes were deleted from this range.
let deleted_val_len = delete_plan
.as_ref()
.expect("delete plan should be initialized")
.target_keys[transform_range.idx]
.len();
let deleted_val_len = if let Some(ref dp) = delete_plan {
if transform_range.idx < dp.target_keys.len() {
Comment thread
albertlockett marked this conversation as resolved.
Outdated
dp.target_keys[transform_range.idx].len()
} else {
// collision-delete range: compute byte length from offsets
let s = offsets[transform_range.start()] as usize;
let e = offsets[transform_range.end()] as usize;
(e - s) / transform_range.range.len()
}
} else {
// collision-delete range without a delete plan
let s = offsets[transform_range.start()] as usize;
let e = offsets[transform_range.end()] as usize;
(e - s) / transform_range.range.len()
};
curr_total_offset_adjustment -=
(deleted_val_len * transform_range.range.len()) as i32;
}
Expand Down Expand Up @@ -1820,6 +1874,127 @@ struct DictionaryKeysTransformResult<K: ArrowDictionaryKeyType> {
renamed_rows: usize,
}

/// Identify rows that would become duplicates after a rename and return them as
/// `KeyTransformRange::Delete` entries. When renaming key `x` to `y`, any existing
/// row with key `y` whose `parent_id` also has a row with key `x` would become a
/// duplicate. Uses [`IdBitmap`] for efficient parent-id set intersection.
///
/// Returns a list of `KeyTransformRange` with `range_type = Delete` that can
/// be merged into the existing transformation plans, removing the row cleanly.
fn find_rename_collisions_to_delete_ranges(
num_rows: usize,
key_accessor: &StringArrayAccessor<'_>,
parent_ids: &[u32],
rename: &RenameTransform,
) -> Vec<KeyTransformRange> {
let mut collision_delete_rows = vec![false; num_rows];
let mut has_collisions = false;

for (old_key, new_key) in &rename.map {
// Build IdBitmap of parent_ids that have the source key (old_key).
let mut source_parents = IdBitmap::new();
Comment thread
albertlockett marked this conversation as resolved.
Outdated
for (i, &pid) in parent_ids.iter().enumerate() {
if let Some(k) = key_accessor.str_at(i) {
if k == old_key.as_str() {
source_parents.insert(pid);
}
}
Comment thread
albertlockett marked this conversation as resolved.
Outdated
}

if source_parents.is_empty() {
continue;
}

// Mark target-key rows whose parent_id overlaps with source_parents.
for (i, &pid) in parent_ids.iter().enumerate() {
if let Some(k) = key_accessor.str_at(i) {
if k == new_key.as_str() && source_parents.contains(pid) {
Comment thread
albertlockett marked this conversation as resolved.
Outdated
collision_delete_rows[i] = true;
has_collisions = true;
}
}
}
}

if !has_collisions {
return Vec::new();
}

// Convert per-row booleans into contiguous KeyTransformRange::Delete entries
let mut ranges = Vec::new();
let mut start = None;
for (i, &should_delete) in collision_delete_rows.iter().enumerate() {
Comment thread
albertlockett marked this conversation as resolved.
Outdated
if should_delete {
if start.is_none() {
start = Some(i);
}
} else if let Some(s) = start.take() {
ranges.push(KeyTransformRange {
range: s..i,
idx: 0,
range_type: KeyTransformRangeType::Delete,
});
}
}
if let Some(s) = start {
ranges.push(KeyTransformRange {
range: s..num_rows,
idx: 0,
range_type: KeyTransformRangeType::Delete,
});
}
ranges
}

/// Read the parent_id column into a flat `Vec<u32>`, handling UInt16, UInt32,
/// and dictionary-encoded variants.
fn read_parent_ids_as_u32(col: &ArrayRef) -> Result<Vec<u32>> {
let n = col.len();
let mut result = Vec::with_capacity(n);
match col.data_type() {
DataType::UInt16 => {
let arr = col.as_any().downcast_ref::<UInt16Array>().ok_or_else(|| {
Error::UnexpectedRecordBatchState {
reason: "expected UInt16 for parent_id".into(),
}
})?;
for i in 0..n {
result.push(arr.value(i) as u32);
}
}
DataType::UInt32 => {
let arr = col.as_any().downcast_ref::<UInt32Array>().ok_or_else(|| {
Error::UnexpectedRecordBatchState {
reason: "expected UInt32 for parent_id".into(),
}
})?;
for i in 0..n {
result.push(arr.value(i));
}
}
DataType::Dictionary(key_type, _) => {
let arr =
cast(col, &DataType::UInt32).map_err(|e| Error::UnexpectedRecordBatchState {
reason: format!("cannot cast dict({key_type:?}) parent_id to u32: {e}"),
})?;
let u32_arr = arr.as_any().downcast_ref::<UInt32Array>().ok_or_else(|| {
Error::UnexpectedRecordBatchState {
reason: "cast to UInt32 failed".into(),
}
})?;
for i in 0..n {
result.push(u32_arr.value(i));
}
}
other => {
return Err(Error::UnexpectedRecordBatchState {
reason: format!("unsupported parent_id data type: {other:?}"),
});
}
}
Ok(result)
}

/// Transforms the keys for the dictionary array.
///
/// # Arguments
Expand All @@ -1835,6 +2010,7 @@ struct DictionaryKeysTransformResult<K: ArrowDictionaryKeyType> {
fn transform_dictionary_keys<K>(
dict_arr: &DictionaryArray<K>,
transform: &AttributesTransform,
collision_delete_ranges: &[KeyTransformRange],
is_transport_encoded: bool,
compute_stats: bool,
) -> Result<DictionaryKeysTransformResult<K>>
Expand All @@ -1849,7 +2025,7 @@ where
actual: dict_values.data_type().clone(),
}
})?;
let dict_values_transform_result = transform_keys(dict_values, transform)?;
let dict_values_transform_result = transform_keys(dict_values, transform, &[])?;

// Convert the ranges of transformed dictionary values into the ranges of transformed dict keys.
// These ranges are used to determine two things:
Expand All @@ -1872,6 +2048,13 @@ where
Vec::new()
};

// Merge collision delete ranges (row-level) into the dict key transform ranges
let mut dict_key_transform_ranges = dict_key_transform_ranges;
if !collision_delete_ranges.is_empty() {
dict_key_transform_ranges.extend_from_slice(collision_delete_ranges);
dict_key_transform_ranges.sort_by_key(|r| r.start());
}

// If we're tracking statistics on how many rows were transformed, it's less expensive to
// compute these statistics from the ranges of transformed dictionary keys. However, if these
// ranges haven't been computed, it's more performant to compute the statistics from the
Expand All @@ -1889,7 +2072,7 @@ where
(rename_count, 0)
};

if dict_values_transform_result.keep_ranges.is_none() {
if dict_values_transform_result.keep_ranges.is_none() && collision_delete_ranges.is_empty() {
// here there were no rows deleted from the values array, which means we can reuse
// the dictionary keys without any transformations
let new_dict_keys = dict_keys.clone();
Expand All @@ -1912,8 +2095,12 @@ where
});
}

// safety: we've checked above that this is not None
let dict_values_keep_ranges = dict_values_transform_result.keep_ranges.expect("not none");
let dict_values_keep_ranges = dict_values_transform_result.keep_ranges.unwrap_or_else(|| {
vec![Range {
start: 0,
end: dict_values.len(),
}]
});

// create quick lookup for each dictionary key of whether it was kept and if so which
// contiguous range of kept dictionary values the key points to. This will allow us to build
Expand Down
Loading