Skip to content

Commit d6a38f4

Browse files
committed
Introduce tests for load-balanced channel.
1 parent ac3d375 commit d6a38f4

5 files changed

Lines changed: 275 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-util/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ rust_test_suite(
137137
"@crates//:mock_instant",
138138
"@crates//:opentelemetry",
139139
"@crates//:opentelemetry-http",
140+
"@crates//:opentelemetry_sdk",
140141
"@crates//:parking_lot",
141142
"@crates//:pretty_assertions",
142143
"@crates//:rand",
@@ -163,10 +164,13 @@ rust_test(
163164
],
164165
deps = [
165166
"@crates//:http-body-util",
167+
"@crates//:opentelemetry-proto",
168+
"@crates//:opentelemetry_sdk",
166169
"@crates//:pretty_assertions",
167170
"@crates//:rand",
168171
"@crates//:serde_json",
169172
"@crates//:tempfile",
173+
"@crates//:tokio-stream",
170174
"@crates//:tracing-test",
171175
],
172176
)

nativelink-util/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,13 @@ walkdir = { version = "2.5.0", default-features = false }
9898

9999
[dev-dependencies]
100100
nativelink-macro = { path = "../nativelink-macro" }
101+
opentelemetry-proto = { version = "0.29.0", features = [
102+
"gen-tonic",
103+
"metrics",
104+
] }
105+
opentelemetry_sdk = { version = "0.29.0", features = [
106+
"testing"
107+
] }
101108

102109
axum = { version = "0.8.3", default-features = false }
103110
http-body-util = { version = "0.1.3", default-features = false }

