diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index 2e480d7800..5443b6a090 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -279,8 +279,12 @@ crypto-openssl = ["otap-df-otap/crypto-openssl"] contrib-exporters = ["otap-df-contrib-nodes/contrib-exporters"] geneva-exporter = ["otap-df-contrib-nodes/geneva-exporter"] azure-monitor-exporter = ["otap-df-contrib-nodes/azure-monitor-exporter"] +# Contrib receivers (opt-in) - now in contrib-nodes +contrib-receivers = ["otap-df-contrib-nodes/contrib-receivers"] +userevents-receiver = ["otap-df-contrib-nodes/userevents-receiver"] # Contrib processors (opt-in) - now in contrib-nodes contrib-processors = ["otap-df-contrib-nodes/contrib-processors"] +microsoft-common-schema-processor = ["otap-df-contrib-nodes/microsoft-common-schema-processor"] condense-attributes-processor = ["otap-df-contrib-nodes/condense-attributes-processor"] recordset-kql-processor = ["otap-df-contrib-nodes/recordset-kql-processor"] resource-validator-processor = ["otap-df-contrib-nodes/resource-validator-processor"] diff --git a/rust/otap-dataflow/crates/contrib-nodes/Cargo.toml b/rust/otap-dataflow/crates/contrib-nodes/Cargo.toml index 7b0030906c..2f8e09620a 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/Cargo.toml +++ b/rust/otap-dataflow/crates/contrib-nodes/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [dependencies] # Workspace crates (likely used by most/all features) +otap-df-channel = { workspace = true } otap-df-config = { workspace = true } otap-df-engine = { workspace = true } otap-df-engine-macros = { workspace = true } @@ -21,23 +22,25 @@ otap-df-otap = { workspace = true } otap-df-pdata = { workspace = true } otap-df-pdata-views = { workspace = true } otap-df-telemetry = { workspace = true } -otap-df-channel = { workspace = true } otap-df-telemetry-macros = { workspace = true } arrow.workspace = true async-trait.workspace = true futures.workspace = true -tokio.workspace = true -tracing.workspace = true bytes.workspace = true prost.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true linkme.workspace = true +tokio.workspace = true +tracing.workspace = true +base64 = { workspace = true, optional = true } chrono = { workspace = true, optional = true } -geneva-uploader = { workspace = true, optional = true } humantime-serde = { workspace = true, optional = true } +tracepoint_decode = { version = "0.5.0", optional = true } +itoa = { workspace = true, optional = true } +geneva-uploader = { workspace = true, optional = true } opentelemetry-proto = { workspace = true, optional = true } data_engine_recordset = { workspace = true, optional = true } data_engine_recordset_otlp_bridge = { workspace = true, optional = true } @@ -46,14 +49,28 @@ azure_core = { workspace = true, optional = true, features = ["reqwest"] } azure_identity = { workspace = true, optional = true } flate2 = { workspace = true, optional = true } http = { workspace = true, optional = true } -itoa = { workspace = true, optional = true } rand = { workspace = true, optional = true } ryu = { workspace = true, optional = true } reqwest = { workspace = true, optional = true, features = ["rustls-no-provider"] } sysinfo = { workspace = true, optional = true } urlencoding = { workspace = true, optional = true } +[target.'cfg(target_os = "linux")'.dependencies] +# TODO: Remove this pinned git dependency once one-collect is upstream in the +# normal dependency graph and no longer needs to be pulled directly here. +one_collect = { git = "https://github.com/microsoft/one-collect.git", rev = "2d33e674348168f4a19bd3ab40d92f1c556407ec", features = ["scripting"], optional = true } + [features] +contrib-receivers = [ + "userevents-receiver", +] +userevents-receiver = [ + "dep:base64", + "dep:chrono", + "dep:humantime-serde", + "dep:one_collect", + "dep:tracepoint_decode", +] contrib-exporters = [ "geneva-exporter", "azure-monitor-exporter", @@ -80,10 +97,14 @@ azure-monitor-exporter = [ ] contrib-processors = [ + "microsoft-common-schema-processor", "condense-attributes-processor", "recordset-kql-processor", "resource-validator-processor", ] +microsoft-common-schema-processor = [ + "dep:chrono", +] condense-attributes-processor = [] recordset-kql-processor = [ "dep:data_engine_recordset_otlp_bridge", @@ -97,11 +118,16 @@ axum.workspace = true clap.workspace = true criterion = { workspace = true, features = ["html_reports"] } humantime.workspace = true +core_affinity.workspace = true +eventheader_dynamic = "0.5.0" +otap-df-channel = { workspace = true } otap-df-engine = { workspace = true, features = ["test-utils"] } otap-df-otap = { workspace = true, features = ["test-utils"] } +otap-df-pdata = { workspace = true, features = ["testing"] } pretty_assertions.workspace = true rand.workspace = true serde_yaml.workspace = true +tracing-subscriber = { workspace = true, features = ["fmt", "std", "env-filter"] } wiremock.workspace = true [[bench]] diff --git a/rust/otap-dataflow/crates/contrib-nodes/README.md b/rust/otap-dataflow/crates/contrib-nodes/README.md index 0893379b95..2a99c1d90f 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/README.md +++ b/rust/otap-dataflow/crates/contrib-nodes/README.md @@ -1,11 +1,15 @@ + + # Contrib Nodes -This crate contains optional (feature-gated) contrib processors and exporters. +This crate contains contrib receivers, processors, and exporters. ## Folder Layout - `src/exporters/` - Contrib exporters +- `src/receivers/` + - Contrib receivers - `src/processors/` - Contrib processors @@ -14,6 +18,18 @@ This crate contains optional (feature-gated) contrib processors and exporters. Feature flags are grouped into aggregate categories and individual node flags. Aggregate flags enable all nodes in their category. +### Receivers + +| Node | URN | Module | +| ---- | --- | ------ | +| userevents_receiver | `urn:otel:receiver:userevents` | `src/receivers/userevents_receiver/` | + +#### userevents_receiver + +- Reads Linux `user_events` tracepoints through per-CPU perf sessions +- Supports single-tracepoint and multi-tracepoint configuration +- Supports tracefs structural decoding and EventHeader decoding + ### Exporters - `contrib-exporters` (enables all contrib exporters) @@ -29,6 +45,7 @@ Aggregate flags enable all nodes in their category. | Feature | Enables Node | Node URN | Module | | ------- | ------------ | -------- | ------ | +| `microsoft-common-schema-processor` | Microsoft Common Schema processor | `urn:microsoft:processor:common_schema_otel_logs` | `src/processors/microsoft_common_schema_processor/` | | `condense-attributes-processor` | Condense Attributes processor | `urn:otel:processor:condense_attributes` | `src/processors/condense_attributes_processor/` | | `recordset-kql-processor` | RecordSet KQL processor | `urn:microsoft:processor:recordset_kql` | `src/processors/recordset_kql_processor/` | | `resource-validator-processor` | Resource Validator processor | `urn:otel:processor:resource_validator` | `src/processors/resource_validator_processor/` | diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/lib.rs b/rust/otap-dataflow/crates/contrib-nodes/src/lib.rs index 2b5cb59c14..929a13382d 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/lib.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/lib.rs @@ -6,5 +6,8 @@ /// Exporter implementations for contrib nodes. pub mod exporters; +/// Receiver implementations for contrib nodes. +pub mod receivers; + /// Processor implementations for contrib nodes. pub mod processors; diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/processors/microsoft_common_schema_processor/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/processors/microsoft_common_schema_processor/mod.rs new file mode 100644 index 0000000000..784059becd --- /dev/null +++ b/rust/otap-dataflow/crates/contrib-nodes/src/processors/microsoft_common_schema_processor/mod.rs @@ -0,0 +1,639 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Microsoft Common Schema log promotion processor. +//! +//! This processor expects Microsoft Common Schema fields that have already been decoded +//! into log attributes, for example by the Linux `user_events` receiver in +//! `event_header` mode. It promotes known Microsoft Common Schema fields to +//! typed OTLP log fields and leaves non-Microsoft-Common-Schema records unchanged. +//! +//! TODO: Promote directly on `OtapArrowRecords`. The current implementation +//! converts input payloads to OTLP proto bytes for promotion, so Arrow-native +//! pipelines still pay an Arrow-to-OTLP conversion before this processor runs. + +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::BytesMut; +use chrono::DateTime; +use linkme::distributed_slice; +use otap_df_config::SignalType; +use otap_df_config::error::Error as ConfigError; +use otap_df_config::node::NodeUserConfig; +use otap_df_engine::MessageSourceLocalEffectHandlerExtension; +use otap_df_engine::config::ProcessorConfig; +use otap_df_engine::context::PipelineContext; +use otap_df_engine::error::{Error as EngineError, ProcessorErrorKind}; +use otap_df_engine::local::processor as local; +use otap_df_engine::message::Message; +use otap_df_engine::node::NodeId; +use otap_df_engine::processor::ProcessorWrapper; +use otap_df_otap::{OTAP_PROCESSOR_FACTORIES, pdata::OtapPdata}; +use otap_df_pdata::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value}; +use otap_df_pdata::proto::opentelemetry::logs::v1::{LogRecord, LogsData}; +use otap_df_pdata::{OtapPayload, OtlpProtoBytes}; +use prost::Message as _; +use serde::Deserialize; + +/// URN for the Microsoft Common Schema processor. +pub const MICROSOFT_COMMON_SCHEMA_PROCESSOR_URN: &str = + "urn:microsoft:processor:common_schema_otel_logs"; + +/// Configuration for the Microsoft Common Schema processor. +#[derive(Debug, Clone, Default, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct Config {} + +/// Processor that promotes decoded Microsoft Common Schema log attributes. +pub struct MicrosoftCommonSchemaProcessor; + +impl MicrosoftCommonSchemaProcessor { + /// Creates a processor from user configuration. + pub fn from_config(config: &serde_json::Value) -> Result { + let _config: Config = + serde_json::from_value(config.clone()).map_err(|e| ConfigError::InvalidUserConfig { + error: format!("Failed to parse MicrosoftCommonSchemaProcessor configuration: {e}"), + })?; + Ok(Self) + } +} + +#[async_trait(?Send)] +impl local::Processor for MicrosoftCommonSchemaProcessor { + async fn process( + &mut self, + msg: Message, + effect_handler: &mut local::EffectHandler, + ) -> Result<(), EngineError> { + match msg { + Message::Control(_) => Ok(()), + Message::PData(pdata) => { + if pdata.signal_type() != SignalType::Logs { + return effect_handler + .send_message_with_source_node(pdata) + .await + .map_err(Into::into); + } + + let (context, payload) = pdata.into_parts(); + let otlp_bytes: OtlpProtoBytes = payload.try_into().map_err(|e| { + processor_error( + effect_handler.processor_id(), + format!("failed to convert input to OTLP logs: {e}"), + ) + })?; + let OtlpProtoBytes::ExportLogsRequest(bytes) = &otlp_bytes else { + return effect_handler + .send_message_with_source_node(OtapPdata::new(context, otlp_bytes.into())) + .await + .map_err(Into::into); + }; + + let mut logs = LogsData::decode(bytes.as_ref()).map_err(|e| { + processor_error( + effect_handler.processor_id(), + format!("failed to decode OTLP logs: {e}"), + ) + })?; + if !promote_microsoft_common_schema_logs(&mut logs) { + return effect_handler + .send_message_with_source_node(OtapPdata::new(context, otlp_bytes.into())) + .await + .map_err(Into::into); + } + + let mut out = BytesMut::new(); + logs.encode(&mut out).map_err(|e| { + processor_error( + effect_handler.processor_id(), + format!("failed to encode OTLP logs: {e}"), + ) + })?; + let payload = OtapPayload::from(OtlpProtoBytes::ExportLogsRequest(out.freeze())); + effect_handler + .send_message_with_source_node(OtapPdata::new(context, payload)) + .await + .map_err(Into::into) + } + } + } +} + +/// Factory function to create a Microsoft Common Schema processor. +pub fn create_microsoft_common_schema_processor( + _pipeline_ctx: PipelineContext, + node: NodeId, + node_config: Arc, + processor_config: &ProcessorConfig, +) -> Result, ConfigError> { + let proc = MicrosoftCommonSchemaProcessor::from_config(&node_config.config)?; + Ok(ProcessorWrapper::local( + proc, + node, + node_config, + processor_config, + )) +} + +/// Register MicrosoftCommonSchemaProcessor as an OTAP processor factory. +#[allow(unsafe_code)] +#[distributed_slice(OTAP_PROCESSOR_FACTORIES)] +pub static MICROSOFT_COMMON_SCHEMA_PROCESSOR_FACTORY: otap_df_engine::ProcessorFactory = + otap_df_engine::ProcessorFactory { + name: MICROSOFT_COMMON_SCHEMA_PROCESSOR_URN, + create: |pipeline_ctx: PipelineContext, + node: NodeId, + node_config: Arc, + proc_cfg: &ProcessorConfig| { + create_microsoft_common_schema_processor(pipeline_ctx, node, node_config, proc_cfg) + }, + wiring_contract: otap_df_engine::wiring_contract::WiringContract::UNRESTRICTED, + validate_config: otap_df_config::validation::validate_typed_config::, + }; + +fn processor_error(processor: NodeId, error: String) -> EngineError { + EngineError::ProcessorError { + processor, + kind: ProcessorErrorKind::Other, + error, + source_detail: String::new(), + } +} + +fn promote_microsoft_common_schema_logs(logs: &mut LogsData) -> bool { + let mut promoted_any = false; + for resource_logs in &mut logs.resource_logs { + for scope_logs in &mut resource_logs.scope_logs { + for log in &mut scope_logs.log_records { + promoted_any |= promote_microsoft_common_schema_log(log); + } + } + } + promoted_any +} + +fn promote_microsoft_common_schema_log(log: &mut LogRecord) -> bool { + let Some(csver) = find_attr_value(&log.attributes, "__csver__").and_then(any_int) else { + return false; + }; + if csver != 0x400 { + return false; + } + if find_attr_value(&log.attributes, "PartB._typeName").and_then(any_str) != Some("Log") { + return false; + } + + let mut promoted = Vec::with_capacity(log.attributes.len()); + let mut part_a_name = None; + let mut part_b_name = None; + let has_part_b_event_id = find_attr_value(&log.attributes, "PartB.eventId").is_some(); + for attr in log.attributes.drain(..) { + match attr.key.as_str() { + "__csver__" | "PartB._typeName" => {} + "PartA.time" => { + if let Some(time) = attr.value.as_ref().and_then(any_str) { + if let Ok(dt) = DateTime::parse_from_rfc3339(time) { + if let Some(nanos) = dt.timestamp_nanos_opt() { + if let Ok(nanos) = u64::try_from(nanos) { + log.time_unix_nano = nanos; + } + } + } + } + } + "PartA.name" => { + if let Some(name) = attr.value.as_ref().and_then(any_str) { + if !name.is_empty() { + part_a_name = Some(name.to_owned()); + } + } + } + "PartB.name" => { + if let Some(name) = attr.value.as_ref().and_then(any_str) { + if !name.is_empty() { + part_b_name = Some(name.to_owned()); + } + } + } + "PartA.ext_dt_traceId" => { + if let Some(trace_id) = attr.value.as_ref().and_then(any_str) { + if let Some(bytes) = parse_hex_bytes::<16>(trace_id) { + log.trace_id = bytes.to_vec(); + } else { + promoted.push(KeyValue::new("trace.id", AnyValue::new_string(trace_id))); + } + } + } + "PartA.ext_dt_spanId" => { + if let Some(span_id) = attr.value.as_ref().and_then(any_str) { + if let Some(bytes) = parse_hex_bytes::<8>(span_id) { + log.span_id = bytes.to_vec(); + } else { + promoted.push(KeyValue::new("span.id", AnyValue::new_string(span_id))); + } + } + } + "PartA.ext_dt_traceFlags" => { + if let Some(flags) = attr.value.as_ref().and_then(any_int) { + log.flags = u32::try_from(flags).unwrap_or(u32::MAX); + } + } + "PartA.ext_cloud_role" => { + if let Some(service_name) = attr.value.as_ref().and_then(any_str) { + if !service_name.is_empty() { + promoted.push(KeyValue::new( + "service.name", + AnyValue::new_string(service_name), + )); + } + } + } + "PartA.ext_cloud_roleInstance" => { + if let Some(instance) = attr.value.as_ref().and_then(any_str) { + if !instance.is_empty() { + promoted.push(KeyValue::new( + "service.instance.id", + AnyValue::new_string(instance), + )); + } + } + } + "PartB.body" => { + log.body = attr.value; + } + "PartB.severityNumber" => { + if let Some(number) = attr.value.as_ref().and_then(any_int) { + if let Ok(number) = i32::try_from(number) { + log.severity_number = number.clamp(1, 24); + } + } + } + "PartB.severityText" => { + if let Some(text) = attr.value.as_ref().and_then(any_str) { + log.severity_text = text.to_owned(); + } + } + "PartB.eventId" => { + if let Some(value) = attr.value { + promoted.push(KeyValue::new("eventId", value)); + } + } + key if key.starts_with("PartC.") => { + if let Some(value) = attr.value { + let key = key.trim_start_matches("PartC."); + if key != "eventId" || !has_part_b_event_id { + promoted.push(KeyValue::new(key, value)); + } + } + } + key if key.starts_with("PartA.") || key.starts_with("PartB.") => {} + _ => promoted.push(attr), + } + } + if let Some(name) = part_b_name.or(part_a_name) { + log.event_name = name; + } + log.attributes = promoted; + true +} + +fn find_attr_value<'a>(attrs: &'a [KeyValue], key: &str) -> Option<&'a AnyValue> { + attrs + .iter() + .find(|attr| attr.key == key) + .and_then(|attr| attr.value.as_ref()) +} + +fn any_str(value: &AnyValue) -> Option<&str> { + match value.value.as_ref()? { + any_value::Value::StringValue(value) => Some(value), + _ => None, + } +} + +fn any_int(value: &AnyValue) -> Option { + match value.value.as_ref()? { + any_value::Value::IntValue(value) => Some(*value), + _ => None, + } +} + +fn parse_hex_bytes(value: &str) -> Option<[u8; N]> { + if value.len() != N * 2 { + return None; + } + let mut bytes = [0u8; N]; + for (index, chunk) in value.as_bytes().chunks_exact(2).enumerate() { + let high = hex_nibble(chunk[0])?; + let low = hex_nibble(chunk[1])?; + bytes[index] = (high << 4) | low; + } + Some(bytes) +} + +fn hex_nibble(byte: u8) -> Option { + match byte { + b'0'..=b'9' => Some(byte - b'0'), + b'a'..=b'f' => Some(byte - b'a' + 10), + b'A'..=b'F' => Some(byte - b'A' + 10), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use otap_df_pdata::proto::opentelemetry::logs::v1::{ResourceLogs, ScopeLogs}; + + fn attr<'a>(log: &'a LogRecord, key: &str) -> Option<&'a AnyValue> { + find_attr_value(&log.attributes, key) + } + + #[test] + fn promotes_microsoft_common_schema_fields() { + let mut log = LogRecord { + time_unix_nano: 1, + attributes: vec![ + KeyValue::new("__csver__", AnyValue::new_int(0x400)), + KeyValue::new("PartB._typeName", AnyValue::new_string("Log")), + KeyValue::new( + "PartA.time", + AnyValue::new_string("2024-06-15T12:00:00+05:30"), + ), + KeyValue::new( + "PartA.ext_dt_traceId", + AnyValue::new_string("0102030405060708090a0b0c0d0e0f10"), + ), + KeyValue::new( + "PartA.ext_dt_spanId", + AnyValue::new_string("a1b2c3d4e5f60718"), + ), + KeyValue::new("PartA.ext_dt_traceFlags", AnyValue::new_int(1)), + KeyValue::new("PartA.ext_cloud_role", AnyValue::new_string("checkout")), + KeyValue::new("PartB.body", AnyValue::new_string("failed")), + KeyValue::new("PartB.severityNumber", AnyValue::new_int(17)), + KeyValue::new("PartB.severityText", AnyValue::new_string("ERROR")), + KeyValue::new("PartB.name", AnyValue::new_string("CheckoutFailure")), + KeyValue::new("PartC.status", AnyValue::new_int(500)), + ], + ..Default::default() + }; + + assert!(promote_microsoft_common_schema_log(&mut log)); + + let expected_time = DateTime::parse_from_rfc3339("2024-06-15T12:00:00+05:30") + .expect("valid time") + .timestamp_nanos_opt() + .expect("representable timestamp") as u64; + assert_eq!(log.time_unix_nano, expected_time); + assert_eq!(log.event_name, "CheckoutFailure"); + assert_eq!(log.severity_number, 17); + assert_eq!(log.severity_text, "ERROR"); + assert_eq!(log.flags, 1); + assert_eq!( + log.trace_id, + parse_hex_bytes::<16>("0102030405060708090a0b0c0d0e0f10") + .expect("hex") + .to_vec() + ); + assert_eq!( + log.span_id, + parse_hex_bytes::<8>("a1b2c3d4e5f60718") + .expect("hex") + .to_vec() + ); + assert_eq!( + log.body.as_ref().and_then(any_str), + Some("failed"), + "PartB.body should become the typed body" + ); + assert!(find_attr_value(&log.attributes, "__csver__").is_none()); + assert_eq!( + attr(&log, "service.name").and_then(any_str), + Some("checkout") + ); + assert_eq!(attr(&log, "status").and_then(any_int), Some(500)); + } + + #[test] + fn leaves_non_microsoft_common_schema_record_unchanged() { + let original = vec![KeyValue::new("PartB.body", AnyValue::new_string("not cs"))]; + let mut log = LogRecord { + attributes: original.clone(), + ..Default::default() + }; + + assert!(!promote_microsoft_common_schema_log(&mut log)); + assert_eq!(log.attributes, original); + assert!(log.body.is_none()); + } + + #[test] + fn rejects_wrong_microsoft_common_schema_version() { + let original = vec![ + KeyValue::new("__csver__", AnyValue::new_int(0x300)), + KeyValue::new("PartB._typeName", AnyValue::new_string("Log")), + KeyValue::new("PartB.body", AnyValue::new_string("body")), + ]; + let mut log = LogRecord { + attributes: original.clone(), + ..Default::default() + }; + + assert!(!promote_microsoft_common_schema_log(&mut log)); + assert_eq!(log.attributes, original); + assert!(log.body.is_none()); + } + + #[test] + fn rejects_missing_or_non_log_type_name() { + let mut missing_type = LogRecord { + attributes: vec![ + KeyValue::new("__csver__", AnyValue::new_int(0x400)), + KeyValue::new("PartB.body", AnyValue::new_string("body")), + ], + ..Default::default() + }; + let mut wrong_type = LogRecord { + attributes: vec![ + KeyValue::new("__csver__", AnyValue::new_int(0x400)), + KeyValue::new("PartB._typeName", AnyValue::new_string("Span")), + KeyValue::new("PartB.body", AnyValue::new_string("body")), + ], + ..Default::default() + }; + + assert!(!promote_microsoft_common_schema_log(&mut missing_type)); + assert!(!promote_microsoft_common_schema_log(&mut wrong_type)); + assert!(missing_type.body.is_none()); + assert!(wrong_type.body.is_none()); + } + + #[test] + fn malformed_time_preserves_existing_timestamp() { + let mut log = LogRecord { + time_unix_nano: 123, + attributes: vec![ + KeyValue::new("__csver__", AnyValue::new_int(0x400)), + KeyValue::new("PartB._typeName", AnyValue::new_string("Log")), + KeyValue::new("PartA.time", AnyValue::new_string("not-a-time")), + ], + ..Default::default() + }; + + assert!(promote_microsoft_common_schema_log(&mut log)); + assert_eq!(log.time_unix_nano, 123); + } + + #[test] + fn pre_epoch_time_preserves_existing_timestamp() { + let mut log = LogRecord { + time_unix_nano: 123, + attributes: vec![ + KeyValue::new("__csver__", AnyValue::new_int(0x400)), + KeyValue::new("PartB._typeName", AnyValue::new_string("Log")), + KeyValue::new("PartA.time", AnyValue::new_string("1969-12-31T23:59:59Z")), + ], + ..Default::default() + }; + + assert!(promote_microsoft_common_schema_log(&mut log)); + assert_eq!(log.time_unix_nano, 123); + } + + #[test] + fn malformed_trace_and_span_ids_fall_back_to_attributes() { + let mut log = LogRecord { + attributes: vec![ + KeyValue::new("__csver__", AnyValue::new_int(0x400)), + KeyValue::new("PartB._typeName", AnyValue::new_string("Log")), + KeyValue::new("PartA.ext_dt_traceId", AnyValue::new_string("short")), + KeyValue::new("PartA.ext_dt_spanId", AnyValue::new_string("not-hex!")), + ], + ..Default::default() + }; + + assert!(promote_microsoft_common_schema_log(&mut log)); + assert!(log.trace_id.is_empty()); + assert!(log.span_id.is_empty()); + assert_eq!(attr(&log, "trace.id").and_then(any_str), Some("short")); + assert_eq!(attr(&log, "span.id").and_then(any_str), Some("not-hex!")); + } + + #[test] + fn part_b_name_takes_precedence_over_part_a_name() { + let mut log = LogRecord { + attributes: vec![ + KeyValue::new("__csver__", AnyValue::new_int(0x400)), + KeyValue::new("PartB._typeName", AnyValue::new_string("Log")), + KeyValue::new("PartB.name", AnyValue::new_string("PartBName")), + KeyValue::new("PartA.name", AnyValue::new_string("PartAName")), + ], + ..Default::default() + }; + + assert!(promote_microsoft_common_schema_log(&mut log)); + assert_eq!(log.event_name, "PartBName"); + } + + #[test] + fn part_b_event_id_takes_precedence_over_part_c_event_id() { + let mut log = LogRecord { + attributes: vec![ + KeyValue::new("__csver__", AnyValue::new_int(0x400)), + KeyValue::new("PartB._typeName", AnyValue::new_string("Log")), + KeyValue::new("PartC.eventId", AnyValue::new_int(1)), + KeyValue::new("PartB.eventId", AnyValue::new_int(2)), + ], + ..Default::default() + }; + + assert!(promote_microsoft_common_schema_log(&mut log)); + assert_eq!(attr(&log, "eventId").and_then(any_int), Some(2)); + assert_eq!( + log.attributes + .iter() + .filter(|attr| attr.key == "eventId") + .count(), + 1 + ); + } + + #[test] + fn severity_number_is_clamped_to_otlp_range() { + let mut low = LogRecord { + attributes: vec![ + KeyValue::new("__csver__", AnyValue::new_int(0x400)), + KeyValue::new("PartB._typeName", AnyValue::new_string("Log")), + KeyValue::new("PartB.severityNumber", AnyValue::new_int(-10)), + ], + ..Default::default() + }; + let mut high = LogRecord { + attributes: vec![ + KeyValue::new("__csver__", AnyValue::new_int(0x400)), + KeyValue::new("PartB._typeName", AnyValue::new_string("Log")), + KeyValue::new("PartB.severityNumber", AnyValue::new_int(99)), + ], + ..Default::default() + }; + + assert!(promote_microsoft_common_schema_log(&mut low)); + assert!(promote_microsoft_common_schema_log(&mut high)); + assert_eq!(low.severity_number, 1); + assert_eq!(high.severity_number, 24); + } + + #[test] + fn promote_logs_reports_whether_any_record_changed() { + let mut logs = LogsData { + resource_logs: vec![ResourceLogs { + scope_logs: vec![ScopeLogs { + log_records: vec![ + LogRecord { + attributes: vec![KeyValue::new( + "PartB.body", + AnyValue::new_string("not cs"), + )], + ..Default::default() + }, + LogRecord { + attributes: vec![ + KeyValue::new("__csver__", AnyValue::new_int(0x400)), + KeyValue::new("PartB._typeName", AnyValue::new_string("Log")), + KeyValue::new("PartB.body", AnyValue::new_string("cs")), + ], + ..Default::default() + }, + ], + ..Default::default() + }], + ..Default::default() + }], + }; + + assert!(promote_microsoft_common_schema_logs(&mut logs)); + let records = &logs.resource_logs[0].scope_logs[0].log_records; + assert_eq!(records[0].attributes[0].key, "PartB.body"); + assert_eq!(records[1].body.as_ref().and_then(any_str), Some("cs")); + + let mut non_cs = LogsData { + resource_logs: vec![ResourceLogs { + scope_logs: vec![ScopeLogs { + log_records: vec![LogRecord { + attributes: vec![KeyValue::new( + "PartB.body", + AnyValue::new_string("not cs"), + )], + ..Default::default() + }], + ..Default::default() + }], + ..Default::default() + }], + }; + + assert!(!promote_microsoft_common_schema_logs(&mut non_cs)); + } +} diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/processors/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/processors/mod.rs index 4e011d3a28..d8e80eebdc 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/processors/mod.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/processors/mod.rs @@ -1,6 +1,10 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +/// Microsoft Common Schema log promotion processor +#[cfg(feature = "microsoft-common-schema-processor")] +pub mod microsoft_common_schema_processor; + /// Condense Attributes processor #[cfg(feature = "condense-attributes-processor")] pub mod condense_attributes_processor; diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/receivers/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/mod.rs new file mode 100644 index 0000000000..9d04d04d61 --- /dev/null +++ b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/mod.rs @@ -0,0 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +/// Linux userevents receiver. +#[cfg(feature = "userevents-receiver")] +pub mod userevents_receiver; diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/README.md b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/README.md new file mode 100644 index 0000000000..05c361b6ba --- /dev/null +++ b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/README.md @@ -0,0 +1,496 @@ + + +# Linux Userevents Receiver + +**URN:** `urn:otel:receiver:userevents` + +This receiver ingests Linux +[`user_events`](https://docs.kernel.org/trace/user_events.html) tracepoints +through `perf_event_open` and converts them into OTAP logs for downstream +processing. + +> **Note:** This receiver is vendor-neutral. It performs Linux `user_events` +> collection and structural decoding only; schema-specific mappings such as +> Microsoft Common Schema should be modeled outside the receiver. + +It follows the OTAP Dataflow thread-per-core model: + +- one receiver instance per pipeline thread +- one perf session per assigned CPU +- one bounded drain loop per receiver +- no shared hot-path state across pipeline threads + +## Architecture + +`user_events` writes enter Linux tracing as user-space tracepoint events. The +kernel stores trace records in per-CPU ring buffers; each receiver instance +drains the ring for the CPU/pipeline core it owns, decodes the tracepoint +record, and emits OTAP logs into that same pipeline. + +```text ++------------------------------- Same Linux Host -------------------------------+ +| | +| App thread(s) | +| | | +| | user_events tracepoint writes | +| v | +| +---------------------+ | +| | Linux tracepoint | | +| | / perf subsystem | | +| +----------+----------+ | +| | (per-CPU fan-out; writer's runtime CPU selects the ring) | +| +--------+--------+---------+--- ... ---+ | +| v v v v | +| +---------+ +---------+ +---------+ +---------+ | +| | CPU 0 | | CPU 1 | | CPU 2 | | CPU N | | +| | perf | | perf | | perf | | perf | lockless | +| | ring | | ring | | ring | | ring | per-CPU | +| +----+----+ +----+----+ +----+----+ +----+----+ kernel buffers | +| | | | | | ++-------|---------------|-------------|-------------|---------------------------+ + | | | | + v v v v + +----------+ +----------+ +----------+ +----------+ + | pipeline | | pipeline | | pipeline | | pipeline | df_engine + | core 0 | | core 1 | | core 2 | | core N | user space + | | | | | | | | + | receiver | | receiver | | receiver | | receiver | one receiver + | decode | | decode | | decode | | decode | instance per + | batch | | batch | | batch | | batch | pipeline core + | export | | export | | export | | export | + +----------+ +----------+ +----------+ +----------+ +``` + +At a high level the Linux tracing ring buffer is a per-CPU producer/reader +structure: writers append on the CPU where they execute, and readers can drain +the per-CPU buffers independently. The receiver consumes those buffers via +`perf_event_open` / `one_collect`; it does not implement the kernel ring-buffer +algorithm itself. + +## CPU Pinning and Coverage + +**Placement rule:** a pipeline assigned to core `K` opens the perf ring for +CPU `K` only. If an application thread writes a `user_events` record while the +kernel has scheduled it on CPU `X`, that record lands in the CPU `X` ring and +is only visible to the receiver instance that owns CPU `X`. No other receiver +instance sees it, and there is no cross-CPU aggregation layer in this receiver. + +```text + user_events write on CPU K + | + v + CPU K perf ring + | + v + receiver pipeline pinned to CPU K + + No other receiver pipeline reads CPU K's ring. +``` + +For an 8-core host, coverage looks like this: + +```text + CPU: 0 1 2 3 4 5 6 7 + | | | | | | | | + ring: R0 R1 R2 R3 R4 R5 R6 R7 + ^ ^ ^ ^ ^ ^ ^ ^ + | | | | | | | | + pipeline: P0 P1 P2 P3 P4 P5 P6 P7 + + P0 reads only R0. P1 reads only R1. ... P7 reads only R7. +``` + +If a CPU has no corresponding pipeline, writes on that CPU are not collected by +this receiver deployment. They are not seen and then discarded; no receiver is +attached to that CPU's ring. The engine does not emit a metric for these +missing records because they are never observed by the receiver; they are lost +silently. + +| Configuration | Writer pinned? | Result | +|----------------------------------|----------------|------------------------------------------------------------| +| `--num-cores 1` | No | Racy; writes on CPUs 1..N not collected by this receiver | +| `--num-cores 1` | `taskset -c 0` | Reliable single-core capture | +| `--core-id-range 0-3` | No | Writes on CPUs 4..N not collected by this receiver | +| `--core-id-range 0-3` | Pinned to 0..3 | Reliable within the range | +| `--num-cores 0` (all host CPUs) | No | Complete host-wide coverage | + +For production, choose the engine's `--num-cores` / `--core-id-range` settings +carefully. Prefer full coverage (`--num-cores 0`) unless you intentionally want +to sample only a subset of CPUs; for development/testing, a pinned writer on a +single-core engine keeps the moving parts minimal. + +## NUMA Locality + +The receiver is **designed** to preserve NUMA locality under the df_engine +thread-per-core model. This is a locality design, not a hard guarantee - the +bullets below describe the intended behavior and the conditions under which it +can be weakened. + +Design properties that support locality: + +- The controller pins each pipeline thread to its assigned CPU **before** + building the pipeline (`core_affinity::set_for_current`, logged as + `core_affinity.set_failed` on failure). Under Linux's default first-touch + policy, allocations made by that pipeline thread afterwards generally land + on the CPU's local NUMA node. +- This receiver opens the perf ring only for `pipeline.core_id()` - i.e. its + own pinned CPU - so ring reads happen from the thread pinned to the same CPU + that the receiver is configured to drain. +- No receiver hot-path state crosses pipeline threads. Decoded records, Arrow + builders, per-record payload buffers, decoded attribute strings, the metric + set, and the admission state are all thread-local (`!Send`). + +On a multi-socket host with `--num-cores 0`, the intended result is one +pipeline per CPU, each reading and processing its own ring on the local NUMA +node: + +```text + df_engine --num-cores 0 2-socket, 8-CPU host + + +----- NUMA node 0 -----+ +----- NUMA node 1 -----+ + | CPU 0 1 2 3 | | CPU 4 5 6 7 | + | ring ring ring ring | | ring ring ring ring | + | ^ ^ ^ ^ | | ^ ^ ^ ^ | + | | | | | | | | | | | | + | pipe pipe pipe pipe | | pipe pipe pipe pipe | + | core core core core | | core core core core | + | 0 1 2 3 | | 4 5 6 7 | + +-----------------------+ +-----------------------+ + + Writes on CPUs on node 0 are read and processed by pipelines on node 0; + writes on node 1 are read and processed by pipelines on node 1. +``` + +Each `user_events` write enters the ring of the CPU on which the writer +executes, and only a pipeline pinned to that CPU drains that ring. With one +pipeline per covered CPU and successful CPU affinity, collection does not +intentionally require cross-NUMA reads; each pipeline drains the ring for its +own CPU. + +### What weakens locality + +This is a locality-preserving design, not a hard guarantee. Locality can be +weakened if: + +- CPU affinity cannot be set. This can happen when the requested CPU is outside + the process cgroup/cpuset, the CPU is offline, or a container/seccomp policy + blocks affinity changes. The controller logs `core_affinity.set_failed` and + continues unpinned. +- The process is started with a NUMA policy that overrides first-touch + placement, such as `numactl --interleave=all`. +- The receiver is changed to read perf rings for CPUs other than its assigned + pipeline core. Today `session.rs` intentionally constructs + `cpu_ids = vec![pipeline.core_id()]`. + +## Platform Support + +This receiver is **Linux-only**. + +It does **not** work on macOS or Windows because `user_events` is a Linux +kernel tracing feature. Windows support would require a separate ETW receiver. + +## Current Scope + +Current implementation supports: + +- single-tracepoint and multi-tracepoint configuration +- `tracefs`, which decodes standard Linux tracefs fields into typed log + attributes +- `event_header`, which decodes EventHeader self-describing fields into typed + log attributes + +## Configuration + +You can configure the receiver in one of two ways. + +### Single Tracepoint Shorthand + +Use this when one receiver should listen to one tracepoint. + +```yaml +nodes: + ingest: + type: receiver:userevents + config: + tracepoint: "user_events:myprovider_L2K1" + format: + type: tracefs + session: + per_cpu_buffer_size: 1048576 # bytes + late_registration: + enabled: true + poll_interval_ms: 100 + drain: + max_records_per_turn: 1024 + max_bytes_per_turn: 1048576 + max_drain_ns: 2ms # duration string: 2ms, 500us, 1000000ns + batching: + max_size: 512 + max_duration: 50ms + overflow: + on_downstream_full: drop +``` + +### Multiple Tracepoints + +Use this when one receiver should listen to several tracepoints. + +```yaml +nodes: + ingest: + type: receiver:userevents + config: + subscriptions: + - tracepoint: "user_events:myprovider_L2K1" + format: + type: tracefs + - tracepoint: "user_events:app_L2K1" + format: + type: event_header + session: + per_cpu_buffer_size: 1048576 # bytes +``` + +Exactly one of `tracepoint` or `subscriptions` must be configured. +`tracefs` is the default `format.type`. + +`session.wakeup_watermark` exists as a reserved configuration field for future +one_collect wakeup support, but is currently ignored. + +TODO: Wire `session.wakeup_watermark` into the perf ring setup once +`one_collect` exposes wakeup/readiness and watermark configuration for +tracepoint sessions. + +### Configuration Reference + +| Field | Default | Description | +| --- | --- | --- | +| `tracepoint` | none | Single tracepoint shorthand. Must use `user_events:`. Mutually exclusive with `subscriptions`. | +| `subscriptions` | none | List of tracepoints. Each entry must use `user_events:`. | +| `format.type` | `tracefs` | Decode format. Supported values: `tracefs`, `event_header`. | +| `session.per_cpu_buffer_size` | `1048576` | Requested per-CPU perf ring size in bytes. Rounded by the underlying perf/ring setup. | +| `session.wakeup_watermark` | `262144` | Reserved for future one_collect wakeup support; currently ignored. | +| `session.late_registration.enabled` | `false` | When true, keep retrying if the tracepoint is not registered yet. | +| `session.late_registration.poll_interval_ms` | `1000` | Retry interval for late tracepoint registration. | +| `drain.max_records_per_turn` | `1024` | Maximum records popped from the receiver's pending queue per drain turn. | +| `drain.max_bytes_per_turn` | `1048576` | Maximum payload bytes popped per drain turn. | +| `drain.max_drain_ns` | `2ms` | Total drain-turn budget. Accepts duration strings such as `2ms`, `500us`, or `1000000ns`; must be greater than zero. | +| `batching.max_size` | `512` | Flush once this many logs are buffered in the current Arrow batch. | +| `batching.max_duration` | `50ms` | Flush interval for partially-filled batches. | +| `overflow.on_downstream_full` | `drop` | Drop the batch if downstream is full; blocking the perf drain loop is intentionally avoided. | + +### Tracepoint Naming + +Tracepoints must be configured with the `user_events:` group prefix. The receiver +rejects other groups because collection always uses the Linux `user_events` +tracefs group: + +```text +user_events:_LK +``` + +EventHeader-style names such as `_LK` are accepted, +but the generic receiver does not interpret the level or keyword as OTLP +severity. + +## Decode + +### `tracefs` + +Decodes standard Linux tracepoint fields using the tracefs `format` metadata +exposed for each registered `user_events` tracepoint. Common tracepoint fields +such as `common_pid` are skipped; producer-declared fields are emitted as typed +log attributes. After a successful decode, the receiver forwards the decoded +OTAP log record; it does not forward the original raw tracepoint sample bytes. +Unknown static fields may be preserved as per-field base64 string attributes. + +### `event_header` + +Decodes EventHeader self-describing fields into typed log attributes. Nested +EventHeader structs are flattened with dot-separated attribute names. If an +EventHeader payload cannot be decoded, the raw user payload is preserved in the +`linux.userevents.payload_base64` attribute. + +## Output Shape + +The receiver emits OTAP logs. Structural data is represented as flat log +attributes with source types preserved where possible (`Int`/`Bool`/`Double`/ +`Str`). The typed `event_name` field is set to the configured tracepoint name, +and `time_unix_nano` uses the perf sample timestamp. Schema-specific promotion +to typed OTLP fields is intentionally left to processors. The original raw +tracepoint sample is not part of the normal output contract after structural +decode succeeds. + +The receiver intentionally does **not** emit receiver-internal +transport/diagnostic fields such as tracepoint name, provider name, +EventHeader level/keyword, CPU, PID/TID, sample id, payload size, body +encoding, or decode mode. These describe the receiver itself rather than the +application payload; surfacing them as OTLP log attributes would pollute +downstream backends with receiver implementation details. + +## Receiver Internals + +The per-pipeline-thread pipeline inside the receiver has four stages, each +bounded by a specific config field. This diagram shows where the `drain.*`, +`batching.*`, `overflow.*`, and memory-pressure knobs take effect: + +```text + perf ring (per-CPU, kernel-owned) + | + | parse_for_duration(max_drain_ns / 2) // drain budget + | pop up to max_records_per_turn / max_bytes_per_turn + v + +---------------------+ + | drain (bounded) | drain.max_records_per_turn + | | drain.max_bytes_per_turn + | | drain.max_drain_ns (split: parse / pop) + +----------+----------+ + | + v + +---------------------+ tracefs fields -> typed attributes + | decode | EventHeader fields -> typed attributes + +----------+----------+ + | + v + +---------------------+ + | Arrow batch builder | append one log per record + | | flush when len >= batching.max_size + | | flush on batching.max_duration tick + +----------+----------+ + | + | flush_batch -> effect_handler.try_send_message(...) + v + +---------------------+ ok -> forwarded_samples += n, flushed_batches++ + | downstream channel | full -> dropped_downstream_full += n + | | (overflow.on_downstream_full = drop) + +---------------------+ memory pressure (should_shed_ingress): + records -> dropped_memory_pressure++ + buffered batch on ctrl event -> drop_batch +``` + +Notes: + +- The drain budget is split in half between parsing the kernel ring and popping + the receiver's pending queue; `drain.max_drain_ns` must be non-zero or config + validation rejects it. +- Memory pressure takes effect at two points: per-record during the pop loop + (records are counted toward `dropped_memory_pressure` instead of being + appended to the batch) and on ctrl events where any already-buffered batch is + dropped rather than flushed. +- `overflow.on_downstream_full = drop` is currently the only mode; the perf + drain loop is never blocked on downstream. + +## Runtime Behavior + +### Backpressure + +`user_events` perf rings cannot be backpressured like a socket. For that +reason, the receiver does not block the perf drain loop when downstream is +full; it drops already-drained batches and reports them as +`dropped_downstream_full`. Separately, if the kernel/perf ring overruns before +the receiver drains it, that loss is reported by the perf path and counted as +`lost_perf_samples`. + +TODO: Plumb corrupt perf event and corrupt perf buffer counters once +`one_collect` exposes them. + +### Memory Pressure + +When process-wide memory pressure indicates ingress shedding, the receiver +drops buffered batches rather than blocking on downstream flush. + +### Late Registration + +If `late_registration.enabled` is true, the receiver will keep retrying +tracepoint attachment until the producer has registered the tracepoint. + +If late registration is disabled and any configured tracepoint is missing or +invalid, session startup fails and the receiver does not start. With multiple +subscriptions, one missing tracepoint fails the whole session. + +TODO: Add mid-stream session recovery once the underlying collection layer +exposes enough error classification to distinguish recoverable session loss from +fatal data, permission, or configuration errors. + +### Shutdown and Drain + +On `DrainIngress`, the receiver performs one final bounded perf-ring drain +before flushing or dropping the in-memory Arrow batch according to memory +pressure. On immediate shutdown, the receiver reports terminal state without an +extra drain/flush; use graceful drain when minimizing buffered data loss matters. + +## Metrics + +The receiver reports these counters under `userevents.receiver.metrics`: + +| Metric | Meaning | +| --- | --- | +| `received_samples` | Perf samples drained from the kernel/perf path. | +| `forwarded_samples` | Log records successfully forwarded downstream. | +| `dropped_downstream_full` | Batches dropped because the downstream channel was full. | +| `dropped_memory_pressure` | Records or batches dropped because process memory pressure requested ingress shedding. | +| `dropped_no_subscription` | Samples that did not map to a configured subscription index. This should normally stay zero. | +| `lost_perf_samples` | Lost sample count reported by the perf ring. | +| `late_registration_retries` | Late-registration retry attempts while waiting for tracepoints. | +| `sessions_started` | Receiver perf sessions successfully opened. | +| `flushed_batches` | Arrow log batches flushed downstream. | + +TODO: Add metrics for corrupt perf events and corrupt perf buffers once the +underlying collection layer exposes those counters. + +## Linux Requirements + +This receiver requires all of the following on the host: + +- Linux kernel **6.4 or later**, built with `CONFIG_USER_EVENTS=y` +- tracefs available, typically under `/sys/kernel/tracing` +- permission to read tracefs metadata +- permission to use `perf_event_open` for the configured tracepoints + +`user_events` was merged in Linux 6.4; earlier kernels do not expose +`user_events_data` / `user_events_status` in tracefs. Distro support also +depends on whether `CONFIG_USER_EVENTS` is enabled in the shipped kernel. +Distributions that ship it enabled by default: + +- **Ubuntu 23.10** and later +- **Azure Linux 3.0** and later + +Other distributions may work on any 6.4+ kernel built with +`CONFIG_USER_EVENTS=y`, but this is not exhaustively verified. + +The exact permission model depends on the host kernel and security settings. + +## Docker + +Sometimes, but not automatically. + +It can work in Docker **only if the host kernel supports `user_events`** and +the container is given access to the host tracing and perf facilities. + +Important implications: + +- containers share the **host kernel** +- Docker on native Linux may work +- Docker Desktop on macOS or Windows does **not** make this a macOS or Windows + feature; it only works if the Linux VM kernel behind Docker Desktop supports + `user_events` and the necessary interfaces are exposed into the container + +In practice, for Docker you usually need some combination of: + +- access to `/sys/kernel/tracing` +- relaxed `perf_event_open` restrictions or appropriate privileges +- permission to write to `user_events_data` if the producer runs in-container + +For reliable testing, prefer: + +- native Linux first +- then privileged or carefully configured Linux containers +- not macOS as the host runtime + +## Testing + +Recommended test layers: + +- unit tests for tracefs structural decoding and EventHeader payload handling +- Linux-only receiver integration tests using a real kernel tracepoint +- pipeline-level schema mapping tests in the processor that owns that schema diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/arrow_records_encoder.rs b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/arrow_records_encoder.rs new file mode 100644 index 0000000000..f5b50a9df7 --- /dev/null +++ b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/arrow_records_encoder.rs @@ -0,0 +1,252 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Arrow encoding for Linux userevents logs. + +use chrono::Utc; +use otap_df_pdata::encode::Result; +use otap_df_pdata::encode::record::{ + attributes::StrKeysAttributesRecordBatchBuilder, logs::LogsRecordBatchBuilder, +}; +use otap_df_pdata::otap::{Logs, OtapArrowRecords}; +use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType; +use otap_df_pdata::schema::{SpanId, TraceId}; + +use super::decoder::{DecodedAttrValue, DecodedUsereventsRecord}; + +/// Builder for creating Arrow record batches from decoded userevents messages. +pub(super) struct ArrowRecordsBuilder { + curr_log_id: u16, + logs: LogsRecordBatchBuilder, + log_attrs: StrKeysAttributesRecordBatchBuilder, +} + +impl Default for ArrowRecordsBuilder { + fn default() -> Self { + Self::new() + } +} + +impl ArrowRecordsBuilder { + /// Creates a new builder. + #[must_use] + pub(super) fn new() -> Self { + Self { + curr_log_id: 0, + logs: LogsRecordBatchBuilder::new(), + log_attrs: StrKeysAttributesRecordBatchBuilder::::new(), + } + } + + /// Returns the number of buffered log records. + #[must_use] + pub(super) const fn len(&self) -> u16 { + self.curr_log_id + } + + /// Returns true when the builder is empty. + #[must_use] + pub(super) const fn is_empty(&self) -> bool { + self.curr_log_id == 0 + } + + /// Appends a decoded userevents record. + pub(super) fn append(&mut self, record: DecodedUsereventsRecord) { + self.logs.append_time_unix_nano(record.time_unix_nano); + self.logs.body.append_str(record.body.as_bytes()); + self.logs.append_severity_number(record.severity_number); + self.logs + .append_severity_text(record.severity_text.as_deref().map(str::as_bytes)); + self.logs.append_id(Some(self.curr_log_id)); + self.logs.append_flags(record.flags); + self.logs + .append_event_name(record.event_name.as_deref().map(str::as_bytes)); + _ = self + .logs + .append_trace_id(record.trace_id.as_ref() as Option<&TraceId>); + _ = self + .logs + .append_span_id(record.span_id.as_ref() as Option<&SpanId>); + + // Receiver-internal transport/diagnostic fields are intentionally not + // represented on decoded records or emitted as downstream attributes: + // the Ingestion backend treats OTLP log attributes as backend columns, + // so surfacing per-record diagnostics there would pollute the + // application schema. + for (key, value) in record.attributes { + self.log_attrs.append_key(key.as_ref()); + match value { + DecodedAttrValue::Str(s) => { + self.log_attrs.any_values_builder.append_str(s.as_bytes()) + } + DecodedAttrValue::Int(i) => self.log_attrs.any_values_builder.append_int(i), + DecodedAttrValue::Bool(b) => self.log_attrs.any_values_builder.append_bool(b), + DecodedAttrValue::Double(d) => self.log_attrs.any_values_builder.append_double(d), + } + self.log_attrs.append_parent_id(&self.curr_log_id); + } + + self.curr_log_id += 1; + } + + /// Builds the Arrow records from the buffered userevents logs. + pub(super) fn build(mut self) -> Result { + let log_record_count = self.curr_log_id.into(); + + self.logs.resource.append_id_n(0, log_record_count); + self.logs + .resource + .append_schema_url_n(None, log_record_count); + self.logs + .resource + .append_dropped_attributes_count_n(0, log_record_count); + + self.logs.scope.append_id_n(0, log_record_count); + self.logs.scope.append_name_n(None, log_record_count); + self.logs.scope.append_version_n(None, log_record_count); + self.logs + .scope + .append_dropped_attributes_count_n(0, log_record_count); + + let observed_time = Utc::now().timestamp_nanos_opt().unwrap_or(0); + self.logs + .append_observed_time_unix_nano_n(observed_time, log_record_count); + self.logs.append_schema_url_n(None, log_record_count); + self.logs + .append_dropped_attributes_count_n(0, log_record_count); + + let mut otap_batch = OtapArrowRecords::Logs(Logs::default()); + otap_batch.set(ArrowPayloadType::Logs, self.logs.finish()?)?; + + let log_attrs_rb = self.log_attrs.finish()?; + if log_attrs_rb.num_rows() > 0 { + otap_batch.set(ArrowPayloadType::LogAttrs, log_attrs_rb)?; + } + + Ok(otap_batch) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::receivers::userevents_receiver::decoder::{ + DecodedAttrValue, DecodedUsereventsRecord, + }; + use arrow::array::{ + Array, AsArray, DictionaryArray, Int32Array, StringArray, StructArray, UInt32Array, + }; + use arrow::datatypes::{TimestampNanosecondType, UInt8Type, UInt16Type}; + + #[test] + fn build_creates_logs_and_attributes_batches() { + let mut builder = ArrowRecordsBuilder::new(); + builder.append(DecodedUsereventsRecord { + time_unix_nano: 1234, + body: "QUJD".to_owned(), + event_name: Some("example-event".to_owned()), + severity_number: Some(17), + severity_text: Some("ERROR".into()), + flags: None, + trace_id: None, + span_id: None, + attributes: vec![( + "user_name".into(), + DecodedAttrValue::Str("example".to_owned()), + )], + }); + + let batch = builder.build().expect("build succeeds"); + let logs_rb = batch + .get(ArrowPayloadType::Logs) + .expect("logs batch present"); + let attrs_rb = batch + .get(ArrowPayloadType::LogAttrs) + .expect("attrs batch present"); + + assert_eq!(logs_rb.num_rows(), 1); + // Only caller/application attributes are emitted downstream. Receiver + // transport diagnostics are intentionally not encoded as log attrs. + assert_eq!(attrs_rb.num_rows(), 1); + + let time_col = logs_rb + .column_by_name("time_unix_nano") + .expect("time column"); + let time_values = time_col.as_primitive::(); + assert_eq!(time_values.value(0), 1234); + + let body_col = logs_rb.column_by_name("body").expect("body column"); + let body_struct = body_col + .as_any() + .downcast_ref::() + .expect("body struct"); + let body_dict = body_struct + .column_by_name("str") + .expect("body string field") + .as_any() + .downcast_ref::>() + .expect("body string dictionary"); + let body_values = body_dict + .values() + .as_any() + .downcast_ref::() + .expect("body string values"); + let body_idx = body_dict.keys().value(0) as usize; + assert_eq!(body_values.value(body_idx), "QUJD"); + + let severity_col = logs_rb + .column_by_name("severity_number") + .expect("severity number column") + .as_any() + .downcast_ref::>() + .expect("severity dictionary"); + let severity_values = severity_col + .values() + .as_any() + .downcast_ref::() + .expect("severity values"); + let severity_idx = severity_col.keys().value(0) as usize; + assert_eq!(severity_values.value(severity_idx), 17); + + assert!( + logs_rb.column_by_name("flags").is_none(), + "all-null flags column should be omitted" + ); + + let parent_col = attrs_rb + .column_by_name("parent_id") + .expect("parent id column") + .as_primitive::(); + for row in 0..attrs_rb.num_rows() { + assert_eq!(parent_col.value(row), 0); + } + } + + #[test] + fn build_preserves_non_null_flags() { + let mut builder = ArrowRecordsBuilder::new(); + builder.append(DecodedUsereventsRecord { + time_unix_nano: 1234, + body: "text".to_owned(), + event_name: Some("example-event".to_owned()), + severity_number: Some(9), + severity_text: Some("INFO".into()), + flags: Some(1), + trace_id: None, + span_id: None, + attributes: vec![], + }); + + let batch = builder.build().expect("build succeeds"); + let logs_rb = batch + .get(ArrowPayloadType::Logs) + .expect("logs batch present"); + let flags_col = logs_rb + .column_by_name("flags") + .expect("flags column") + .as_any() + .downcast_ref::() + .expect("flags values"); + assert_eq!(flags_col.value(0), 1); + } +} diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/decoder.rs b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/decoder.rs new file mode 100644 index 0000000000..ddd7ef80ff --- /dev/null +++ b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/decoder.rs @@ -0,0 +1,704 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Decoding helpers for Linux userevents samples. +//! +//! The receiver handles two wire/layout layers: +//! +//! ```text +//! Linux tracepoint sample +//! +----------------------+----------------------------------+ +//! | common_* fields | producer-defined data | +//! | tracefs metadata | tracefs fields or EventHeader | +//! +----------------------+----------------------------------+ +//! 0 user_data_offset +//! ``` +//! +//! In `tracefs` mode, the producer-defined data is decoded with the field +//! offsets, sizes, locations, and type names from the tracefs `format` file. +//! This is the standard Linux tracepoint model used by perf/ftrace. After a +//! successful tracefs decode, downstream processors receive the decoded OTAP log +//! attributes, not the original raw tracepoint sample bytes. Unknown static +//! fields may be preserved as per-field base64 string attributes. +//! +//! In `event_header` mode, the producer-defined data is decoded as an +//! EventHeader payload: +//! +//! ```text +//! EventHeader payload +//! +-------------+----------------------+------------------+ +//! | EventHeader | extension block(s) | event data | +//! | 8 bytes | metadata, activity | field values | +//! +-------------+----------------------+------------------+ +//! +//! Metadata extension +//! +----------------------+-------------------------------+ +//! | event name, NUL | field metadata blocks | +//! +----------------------+-------------------------------+ +//! +//! Field metadata block +//! +----------------------+----------+----------+----------+ +//! | field name, NUL | encoding | format? | tag? | +//! +----------------------+----------+----------+----------+ +//! ``` +//! +//! EventHeader is decoded structurally and vendor-neutrally here. Structs are +//! flattened into dot-separated attribute names, e.g. `PartB.body` or +//! `Request.path`, but this module does not attach semantic meaning to any +//! particular struct or field name. Schema-specific interpretation, such as +//! Microsoft Common Schema `PartA`/`PartB`/`PartC` promotion, belongs in a +//! processor. If EventHeader decoding fails, only the user payload region is +//! preserved as `linux.userevents.payload_base64`. + +use std::borrow::Cow; + +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; +use tracepoint_decode::{ + EventHeaderEnumeratorContext, EventHeaderEnumeratorState, EventHeaderItemInfo, +}; + +use super::FormatConfig; +use super::session::{RawUsereventsRecord, TracefsField, TracefsFieldLocation}; + +// FieldEncoding raw values (from eventheader_types, avoids a new dep) +const ENC_VALUE8: u8 = 2; +const ENC_VALUE16: u8 = 3; +const ENC_VALUE32: u8 = 4; +const ENC_VALUE64: u8 = 5; +const ENC_STRING_LENGTH16_CHAR8: u8 = 10; + +// FieldFormat raw values +const FMT_SIGNED_INT: u8 = 2; +const FMT_BOOLEAN: u8 = 7; +const FMT_FLOAT: u8 = 8; + +/// Typed attribute value carried on a decoded userevents record. +#[derive(Debug, Clone, PartialEq)] +pub(super) enum DecodedAttrValue { + Str(String), + Int(i64), + Bool(bool), + Double(f64), +} + +impl PartialEq for DecodedAttrValue { + fn eq(&self, other: &str) -> bool { + matches!(self, Self::Str(s) if s == other) + } +} + +impl PartialEq<&str> for DecodedAttrValue { + fn eq(&self, other: &&str) -> bool { + matches!(self, Self::Str(s) if s == *other) + } +} + +/// A decoded userevents record ready for Arrow encoding. +#[derive(Debug, Clone, PartialEq)] +pub(super) struct DecodedUsereventsRecord { + /// Event timestamp in Unix epoch nanoseconds. + pub time_unix_nano: i64, + /// Log body string. + pub body: String, + /// Optional promoted event name for the typed OTLP log field. + pub event_name: Option, + /// Optional severity number. + pub severity_number: Option, + /// Optional severity text. + pub severity_text: Option>, + /// Optional OTLP log flags. + pub flags: Option, + /// Optional W3C trace id (16 bytes). + pub trace_id: Option<[u8; 16]>, + /// Optional W3C span id (8 bytes). + pub span_id: Option<[u8; 8]>, + /// Additional structured attributes, preserving the source typed value. + pub attributes: Vec<(Cow<'static, str>, DecodedAttrValue)>, +} + +impl DecodedUsereventsRecord { + pub(super) fn from_raw( + tracepoint: &str, + value: RawUsereventsRecord, + format: &FormatConfig, + ) -> Self { + match format { + FormatConfig::Tracefs => Self::from_tracefs(tracepoint, value), + FormatConfig::EventHeader => Self::from_eventheader(tracepoint, value), + } + } + + fn base_record( + tracepoint: &str, + value: RawUsereventsRecord, + attributes: Vec<(Cow<'static, str>, DecodedAttrValue)>, + ) -> Self { + Self { + time_unix_nano: i64::try_from(value.timestamp_unix_nano).unwrap_or(i64::MAX), + body: String::new(), + event_name: Some(tracepoint.to_owned()), + severity_number: None, + severity_text: None, + flags: None, + trace_id: None, + span_id: None, + attributes, + } + } + + fn from_tracefs(tracepoint: &str, value: RawUsereventsRecord) -> Self { + let mut attributes = Vec::with_capacity(value.fields.len()); + for field in value.fields.iter() { + if field.name.starts_with("common_") { + continue; + } + if let Some(decoded) = decode_tracefs_field(field, &value.event_data) { + attributes.push((Cow::Owned(field.name.clone()), decoded)); + } + } + Self::base_record(tracepoint, value, attributes) + } + + fn from_eventheader(tracepoint: &str, value: RawUsereventsRecord) -> Self { + let payload = value + .event_data + .get(value.user_data_offset..) + .unwrap_or(value.event_data.as_slice()); + let attributes = decode_eventheader_attrs(tracepoint, payload); + Self::base_record(tracepoint, value, attributes) + } +} + +fn decode_tracefs_field(field: &TracefsField, event_data: &[u8]) -> Option { + let bytes = tracefs_field_bytes(field, event_data)?; + let type_name = field.type_name.trim(); + if matches!( + field.location, + TracefsFieldLocation::StaticString + | TracefsFieldLocation::DynRelative + | TracefsFieldLocation::DynAbsolute + ) || (type_name == "char" && field.size != 1) + { + return std::str::from_utf8(bytes) + .ok() + .map(|s| DecodedAttrValue::Str(s.trim_end_matches('\0').to_owned())); + } + + match (type_name, bytes.len()) { + ("bool" | "_Bool", 1) => Some(DecodedAttrValue::Bool(bytes[0] != 0)), + ("char" | "signed char" | "s8" | "__s8" | "int8_t", 1) => { + Some(DecodedAttrValue::Int(i64::from(bytes[0] as i8))) + } + ("unsigned char" | "u8" | "__u8" | "uint8_t", 1) => { + Some(DecodedAttrValue::Int(i64::from(bytes[0]))) + } + ("short" | "signed short" | "s16" | "__s16" | "int16_t", 2) => Some(DecodedAttrValue::Int( + i64::from(i16::from_ne_bytes(bytes.try_into().ok()?)), + )), + ("unsigned short" | "u16" | "__u16" | "uint16_t", 2) => Some(DecodedAttrValue::Int( + i64::from(u16::from_ne_bytes(bytes.try_into().ok()?)), + )), + ("int" | "signed int" | "s32" | "__s32" | "int32_t", 4) => Some(DecodedAttrValue::Int( + i64::from(i32::from_ne_bytes(bytes.try_into().ok()?)), + )), + ("unsigned int" | "u32" | "__u32" | "uint32_t", 4) => Some(DecodedAttrValue::Int( + i64::from(u32::from_ne_bytes(bytes.try_into().ok()?)), + )), + ( + "long" | "signed long" | "long long" | "signed long long" | "s64" | "__s64" | "int64_t", + 8, + ) => Some(DecodedAttrValue::Int(i64::from_ne_bytes( + bytes.try_into().ok()?, + ))), + ("unsigned long" | "unsigned long long" | "u64" | "__u64" | "uint64_t", 8) => { + Some(DecodedAttrValue::Int( + i64::try_from(u64::from_ne_bytes(bytes.try_into().ok()?)).unwrap_or(i64::MAX), + )) + } + ("float", 4) => Some(DecodedAttrValue::Double(f64::from(f32::from_ne_bytes( + bytes.try_into().ok()?, + )))), + ("double", 8) => Some(DecodedAttrValue::Double(f64::from_ne_bytes( + bytes.try_into().ok()?, + ))), + _ => Some(DecodedAttrValue::Str(BASE64_STANDARD.encode(bytes))), + } +} + +fn tracefs_field_bytes<'a>(field: &TracefsField, event_data: &'a [u8]) -> Option<&'a [u8]> { + match field.location { + TracefsFieldLocation::Static => { + let end = field.offset.checked_add(field.size)?; + event_data.get(field.offset..end) + } + TracefsFieldLocation::StaticString => event_data.get(field.offset..).map(until_nul), + TracefsFieldLocation::StaticUtf16String => { + let end = field.offset.checked_add(field.size)?; + event_data.get(field.offset..end) + } + TracefsFieldLocation::StaticLenPrefixArray => { + let len_end = field.offset.checked_add(2)?; + let len_bytes = event_data.get(field.offset..len_end)?; + let len = usize::from(u16::from_ne_bytes(len_bytes.try_into().ok()?)); + let bytes_len = len.checked_mul(field.size)?; + let end = len_end.checked_add(bytes_len)?; + event_data.get(len_end..end) + } + TracefsFieldLocation::DynAbsolute | TracefsFieldLocation::DynRelative => { + let loc_end = field.offset.checked_add(field.size)?; + let loc_bytes = event_data.get(field.offset..loc_end)?; + if loc_bytes.len() != 4 { + return None; + } + let loc = u32::from_ne_bytes(loc_bytes.try_into().ok()?); + let len = (loc >> 16) as usize; + let mut start = (loc & 0xFFFF) as usize; + if field.location == TracefsFieldLocation::DynRelative { + start = start.checked_add(loc_end)?; + } + let end = start.checked_add(len)?; + event_data.get(start..end).map(until_nul) + } + } +} + +fn until_nul(bytes: &[u8]) -> &[u8] { + bytes + .iter() + .position(|byte| *byte == 0) + .map_or(bytes, |end| &bytes[..end]) +} + +fn decode_eventheader_attrs( + tracepoint: &str, + payload: &[u8], +) -> Vec<(Cow<'static, str>, DecodedAttrValue)> { + let mut attrs = Vec::new(); + let mut context = EventHeaderEnumeratorContext::new(); + let Ok(mut en) = context.enumerate_with_name_and_data( + tracepoint, + payload, + EventHeaderEnumeratorContext::MOVE_NEXT_LIMIT_DEFAULT, + ) else { + attrs.push(( + Cow::Borrowed("linux.userevents.payload_base64"), + DecodedAttrValue::Str(BASE64_STANDARD.encode(payload)), + )); + return attrs; + }; + + let mut prefix: Vec = Vec::new(); + while en.move_next() { + match en.state() { + EventHeaderEnumeratorState::StructBegin => { + if let Ok(name) = std::str::from_utf8(en.item_info().name_bytes()) { + prefix.push(name.to_owned()); + } + } + EventHeaderEnumeratorState::StructEnd => { + let _ = prefix.pop(); + } + EventHeaderEnumeratorState::Value => { + let item = en.item_info(); + let Ok(name) = std::str::from_utf8(item.name_bytes()) else { + continue; + }; + if let Some(value) = item_as_any_scalar_value(&item) { + let key = if prefix.is_empty() { + name.to_owned() + } else { + format!("{}.{}", prefix.join("."), name) + }; + attrs.push((Cow::Owned(key), value)); + } + } + EventHeaderEnumeratorState::Error | EventHeaderEnumeratorState::AfterLastItem => break, + _ => {} + } + } + attrs +} + +fn item_as_any_scalar_value(item: &EventHeaderItemInfo<'_>) -> Option { + let enc = item.metadata().encoding().without_flags().as_int(); + let fmt = item.metadata().format().as_int(); + match enc { + e if e == ENC_STRING_LENGTH16_CHAR8 => std::str::from_utf8(item.value().bytes()) + .ok() + .map(|s| DecodedAttrValue::Str(s.to_owned())), + e if e == ENC_VALUE8 => { + if fmt == FMT_BOOLEAN { + Some(DecodedAttrValue::Bool(item.value().to_u8(0) != 0)) + } else if fmt == FMT_SIGNED_INT { + Some(DecodedAttrValue::Int( + i64::from(item.value().to_u8(0) as i8), + )) + } else { + Some(DecodedAttrValue::Int(i64::from(item.value().to_u8(0)))) + } + } + e if e == ENC_VALUE16 => { + if fmt == FMT_SIGNED_INT { + Some(DecodedAttrValue::Int(i64::from(item.value().to_i16(0)))) + } else { + Some(DecodedAttrValue::Int(i64::from(item.value().to_u16(0)))) + } + } + e if e == ENC_VALUE32 => { + if fmt == FMT_FLOAT { + Some(DecodedAttrValue::Double(f64::from(f32::from_bits( + item.value().to_u32(0), + )))) + } else if fmt == FMT_BOOLEAN { + Some(DecodedAttrValue::Bool(item.value().to_u32(0) != 0)) + } else if fmt == FMT_SIGNED_INT { + Some(DecodedAttrValue::Int(i64::from(item.value().to_i32(0)))) + } else { + Some(DecodedAttrValue::Int(i64::from(item.value().to_u32(0)))) + } + } + e if e == ENC_VALUE64 => { + if fmt == FMT_FLOAT { + Some(DecodedAttrValue::Double(f64::from_bits( + item.value().to_u64(0), + ))) + } else if fmt == FMT_SIGNED_INT { + Some(DecodedAttrValue::Int(item.value().to_i64(0))) + } else { + Some(DecodedAttrValue::Int( + i64::try_from(item.value().to_u64(0)).unwrap_or(i64::MAX), + )) + } + } + _ => None, + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + + const ENC_STRUCT: u8 = 1; + const FMT_DEFAULT: u8 = 0; + + fn raw_record(fields: Vec, event_data: Vec) -> RawUsereventsRecord { + RawUsereventsRecord { + subscription_index: 0, + timestamp_unix_nano: 42, + event_data, + user_data_offset: 8, + fields: Arc::from(fields), + } + } + + fn field( + name: &str, + type_name: &str, + location: TracefsFieldLocation, + offset: usize, + size: usize, + ) -> TracefsField { + TracefsField { + name: name.to_owned(), + type_name: type_name.to_owned(), + location, + offset, + size, + } + } + + fn decode_tracefs(fields: Vec, event_data: Vec) -> DecodedUsereventsRecord { + DecodedUsereventsRecord::from_raw( + "user_events:my_event", + raw_record(fields, event_data), + &FormatConfig::Tracefs, + ) + } + + fn attr<'a>(decoded: &'a DecodedUsereventsRecord, key: &str) -> Option<&'a DecodedAttrValue> { + decoded + .attributes + .iter() + .find_map(|(attr_key, value)| (attr_key == key).then_some(value)) + } + + fn add_value_field_meta(meta: &mut Vec, name: &str, encoding: u8, format: u8) { + meta.extend_from_slice(name.as_bytes()); + meta.push(0); + meta.push(encoding | 0x80); + meta.push(format); + } + + fn add_string_field_meta(meta: &mut Vec, name: &str) { + meta.extend_from_slice(name.as_bytes()); + meta.push(0); + meta.push(ENC_STRING_LENGTH16_CHAR8); + } + + fn add_struct_meta(meta: &mut Vec, name: &str, field_count: u8) { + meta.extend_from_slice(name.as_bytes()); + meta.push(0); + meta.push(0x80 | ENC_STRUCT); + meta.push(field_count); + } + + fn add_string_data(data: &mut Vec, value: &str) { + data.extend_from_slice(&(value.len() as u16).to_le_bytes()); + data.extend_from_slice(value.as_bytes()); + } + + fn build_eventheader_payload( + event_name: &str, + level: u8, + meta_fields: &[u8], + data: &[u8], + ) -> Vec { + let mut payload = vec![0x07, 0, 0, 0, 0, 0, 0, level]; + let mut meta = Vec::new(); + meta.extend_from_slice(event_name.as_bytes()); + meta.push(0); + meta.extend_from_slice(meta_fields); + + payload.extend_from_slice(&(meta.len() as u16).to_le_bytes()); + payload.extend_from_slice(&1u16.to_le_bytes()); + payload.extend_from_slice(&meta); + payload.extend_from_slice(data); + payload + } + + #[test] + fn tracefs_decodes_standard_fields_as_attributes() { + let mut event_data = vec![0; 8]; + event_data.extend_from_slice(&200u32.to_ne_bytes()); + event_data.extend_from_slice(b"/checkout\0"); + let fields = vec![ + field("common_pid", "int", TracefsFieldLocation::Static, 4, 4), + field("status", "u32", TracefsFieldLocation::Static, 8, 4), + field( + "endpoint", + "char", + TracefsFieldLocation::StaticString, + 12, + 0, + ), + ]; + + let decoded = decode_tracefs(fields, event_data); + + assert_eq!(decoded.event_name.as_deref(), Some("user_events:my_event")); + assert_eq!(decoded.body, ""); + assert_eq!(decoded.attributes.len(), 2); + assert_eq!(decoded.attributes[0].0, "status"); + assert_eq!(decoded.attributes[0].1, DecodedAttrValue::Int(200)); + assert_eq!(decoded.attributes[1].0, "endpoint"); + assert_eq!(decoded.attributes[1].1, "/checkout"); + } + + #[test] + fn tracefs_decodes_scalar_field_types() { + let mut event_data = vec![0; 8]; + event_data.push(1); + event_data.extend_from_slice(&(-42i32).to_ne_bytes()); + event_data.extend_from_slice(&123u64.to_ne_bytes()); + event_data.extend_from_slice(&2.5f64.to_ne_bytes()); + let fields = vec![ + field("ok", "bool", TracefsFieldLocation::Static, 8, 1), + field("delta", "s32", TracefsFieldLocation::Static, 9, 4), + field("count", "u64", TracefsFieldLocation::Static, 13, 8), + field("duration_ms", "double", TracefsFieldLocation::Static, 21, 8), + ]; + + let decoded = decode_tracefs(fields, event_data); + + assert_eq!(attr(&decoded, "ok"), Some(&DecodedAttrValue::Bool(true))); + assert_eq!(attr(&decoded, "delta"), Some(&DecodedAttrValue::Int(-42))); + assert_eq!(attr(&decoded, "count"), Some(&DecodedAttrValue::Int(123))); + assert_eq!( + attr(&decoded, "duration_ms"), + Some(&DecodedAttrValue::Double(2.5)) + ); + } + + #[test] + fn tracefs_decodes_dynamic_relative_string() { + let mut event_data = vec![0; 8]; + let value = b"/dynamic\0"; + let location = (value.len() as u32) << 16; + event_data.extend_from_slice(&location.to_ne_bytes()); + event_data.extend_from_slice(value); + let fields = vec![field( + "endpoint", + "char", + TracefsFieldLocation::DynRelative, + 8, + 4, + )]; + + let decoded = decode_tracefs(fields, event_data); + + assert_eq!( + attr(&decoded, "endpoint"), + Some(&DecodedAttrValue::Str("/dynamic".to_owned())) + ); + } + + #[test] + fn tracefs_preserves_unknown_static_bytes_as_base64() { + let mut event_data = vec![0; 8]; + event_data.extend_from_slice(&[0xde, 0xad, 0xbe, 0xef]); + let fields = vec![field( + "opaque", + "struct opaque", + TracefsFieldLocation::Static, + 8, + 4, + )]; + + let decoded = decode_tracefs(fields, event_data); + + assert_eq!( + attr(&decoded, "opaque"), + Some(&DecodedAttrValue::Str("3q2+7w==".to_owned())) + ); + } + + #[test] + fn tracefs_skips_fields_outside_event_data() { + let fields = vec![field( + "missing", + "u32", + TracefsFieldLocation::Static, + 128, + 4, + )]; + + let decoded = decode_tracefs(fields, vec![0; 8]); + + assert!(decoded.attributes.is_empty()); + } + + #[test] + fn eventheader_decodes_microsoft_common_schema_shape_as_flattened_attributes() { + let mut meta = Vec::new(); + let mut data = Vec::new(); + + add_value_field_meta(&mut meta, "__csver__", ENC_VALUE32, FMT_DEFAULT); + data.extend_from_slice(&0x400u32.to_le_bytes()); + + add_struct_meta(&mut meta, "PartA", 6); + add_string_field_meta(&mut meta, "time"); + add_string_data(&mut data, "2024-06-15T12:00:00Z"); + add_string_field_meta(&mut meta, "ext_dt_traceId"); + add_string_data(&mut data, "0102030405060708090a0b0c0d0e0f10"); + add_string_field_meta(&mut meta, "ext_dt_spanId"); + add_string_data(&mut data, "a1b2c3d4e5f60718"); + add_value_field_meta(&mut meta, "ext_dt_traceFlags", ENC_VALUE8, FMT_DEFAULT); + data.push(1); + add_string_field_meta(&mut meta, "ext_cloud_role"); + add_string_data(&mut data, "checkout"); + add_string_field_meta(&mut meta, "ext_cloud_roleInstance"); + add_string_data(&mut data, "instance-1"); + + add_struct_meta(&mut meta, "PartB", 6); + add_string_field_meta(&mut meta, "_typeName"); + add_string_data(&mut data, "Log"); + add_string_field_meta(&mut meta, "body"); + add_string_data(&mut data, "hello"); + add_string_field_meta(&mut meta, "name"); + add_string_data(&mut data, "CheckoutFailure"); + add_value_field_meta(&mut meta, "severityNumber", ENC_VALUE32, FMT_SIGNED_INT); + data.extend_from_slice(&17i32.to_le_bytes()); + add_string_field_meta(&mut meta, "severityText"); + add_string_data(&mut data, "ERROR"); + add_value_field_meta(&mut meta, "eventId", ENC_VALUE64, FMT_SIGNED_INT); + data.extend_from_slice(&42i64.to_le_bytes()); + + add_struct_meta(&mut meta, "PartC", 1); + add_value_field_meta(&mut meta, "status", ENC_VALUE32, FMT_SIGNED_INT); + data.extend_from_slice(&500i32.to_le_bytes()); + + let payload = build_eventheader_payload("Log", 4, &meta, &data); + let mut event_data = vec![0; 8]; + event_data.extend_from_slice(&payload); + let decoded = DecodedUsereventsRecord::from_raw( + "user_events:myprovider_L4K1", + raw_record(Vec::new(), event_data), + &FormatConfig::EventHeader, + ); + + assert_eq!( + attr(&decoded, "__csver__"), + Some(&DecodedAttrValue::Int(0x400)) + ); + assert_eq!( + attr(&decoded, "PartA.ext_dt_traceId"), + Some(&DecodedAttrValue::Str( + "0102030405060708090a0b0c0d0e0f10".to_owned() + )) + ); + assert_eq!( + attr(&decoded, "PartA.time"), + Some(&DecodedAttrValue::Str("2024-06-15T12:00:00Z".to_owned())) + ); + assert_eq!( + attr(&decoded, "PartA.ext_dt_spanId"), + Some(&DecodedAttrValue::Str("a1b2c3d4e5f60718".to_owned())) + ); + assert_eq!( + attr(&decoded, "PartA.ext_dt_traceFlags"), + Some(&DecodedAttrValue::Int(1)) + ); + assert_eq!( + attr(&decoded, "PartA.ext_cloud_role"), + Some(&DecodedAttrValue::Str("checkout".to_owned())) + ); + assert_eq!( + attr(&decoded, "PartA.ext_cloud_roleInstance"), + Some(&DecodedAttrValue::Str("instance-1".to_owned())) + ); + assert_eq!( + attr(&decoded, "PartB._typeName"), + Some(&DecodedAttrValue::Str("Log".to_owned())) + ); + assert_eq!( + attr(&decoded, "PartB.body"), + Some(&DecodedAttrValue::Str("hello".to_owned())) + ); + assert_eq!( + attr(&decoded, "PartB.name"), + Some(&DecodedAttrValue::Str("CheckoutFailure".to_owned())) + ); + assert_eq!( + attr(&decoded, "PartB.severityNumber"), + Some(&DecodedAttrValue::Int(17)) + ); + assert_eq!( + attr(&decoded, "PartB.severityText"), + Some(&DecodedAttrValue::Str("ERROR".to_owned())) + ); + assert_eq!( + attr(&decoded, "PartB.eventId"), + Some(&DecodedAttrValue::Int(42)) + ); + assert_eq!( + attr(&decoded, "PartC.status"), + Some(&DecodedAttrValue::Int(500)) + ); + } + + #[test] + fn eventheader_invalid_payload_is_preserved_as_attribute() { + let decoded = DecodedUsereventsRecord::from_raw( + "user_events:my_event", + raw_record(Vec::new(), vec![0, 1, 2]), + &FormatConfig::EventHeader, + ); + + assert_eq!(decoded.body, ""); + assert_eq!(decoded.attributes.len(), 1); + assert_eq!(decoded.attributes[0].0, "linux.userevents.payload_base64"); + } +} diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/metrics.rs b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/metrics.rs new file mode 100644 index 0000000000..3c78af6485 --- /dev/null +++ b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/metrics.rs @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Metrics for the Linux userevents receiver. + +use otap_df_telemetry::instrument::Counter; +use otap_df_telemetry_macros::metric_set; + +/// Internal telemetry for the Linux userevents receiver. +#[metric_set(name = "userevents.receiver.metrics")] +#[derive(Debug, Default, Clone)] +pub(super) struct UsereventsReceiverMetrics { + /// Number of perf samples received from the kernel ring. + #[metric(unit = "{item}")] + pub received_samples: Counter, + /// Number of perf samples forwarded downstream. + #[metric(unit = "{item}")] + pub forwarded_samples: Counter, + /// Number of samples dropped because the downstream channel was full. + #[metric(unit = "{item}")] + pub dropped_downstream_full: Counter, + /// Number of samples dropped due to process-wide memory pressure. + #[metric(unit = "{item}")] + pub dropped_memory_pressure: Counter, + /// Number of samples dropped because no matching subscription was found. + #[metric(unit = "{item}")] + pub dropped_no_subscription: Counter, + /// Number of lost samples reported by the perf ring. + #[metric(unit = "{item}")] + pub lost_perf_samples: Counter, + /// Number of late-registration retries attempted while waiting for tracepoints. + #[metric(unit = "{event}")] + pub late_registration_retries: Counter, + /// Number of receiver sessions successfully started. + #[metric(unit = "{event}")] + pub sessions_started: Counter, + /// Number of Arrow batches flushed downstream. + #[metric(unit = "{event}")] + pub flushed_batches: Counter, +} diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/mod.rs new file mode 100644 index 0000000000..845726f2f5 --- /dev/null +++ b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/mod.rs @@ -0,0 +1,912 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(not(target_os = "linux"), allow(dead_code, unused_imports))] + +//! Linux userevents receiver. + +mod arrow_records_encoder; +mod decoder; +mod metrics; +mod one_collect_adapter; +mod session; + +use std::cell::RefCell; +use std::rc::Rc; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use linkme::distributed_slice; +use otap_df_config::node::NodeUserConfig; +use otap_df_engine::config::ReceiverConfig; +use otap_df_engine::context::PipelineContext; +use otap_df_engine::memory_limiter::LocalReceiverAdmissionState; +use otap_df_engine::node::NodeId; +use otap_df_engine::receiver::ReceiverWrapper; +use otap_df_engine::terminal_state::TerminalState; +use otap_df_engine::{ + MessageSourceLocalEffectHandlerExtension, ReceiverFactory, + error::{Error, ReceiverErrorKind, format_error_sources}, + local::receiver as local, +}; +use otap_df_otap::OTAP_RECEIVER_FACTORIES; +use otap_df_otap::pdata::OtapPdata; +use otap_df_telemetry::metrics::MetricSet; +use serde::Deserialize; +use serde_json::Value; + +use self::arrow_records_encoder::ArrowRecordsBuilder; +use self::decoder::DecodedUsereventsRecord; +use self::metrics::UsereventsReceiverMetrics; +#[cfg(target_os = "linux")] +use self::session::SessionInitError; +use self::session::UsereventsSession; +#[cfg(target_os = "linux")] +use otap_df_engine::control::NodeControlMsg; +#[cfg(target_os = "linux")] +use otap_df_telemetry::{otel_info, otel_warn}; +#[cfg(target_os = "linux")] +use tokio::time::{self, MissedTickBehavior}; + +const DEFAULT_PER_CPU_BUFFER_SIZE: usize = 1024 * 1024; +const DEFAULT_WAKEUP_WATERMARK: usize = 256 * 1024; +const DEFAULT_MAX_RECORDS_PER_TURN: usize = 1024; +const DEFAULT_MAX_BYTES_PER_TURN: usize = 1024 * 1024; +const DEFAULT_MAX_DRAIN_NS: Duration = Duration::from_millis(2); +const DEFAULT_BATCH_MAX_SIZE: u16 = 512; +const DEFAULT_BATCH_MAX_DURATION: Duration = Duration::from_millis(50); +const DEFAULT_LATE_REGISTRATION_POLL: Duration = Duration::from_secs(1); + +/// URN for the Linux userevents receiver. +/// +/// The receiver identity is vendor-neutral: it collects Linux `user_events` +/// and structurally decodes records into OTAP logs. +pub const USEREVENTS_RECEIVER_URN: &str = "urn:otel:receiver:userevents"; + +#[derive(Debug, Clone, Default, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)] +enum FormatConfig { + /// Decode the sample using the Linux tracefs `format` metadata registered + /// for the tracepoint. + /// + /// This is the standard Linux user_events/tracepoint shape used by tools + /// such as perf and ftrace: field names, offsets, sizes, and C-like type + /// names come from tracefs, while values come from the raw sample bytes. + #[default] + Tracefs, + /// Decode the user payload as an EventHeader self-describing event. + /// + /// EventHeader is still decoded structurally and vendor-neutrally here: the + /// receiver flattens EventHeader structs into `Struct.field` attributes but + /// does not attach any meaning to names such as `PartA`, `PartB`, or + /// `PartC`. Schema-specific interpretation belongs in processors. + EventHeader, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] +struct SubscriptionConfig { + tracepoint: String, + #[serde(default)] + format: FormatConfig, +} + +#[derive(Debug, Deserialize, Clone, Default)] +#[serde(deny_unknown_fields)] +struct LateRegistrationConfig { + #[serde(default)] + enabled: bool, + #[serde(default = "default_late_registration_poll_ms")] + poll_interval_ms: u64, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +struct SessionConfig { + #[serde(default = "default_per_cpu_buffer_size")] + per_cpu_buffer_size: usize, + #[expect( + dead_code, + reason = "reserved for future one-collect wakeup watermark support" + )] + // TODO: Wire this into the perf ring setup once one_collect exposes + // wakeup/readiness and watermark configuration for tracepoint sessions. + #[serde(default = "default_wakeup_watermark")] + wakeup_watermark: usize, + #[serde(default)] + late_registration: LateRegistrationConfig, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +struct DrainConfig { + #[serde(default = "default_max_records_per_turn")] + max_records_per_turn: usize, + #[serde(default = "default_max_bytes_per_turn")] + max_bytes_per_turn: usize, + #[serde(default = "default_max_drain_ns")] + #[serde(with = "humantime_serde")] + max_drain_ns: Duration, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +struct BatchConfig { + // NOTE: These batches are in-memory only. Unlike a network receiver such as + // syslog, there is typically no practical sender-side retry or replay path + // for user_events payloads after this process crashes. Syslog transport + // also does not generally provide per-message acknowledgements, but some + // senders/agents may buffer and retry on reconnect; user_events producers + // normally cannot. Larger `max_size` / `max_duration` values therefore + // increase the amount of irrecoverable data that can be lost before a flush. + // TODO: Revisit these batching tradeoffs if we add durable buffering or + // upstream retry/replay support for this ingestion path. + #[serde(default = "default_batch_max_size")] + max_size: u16, + #[serde(default = "default_batch_max_duration")] + #[serde(with = "humantime_serde")] + max_duration: Duration, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +enum OverflowMode { + Drop, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +struct OverflowConfig { + #[serde(default = "default_overflow_mode")] + on_downstream_full: OverflowMode, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +struct Config { + #[serde(default)] + tracepoint: Option, + #[serde(default)] + format: Option, + #[serde(default)] + subscriptions: Option>, + #[serde(default)] + session: Option, + #[serde(default)] + drain: Option, + #[serde(default)] + batching: Option, + #[serde(default)] + overflow: Option, +} + +struct UsereventsReceiver { + subscriptions: Vec, + session: SessionConfig, + drain: DrainConfig, + batching: BatchConfig, + overflow: OverflowConfig, + cpu_id: usize, + metrics: Rc>>, + admission_state: LocalReceiverAdmissionState, +} + +impl UsereventsReceiver { + fn from_config( + pipeline: PipelineContext, + config: &Value, + ) -> Result { + let config: Config = serde_json::from_value(config.clone()).map_err(|e| { + otap_df_config::error::Error::InvalidUserConfig { + error: e.to_string(), + } + })?; + + let subscriptions = Self::normalize_subscriptions(&config)?; + let session = config.session.clone().unwrap_or(SessionConfig { + per_cpu_buffer_size: default_per_cpu_buffer_size(), + wakeup_watermark: default_wakeup_watermark(), + late_registration: LateRegistrationConfig { + enabled: false, + poll_interval_ms: default_late_registration_poll_ms(), + }, + }); + let drain = config.drain.clone().unwrap_or(DrainConfig { + max_records_per_turn: default_max_records_per_turn(), + max_bytes_per_turn: default_max_bytes_per_turn(), + max_drain_ns: default_max_drain_ns(), + }); + Self::validate_drain(&drain)?; + let batching = config.batching.clone().unwrap_or(BatchConfig { + max_size: default_batch_max_size(), + max_duration: default_batch_max_duration(), + }); + Self::validate_batching(&batching)?; + let overflow = config.overflow.clone().unwrap_or(OverflowConfig { + on_downstream_full: default_overflow_mode(), + }); + + Ok(Self { + subscriptions, + session, + drain, + batching, + overflow, + cpu_id: pipeline.core_id(), + metrics: Rc::new(RefCell::new( + pipeline.register_metrics::(), + )), + admission_state: LocalReceiverAdmissionState::from_process_state( + &pipeline.memory_pressure_state(), + ), + }) + } + + fn normalize_subscriptions( + config: &Config, + ) -> Result, otap_df_config::error::Error> { + match (&config.tracepoint, &config.subscriptions) { + (Some(_), Some(_)) => Err(otap_df_config::error::Error::InvalidUserConfig { + error: "configure either `tracepoint` or `subscriptions`, not both".to_owned(), + }), + (None, None) => Err(otap_df_config::error::Error::InvalidUserConfig { + error: "userevents receiver requires either `tracepoint` or `subscriptions`" + .to_owned(), + }), + (Some(tracepoint), None) => { + Self::validate_tracepoint(tracepoint)?; + Ok(vec![SubscriptionConfig { + tracepoint: tracepoint.clone(), + format: config.format.clone().unwrap_or_default(), + }]) + } + (None, Some(subscriptions)) if subscriptions.is_empty() => { + Err(otap_df_config::error::Error::InvalidUserConfig { + error: "userevents receiver requires at least one subscription".to_owned(), + }) + } + (None, Some(subscriptions)) => { + for subscription in subscriptions { + Self::validate_tracepoint(&subscription.tracepoint)?; + } + Ok(subscriptions.clone()) + } + } + } + + fn validate_tracepoint(tracepoint: &str) -> Result<(), otap_df_config::error::Error> { + let Some((group, event_name)) = tracepoint.split_once(':') else { + return Err(otap_df_config::error::Error::InvalidUserConfig { + error: format!( + "userevents receiver tracepoint `{tracepoint}` must use `user_events:`" + ), + }); + }; + if group != "user_events" || event_name.is_empty() { + return Err(otap_df_config::error::Error::InvalidUserConfig { + error: format!( + "userevents receiver tracepoint `{tracepoint}` must use `user_events:`" + ), + }); + } + Ok(()) + } + + fn validate_drain(drain: &DrainConfig) -> Result<(), otap_df_config::error::Error> { + if drain.max_records_per_turn == 0 { + return Err(otap_df_config::error::Error::InvalidUserConfig { + error: "userevents receiver `drain.max_records_per_turn` must be greater than zero" + .to_owned(), + }); + } + if drain.max_bytes_per_turn == 0 { + return Err(otap_df_config::error::Error::InvalidUserConfig { + error: "userevents receiver `drain.max_bytes_per_turn` must be greater than zero" + .to_owned(), + }); + } + if drain.max_drain_ns.is_zero() { + return Err(otap_df_config::error::Error::InvalidUserConfig { + error: "userevents receiver `drain.max_drain_ns` must be greater than zero; \ + a zero budget would starve either parsing or the pending-queue \ + pop phase under continuous load" + .to_owned(), + }); + } + Ok(()) + } + + fn validate_batching(batching: &BatchConfig) -> Result<(), otap_df_config::error::Error> { + if batching.max_duration.is_zero() { + return Err(otap_df_config::error::Error::InvalidUserConfig { + error: "userevents receiver `batching.max_duration` must be greater than zero" + .to_owned(), + }); + } + Ok(()) + } +} + +fn drop_batch( + metrics: &Rc>>, + builder: &mut ArrowRecordsBuilder, +) { + let dropped = u64::from(builder.len()); + if dropped == 0 { + return; + } + + *builder = ArrowRecordsBuilder::new(); + metrics.borrow_mut().dropped_memory_pressure.add(dropped); +} + +async fn flush_batch( + effect_handler: &local::EffectHandler, + metrics: &Rc>>, + builder: &mut ArrowRecordsBuilder, + overflow_mode: &OverflowMode, +) -> Result<(), Error> { + if builder.is_empty() { + return Ok(()); + } + + let payload = std::mem::take(builder) + .build() + .map_err(|error| Error::ReceiverError { + receiver: effect_handler.receiver_id(), + kind: ReceiverErrorKind::Transport, + error: "failed to build userevents Arrow batch".to_owned(), + source_detail: format_error_sources(&error), + })?; + + let item_count = payload.num_items() as u64; + let pdata = OtapPdata::new_todo_context(payload.into()); + match effect_handler.try_send_message_with_source_node(pdata) { + Ok(()) => { + let mut guard = metrics.borrow_mut(); + guard.forwarded_samples.add(item_count); + guard.flushed_batches.inc(); + Ok(()) + } + Err(otap_df_engine::error::TypedError::ChannelSendError( + otap_df_channel::error::SendError::Full(_), + )) => { + match overflow_mode { + OverflowMode::Drop => { + metrics.borrow_mut().dropped_downstream_full.add(item_count); + } + } + Ok(()) + } + Err(error) => Err(error.into()), + } +} + +#[allow(unsafe_code)] +#[distributed_slice(OTAP_RECEIVER_FACTORIES)] +/// Declares the Linux userevents receiver as a local receiver factory. +pub static USEREVENTS_RECEIVER: ReceiverFactory = ReceiverFactory { + name: USEREVENTS_RECEIVER_URN, + create: |pipeline: PipelineContext, + node: NodeId, + node_config: Arc, + receiver_config: &ReceiverConfig| { + Ok(ReceiverWrapper::local( + UsereventsReceiver::from_config(pipeline, &node_config.config)?, + node, + node_config, + receiver_config, + )) + }, + wiring_contract: otap_df_engine::wiring_contract::WiringContract::UNRESTRICTED, + validate_config: otap_df_config::validation::validate_typed_config::, +}; + +#[async_trait(?Send)] +impl local::Receiver for UsereventsReceiver { + async fn start( + self: Box, + _ctrl_chan: local::ControlChannel, + effect_handler: local::EffectHandler, + ) -> Result { + #[cfg(not(target_os = "linux"))] + { + let _ = self; + let _ = _ctrl_chan; + return Err(Error::ReceiverError { + receiver: effect_handler.receiver_id(), + kind: ReceiverErrorKind::Configuration, + error: "userevents receiver is supported only on Linux".to_owned(), + source_detail: String::new(), + }); + } + + #[cfg(target_os = "linux")] + { + let mut ctrl_chan = _ctrl_chan; + let telemetry_timer_handle = effect_handler + .start_periodic_telemetry(Duration::from_secs(1)) + .await?; + let batch_cfg = self.batching.clone(); + let session_cfg = self.session.clone(); + let drain_cfg = self.drain.clone(); + let overflow_mode = self.overflow.on_downstream_full.clone(); + + let mut flush_interval = time::interval(batch_cfg.max_duration); + flush_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + let retry_period = + Duration::from_millis(session_cfg.late_registration.poll_interval_ms); + let mut retry_interval = time::interval(retry_period.max(Duration::from_millis(1))); + retry_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + let mut session: Option = None; + let mut builder = ArrowRecordsBuilder::new(); + let mut drained_records = Vec::with_capacity(drain_cfg.max_records_per_turn); + let node_name = effect_handler.receiver_id().name.as_ref().to_owned(); + otel_info!( + "userevents_receiver.start", + node = node_name.as_str(), + pipeline_core = self.cpu_id, + message = "Linux userevents receiver started" + ); + + loop { + tokio::select! { + biased; + + ctrl = ctrl_chan.recv() => { + match ctrl { + Ok(NodeControlMsg::CollectTelemetry { mut metrics_reporter }) => { + let mut metrics = self.metrics.borrow_mut(); + let _ = metrics_reporter.report(&mut metrics); + } + Ok(NodeControlMsg::MemoryPressureChanged { update }) => { + self.admission_state.apply(update); + } + Ok(NodeControlMsg::DrainIngress { deadline, .. }) => { + let _ = telemetry_timer_handle.cancel().await; + if let Some(session) = session.as_mut() { + if Instant::now() < deadline { + let drain_stats = session + .drain_once(&drain_cfg, &mut drained_records) + .map_err(|error| Error::ReceiverError { + receiver: effect_handler.receiver_id(), + kind: ReceiverErrorKind::Transport, + error: "failed to drain Linux userevents perf ring during ingress drain".to_owned(), + source_detail: format_error_sources(&error), + })?; + + let received_samples = drain_stats.received_samples; + let mut dropped_memory_pressure: u64 = 0; + let dropped_no_subscription = + drain_stats.dropped_no_subscription; + for raw in drained_records.drain(..) { + if self.admission_state.should_shed_ingress() { + dropped_memory_pressure += 1; + continue; + } + + let subscription_index = raw.subscription_index; + let subscription = &self.subscriptions[subscription_index]; + + let decoded = DecodedUsereventsRecord::from_raw( + subscription.tracepoint.as_str(), + raw, + &subscription.format, + ); + builder.append(decoded); + if builder.len() >= batch_cfg.max_size { + flush_batch(&effect_handler, &self.metrics, &mut builder, &overflow_mode).await?; + } + } + + let mut metrics = self.metrics.borrow_mut(); + if drain_stats.lost_samples > 0 { + metrics.lost_perf_samples.add(drain_stats.lost_samples); + } + if received_samples > 0 { + metrics.received_samples.add(received_samples); + } + if dropped_memory_pressure > 0 { + metrics + .dropped_memory_pressure + .add(dropped_memory_pressure); + } + if dropped_no_subscription > 0 { + metrics + .dropped_no_subscription + .add(dropped_no_subscription); + } + } + } + if self.admission_state.should_shed_ingress() { + drop_batch(&self.metrics, &mut builder); + } else { + flush_batch(&effect_handler, &self.metrics, &mut builder, &overflow_mode).await?; + } + effect_handler.notify_receiver_drained().await?; + let snapshot = self.metrics.borrow().snapshot(); + return Ok(TerminalState::new(deadline, [snapshot])); + } + Ok(NodeControlMsg::Shutdown { deadline, .. }) => { + let _ = telemetry_timer_handle.cancel().await; + let snapshot = self.metrics.borrow().snapshot(); + return Ok(TerminalState::new(deadline, [snapshot])); + } + Err(error) => return Err(Error::ChannelRecvError(error)), + _ => {} + } + } + + _ = flush_interval.tick() => { + if self.admission_state.should_shed_ingress() { + drop_batch(&self.metrics, &mut builder); + } else { + flush_batch(&effect_handler, &self.metrics, &mut builder, &overflow_mode).await?; + } + } + + _ = retry_interval.tick(), if session.is_none() && session_cfg.late_registration.enabled => { + self.metrics.borrow_mut().late_registration_retries.inc(); + match UsereventsSession::open(&self.subscriptions, &session_cfg, self.cpu_id) { + Ok(opened) => { + self.metrics.borrow_mut().sessions_started.inc(); + otel_info!( + "userevents_receiver.session_opened", + node = node_name.as_str(), + subscriptions = opened.subscription_count(), + message = "Opened Linux userevents perf session" + ); + session = Some(opened); + } + Err(SessionInitError::MissingTracepoint(tracepoint)) => { + otel_warn!( + "userevents_receiver.tracepoint_pending", + node = node_name.as_str(), + tracepoint = tracepoint.as_str(), + message = "Waiting for late tracepoint registration" + ); + } + Err(error) => { + let source_detail = { + let nested = format_error_sources(&error); + if nested.is_empty() { + error.to_string() + } else { + format!("{}: {}", error, nested) + } + }; + return Err(Error::ReceiverError { + receiver: effect_handler.receiver_id(), + kind: ReceiverErrorKind::Configuration, + error: "failed to open Linux userevents perf session".to_owned(), + source_detail, + }); + } + } + } + + drained = async { + let session = session + .as_mut() + .expect("userevents session branch is gated by is_some()"); + session.drain_ready(&drain_cfg, &mut drained_records).await + }, if session.is_some() => { + // TODO: Reopen the session for recoverable mid-stream + // collection failures once one_collect exposes typed + // error classification. Today drain errors are reported + // as terminal transport failures. + let drain_stats = drained.map_err(|error| Error::ReceiverError { + receiver: effect_handler.receiver_id(), + kind: ReceiverErrorKind::Transport, + error: "failed to drain Linux userevents perf ring".to_owned(), + source_detail: format_error_sources(&error), + })?; + + let received_samples = drain_stats.received_samples; + let mut dropped_memory_pressure: u64 = 0; + let dropped_no_subscription = drain_stats.dropped_no_subscription; + for raw in drained_records.drain(..) { + if self.admission_state.should_shed_ingress() { + dropped_memory_pressure += 1; + continue; + } + + let subscription_index = raw.subscription_index; + let subscription = &self.subscriptions[subscription_index]; + + let decoded = DecodedUsereventsRecord::from_raw( + subscription.tracepoint.as_str(), + raw, + &subscription.format, + ); + builder.append(decoded); + if builder.len() >= batch_cfg.max_size { + flush_batch(&effect_handler, &self.metrics, &mut builder, &overflow_mode).await?; + } + } + + let mut metrics = self.metrics.borrow_mut(); + if drain_stats.lost_samples > 0 { + metrics.lost_perf_samples.add(drain_stats.lost_samples); + } + if received_samples > 0 { + metrics.received_samples.add(received_samples); + } + if dropped_memory_pressure > 0 { + metrics.dropped_memory_pressure.add(dropped_memory_pressure); + } + if dropped_no_subscription > 0 { + metrics.dropped_no_subscription.add(dropped_no_subscription); + } + } + } + + if session.is_none() && !session_cfg.late_registration.enabled { + match UsereventsSession::open(&self.subscriptions, &session_cfg, self.cpu_id) { + Ok(opened) => { + self.metrics.borrow_mut().sessions_started.inc(); + session = Some(opened); + } + Err(error) => { + return Err(Error::ReceiverError { + receiver: effect_handler.receiver_id(), + kind: ReceiverErrorKind::Configuration, + error: "failed to open Linux userevents perf session".to_owned(), + source_detail: format_error_sources(&error), + }); + } + } + } + } + } + } +} + +const fn default_per_cpu_buffer_size() -> usize { + DEFAULT_PER_CPU_BUFFER_SIZE +} + +const fn default_wakeup_watermark() -> usize { + DEFAULT_WAKEUP_WATERMARK +} + +const fn default_late_registration_poll_ms() -> u64 { + DEFAULT_LATE_REGISTRATION_POLL.as_millis() as u64 +} + +const fn default_max_records_per_turn() -> usize { + DEFAULT_MAX_RECORDS_PER_TURN +} + +const fn default_max_bytes_per_turn() -> usize { + DEFAULT_MAX_BYTES_PER_TURN +} + +const fn default_max_drain_ns() -> Duration { + DEFAULT_MAX_DRAIN_NS +} + +const fn default_batch_max_size() -> u16 { + DEFAULT_BATCH_MAX_SIZE +} + +const fn default_batch_max_duration() -> Duration { + DEFAULT_BATCH_MAX_DURATION +} + +const fn default_overflow_mode() -> OverflowMode { + OverflowMode::Drop +} + +#[cfg(all(test, target_os = "linux"))] +mod linux_integration_tests { + use super::*; + + #[test] + #[ignore = "requires a Linux runner with a pre-registered user_events tracepoint"] + fn session_open_smoke_test_for_pre_registered_tracepoint() { + let tracepoint = std::env::var("OTAP_DF_USEREVENTS_TEST_TRACEPOINT").expect( + "set OTAP_DF_USEREVENTS_TEST_TRACEPOINT to a pre-registered user_events tracepoint", + ); + + let config = SessionConfig { + per_cpu_buffer_size: default_per_cpu_buffer_size(), + wakeup_watermark: default_wakeup_watermark(), + late_registration: LateRegistrationConfig::default(), + }; + let subscriptions = vec![SubscriptionConfig { + tracepoint, + format: FormatConfig::Tracefs, + }]; + + let session = UsereventsSession::open(&subscriptions, &config, 0) + .expect("open a perf session for the registered tracepoint"); + assert_eq!(session.subscription_count(), 1); + } +} + +#[cfg(test)] +mod config_tests { + use super::*; + + #[test] + fn normalize_single_tracepoint_shorthand() { + let config = Config { + tracepoint: Some("user_events:example_L2K1".to_owned()), + format: Some(FormatConfig::Tracefs), + subscriptions: None, + session: None, + drain: None, + batching: None, + overflow: None, + }; + + let normalized = + UsereventsReceiver::normalize_subscriptions(&config).expect("normalized subscriptions"); + assert_eq!(normalized.len(), 1); + assert_eq!(normalized[0].tracepoint, "user_events:example_L2K1"); + assert!(matches!(normalized[0].format, FormatConfig::Tracefs)); + } + + #[test] + fn normalize_subscriptions_list() { + let config = Config { + tracepoint: None, + format: None, + subscriptions: Some(vec![SubscriptionConfig { + tracepoint: "user_events:example_L5K1".to_owned(), + format: FormatConfig::Tracefs, + }]), + session: None, + drain: None, + batching: None, + overflow: None, + }; + + let normalized = + UsereventsReceiver::normalize_subscriptions(&config).expect("normalized subscriptions"); + assert_eq!(normalized.len(), 1); + assert!(matches!(normalized[0].format, FormatConfig::Tracefs)); + } + + #[test] + fn normalize_rejects_both_tracepoint_and_subscriptions() { + let config = Config { + tracepoint: Some("user_events:example_L2K1".to_owned()), + format: Some(FormatConfig::Tracefs), + subscriptions: Some(vec![SubscriptionConfig { + tracepoint: "user_events:example_L3K1".to_owned(), + format: FormatConfig::Tracefs, + }]), + session: None, + drain: None, + batching: None, + overflow: None, + }; + + let error = + UsereventsReceiver::normalize_subscriptions(&config).expect_err("config rejected"); + assert!( + error + .to_string() + .contains("either `tracepoint` or `subscriptions`") + ); + } + + #[test] + fn normalize_rejects_non_userevents_tracepoint_group() { + let config = Config { + tracepoint: Some("foo:example_L2K1".to_owned()), + format: Some(FormatConfig::Tracefs), + subscriptions: None, + session: None, + drain: None, + batching: None, + overflow: None, + }; + + let error = + UsereventsReceiver::normalize_subscriptions(&config).expect_err("config rejected"); + assert!( + error.to_string().contains("user_events:"), + "unexpected error: {error}" + ); + } + + #[test] + fn normalize_rejects_tracepoint_without_group() { + let config = Config { + tracepoint: Some("example_L2K1".to_owned()), + format: Some(FormatConfig::Tracefs), + subscriptions: None, + session: None, + drain: None, + batching: None, + overflow: None, + }; + + let error = + UsereventsReceiver::normalize_subscriptions(&config).expect_err("config rejected"); + assert!( + error.to_string().contains("user_events:"), + "unexpected error: {error}" + ); + } + + #[test] + fn validate_drain_rejects_zero_max_drain_ns() { + let drain = DrainConfig { + max_records_per_turn: default_max_records_per_turn(), + max_bytes_per_turn: default_max_bytes_per_turn(), + max_drain_ns: Duration::ZERO, + }; + let error = UsereventsReceiver::validate_drain(&drain).expect_err("zero rejected"); + assert!( + error.to_string().contains("max_drain_ns"), + "unexpected error: {error}" + ); + } + + #[test] + fn validate_drain_rejects_zero_max_records_per_turn() { + let drain = DrainConfig { + max_records_per_turn: 0, + max_bytes_per_turn: default_max_bytes_per_turn(), + max_drain_ns: default_max_drain_ns(), + }; + let error = UsereventsReceiver::validate_drain(&drain).expect_err("zero rejected"); + assert!( + error.to_string().contains("max_records_per_turn"), + "unexpected error: {error}" + ); + } + + #[test] + fn validate_drain_rejects_zero_max_bytes_per_turn() { + let drain = DrainConfig { + max_records_per_turn: default_max_records_per_turn(), + max_bytes_per_turn: 0, + max_drain_ns: default_max_drain_ns(), + }; + let error = UsereventsReceiver::validate_drain(&drain).expect_err("zero rejected"); + assert!( + error.to_string().contains("max_bytes_per_turn"), + "unexpected error: {error}" + ); + } + + #[test] + fn validate_drain_accepts_nonzero_values() { + let drain = DrainConfig { + max_records_per_turn: default_max_records_per_turn(), + max_bytes_per_turn: default_max_bytes_per_turn(), + max_drain_ns: Duration::from_millis(2), + }; + UsereventsReceiver::validate_drain(&drain).expect("non-zero accepted"); + } + + #[test] + fn validate_batching_rejects_zero_max_duration() { + let batching = BatchConfig { + max_size: default_batch_max_size(), + max_duration: Duration::ZERO, + }; + let error = UsereventsReceiver::validate_batching(&batching).expect_err("zero rejected"); + assert!( + error.to_string().contains("max_duration"), + "unexpected error: {error}" + ); + } + + #[test] + fn validate_batching_accepts_nonzero_max_duration() { + let batching = BatchConfig { + max_size: default_batch_max_size(), + max_duration: default_batch_max_duration(), + }; + UsereventsReceiver::validate_batching(&batching).expect("non-zero accepted"); + } +} diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/one_collect_adapter.rs b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/one_collect_adapter.rs new file mode 100644 index 0000000000..550075a33e --- /dev/null +++ b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/one_collect_adapter.rs @@ -0,0 +1,408 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(not(target_os = "linux"), allow(dead_code, unused_imports))] + +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::io; +use std::rc::Rc; +use std::sync::Arc; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +#[cfg(target_os = "linux")] +use one_collect::event::LocationType; +#[cfg(target_os = "linux")] +use one_collect::helpers::exporting::ExportMachine; +#[cfg(target_os = "linux")] +use one_collect::perf_event::{PerfSession, RingBufBuilder, RingBufSessionBuilder}; +#[cfg(target_os = "linux")] +use one_collect::tracefs::TraceFS; + +use super::session::{TracefsField, TracefsFieldLocation}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct CollectedEvent { + pub timestamp_unix_nano: u64, + pub event_data: Vec, + pub user_data_offset: usize, + pub fields: Arc<[TracefsField]>, + pub source: EventSource, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub(crate) enum EventSource { + UserEvents(UserEventsSource), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct UserEventsSource { + pub subscription_index: usize, +} + +#[derive(Debug)] +pub(crate) struct CollectedDrain { + pub events: Vec, + pub lost_samples: u64, +} + +#[derive(Debug, Clone)] +pub(crate) struct UserEventsSubscription { + pub tracepoint: String, +} + +#[derive(Debug, Clone)] +pub(crate) struct UserEventsSessionConfig { + pub per_cpu_buffer_size: usize, + pub cpu_ids: Vec, +} + +#[cfg(target_os = "linux")] +#[derive(Debug, Clone, Copy)] +struct PerfTimeAnchor { + unix_nano: u64, + perf_nano: u64, +} + +#[cfg(target_os = "linux")] +impl PerfTimeAnchor { + fn capture() -> Self { + let before = ExportMachine::qpc_time(); + let unix_nano = current_time_unix_nano(); + let after = ExportMachine::qpc_time(); + + Self { + unix_nano, + perf_nano: before.saturating_add(after.saturating_sub(before) / 2), + } + } + + fn sample_perf_time_to_unix_nano(self, sample_perf_time: u64) -> u64 { + if sample_perf_time <= self.perf_nano { + self.unix_nano + .saturating_sub(self.perf_nano - sample_perf_time) + } else { + self.unix_nano + .saturating_add(sample_perf_time - self.perf_nano) + } + } +} + +#[derive(Debug)] +pub(crate) enum CollectInitError { + MissingTracepoint(String), + InvalidTracepoint(String), + Io(io::Error), +} + +impl std::fmt::Display for CollectInitError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::MissingTracepoint(name) => write!(f, "tracepoint `{name}` is not registered"), + Self::InvalidTracepoint(name) => write!(f, "tracepoint `{name}` is invalid"), + Self::Io(err) => err.fmt(f), + } + } +} + +impl std::error::Error for CollectInitError {} + +#[cfg(target_os = "linux")] +pub(crate) struct OneCollectUserEventsSession { + session: PerfSession, + pending: Rc>>, + lost_samples: Rc>, + subscription_count: usize, +} + +#[cfg(target_os = "linux")] +impl OneCollectUserEventsSession { + pub(crate) fn open( + subscriptions: &[UserEventsSubscription], + config: &UserEventsSessionConfig, + ) -> Result { + if subscriptions.is_empty() { + return Err(CollectInitError::InvalidTracepoint( + "at least one tracepoint subscription is required".to_owned(), + )); + } + if config.cpu_ids.is_empty() { + return Err(CollectInitError::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "at least one target CPU is required for one_collect user_events collection", + ))); + } + + let page_count = page_count(config.per_cpu_buffer_size); + let mut builder = RingBufSessionBuilder::new() + .with_page_count(page_count) + .with_tracepoint_events(RingBufBuilder::for_tracepoint()); + for cpu in &config.cpu_ids { + let cpu = u16::try_from(*cpu).map_err(|_| { + CollectInitError::Io(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "target CPU id `{cpu}` exceeds supported u16 range for one_collect user_events collection" + ), + )) + })?; + builder = builder.with_target_cpu(cpu); + } + + let mut session = builder.build().map_err(CollectInitError::Io)?; + session.set_read_timeout(Duration::from_millis(0)); + + let pending = Rc::new(RefCell::new(VecDeque::new())); + let lost_samples = Rc::new(Cell::new(0u64)); + let time_field = session.time_data_ref(); + // TODO: Prefer a one_collect-owned sample-time to realtime conversion API + // once the session exposes one. + let tracefs = TraceFS::open().map_err(CollectInitError::Io)?; + let time_anchor = PerfTimeAnchor::capture(); + + for (subscription_index, subscription) in subscriptions.iter().enumerate() { + let (group, event_name) = subscription.tracepoint.split_once(':').ok_or_else(|| { + CollectInitError::InvalidTracepoint(subscription.tracepoint.clone()) + })?; + if group != "user_events" { + return Err(CollectInitError::InvalidTracepoint( + subscription.tracepoint.clone(), + )); + } + + let mut event = match tracefs.find_event("user_events", event_name) { + Ok(event) => event, + Err(error) => match error.kind() { + io::ErrorKind::NotFound => { + return Err(CollectInitError::MissingTracepoint( + subscription.tracepoint.clone(), + )); + } + io::ErrorKind::PermissionDenied => { + return Err(CollectInitError::Io(io::Error::new( + io::ErrorKind::PermissionDenied, + format!( + "tracepoint `{}` is registered but tracefs metadata is not readable; run df_engine with elevated privileges or relax tracefs read permissions", + subscription.tracepoint + ), + ))); + } + _ => return Err(CollectInitError::Io(error)), + }, + }; + + let fields = Arc::<[TracefsField]>::from( + event + .format() + .fields() + .iter() + .map(|field| TracefsField { + name: field.name.clone(), + type_name: field.type_name.clone(), + location: match field.location { + LocationType::Static => TracefsFieldLocation::Static, + LocationType::StaticString => TracefsFieldLocation::StaticString, + LocationType::DynRelative => TracefsFieldLocation::DynRelative, + LocationType::DynAbsolute => TracefsFieldLocation::DynAbsolute, + LocationType::StaticLenPrefixArray => { + TracefsFieldLocation::StaticLenPrefixArray + } + LocationType::StaticUTF16String => { + TracefsFieldLocation::StaticUtf16String + } + }, + offset: field.offset, + size: field.size, + }) + .collect::>(), + ); + let user_data_offset = fields + .iter() + .find(|field| !field.name.starts_with("common_")) + .map_or(0, |field| field.offset); + + let event_pending = Rc::clone(&pending); + let event_time_field = time_field.clone(); + let event_fields = Arc::clone(&fields); + + event.add_callback(move |data| { + let full_data = data.full_data(); + let timestamp = event_time_field + .try_get_u64(full_data) + .map(|sample_time| time_anchor.sample_perf_time_to_unix_nano(sample_time)) + .unwrap_or_else(current_time_unix_nano); + + event_pending.borrow_mut().push_back(CollectedEvent { + timestamp_unix_nano: timestamp, + event_data: data.event_data().to_vec(), + user_data_offset, + fields: Arc::clone(&event_fields), + source: EventSource::UserEvents(UserEventsSource { subscription_index }), + }); + Ok(()) + }); + + session.add_event(event).map_err(CollectInitError::Io)?; + } + + register_lost_callbacks(&mut session, Rc::clone(&lost_samples)); + session.enable().map_err(CollectInitError::Io)?; + + Ok(Self { + session, + pending, + lost_samples, + subscription_count: subscriptions.len(), + }) + } + + pub(crate) fn subscription_count(&self) -> usize { + self.subscription_count + } + + pub(crate) fn drain( + &mut self, + max_records: usize, + max_bytes: usize, + max_drain_ns: Duration, + ) -> io::Result { + // `max_drain_ns` is the total intended wall-time budget for this drain + // turn. We split it into two reserved phases so neither parsing nor + // popping can starve the other: + // + // * parse phase: at most `max_drain_ns / 2`. Bounding parse avoids + // the `parse_all()` failure mode where a saturated ring keeps + // returning records indefinitely and starves the pop loop. + // * pop phase: whatever time remains until `started + max_drain_ns`. + // Because parse is capped at half the budget, pop is guaranteed + // at least `max_drain_ns / 2` of wall-time before the deadline. + // + // Forward-progress rule: if the parse phase somehow exhausts the + // entire budget (e.g. the OS scheduler delays us) and we would + // otherwise return zero events while `pending` is non-empty, we pop + // exactly one record (subject to record/byte caps) so the queue + // drains monotonically across turns under continuous load. + let started = Instant::now(); + let parse_budget = max_drain_ns / 2; + self.session + .parse_for_duration(parse_budget) + .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?; + + let deadline = started + max_drain_ns; + let mut drained_bytes = 0usize; + let pending_len = self.pending.borrow().len(); + // TODO(perf): Avoid allocating this temporary events Vec on every drain + // turn. The session wrapper immediately moves these records into its + // caller-owned RawUsereventsRecord buffer, so the adapter should + // eventually drain directly into a reusable caller-provided output + // buffer once the adapter/session ownership boundary is revisited. + let mut events = Vec::with_capacity(max_records.min(pending_len)); + let mut pending = self.pending.borrow_mut(); + + while let Some(front) = pending.front() { + if events.len() >= max_records || Instant::now() >= deadline { + break; + } + + let front_len = front.event_data.len(); + let next_bytes = drained_bytes.saturating_add(front_len); + if next_bytes > max_bytes && !events.is_empty() { + break; + } + + drained_bytes = next_bytes; + if let Some(event) = pending.pop_front() { + events.push(event); + } + } + + // Forward-progress guarantee: ensure at least one event drains per + // turn when the queue is non-empty, even if the parse phase blew + // past the deadline before the pop loop could run. + if events.is_empty() && !pending.is_empty() && max_records > 0 { + if let Some(front) = pending.front() { + let front_len = front.event_data.len(); + if front_len <= max_bytes { + if let Some(event) = pending.pop_front() { + events.push(event); + } + } + } + } + + Ok(CollectedDrain { + events, + lost_samples: self.lost_samples.replace(0), + }) + } +} + +#[cfg(target_os = "linux")] +fn register_lost_callbacks(session: &mut PerfSession, lost_samples: Rc>) { + let lost_field = session.lost_event().format().get_field_ref("lost"); + let lost_counter = Rc::clone(&lost_samples); + session.lost_event().add_callback(move |data| { + let lost = lost_field + .and_then(|field| data.format().try_get_u64(field, data.event_data())) + .unwrap_or(1); + lost_counter.set(lost_counter.get().saturating_add(lost)); + Ok(()) + }); + + let lost_samples_field = session.lost_samples_event().format().get_field_ref("lost"); + session.lost_samples_event().add_callback(move |data| { + let lost = lost_samples_field + .and_then(|field| data.format().try_get_u64(field, data.event_data())) + .unwrap_or(1); + lost_samples.set(lost_samples.get().saturating_add(lost)); + Ok(()) + }); +} + +#[cfg(target_os = "linux")] +fn page_count(per_cpu_buffer_size: usize) -> usize { + let page_size = one_collect::os::linux::system_page_size() as usize; + let rounded = per_cpu_buffer_size.max(page_size).next_power_of_two(); + (rounded / page_size).max(1) +} + +#[cfg(target_os = "linux")] +fn current_time_unix_nano() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_nanos().min(u128::from(u64::MAX)) as u64) + .unwrap_or_default() +} + +#[cfg(not(target_os = "linux"))] +pub(crate) struct OneCollectUserEventsSession; + +#[cfg(not(target_os = "linux"))] +impl OneCollectUserEventsSession { + pub(crate) fn open( + _subscriptions: &[UserEventsSubscription], + _config: &UserEventsSessionConfig, + ) -> Result { + Err(CollectInitError::Io(io::Error::new( + io::ErrorKind::Unsupported, + "one_collect user_events collection is supported only on Linux", + ))) + } + + pub(crate) fn subscription_count(&self) -> usize { + 0 + } + + pub(crate) fn drain( + &mut self, + _max_records: usize, + _max_bytes: usize, + _max_drain_ns: Duration, + ) -> io::Result { + Ok(CollectedDrain { + events: Vec::new(), + lost_samples: 0, + }) + } +} diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/session.rs b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/session.rs new file mode 100644 index 0000000000..c58606a4fb --- /dev/null +++ b/rust/otap-dataflow/crates/contrib-nodes/src/receivers/userevents_receiver/session.rs @@ -0,0 +1,319 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(not(target_os = "linux"), allow(dead_code, unused_imports))] + +//! Session wrapper for Linux userevents collection. + +#[cfg(target_os = "linux")] +mod imp { + use std::io; + use std::time::Duration; + + use tokio::time; + + use super::super::one_collect_adapter::{ + CollectInitError, EventSource, OneCollectUserEventsSession, UserEventsSessionConfig, + UserEventsSubscription, + }; + + use super::super::{DrainConfig, SessionConfig, SubscriptionConfig}; + + #[derive(Debug, Clone, PartialEq, Eq)] + pub(crate) enum TracefsFieldLocation { + Static, + StaticString, + DynRelative, + DynAbsolute, + StaticLenPrefixArray, + StaticUtf16String, + } + + #[derive(Debug, Clone, PartialEq, Eq)] + pub(crate) struct TracefsField { + pub name: String, + pub type_name: String, + pub location: TracefsFieldLocation, + pub offset: usize, + pub size: usize, + } + + #[derive(Debug, Clone, PartialEq, Eq)] + pub(crate) struct RawUsereventsRecord { + pub subscription_index: usize, + pub timestamp_unix_nano: u64, + /// Full raw tracepoint sample bytes as delivered by the perf session. + /// + /// Conceptually, for normal tracefs decoding this is: + /// + /// ```text + /// +----------------------+-------------------------------+ + /// | common_* fields | user-declared tracefs fields | + /// | fixed trace metadata | producer payload fields | + /// +----------------------+-------------------------------+ + /// 0 user_data_offset + /// ``` + /// + /// For EventHeader tracepoints, the user-declared region starts with + /// the EventHeader bytes: + /// + /// ```text + /// +----------------------+------------------------------------------+ + /// | common_* fields | EventHeader + extensions + event payload | + /// +----------------------+------------------------------------------+ + /// 0 user_data_offset + /// ``` + /// + /// The receiver keeps these layers separate: tracefs metadata tells us + /// where the user region begins, and `FormatConfig` decides whether the + /// user region is decoded as standard tracefs fields or as EventHeader. + pub event_data: Vec, + /// Byte offset from the start of `event_data` to the first non-common + /// tracefs field. This is the beginning of the producer-defined payload + /// region for EventHeader decoding. + pub user_data_offset: usize, + /// Tracefs field metadata for the tracepoint. These fields come from + /// the tracefs `format` file, not from the sample payload itself. + pub fields: std::sync::Arc<[TracefsField]>, + } + + #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] + pub(crate) struct SessionDrainStats { + pub received_samples: u64, + pub dropped_no_subscription: u64, + pub lost_samples: u64, + } + + #[derive(Debug)] + pub(crate) enum SessionInitError { + MissingTracepoint(String), + InvalidTracepoint(String), + Io(io::Error), + } + + impl std::fmt::Display for SessionInitError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::MissingTracepoint(name) => { + write!(f, "tracepoint `{name}` is not registered") + } + Self::InvalidTracepoint(name) => write!(f, "tracepoint `{name}` is invalid"), + Self::Io(err) => err.fmt(f), + } + } + } + + impl std::error::Error for SessionInitError {} + + impl From for SessionInitError { + fn from(value: io::Error) -> Self { + Self::Io(value) + } + } + + pub(crate) struct UsereventsSession { + inner: OneCollectUserEventsSession, + } + + impl UsereventsSession { + fn drain_once_impl( + &mut self, + config: &DrainConfig, + out: &mut Vec, + ) -> io::Result { + let drained = self.inner.drain( + config.max_records_per_turn, + config.max_bytes_per_turn, + config.max_drain_ns, + )?; + + out.clear(); + out.reserve(drained.events.len()); + let mut stats = SessionDrainStats { + received_samples: drained.events.len() as u64, + dropped_no_subscription: 0, + lost_samples: drained.lost_samples, + }; + for event in drained.events { + let EventSource::UserEvents(source) = event.source; + if source.subscription_index >= self.inner.subscription_count() { + stats.dropped_no_subscription += 1; + continue; + } + out.push(RawUsereventsRecord { + subscription_index: source.subscription_index, + timestamp_unix_nano: event.timestamp_unix_nano, + event_data: event.event_data, + user_data_offset: event.user_data_offset, + fields: event.fields, + }); + } + + Ok(stats) + } + + pub(crate) fn open( + subscriptions: &[SubscriptionConfig], + config: &SessionConfig, + cpu_id: usize, + ) -> Result { + let subscriptions = subscriptions + .iter() + .map(|subscription| UserEventsSubscription { + tracepoint: subscription.tracepoint.clone(), + }) + .collect::>(); + let config = UserEventsSessionConfig { + per_cpu_buffer_size: config.per_cpu_buffer_size, + // Open the perf ring for this pipeline's pinned CPU only. + // Keeping ring reads on the same CPU as the pipeline thread + // preserves the NUMA-locality design documented in the + // receiver README; do not widen this to "all CPUs" without + // revisiting that contract. + cpu_ids: vec![cpu_id], + }; + + let inner = + OneCollectUserEventsSession::open(&subscriptions, &config).map_err(|error| { + match error { + CollectInitError::MissingTracepoint(tracepoint) => { + SessionInitError::MissingTracepoint(tracepoint) + } + CollectInitError::InvalidTracepoint(tracepoint) => { + SessionInitError::InvalidTracepoint(tracepoint) + } + CollectInitError::Io(error) => SessionInitError::Io(error), + } + })?; + + Ok(Self { inner }) + } + + pub(crate) async fn drain_ready( + &mut self, + config: &DrainConfig, + out: &mut Vec, + ) -> io::Result { + let poll_interval = Duration::from_millis(1); + + loop { + let stats = self.drain_once_impl(config, out)?; + if !out.is_empty() || stats.lost_samples > 0 { + return Ok(stats); + } + + // TODO: Replace this fixed sleep with an event-driven wakeup once + // one-collect exposes a waitable/readiness API for PerfSession. + // Tracking issue: https://github.com/microsoft/one-collect/issues/254 + time::sleep(poll_interval).await; + } + } + + pub(crate) fn drain_once( + &mut self, + config: &DrainConfig, + out: &mut Vec, + ) -> io::Result { + self.drain_once_impl(config, out) + } + + pub(crate) fn subscription_count(&self) -> usize { + self.inner.subscription_count() + } + } +} + +#[cfg(not(target_os = "linux"))] +mod imp { + use std::io; + + use super::super::{DrainConfig, SessionConfig, SubscriptionConfig}; + + #[derive(Debug, Clone, PartialEq, Eq)] + pub(crate) enum TracefsFieldLocation { + Static, + StaticString, + DynRelative, + DynAbsolute, + StaticLenPrefixArray, + StaticUtf16String, + } + + #[derive(Debug, Clone, PartialEq, Eq)] + pub(crate) struct TracefsField { + pub name: String, + pub type_name: String, + pub location: TracefsFieldLocation, + pub offset: usize, + pub size: usize, + } + + #[derive(Debug, Clone, PartialEq, Eq)] + pub(crate) struct RawUsereventsRecord { + pub subscription_index: usize, + pub timestamp_unix_nano: u64, + pub event_data: Vec, + pub user_data_offset: usize, + pub fields: std::sync::Arc<[TracefsField]>, + } + + #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] + pub(crate) struct SessionDrainStats { + pub received_samples: u64, + pub dropped_no_subscription: u64, + pub lost_samples: u64, + } + + #[derive(Debug)] + pub(crate) enum SessionInitError { + Unsupported, + } + + impl std::fmt::Display for SessionInitError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Unsupported => write!(f, "userevents sessions are supported only on Linux"), + } + } + } + + impl std::error::Error for SessionInitError {} + + pub(crate) struct UsereventsSession; + + impl UsereventsSession { + pub(crate) fn open( + _subscriptions: &[SubscriptionConfig], + _config: &SessionConfig, + _cpu_id: usize, + ) -> Result { + Err(SessionInitError::Unsupported) + } + + pub(crate) async fn drain_ready( + &mut self, + _config: &DrainConfig, + out: &mut Vec, + ) -> io::Result { + out.clear(); + Ok(SessionDrainStats::default()) + } + + pub(crate) fn drain_once( + &mut self, + _config: &DrainConfig, + out: &mut Vec, + ) -> io::Result { + out.clear(); + Ok(SessionDrainStats::default()) + } + + pub(crate) fn subscription_count(&self) -> usize { + 0 + } + } +} + +pub(super) use imp::{ + RawUsereventsRecord, SessionInitError, TracefsField, TracefsFieldLocation, UsereventsSession, +}; diff --git a/rust/otap-dataflow/crates/engine/Cargo.toml b/rust/otap-dataflow/crates/engine/Cargo.toml index 9305c47688..17722508dd 100644 --- a/rust/otap-dataflow/crates/engine/Cargo.toml +++ b/rust/otap-dataflow/crates/engine/Cargo.toml @@ -55,8 +55,5 @@ async-channel = { workspace = true } tikv-jemalloc-ctl = { workspace = true, optional = true } tikv-jemalloc-sys = { workspace = true, optional = true } -[dev-dependencies] -otap-df-engine = { workspace = true, features = ["test-utils"] } - [target.'cfg(not(windows))'.dev-dependencies] tikv-jemallocator = { workspace = true } diff --git a/rust/otap-dataflow/crates/otap/Cargo.toml b/rust/otap-dataflow/crates/otap/Cargo.toml index 44c17df362..b2047ef7e1 100644 --- a/rust/otap-dataflow/crates/otap/Cargo.toml +++ b/rust/otap-dataflow/crates/otap/Cargo.toml @@ -115,8 +115,6 @@ rustls = { workspace = true, features = ["ring"] } tokio-rustls.workspace = true rustls-pki-types.workspace = true tokio-stream.workspace = true -otap-df-contrib-nodes = { workspace = true } -otap-df-core-nodes = { workspace = true, features = ["dev-tools"] } weaver_common.workspace = true otap-test-tls-certs.workspace = true otap-df-engine = { workspace = true, features = ["test-utils"] }