Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ geneva-exporter = ["otap-df-contrib-nodes/geneva-exporter"]
azure-monitor-exporter = ["otap-df-contrib-nodes/azure-monitor-exporter"]
# Contrib processors (opt-in) - now in contrib-nodes
contrib-processors = ["otap-df-contrib-nodes/contrib-processors"]
microsoft-common-schema-processor = ["otap-df-contrib-nodes/microsoft-common-schema-processor"]
condense-attributes-processor = ["otap-df-contrib-nodes/condense-attributes-processor"]
recordset-kql-processor = ["otap-df-contrib-nodes/recordset-kql-processor"]
resource-validator-processor = ["otap-df-contrib-nodes/resource-validator-processor"]
Expand Down
14 changes: 9 additions & 5 deletions rust/otap-dataflow/crates/contrib-nodes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,30 @@ workspace = true

[dependencies]
# Workspace crates (likely used by most/all features)
otap-df-channel = { workspace = true }
otap-df-config = { workspace = true }
otap-df-engine = { workspace = true }
otap-df-otap = { workspace = true }
otap-df-pdata = { workspace = true }
otap-df-pdata-views = { workspace = true }
otap-df-telemetry = { workspace = true }
otap-df-channel = { workspace = true }
otap-df-telemetry-macros = { workspace = true }

arrow.workspace = true
async-trait.workspace = true
futures.workspace = true
tokio.workspace = true
tracing.workspace = true
bytes.workspace = true
prost.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
linkme.workspace = true
tokio.workspace = true
tracing.workspace = true
chrono = { workspace = true, optional = true }
geneva-uploader = { workspace = true, optional = true }
humantime-serde = { workspace = true, optional = true }
itoa = { workspace = true, optional = true }
geneva-uploader = { workspace = true, optional = true }
opentelemetry-proto = { workspace = true, optional = true }
data_engine_recordset = { workspace = true, optional = true }
data_engine_recordset_otlp_bridge = { workspace = true, optional = true }
Expand All @@ -45,7 +46,6 @@ azure_core = { workspace = true, optional = true, features = ["reqwest"] }
azure_identity = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
http = { workspace = true, optional = true }
itoa = { workspace = true, optional = true }
rand = { workspace = true, optional = true }
ryu = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true, features = ["rustls-no-provider"] }
Expand Down Expand Up @@ -79,10 +79,14 @@ azure-monitor-exporter = [
]