nativelink-util/src/telemetry.rs

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,3 +383,217 @@ impl<S> tower::Layer<S> for OtlpLayer {
383383
OtlpMiddleware::new(service, self.identity_required)
384384
}
385385
}
386+
387+
#[cfg(test)]
388+
mod tests {
389+
use core::convert::Infallible;
390+
use core::future::Future;
391+
use core::pin::Pin;
392+
use core::task::{Context, Poll};
393+
use std::sync::Arc;
394+
395+
use nativelink_macro::nativelink_test;
396+
use opentelemetry::metrics::MeterProvider as _;
397+
use opentelemetry_proto::tonic::collector::metrics::v1::{
398+
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
399+
};
400+
use opentelemetry_sdk::metrics::SdkMeterProvider;
401+
use parking_lot::Mutex;
402+
403+
use super::*;
404+
405+
// Serialize env-var mutations across tests: env vars are process-global,
406+
// so concurrent writes would be a data race on Unix.
407+
//
408+
// Serialize env-var mutations across tests: env vars are process-global,
409+
// so concurrent writes would be a data race on Unix. The guard is always
410+
// scoped so it drops before any `.await`, which satisfies
411+
// `clippy::await_holding_lock`.
412+
static ENV_LOCK: Mutex<()> = Mutex::new(());
413+
414+
#[nativelink_test("crate")]
415+
async fn channel_absent_when_env_not_set() {
416+
{
417+
let _guard = ENV_LOCK.lock();
418+
// SAFETY: ENV_LOCK serializes all env-var writes in this test module.
419+
unsafe { env::remove_var(NL_OTEL_ENDPOINT) };
420+
}
421+
assert!(
422+
maybe_load_balanced_channel().await.is_none(),
423+
"Expected None when {NL_OTEL_ENDPOINT} is unset"
424+
);
425+
}
426+
427+
#[nativelink_test("crate")]
428+
async fn channel_present_when_valid_endpoint_set() {
429+
{
430+
let _guard = ENV_LOCK.lock();
431+
// SAFETY: ENV_LOCK serializes all env-var writes in this test module.
432+
unsafe { env::set_var(NL_OTEL_ENDPOINT, "http://localhost:4317") };
433+
}
434+
let result = maybe_load_balanced_channel().await;
435+
{
436+
let _guard = ENV_LOCK.lock();
437+
// SAFETY: ENV_LOCK serializes all env-var writes in this test module.
438+
unsafe { env::remove_var(NL_OTEL_ENDPOINT) };
439+
}
440+
assert!(
441+
result.is_some(),
442+
"Expected Some(channel) when {NL_OTEL_ENDPOINT} points to a valid URL"
443+
);
444+
}
445+
446+
// A minimal in-process OTLP metrics collector backed by tonic 0.13.
447+
//
448+
// `opentelemetry-proto`'s generated `MetricsServiceServer` depends on tonic
449+
// 0.12, which is incompatible with the tonic 0.13 used by the rest of the
450+
// crate. To avoid the version conflict we implement the gRPC service
451+
// manually: re-use only the prost message types from `opentelemetry-proto`
452+
// and wire up the routing with tonic 0.13 primitives directly.
453+
#[derive(Clone)]
454+
struct TestMetricsService {
455+
received: Arc<Mutex<Vec<ExportMetricsServiceRequest>>>,
456+
}
457+
458+
const METRICS_EXPORT_PATH: &str =
459+
"/opentelemetry.proto.collector.metrics.v1.MetricsService/Export";
460+
461+
impl tower::Service<hyper::http::Request<tonic::body::Body>> for TestMetricsService {
462+
type Response = Response<tonic::body::Body>;
463+
type Error = Infallible;
464+
type Future =
465+
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
466+
467+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
468+
Poll::Ready(Ok(()))
469+
}
470+
471+
fn call(&mut self, req: hyper::http::Request<tonic::body::Body>) -> Self::Future {
472+
let received = self.received.clone();
473+
Box::pin(async move {
474+
if req.uri().path() == METRICS_EXPORT_PATH {
475+
let export_svc = tower::service_fn(
476+
move |request: tonic::Request<ExportMetricsServiceRequest>| {
477+
let received = received.clone();
478+
async move {
479+
received.lock().push(request.into_inner());
480+
Ok::<tonic::Response<ExportMetricsServiceResponse>, tonic::Status>(
481+
tonic::Response::new(ExportMetricsServiceResponse::default()),
482+
)
483+
}
484+
},
485+
);
486+
// ProstCodec<T, U>: Encode = T (sent to client),
487+
// Decode = U (received from client).
488+
let mut grpc = tonic::server::Grpc::new(tonic::codec::ProstCodec::<
489+
ExportMetricsServiceResponse,
490+
ExportMetricsServiceRequest,
491+
>::default());
492+
Ok(grpc.unary(export_svc, req).await)
493+
} else {
494+
let mut resp = Response::new(tonic::body::Body::empty());
495+
resp.headers_mut().insert(
496+
tonic::Status::GRPC_STATUS,
497+
// `Code` in the outer scope is nativelink_error::Code,
498+
// so the `tonic::` prefix is necessary here.
499+
#[expect(
500+
unused_qualifications,
501+
reason = "tonic::Code differs from the nativelink_error::Code in outer scope"
502+
)]
503+
(tonic::Code::Unimplemented as i32).into(),
504+
);
505+
Ok(resp)
506+
}
507+
})
508+
}
509+
}
510+
511+
impl tonic::server::NamedService for TestMetricsService {
512+
const NAME: &'static str = "opentelemetry.proto.collector.metrics.v1.MetricsService";
513+
}
514+
515+
#[expect(
516+
clippy::disallowed_methods,
517+
reason = "`tokio::test` uses `tokio::runtime::Builder::new_multi_thread` and \
518+
`tokio::runtime::Runtime::block_on` internally"
519+
)]
520+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
521+
async fn metrics_pushed_via_load_balanced_channel() {
522+
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
523+
.await
524+
.expect("bind test listener");
525+
let port = listener.local_addr().expect("local_addr").port();
526+
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
527+
528+
let received: Arc<Mutex<Vec<ExportMetricsServiceRequest>>> = Arc::new(Mutex::new(vec![]));
529+
let svc = TestMetricsService {
530+
received: received.clone(),
531+
};
532+
533+
crate::background_spawn!("otlp_test_server", async move {
534+
let _ = tonic::transport::Server::builder()
535+
.add_service(svc)
536+
.serve_with_incoming(incoming)
537+
.await;
538+
});
539+
540+
{
541+
let _guard = ENV_LOCK.lock();
542+
// SAFETY: ENV_LOCK serializes all env-var writes in this test module.
543+
unsafe { env::set_var(NL_OTEL_ENDPOINT, format!("http://127.0.0.1:{port}")) };
544+
}
545+
let channel = maybe_load_balanced_channel()
546+
.await
547+
.expect("channel should be Some when NL_OTEL_ENDPOINT is set");
548+
{
549+
let _guard = ENV_LOCK.lock();
550+
// SAFETY: ENV_LOCK serializes all env-var writes in this test module.
551+
unsafe { env::remove_var(NL_OTEL_ENDPOINT) };
552+
}
553+
554+
let exporter = MetricExporter::builder()
555+
.with_tonic()
556+
.with_channel(channel.into())
557+
.with_protocol(Protocol::Grpc)
558+
.build()
559+
.expect("MetricExporter");
560+
561+
let meter_provider = SdkMeterProvider::builder()
562+
.with_periodic_exporter(exporter)
563+
.build();
564+
565+
let meter = meter_provider.meter("nativelink_test");
566+
let counter = meter
567+
.u64_counter("test.pushes")
568+
.with_description("Counter pushed through the load-balanced channel")
569+
.build();
570+
counter.add(42, &[KeyValue::new("operation", "cache_read")]);
571+
572+
meter_provider
573+
.force_flush()
574+
.expect("force_flush should succeed");
575+
576+
{
577+
let batches = received.lock();
578+
assert!(
579+
!batches.is_empty(),
580+
"Expected at least one OTLP ExportMetricsServiceRequest at the test server"
581+
);
582+
583+
let metric_names: Vec<&str> = batches
584+
.iter()
585+
.flat_map(|req| &req.resource_metrics)
586+
.flat_map(|rm| &rm.scope_metrics)
587+
.flat_map(|sm| &sm.metrics)
588+
.map(|m| m.name.as_str())
589+
.collect();
590+
591+
assert!(
592+
metric_names.contains(&"test.pushes"),
593+
"Counter 'test.pushes' should have been delivered via gRPC, got: {metric_names:?}"
594+
);
595+
}
596+
597+
meter_provider.shutdown().expect("shutdown");
598+
}
599+
}

