Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
16eba29
add OTLP http exporter
albertlockett Feb 18, 2026
abf8289
finished unit test for OTLP export
albertlockett Feb 19, 2026
084f7a2
lots of debugging trying to figure out how I screwed up Ack/Nack
albertlockett Feb 19, 2026
10cbed3
added tests for nack on non-200 and connect refuse
albertlockett Feb 19, 2026
ad5c4c7
added test for partial success
albertlockett Feb 19, 2026
f328b7c
add test for invalid decoding
albertlockett Feb 19, 2026
c64b7fd
finished tests aside from metrics
albertlockett Feb 19, 2026
87c1400
fix clippies and tests
albertlockett Feb 19, 2026
1ef4231
cleanup TODOs
albertlockett Feb 19, 2026
04a69e0
document replace_bytes method
albertlockett Feb 19, 2026
750298a
cargo fmt
albertlockett Feb 19, 2026
7620115
changed urn to use otel_http with underscore character
albertlockett Feb 20, 2026
c36df31
avoid copying OTLP bytes request for body
albertlockett Feb 20, 2026
1a7db4b
enforce maximum size of response body
albertlockett Feb 20, 2026
dba527f
Merge branch 'main' into albert/1145
lalitb Feb 20, 2026
03166d9
PR feedback about dropping invalid OTAP batch on nack
albertlockett Feb 20, 2026
185a2e0
add Accept header to request
albertlockett Feb 20, 2026
348f9f3
generic from replace_bytes
albertlockett Feb 20, 2026
bd8459d
validate endpoint in constructor
albertlockett Feb 20, 2026
68be325
add musl gcc to fix perf test build
albertlockett Feb 20, 2026
3bb9a33
better handling for non-retryable errors
albertlockett Feb 20, 2026
4479371
Removed useless assertion
albertlockett Feb 20, 2026
4ad24a4
remove uneeded assertion
albertlockett Feb 20, 2026
9ca950a
Update rust/otap-dataflow/crates/otap/src/otlp_http/client_settings.rs
albertlockett Feb 20, 2026
de60ec3
PR feedback
albertlockett Feb 20, 2026
d502fcb
remove comments
albertlockett Feb 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions rust/otap-dataflow/crates/otap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ weaver_semconv.workspace = true
weaver_resolved_schema.workspace = true
weaver_resolver.workspace = true
rand.workspace = true
reqwest = { workspace = true, features = ["rustls", "stream"] }
Comment thread
lalitb marked this conversation as resolved.
zip.workspace = true
tower = { workspace = true }
hyper-util = { workspace = true }
Expand All @@ -86,7 +87,6 @@ data_engine_recordset = { workspace = true, optional = true }

# Azure Monitor Exporter dependencies
azure_identity = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true, features = ["rustls"] }
azure_core = { workspace = true, optional = true, features = ["reqwest"] }
ahash = { workspace = true, optional = true }
sysinfo = { workspace = true, optional = true }
Expand Down Expand Up @@ -115,7 +115,6 @@ azure-monitor-exporter = [
"experimental-exporters",
"dep:opentelemetry-proto",
"dep:azure_identity",
"dep:reqwest",
"dep:azure_core",
"dep:ahash",
"dep:sysinfo",
Expand Down
2 changes: 2 additions & 0 deletions rust/otap-dataflow/crates/otap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub mod otlp_receiver;
/// Implementation of OTLP exporter that implements the exporter trait
pub mod otlp_exporter;

pub mod otlp_http_exporter;

/// Batch processor
pub mod batch_processor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,19 +368,19 @@ impl Default for GrpcClientSettings {
}
}

