Skip to content

Commit 652f8ca

Browse files
authored
Merge branch 'main' into split/microsoft-common-schema-processor
2 parents 626e4ca + f018901 commit 652f8ca

45 files changed

Lines changed: 7831 additions & 2535 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/copilot-instructions.md

Lines changed: 0 additions & 4 deletions
This file was deleted.

.github/workflows/pipeline-perf-on-label.yaml

Lines changed: 6 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
# This action runs the pipeline perf continuous benchmarking suite on every PR.
2-
# - With 'pipelineperf' label: runs on dedicated Oracle bare-metal hardware for accurate benchmarks
3-
# - Without label: runs on ubuntu-latest for basic validation
4-
# In either case, the results does not update the charts.
5-
name: Pipeline Perf Pre-Merge
1+
# This action runs the pipeline perf benchmarking suite on dedicated Oracle
2+
# bare-metal hardware when the 'pipelineperf' label is added to a PR.
3+
# Basic perf validation on ubuntu-latest is handled by Rust-CI (rust-ci.yml).
4+
# The results from this workflow do not update the charts.
5+
name: Pipeline Perf Dedicated
66

77
on:
88
pull_request:
@@ -18,29 +18,9 @@ concurrency:
1818
cancel-in-progress: true
1919

2020
jobs:
21-
# Check for the pipelineperf label to determine which runner to use
22-
label-check:
23-
name: Check for pipelineperf label
24-
runs-on: ubuntu-latest
25-
outputs:
26-
has_label: ${{ steps.check_label.outputs.has_label }}
27-
steps:
28-
- name: Check if PR has 'pipelineperf' label
29-
id: check_label
30-
run: |
31-
labels=$(echo '${{ toJson(github.event.pull_request.labels) }}' | jq -r '.[].name')
32-
if echo "$labels" | grep -q "pipelineperf"; then
33-
echo "Label pipelineperf found - will use dedicated hardware"
34-
echo "has_label=true" >> $GITHUB_OUTPUT
35-
else
36-
echo "Label 'pipelineperf' not found - will use ubuntu-latest"
37-
echo "has_label=false" >> $GITHUB_OUTPUT
38-
fi
39-
4021
# Run on dedicated Oracle hardware when 'pipelineperf' label is present
4122
pipeline-perf-test-dedicated:
42-
needs: label-check
43-
if: needs.label-check.outputs.has_label == 'true'
23+
if: contains(github.event.pull_request.labels.*.name, 'pipelineperf')
4424
runs-on: oracle-bare-metal-64cpu-1024gb-x86-64-ubuntu-24
4525
steps:
4626
- name: Harden the runner (Audit all outbound calls)
@@ -122,42 +102,3 @@ jobs:
122102
echo ""
123103
echo "=== Docker disk usage ==="
124104
docker system df -v 2>/dev/null || true
125-
126-
# Run on ubuntu-latest for basic validation when no label is present
127-
pipeline-perf-test-basic:
128-
needs: label-check
129-
if: needs.label-check.outputs.has_label == 'false'
130-
runs-on: ubuntu-latest
131-
steps:
132-
- name: Harden the runner (Audit all outbound calls)
133-
uses: step-security/harden-runner@fe104658747b27e96e4f7e80cd0a94068e53901d # v2.16.1
134-
with:
135-
egress-policy: audit
136-
137-
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
138-
139-
- name: Set up Python
140-
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
141-
with:
142-
python-version: "3.14"
143-
144-
- name: Set up Docker Buildx
145-
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # v4.0.0
146-
147-
- name: Build dataflow_engine
148-
run: |
149-
git submodule init
150-
git submodule update
151-
cd rust/otap-dataflow
152-
docker buildx build --load --build-context otel-arrow=../../ -f Dockerfile -t df_engine .
153-
cd ../..
154-
155-
- name: Install dependencies
156-
run: |
157-
python -m pip install --user --require-hashes -r tools/pipeline_perf_test/orchestrator/requirements.lock.txt
158-
python -m pip install --user --require-hashes -r tools/pipeline_perf_test/load_generator/requirements.lock.txt
159-
160-
- name: Run pipeline performance test suite
161-
run: |
162-
cd tools/pipeline_perf_test
163-
python orchestrator/run_orchestrator.py --config test_suites/integration/continuous/100klrps-docker.yaml