contrib-processors = [
"microsoft-common-schema-processor",
"condense-attributes-processor",
"recordset-kql-processor",
"resource-validator-processor",
]
microsoft-common-schema-processor = [
"dep:chrono",
]
condense-attributes-processor = []
recordset-kql-processor = [
"dep:data_engine_recordset_otlp_bridge",
Expand Down
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/contrib-nodes/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
<!-- markdownlint-disable MD013 -->

# Contrib Nodes

This crate contains optional (feature-gated) contrib processors and exporters.
Expand Down Expand Up @@ -29,6 +31,7 @@ Aggregate flags enable all nodes in their category.

| Feature | Enables Node | Node URN | Module |
| ------- | ------------ | -------- | ------ |
| `microsoft-common-schema-processor` | Microsoft Common Schema processor | `urn:microsoft:processor:common_schema_otel_logs` | `src/processors/microsoft_common_schema_processor/` |
| `condense-attributes-processor` | Condense Attributes processor | `urn:otel:processor:condense_attributes` | `src/processors/condense_attributes_processor/` |
| `recordset-kql-processor` | RecordSet KQL processor | `urn:microsoft:processor:recordset_kql` | `src/processors/recordset_kql_processor/` |
| `resource-validator-processor` | Resource Validator processor | `urn:otel:processor:resource_validator` | `src/processors/resource_validator_processor/` |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Microsoft Common Schema Processor
Comment thread
lalitb marked this conversation as resolved.

URN: `urn:microsoft:processor:common_schema_otel_logs`

This processor promotes decoded Microsoft Common Schema log attributes into typed
OTLP log fields. It is intended for pipelines where the producer is known to
emit Microsoft Common Schema records, such as the Linux `user_events` receiver
using `event_header` format.

## Configuration

The processor currently has no options:

```yaml
processors:
common_schema:
kind: "urn:microsoft:processor:common_schema_otel_logs"
config: {}
```

Unknown configuration fields are rejected.

## Promotion Rules

Only records with `__csver__ == 0x400` and `PartB._typeName == "Log"` are
promoted. Other records are forwarded unchanged.

| Common Schema attribute | OTLP destination |
| --- | --- |
| `PartA.time` | `time_unix_nano` |
| `PartA.name` | `event_name` when `PartB.name` is absent |
| `PartB.name` | `event_name` |
| `PartA.ext_dt_traceId` | `trace_id` |
| `PartA.ext_dt_spanId` | `span_id` |
| `PartA.ext_dt_traceFlags` | `flags` |
| `PartA.ext_cloud_role` | `service.name` attribute |
| `PartA.ext_cloud_roleInstance` | `service.instance.id` attribute |
| `PartB.body` | `body` |
| `PartB.severityNumber` | `severity_number` |
| `PartB.severityText` | `severity_text` |
| `PartC.*` | Attribute with the `PartC.` prefix removed |

`PartB.name` takes precedence over `PartA.name`. Severity values above the OTLP
range are clamped to `24`; severity `0` is preserved as OTLP `UNSPECIFIED`.
Recognized `PartA.*` and `PartB.*` fields are removed after promotion. Unknown
`PartA.*` and `PartB.*` fields are preserved with their original names.

Malformed trace and span IDs are kept as attributes named `trace.id` and
`span.id` instead of being promoted to typed ID fields.

## Arrow Conversion Cost

The current implementation promotes records by converting payloads to OTLP proto
bytes and mutating OTLP `LogsData` / `LogRecord` values. Incoming Arrow batches
that do not contain a `__csver__` log attribute are forwarded without this
conversion. Arrow batches that contain a `__csver__` log attribute still pay the
Arrow-to-OTLP conversion cost. If no records are promoted after inspection, the
original Arrow payload is forwarded unchanged.

This processor should be wired only where Common Schema records are expected.
Using it in a heterogeneous logs pipeline may add conversion work to batches
that do not benefit from promotion.

The full Arrow-native implementation is deferred. It needs a reusable
cross-batch transform that can read log attributes by `parent_id`, set typed
root log columns, and rebuild `LogAttrs` with promoted fields removed while
preserving the remaining attributes.

## Metrics

The processor reports these metrics:

- `records_seen`: log records inspected after conversion to OTLP.
- `records_promoted`: log records promoted as Microsoft Common Schema logs.
- `records_skipped_not_common_schema`: inspected records that did not match
Common Schema gating.
- `batches_promoted`: log batches with at least one promoted record.
- `arrow_batches_skipped_no_csver`: Arrow batches forwarded without OTLP
conversion because no `__csver__` attribute was present.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Metrics for the Microsoft Common Schema processor.

use otap_df_telemetry::instrument::Counter;
use otap_df_telemetry_macros::metric_set;

/// Internal telemetry for the Microsoft Common Schema processor.
#[metric_set(name = "microsoft.common_schema.processor.metrics")]
#[derive(Debug, Default, Clone)]
pub(super) struct MicrosoftCommonSchemaProcessorMetrics {
/// Number of log records inspected by the processor.
#[metric(unit = "{item}")]
pub records_seen: Counter<u64>,
/// Number of log records promoted as Microsoft Common Schema logs.
#[metric(unit = "{item}")]
pub records_promoted: Counter<u64>,
/// Number of log records inspected but not promoted as Microsoft Common Schema.
#[metric(unit = "{item}")]
pub records_skipped_not_common_schema: Counter<u64>,
/// Number of log batches that had at least one promoted record.
#[metric(unit = "{batch}")]
pub batches_promoted: Counter<u64>,
/// Number of Arrow log batches forwarded without OTLP conversion because no
/// `__csver__` attribute was present.
#[metric(unit = "{batch}")]
pub arrow_batches_skipped_no_csver: Counter<u64>,
}
Loading
Loading