const fn default_concurrency_limit() -> usize {
pub(crate) const fn default_concurrency_limit() -> usize {
256
}

const fn default_connect_timeout() -> Duration {
pub(crate) const fn default_connect_timeout() -> Duration {
Duration::from_secs(3)
}

const fn default_tcp_nodelay() -> bool {
pub(crate) const fn default_tcp_nodelay() -> bool {
true
}

const fn default_tcp_keepalive() -> Option<Duration> {
pub(crate) const fn default_tcp_keepalive() -> Option<Duration> {
Some(Duration::from_secs(45))
}

Expand Down
24 changes: 12 additions & 12 deletions rust/otap-dataflow/crates/otap/src/otlp_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct Config {
pub max_in_flight: usize,
}

const fn default_max_in_flight() -> usize {
pub(crate) const fn default_max_in_flight() -> usize {
5
}

Expand Down Expand Up @@ -225,7 +225,7 @@ impl Exporter<OtapPdata> for OTLPExporter {

match msg {
Message::Control(NodeControlMsg::Shutdown { deadline, .. }) => {
otel_info!("otlp.exporter.shutdown");
otel_info!("otlp.exporter.grpc.shutdown");
debug_assert!(
pending_msg.is_none(),
"pending message should have been drained before shutdown"
Expand Down Expand Up @@ -480,7 +480,7 @@ async fn dispatch_otap_export<Enc, Fut, MakeFuture>(
proto_buffer: &mut ProtoBuffer,
encoder: &mut Enc,
make_future: MakeFuture,
inflight: &mut InFlightExports<Fut>,
inflight: &mut InFlightExports<Fut, CompletedExport>,
failed_counter: &mut Counter<u64>,
effect_handler: &EffectHandler<OtapPdata>,
) where
Expand Down Expand Up @@ -597,37 +597,37 @@ fn make_export_future(
}

/// FIFO-ish wrapper around the in-flight export RPCs.
struct InFlightExports<Fut>
pub(crate) struct InFlightExports<Fut, Output>
where
Fut: Future<Output = CompletedExport>,
Fut: Future<Output = Output>,
{
futures: FuturesUnordered<Fut>,
}

impl<Fut> InFlightExports<Fut>
impl<Fut, Output> InFlightExports<Fut, Output>
where
Fut: Future<Output = CompletedExport>,
Fut: Future<Output = Output>,
{
fn new() -> Self {
pub(crate) fn new() -> Self {
Self {
futures: FuturesUnordered::new(),
}
}

fn len(&self) -> usize {
pub(crate) fn len(&self) -> usize {
self.futures.len()
}

fn is_empty(&self) -> bool {
pub(crate) fn is_empty(&self) -> bool {
self.futures.is_empty()
}

fn push(&mut self, future: Fut) {
pub(crate) fn push(&mut self, future: Fut) {
self.futures.push(future);
}

/// Returns a future that resolves once the next export finishes.
fn next_completion(&mut self) -> impl Future<Output = Option<CompletedExport>> + '_ {
pub(crate) fn next_completion(&mut self) -> impl Future<Output = Option<Output>> + '_ {
self.futures.next()
}
}
Expand Down
20 changes: 16 additions & 4 deletions rust/otap-dataflow/crates/otap/src/otlp_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ use crate::tls_utils::build_tls_acceptor;
#[cfg(feature = "experimental-tls")]
use otap_df_config::tls::TlsServerConfig;

pub mod client_settings;

/// OTLP protobuf content type
const PROTOBUF_CONTENT_TYPE: &str = "application/x-protobuf";
pub(crate) const PROTOBUF_CONTENT_TYPE: &str = "application/x-protobuf";

/// Settings for the OTLP/HTTP server.
#[derive(Debug, Deserialize, Clone)]
Expand Down Expand Up @@ -321,11 +323,20 @@ fn internal_error() -> Response<Full<Bytes>> {
rpc_status_response(StatusCode::INTERNAL_SERVER_ERROR, 13, "internal error")
}

/// OTLP HTTP path for logs endpoint
pub const LOGS_PATH: &str = "/v1/logs";

/// OTLP HTTP path for metrics endpoint
pub const METRICS_PATH: &str = "/v1/metrics";

/// OTLP HTTP path for traces endpoint
pub const TRACES_PATH: &str = "/v1/traces";

fn map_path_to_signal(path: &str) -> Option<SignalType> {
match path {
"/v1/logs" => Some(SignalType::Logs),
"/v1/metrics" => Some(SignalType::Metrics),
"/v1/traces" => Some(SignalType::Traces),
LOGS_PATH => Some(SignalType::Logs),
METRICS_PATH => Some(SignalType::Metrics),
TRACES_PATH => Some(SignalType::Traces),
_ => None,
}
}
Expand Down Expand Up @@ -681,6 +692,7 @@ impl HttpHandler {
} else {
None
};

if self
.effect_handler
.send_message_with_source_node(pdata)
Expand Down
90 changes: 90 additions & 0 deletions rust/otap-dataflow/crates/otap/src/otlp_http/client_settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Shared configuration for HTTP-based clients.

use reqwest::ClientBuilder;
use serde::Deserialize;
use std::time::Duration;
use tower::limit::ConcurrencyLimitLayer;

use crate::otap_grpc::client_settings::{
default_concurrency_limit, default_connect_timeout, default_tcp_keepalive, default_tcp_nodelay,
};

/// Common configuration shared across HTTP clients.
#[derive(Debug, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct HttpClientSettings {
/// Maximum number of concurrent in-flight requests allowed by the transport stack.
#[serde(default = "default_concurrency_limit")]
pub concurrency_limit: usize,

/// Timeout for establishing TCP connections.
///
/// When a proxy is configured, this timeout covers the entire connection process,
/// including the TCP connection to the proxy and the HTTP CONNECT handshake.
#[serde(default = "default_connect_timeout", with = "humantime_serde")]
pub connect_timeout: Duration,

/// Whether to enable `TCP_NODELAY`.
#[serde(default = "default_tcp_nodelay")]
pub tcp_nodelay: bool,

/// TCP keepalive timeout for outbound connections.
#[serde(default = "default_tcp_keepalive", with = "humantime_serde")]
pub tcp_keepalive: Option<Duration>,

/// Interval between TCP keepalive probes once keepalive is active.
#[serde(default, with = "humantime_serde")]
pub tcp_keepalive_interval: Option<Duration>,

/// Timeout for HTTP requests. If not specified, no timeout is applied.
#[serde(default, with = "humantime_serde")]
pub timeout: Option<Duration>,
}

impl HttpClientSettings {
/// Returns a non-zero concurrency limit.
#[must_use]
pub fn effective_concurrency_limit(&self) -> usize {
self.concurrency_limit.max(1)
}

/// Returns a configured client-bulder
pub fn client_builder(&self) -> ClientBuilder {
let mut client_builder = ClientBuilder::new()
.connect_timeout(self.connect_timeout)
.tcp_nodelay(self.tcp_nodelay)
.connector_layer(ConcurrencyLimitLayer::new(
self.effective_concurrency_limit(),
));

if let Some(tcp_keepalive) = self.tcp_keepalive {
client_builder = client_builder.tcp_keepalive(tcp_keepalive);
}

if let Some(tcp_keepalive_interval) = self.tcp_keepalive_interval {
client_builder = client_builder.tcp_keepalive_interval(tcp_keepalive_interval)
}

if let Some(timeout) = self.timeout {
client_builder = client_builder.timeout(timeout)
}

client_builder
}
}

impl Default for HttpClientSettings {
fn default() -> Self {
Self {
concurrency_limit: default_concurrency_limit(),
connect_timeout: default_connect_timeout(),
tcp_nodelay: default_tcp_nodelay(),
tcp_keepalive: default_tcp_keepalive(),
tcp_keepalive_interval: None,
timeout: None,
}
}
}
42 changes: 42 additions & 0 deletions rust/otap-dataflow/crates/otap/src/otlp_http_exporter/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

use std::num::NonZeroUsize;

use serde::Deserialize;

use crate::otlp_exporter::default_max_in_flight;
use crate::otlp_http::client_settings::HttpClientSettings;

/// Configuration for OTLP HTTP Exporter
#[derive(Debug, Deserialize)]
pub struct Config {
/// Configuration for the HTTP client that will be used by this exporter
pub http: HttpClientSettings,

/// The endpoint to which the exporter will send OTLP HTTP requests. This should include the
/// scheme, host and port, but not the paths (/v1/logs) as these will be appended to requests
/// automatically for each batch of signals depending on the signal type.
///
/// Example: "http://localhost:4318" or "https://otel-collector:4318"
pub endpoint: String,
Comment thread
albertlockett marked this conversation as resolved.

/// Maximum allowed size for the body of OTLP HTTP responses. This is used to prevent unbounded
/// memory usage when receiving responses from the OTLP server. If a response exceeds this size,
/// the exporter will consider processing of the batch to be unsuccessful. default = 10 MiB
#[serde(default = "default_max_response_body_length")]
pub max_response_body_length: usize,

/// The size of the pool of HTTP clients to use for sending export requests. This allows for
/// multiple concurrent connections to the OTLP server, which can help with load balancing when
/// there are multiple receiver instances running on the same port (using SO_REUSEPORT).
pub client_pool_size: NonZeroUsize,

/// Maximum number of concurrent in-flight export requests.
#[serde(default = "default_max_in_flight")]
pub max_in_flight: usize,
}

fn default_max_response_body_length() -> usize {
10 * 1024 * 1024
}
Loading
Loading