Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ fn bench_otap_logs_view(c: &mut Criterion) {
b.iter(|| {
// 1. Convert OTAP -> OTLP bytes
let mut logs_encoder = LogsProtoBytesEncoder::new();
let mut buffer = ProtoBuffer::new();
let mut buffer = ProtoBuffer::default();
let mut input_clone = input.clone();
logs_encoder
.encode(&mut input_clone, &mut buffer)
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/benchmarks/benches/otlp_bytes/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ fn otap_to_bytes_logs(c: &mut Criterion) {

let logs = create_logs_data();
let otap_batch = otlp_to_otap(&OtlpProtoMessage::Logs(logs));
let mut proto_buffer = ProtoBuffer::new();
let mut proto_buffer = ProtoBuffer::default();
let mut encoder = LogsProtoBytesEncoder::new();

_ = group.bench_function("proto encode", |b| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ where
}
}
BenchOp::EncodeProto => {
let mut buf = ProtoBuffer::new();
let mut buf = ProtoBuffer::default();
let mut encoder = DirectLogRecordEncoder::new(&mut buf);

for _ in 0..self.iterations {
Expand Down
14 changes: 14 additions & 0 deletions rust/otap-dataflow/crates/config/src/conversion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Options for protocol conversion.
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

/// Options for protocol conversion.
///
/// Currently empty; intended as a placeholder for future per-conversion
/// configuration (e.g., size limits) to be threaded through pipeline config.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Default)]
#[serde(deny_unknown_fields)]
pub struct ConversionOptions {}
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::hash::Hash;
pub mod byte_units;
/// Config URI providers for resolving configuration from file:, env:, or bare paths.
pub mod config_provider;
pub mod conversion;
pub mod engine;
/// Environment variable substitution for raw config text.
pub mod env_substitution;
Expand Down Expand Up @@ -50,6 +51,8 @@ pub use topic::{
/// Validation helpers for node configuration.
pub mod validation;

pub use conversion::ConversionOptions;

/// Signal types
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum SignalType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use otap_df_engine::local::exporter::{EffectHandler, Exporter};
use otap_df_engine::message::{ExporterInbox, Message};
use otap_df_engine::node::NodeId;
use otap_df_engine::terminal_state::TerminalState;
use otap_df_pdata::TryIntoWithOptions;
use otap_df_pdata::otlp::OtlpProtoBytes;
// Zero-copy view import (currently unused, for future optimization)
// use otap_df_pdata::views::otap::OtapLogsView;
Expand Down Expand Up @@ -508,7 +509,7 @@ impl GenevaExporter {

let otlp_bytes: OtlpProtoBytes =
OtapPayload::OtapArrowRecords(OtapArrowRecords::Logs(otap_records))
.try_into()
.try_into_with_default()
.map_err(|e| {
self.metrics.conversion_errors.inc();
format!("Failed to convert OTAP to OTLP: {:?}", e)
Expand Down Expand Up @@ -558,7 +559,7 @@ impl GenevaExporter {

let otlp_bytes: OtlpProtoBytes =
OtapPayload::OtapArrowRecords(OtapArrowRecords::Traces(otap_records))
.try_into()
.try_into_with_default()
.map_err(|e| {
self.metrics.conversion_errors.inc();
format!("Failed to convert OTAP to OTLP: {:?}", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use otap_df_engine::message::Message;
use otap_df_engine::node::NodeId;
use otap_df_engine::process_duration::ComputeDuration;
use otap_df_engine::processor::ProcessorWrapper;
use otap_df_pdata::TryIntoWithOptions;
use otap_df_pdata::encode::record::attributes::StrKeysAttributesRecordBatchBuilder;
use otap_df_pdata::otlp::attributes::AttributeValueType;
use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType;
Expand Down Expand Up @@ -623,7 +624,7 @@ impl local::Processor<OtapPdata> for CondenseAttributesProcessor {
OtapPayload::empty(signal)
};

let mut records: OtapArrowRecords = payload.try_into()?;
let mut records: OtapArrowRecords = payload.try_into_with_default()?;

let input_items = records.num_items() as u64;

Expand Down Expand Up @@ -758,7 +759,8 @@ mod condense_tests {
let out = ctx.drain_pdata().await;
let (_, first) = out.into_iter().next().expect("one output").into_parts();

let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp");
let otlp_bytes: OtlpProtoBytes =
first.try_into_with_default().expect("convert to otlp");
let bytes = match otlp_bytes {
OtlpProtoBytes::ExportLogsRequest(b) => b,
_ => panic!("unexpected otlp variant"),
Expand Down Expand Up @@ -880,7 +882,8 @@ mod condense_tests {
let mut bytes = BytesMut::new();
input.encode(&mut bytes).expect("encode input");
let payload: OtapPayload = OtlpProtoBytes::ExportLogsRequest(bytes.freeze()).into();
let mut records: OtapArrowRecords = payload.try_into().expect("convert to records");
let mut records: OtapArrowRecords =
payload.try_into_with_default().expect("convert to records");

let before_batch = records
.get(ArrowPayloadType::LogAttrs)
Expand Down Expand Up @@ -1292,7 +1295,9 @@ mod condense_tests {

let out = ctx.drain_pdata().await;
let (_, first_payload) = out.into_iter().next().expect("one output").into_parts();
let otlp_bytes: OtlpProtoBytes = first_payload.try_into().expect("convert to otlp");
let otlp_bytes: OtlpProtoBytes = first_payload
.try_into_with_default()
.expect("convert to otlp");
let bytes = match otlp_bytes {
OtlpProtoBytes::ExportLogsRequest(b) => b,
_ => panic!("unexpected otlp variant"),
Expand Down Expand Up @@ -1334,8 +1339,9 @@ mod condense_tests {

let out = ctx.drain_pdata().await;
let (_, second_payload) = out.into_iter().next().expect("one output").into_parts();
let otlp_bytes: OtlpProtoBytes =
second_payload.try_into().expect("convert to otlp");
let otlp_bytes: OtlpProtoBytes = second_payload
.try_into_with_default()
.expect("convert to otlp");
let bytes = match otlp_bytes {
OtlpProtoBytes::ExportLogsRequest(b) => b,
_ => panic!("unexpected otlp variant"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use otap_df_engine::{
message::Message,
process_duration::ComputeDuration,
};
use otap_df_pdata::TryIntoWithOptions;
use otap_df_pdata::{OtapPayload, OtlpProtoBytes};

/// URN identifier for the processor
Expand Down Expand Up @@ -129,7 +130,7 @@ impl RecordsetKqlProcessor {

// Extract context and payload, convert to OTLP bytes
let (ctx, payload) = data.into_parts();
let otlp_bytes: OtlpProtoBytes = payload.try_into()?;
let otlp_bytes: OtlpProtoBytes = payload.try_into_with_default()?;

// Process based on signal type (timed).
let result = effect_handler.timed(&self.compute_duration, || match otlp_bytes {
Expand Down Expand Up @@ -389,7 +390,8 @@ mod tests {
let out = ctx.drain_pdata().await;
let first = out.into_iter().next().expect("one output").payload();

let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp");
let otlp_bytes: OtlpProtoBytes =
first.try_into_with_default().expect("convert to otlp");
let bytes = match otlp_bytes {
OtlpProtoBytes::ExportLogsRequest(b) => b,
_ => panic!("unexpected otlp variant"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ use otap_df_engine::node::NodeId;
use otap_df_engine::processor::ProcessorWrapper;
use otap_df_pdata::OtapArrowRecords;
use otap_df_pdata::OtapPayload;
use otap_df_pdata::TryFromWithOptions;
#[cfg(test)]
use otap_df_pdata::TryIntoWithOptions;
use otap_df_pdata::otlp::OtlpProtoBytes;
use otap_df_pdata::views::otap::OtapLogsView;
use otap_df_pdata::views::otlp::bytes::logs::RawLogsData;
Expand Down Expand Up @@ -508,7 +511,7 @@ impl local::Processor<OtapPdata> for ResourceValidatorProcessor {
// Metrics/Traces Arrow views not yet available - convert to OTLP
// TODO: Implement OtapMetricsView/OtapTracesView to avoid clone + conversion
SignalType::Metrics | SignalType::Traces => {
match OtlpProtoBytes::try_from(arrow_records.clone()) {
match OtlpProtoBytes::try_from_with_default(arrow_records.clone()) {
Ok(OtlpProtoBytes::ExportMetricsRequest(bytes)) => {
let data = RawMetricsData::new(bytes.as_ref());
self.validate_metrics(&data, &allowed_values)
Expand Down Expand Up @@ -914,7 +917,9 @@ mod tests {
fn create_arrow_logs_with_resource(attrs: Vec<KeyValue>) -> OtapArrowRecords {
let logs_bytes = create_logs_request_with_resource(attrs);
let otlp_bytes = OtlpProtoBytes::ExportLogsRequest(logs_bytes);
otlp_bytes.try_into().expect("Failed to convert to Arrow")
otlp_bytes
.try_into_with_default()
.expect("Failed to convert to Arrow")
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use otap_df_otap::OTAP_EXPORTER_FACTORIES;
use otap_df_otap::metrics::ExporterPDataMetrics;
use otap_df_otap::pdata::OtapPdata;
use otap_df_pdata::Producer;
use otap_df_pdata::TryIntoWithOptions;
use otap_df_pdata::encode::producer::ProducerOptions;
use otap_df_pdata::otap::OtapArrowRecords;
use otap_df_pdata::proto::opentelemetry::arrow::v1::{BatchArrowRecords, BatchStatus};
Expand Down Expand Up @@ -466,7 +467,7 @@ impl local::Exporter<OtapPdata> for OTAPExporter {
self.pdata_metrics.inc_consumed(signal_type);
let payload = pdata.take_payload();

let message: OtapArrowRecords = match payload.try_into() {
let message: OtapArrowRecords = match payload.try_into_with_default() {
Ok(m) => m,
Err(e) => {
self.pdata_metrics.inc_failed(signal_type);
Expand Down Expand Up @@ -936,6 +937,7 @@ mod tests {
test_node,
};
use otap_df_otap::compression::CompressionMethod;
use otap_df_pdata::TryIntoWithOptions;
use otap_df_pdata::otap::OtapArrowRecords;
use otap_df_pdata::proto::opentelemetry::arrow::v1::{
ArrowPayloadType, BatchArrowRecords, BatchStatus,
Expand Down Expand Up @@ -1025,7 +1027,7 @@ mod tests {
.expect("Timed out waiting for message")
.expect("No message received")
.payload()
.try_into()
.try_into_with_default()
.expect("Could convert pdata to OTAPData");

// Assert that the message received is what the exporter sent
Expand All @@ -1039,7 +1041,7 @@ mod tests {
.expect("Timed out waiting for message")
.expect("No message received")
.payload()
.try_into()
.try_into_with_default()
.expect("Could convert pdata to OTAPData");
let _expected_logs_message =
create_otap_batch(LOG_BATCH_ID, ArrowPayloadType::Logs);
Expand All @@ -1051,7 +1053,7 @@ mod tests {
.expect("Timed out waiting for message")
.expect("No message received")
.payload()
.try_into()
.try_into_with_default()
.expect("Could convert pdata to OTAPData");

let _expected_trace_message =
Expand Down Expand Up @@ -1584,7 +1586,7 @@ mod tests {
let pdata = OtapPdata::new_default(log_message.into());
let payload = pdata.clone();
batches_tx
.send((pdata, payload.payload().try_into().unwrap()))
.send((pdata, payload.payload().try_into_with_default().unwrap()))
.await
.unwrap();
// Drop sender so the function exits after processing
Expand Down Expand Up @@ -1644,7 +1646,7 @@ mod tests {
let pdata = OtapPdata::new_default(log_message.into());
let payload = pdata.clone();
batches_tx
.send((pdata, payload.payload().try_into().unwrap()))
.send((pdata, payload.payload().try_into_with_default().unwrap()))
.await
.unwrap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use otap_df_engine::node::NodeId;
use otap_df_engine::terminal_state::TerminalState;
use otap_df_engine::wiring_contract::WiringContract;
use otap_df_engine::{ConsumerEffectHandlerExtension, ExporterFactory};
#[cfg(test)]
use otap_df_pdata::TryIntoWithOptions;
use otap_df_pdata::otlp::logs::LogsProtoBytesEncoder;
use otap_df_pdata::otlp::metrics::MetricsProtoBytesEncoder;
use otap_df_pdata::otlp::traces::TracesProtoBytesEncoder;
Expand Down Expand Up @@ -245,7 +247,7 @@ impl Exporter<OtapPdata> for OtlpHttpExporter {
let mut logs_proto_encoder = LogsProtoBytesEncoder::new();
let mut metrics_proto_encoder = MetricsProtoBytesEncoder::new();
let mut traces_proto_encoder = TracesProtoBytesEncoder::new();
let mut proto_buffer = ProtoBuffer::with_capacity(8 * 1024);
let mut proto_buffer = ProtoBuffer::default();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmacd Sorry didn't get my review in before merging. Is this change potentially a small perf regression since we're going from a pre-allocation via with_capacity to default?


loop {
// Opportunistically drain completions before we park on a recv.
Expand Down Expand Up @@ -1217,7 +1219,7 @@ mod test {
match pdata.signal_type() {
SignalType::Logs => {
let pdata: OtlpProtoBytes =
pdata.take_payload().try_into().unwrap();
pdata.take_payload().try_into_with_default().unwrap();
let pdata_decoded = LogsData::decode(pdata.as_bytes()).unwrap();
assert_equivalent(
&[OtlpProtoMessage::Logs(pdata_decoded)],
Expand All @@ -1226,7 +1228,7 @@ mod test {
}
SignalType::Metrics => {
let pdata: OtlpProtoBytes =
pdata.take_payload().try_into().unwrap();
pdata.take_payload().try_into_with_default().unwrap();
let pdata_decoded = MetricsData::decode(pdata.as_bytes()).unwrap();
assert_equivalent(
&[OtlpProtoMessage::Metrics(pdata_decoded)],
Expand All @@ -1235,7 +1237,7 @@ mod test {
}
SignalType::Traces => {
let pdata: OtlpProtoBytes =
pdata.take_payload().try_into().unwrap();
pdata.take_payload().try_into_with_default().unwrap();
let pdata_decoded = TracesData::decode(pdata.as_bytes()).unwrap();
assert_equivalent(
&[OtlpProtoMessage::Traces(pdata_decoded)],
Expand Down Expand Up @@ -1878,7 +1880,7 @@ mod test {
match pdata.signal_type() {
SignalType::Logs => {
let pdata: OtlpProtoBytes =
pdata.take_payload().try_into().unwrap();
pdata.take_payload().try_into_with_default().unwrap();
let pdata_decoded = LogsData::decode(pdata.as_bytes()).unwrap();
assert_equivalent(
&[OtlpProtoMessage::Logs(pdata_decoded)],
Expand All @@ -1887,7 +1889,7 @@ mod test {
}
SignalType::Metrics => {
let pdata: OtlpProtoBytes =
pdata.take_payload().try_into().unwrap();
pdata.take_payload().try_into_with_default().unwrap();
let pdata_decoded = MetricsData::decode(pdata.as_bytes()).unwrap();
assert_equivalent(
&[OtlpProtoMessage::Metrics(pdata_decoded)],
Expand All @@ -1896,7 +1898,7 @@ mod test {
}
SignalType::Traces => {
let pdata: OtlpProtoBytes =
pdata.take_payload().try_into().unwrap();
pdata.take_payload().try_into_with_default().unwrap();
let pdata_decoded = TracesData::decode(pdata.as_bytes()).unwrap();
assert_equivalent(
&[OtlpProtoMessage::Traces(pdata_decoded)],
Expand Down Expand Up @@ -1998,23 +2000,26 @@ mod test {
// ensure we got back all the signals we expected, from the correct servers.

let mut pdata = logs_pdata_rx.recv().await.unwrap();
let otlp_bytes: OtlpProtoBytes = pdata.take_payload().try_into().unwrap();
let otlp_bytes: OtlpProtoBytes =
pdata.take_payload().try_into_with_default().unwrap();
let pdata_decoded = LogsData::decode(otlp_bytes.as_bytes()).unwrap();
assert_equivalent(
&[OtlpProtoMessage::Logs(pdata_decoded)],
&[OtlpProtoMessage::Logs(logs_batch.clone())],
);

let mut pdata = metrics_pdata_rx.recv().await.unwrap();
let otlp_bytes: OtlpProtoBytes = pdata.take_payload().try_into().unwrap();
let otlp_bytes: OtlpProtoBytes =
pdata.take_payload().try_into_with_default().unwrap();
let pdata_decoded = MetricsData::decode(otlp_bytes.as_bytes()).unwrap();
assert_equivalent(
&[OtlpProtoMessage::Metrics(pdata_decoded)],
&[OtlpProtoMessage::Metrics(metrics_batch.clone())],
);

let mut pdata = traces_pdata_rx.recv().await.unwrap();
let otlp_bytes: OtlpProtoBytes = pdata.take_payload().try_into().unwrap();
let otlp_bytes: OtlpProtoBytes =
pdata.take_payload().try_into_with_default().unwrap();
let pdata_decoded = TracesData::decode(otlp_bytes.as_bytes()).unwrap();
assert_equivalent(
&[OtlpProtoMessage::Traces(pdata_decoded)],
Expand Down Expand Up @@ -2088,8 +2093,10 @@ mod test {
server_cancellation_token.cancel();

// assert the pdata sent was the pdata received
let pdata: OtlpProtoBytes =
pdatas_received[0].take_payload().try_into().unwrap();
let pdata: OtlpProtoBytes = pdatas_received[0]
.take_payload()
.try_into_with_default()
.unwrap();
let pdata_decoded = LogsData::decode(pdata.as_bytes()).unwrap();
assert_equivalent(
&[OtlpProtoMessage::Logs(pdata_decoded)],
Expand Down
Loading
Loading