Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,71 @@ mod tests {
})
.validate(|_| async move {});
}
#[test]
fn test_rename_removes_duplicate_keys() {
// Prepare input with key "a" and "b"
let input = build_logs_with_attrs(
vec![],
vec![],
vec![
KeyValue::new("a", AnyValue::new_string("value_a")),
KeyValue::new("b", AnyValue::new_string("value_b")),
],
);

let cfg = json!({
"actions": [
{"action": "rename", "source_key": "a", "destination_key": "b"}
]
});

let telemetry_registry_handle = TelemetryRegistryHandle::new();
let controller_ctx = ControllerContext::new(telemetry_registry_handle);
let pipeline_ctx =
controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0);

let node = test_node("attributes-processor-test-dup");
let rt: TestRuntime<OtapPdata> = TestRuntime::new();
let mut node_config = NodeUserConfig::new_processor_config(ATTRIBUTES_PROCESSOR_URN);
node_config.config = cfg;
let proc =
create_attributes_processor(pipeline_ctx, node, Arc::new(node_config), rt.config())
.expect("create processor");
let phase = rt.set_processor(proc);

phase
.run_test(|mut ctx| async move {
let mut bytes = BytesMut::new();
input.encode(&mut bytes).expect("encode");
let bytes = bytes.freeze();
let pdata_in =
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
ctx.process(Message::PData(pdata_in))
.await
.expect("process");

let out = ctx.drain_pdata().await;
let first = out.into_iter().next().expect("one output").payload();

let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp");
let bytes = match otlp_bytes {
OtlpProtoBytes::ExportLogsRequest(b) => b,
_ => panic!("unexpected otlp variant"),
};
let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode");

let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes;

// Expect no "a" and exactly one "b"
assert!(!log_attrs.iter().any(|kv| kv.key == "a"));
let b_count = log_attrs.iter().filter(|kv| kv.key == "b").count();
assert_eq!(
b_count, 1,
"There should be exactly one key 'b' (no duplicates)"
);
})
.validate(|_| async move {});
}

#[test]
fn test_delete_applies_to_signal_only_by_default() {
Expand Down
Loading
Loading