.github/workflows/rust-ci.yml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,38 @@ jobs:
737737
reporter: java-junit
738738
fail-on-error: false
739739

740+
# Pipeline performance test - validates that Rust changes don't regress performance.
741+
pipeline_perf_test:
742+
runs-on: ubuntu-latest
743+
steps:
744+
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
745+
with:
746+
submodules: true
747+
- name: Set up Python
748+
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
749+
with:
750+
python-version: "3.14"
751+
- name: Set up Docker Buildx
752+
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # v4.0.0
753+
- name: Free disk space
754+
run: |
755+
sudo rm -rf /usr/lib/jvm /usr/share/dotnet /usr/share/swift /usr/local/.ghcup
756+
sudo rm -rf /usr/local/julia* /usr/local/lib/android /usr/local/share/chromium
757+
sudo rm -rf /opt/microsoft /opt/google /opt/az /usr/local/share/powershell
758+
- name: Build dataflow_engine
759+
run: |
760+
cd rust/otap-dataflow
761+
docker buildx build --load --build-context otel-arrow=../../ -f Dockerfile -t df_engine .
762+
cd ../..
763+
- name: Install dependencies
764+
run: |
765+
python -m pip install --user --require-hashes -r tools/pipeline_perf_test/orchestrator/requirements.lock.txt
766+
python -m pip install --user --require-hashes -r tools/pipeline_perf_test/load_generator/requirements.lock.txt
767+
- name: Run pipeline performance test suite
768+
run: |
769+
cd tools/pipeline_perf_test
770+
python orchestrator/run_orchestrator.py --config test_suites/integration/continuous/100klrps-docker.yaml
771+
740772
# Aggregated status check - depends only on the required matrix combinations.
741773
# Add/remove jobs from the needs list to change what is required via PR,
742774
# rather than updating GitHub branch protection settings directly.
@@ -753,6 +785,7 @@ jobs:
753785
- compile_proto
754786
- pest-fmt
755787
- no_default_features_check
788+
- pipeline_perf_test
756789
steps:
757790
- name: Check if all required jobs succeeded
758791
run: |
@@ -788,4 +821,8 @@ jobs:
788821
echo "no_default_features_check failed or was cancelled"
789822
exit 1
790823
fi
824+
if [[ "${{ needs.pipeline_perf_test.result }}" != "success" ]]; then
825+
echo "pipeline_perf_test failed or was cancelled"
826+
exit 1
827+
fi
791828
echo "All required checks passed!"

AGENTS.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Agent Instructions
2+
3+
If working on Rust code (i.e., the `rust/` directory), read and follow all
4+
instructions in [rust/otap-dataflow/AGENTS.md](rust/otap-dataflow/AGENTS.md).

CLAUDE.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
# CLAUDE.md
22