nativelink-util/tests/telemetry_test.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ use hyper::{Request, Response, StatusCode, Uri};
88
use nativelink_macro::nativelink_test;
99
use nativelink_util::telemetry::ClientHeaders;
1010
use opentelemetry::baggage::BaggageExt;
11+
use opentelemetry::metrics::MeterProvider as _;
1112
use opentelemetry::{Context, KeyValue, global};
1213
use opentelemetry_http::HeaderExtractor as OTELHeaderExtractor;
14+
use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider};
1315
use tonic::body::Body;
1416
use tower::{Service, ServiceBuilder, ServiceExt};
1517
use tracing::{debug, error, warn};
@@ -153,3 +155,43 @@ async fn oltp_logs_with_headers() -> Result<(), Box<dyn core::error::Error>> {
153155

154156
Ok(())
155157
}
158+
159+
#[nativelink_test]
160+
async fn metrics_are_tracked() -> Result<(), Box<dyn core::error::Error>> {
161+
let exporter = InMemoryMetricExporter::default();
162+
let meter_provider = SdkMeterProvider::builder()
163+
.with_reader(PeriodicReader::builder(exporter.clone()).build())
164+
.build();
165+
166+
let meter = meter_provider.meter("nativelink_test");
167+
let counter = meter
168+
.u64_counter("test.operations")
169+
.with_description("Total test operations")
170+
.build();
171+
172+
counter.add(3, &[KeyValue::new("operation", "cache_read")]);
173+
counter.add(7, &[KeyValue::new("operation", "cache_write")]);
174+
175+
meter_provider.force_flush()?;
176+
177+
let finished = exporter.get_finished_metrics()?;
178+
assert!(
179+
!finished.is_empty(),
180+
"Expected at least one ResourceMetrics batch after force_flush"
181+
);
182+
183+
let recorded_names: Vec<&str> = finished
184+
.iter()
185+
.flat_map(|rm| &rm.scope_metrics)
186+
.flat_map(|sm| &sm.metrics)
187+
.map(|m| m.name.as_ref())
188+
.collect();
189+
190+
assert!(
191+
recorded_names.contains(&"test.operations"),
192+
"Counter 'test.operations' should appear in exported metrics, got: {recorded_names:?}"
193+
);
194+
195+
meter_provider.shutdown()?;
196+
Ok(())
197+
}

0 commit comments

Comments
 (0)