3-
If working on Rust code (i.e., the `rust/` directory), read and follow all
4-
instructions in [rust/otap-dataflow/AGENTS.md](rust/otap-dataflow/AGENTS.md).
3+
See [AGENTS.md](AGENTS.md).

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,6 @@ repository](https://github.com/open-telemetry/community/blob/main/guides/contrib
211211

212212
- [Cijo Thomas](https://github.com/cijothomas), Microsoft
213213
- [Lalit Kumar Bhasin](https://github.com/lalitb), Microsoft
214-
- [Lei Huang](https://github.com/v0y4g3r), Greptime
215214
- [Utkarsh Umesan Pillai](https://github.com/utpilla), Microsoft
216215

217216
For more information about the approver role, see the [community
@@ -227,6 +226,7 @@ repository](https://github.com/open-telemetry/community/blob/main/guides/contrib
227226
### Emeritus
228227

229228
- [Alex Boten](https://github.com/codeboten), Approver
229+
- [Lei Huang](https://github.com/v0y4g3r), Approver
230230
- [Moh Osman](https://github.com/moh-osman3), Approver
231231

232232
### Thanks to all of our contributors

rust/otap-dataflow/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ A simple component to produce synthetic data from semantic convention registries
187187
#### Batch processor
188188

189189
A batching processor that works directly with OTAP records. This is
190-
[based on lower-level support in the `otal_arrow_rust`
191-
crate](../otel-arrow-rust/src/otap/batching.rs).
190+
[based on lower-level support in the `otap-df-pdata`
191+
crate](./crates/pdata/src/otap/batching.rs).
192192

193193
#### OTAP exporter
194194

rust/otap-dataflow/benchmarks/benches/attribute_transform/main.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,27 @@ fn generate_native_keys_attr_batch(
3535
key_gen: impl Fn(usize) -> String,
3636
) -> RecordBatch {
3737
let mut keys_arr = StringBuilder::new();
38+
let mut parent_ids = Vec::new();
3839
for i in 0..num_rows {
3940
let attr_key = key_gen(i);
4041
keys_arr.append_value(attr_key);
42+
parent_ids.push((i % 10) as u16);
4143
}
4244
let keys_arr = keys_arr.finish();
45+
let parent_ids = UInt16Array::from(parent_ids);
4346

4447
let type_arr = UInt8Array::from_iter_values(std::iter::repeat_n(
4548
AttributeValueType::Empty as u8,
4649
keys_arr.len(),
4750
));
51+
4852
RecordBatch::try_new(
4953
Arc::new(Schema::new(vec![
54+
Field::new(consts::PARENT_ID, DataType::UInt16, false).with_plain_encoding(),
5055
Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false),
5156
Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false),
5257
])),
53-
vec![Arc::new(type_arr), Arc::new(keys_arr)],
58+
vec![Arc::new(parent_ids), Arc::new(type_arr), Arc::new(keys_arr)],
5459
)
5560
.expect("expect no error")
5661
}
@@ -62,18 +67,24 @@ fn generate_dict_keys_attribute_batch(
6267
) -> RecordBatch {
6368
let mut keys_dict_values_arr = StringBuilder::new();
6469
let mut keys_dict_keys_arr = PrimitiveBuilder::<UInt16Type>::new();
70+
let mut parent_ids = Vec::new();
6571
for i in 0..num_keys {
6672
let attr_key = key_gen(i);
6773
keys_dict_values_arr.append_value(attr_key);
6874
keys_dict_keys_arr.append_value_n(i as u16, rows_per_key);
75+
for j in 0..rows_per_key {
76+
parent_ids.push(((i * rows_per_key + j) % 10) as u16);
77+
}
6978
}
7079

7180
let keys_arr = DictionaryArray::new(
7281
keys_dict_keys_arr.finish(),
7382
Arc::new(keys_dict_values_arr.finish()),
7483
);
84+
let parent_ids = UInt16Array::from(parent_ids);
7585

7686
let schema = Arc::new(Schema::new(vec![
87+
Field::new(consts::PARENT_ID, DataType::UInt16, false).with_plain_encoding(),
7788
Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false),
7889
Field::new(
7990
consts::ATTRIBUTE_KEY,
@@ -85,8 +96,11 @@ fn generate_dict_keys_attribute_batch(
8596
AttributeValueType::Empty as u8,
8697
keys_arr.len(),
8798
));
88-
RecordBatch::try_new(schema, vec![Arc::new(type_arr), Arc::new(keys_arr)])
89-
.expect("expect no error")
99+
RecordBatch::try_new(
100+
schema,
101+
vec![Arc::new(parent_ids), Arc::new(type_arr), Arc::new(keys_arr)],
102+
)
103+
.expect("expect no error")
90104
}
91105

92106
fn bench_transform_attributes(c: &mut Criterion) {

rust/otap-dataflow/crates/config/src/node.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
use crate::error::Error;
1212
use crate::pipeline::telemetry::{AttributeValue, TelemetryAttribute};
1313
use crate::transport_headers_policy::{HeaderCapturePolicy, HeaderPropagationPolicy};
14-
use crate::{CapabilityId, Description, NodeId, NodeUrn, PortName};
14+
use crate::{CapabilityId, Description, ExtensionId, NodeUrn, PortName};
1515
use schemars::JsonSchema;
1616
use serde::{Deserialize, Serialize};
1717
use serde_json::Value;
@@ -25,7 +25,7 @@ use std::collections::HashMap;
2525
/// and returns an error so the user gets immediate feedback.
2626
fn deserialize_no_dup_keys<'de, D>(
2727
deserializer: D,
28-
) -> Result<HashMap<CapabilityId, NodeId>, D::Error>
28+
) -> Result<HashMap<CapabilityId, ExtensionId>, D::Error>
2929
where
3030
D: serde::Deserializer<'de>,
3131
{
@@ -35,7 +35,7 @@ where
3535
struct NoDupVisitor;
3636

3737
impl<'de> Visitor<'de> for NoDupVisitor {
38-
type Value = HashMap<CapabilityId, NodeId>;
38+
type Value = HashMap<CapabilityId, ExtensionId>;
3939

4040
fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4141
f.write_str("a map with no duplicate keys")
@@ -49,7 +49,7 @@ where
4949
"duplicate capability key '{key}'"
5050
)));
5151
}
52-
let _ = result.insert(CapabilityId::from(key), NodeId::from(value));
52+
let _ = result.insert(CapabilityId::from(key), ExtensionId::from(value));
5353
}
5454
Ok(result)
5555
}
@@ -115,7 +115,7 @@ pub struct NodeUserConfig {
115115
skip_serializing_if = "HashMap::is_empty",
116116
deserialize_with = "deserialize_no_dup_keys"
117117
)]
118-
pub capabilities: HashMap<CapabilityId, NodeId>,
118+
pub capabilities: HashMap<CapabilityId, ExtensionId>,
119119

120120
/// Entity configuration for the node.
121121
///

rust/otap-dataflow/crates/core-nodes/src/processors/attributes_processor/mod.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,71 @@ mod tests {
716716
})
717717
.validate(|_| async move {});
718718
}
719+
#[test]
720+
fn test_rename_removes_duplicate_keys() {
721+
// Prepare input with key "a" and "b"
722+
let input = build_logs_with_attrs(
723+
vec![],
724+
vec![],
725+
vec![
726+
KeyValue::new("a", AnyValue::new_string("value_a")),
727+
KeyValue::new("b", AnyValue::new_string("value_b")),
728+
],
729+
);
730+
731+
let cfg = json!({
732+
"actions": [
733+
{"action": "rename", "source_key": "a", "destination_key": "b"}
734+
]
735+
});
736+
737+
let telemetry_registry_handle = TelemetryRegistryHandle::new();
738+
let controller_ctx = ControllerContext::new(telemetry_registry_handle);
739+
let pipeline_ctx =
740+
controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0);
741+
742+
let node = test_node("attributes-processor-test-dup");
743+
let rt: TestRuntime<OtapPdata> = TestRuntime::new();
744+
let mut node_config = NodeUserConfig::new_processor_config(ATTRIBUTES_PROCESSOR_URN);
745+
node_config.config = cfg;
746+
let proc =
747+
create_attributes_processor(pipeline_ctx, node, Arc::new(node_config), rt.config())
748+
.expect("create processor");
749+
let phase = rt.set_processor(proc);
750+
751+
phase
752+
.run_test(|mut ctx| async move {
753+
let mut bytes = BytesMut::new();
754+
input.encode(&mut bytes).expect("encode");
755+
let bytes = bytes.freeze();
756+
let pdata_in =
757+
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
758+
ctx.process(Message::PData(pdata_in))
759+
.await
760+
.expect("process");
761+
762+
let out = ctx.drain_pdata().await;
763+
let first = out.into_iter().next().expect("one output").payload();
764+
765+
let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp");
766+
let bytes = match otlp_bytes {
767+
OtlpProtoBytes::ExportLogsRequest(b) => b,
768+
_ => panic!("unexpected otlp variant"),
769+
};
770+
let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode");
771+
772+
let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes;
773+
774+
// Expect no "a" and exactly one "b"
775+
assert!(!log_attrs.iter().any(|kv| kv.key == "a"));
776+
let b_count = log_attrs.iter().filter(|kv| kv.key == "b").count();
777+
assert_eq!(
778+
b_count, 1,
779+
"There should be exactly one key 'b' (no duplicates)"
780+
);
781+
})
782+
.validate(|_| async move {});
783+
}
719784

720785
#[test]
721786
fn test_delete_applies_to_signal_only_by_default() {

0 commit comments

Comments
 (0)