@@ -383,3 +383,217 @@ impl<S> tower::Layer<S> for OtlpLayer {
383383 OtlpMiddleware :: new ( service, self . identity_required )
384384 }
385385}
386+
387+ #[ cfg( test) ]
388+ mod tests {
389+ use core:: convert:: Infallible ;
390+ use core:: future:: Future ;
391+ use core:: pin:: Pin ;
392+ use core:: task:: { Context , Poll } ;
393+ use std:: sync:: Arc ;
394+
395+ use nativelink_macro:: nativelink_test;
396+ use opentelemetry:: metrics:: MeterProvider as _;
397+ use opentelemetry_proto:: tonic:: collector:: metrics:: v1:: {
398+ ExportMetricsServiceRequest , ExportMetricsServiceResponse ,
399+ } ;
400+ use opentelemetry_sdk:: metrics:: SdkMeterProvider ;
401+ use parking_lot:: Mutex ;
402+
403+ use super :: * ;
404+
405+ // Serialize env-var mutations across tests: env vars are process-global,
406+ // so concurrent writes would be a data race on Unix.
407+ //
408+ // Serialize env-var mutations across tests: env vars are process-global,
409+ // so concurrent writes would be a data race on Unix. The guard is always
410+ // scoped so it drops before any `.await`, which satisfies
411+ // `clippy::await_holding_lock`.
412+ static ENV_LOCK : Mutex < ( ) > = Mutex :: new ( ( ) ) ;
413+
414+ #[ nativelink_test( "crate" ) ]
415+ async fn channel_absent_when_env_not_set ( ) {
416+ {
417+ let _guard = ENV_LOCK . lock ( ) ;
418+ // SAFETY: ENV_LOCK serializes all env-var writes in this test module.
419+ unsafe { env:: remove_var ( NL_OTEL_ENDPOINT ) } ;
420+ }
421+ assert ! (
422+ maybe_load_balanced_channel( ) . await . is_none( ) ,
423+ "Expected None when {NL_OTEL_ENDPOINT} is unset"
424+ ) ;
425+ }
426+
427+ #[ nativelink_test( "crate" ) ]
428+ async fn channel_present_when_valid_endpoint_set ( ) {
429+ {
430+ let _guard = ENV_LOCK . lock ( ) ;
431+ // SAFETY: ENV_LOCK serializes all env-var writes in this test module.
432+ unsafe { env:: set_var ( NL_OTEL_ENDPOINT , "http://localhost:4317" ) } ;
433+ }
434+ let result = maybe_load_balanced_channel ( ) . await ;
435+ {
436+ let _guard = ENV_LOCK . lock ( ) ;
437+ // SAFETY: ENV_LOCK serializes all env-var writes in this test module.
438+ unsafe { env:: remove_var ( NL_OTEL_ENDPOINT ) } ;
439+ }
440+ assert ! (
441+ result. is_some( ) ,
442+ "Expected Some(channel) when {NL_OTEL_ENDPOINT} points to a valid URL"
443+ ) ;
444+ }
445+
446+ // A minimal in-process OTLP metrics collector backed by tonic 0.13.
447+ //
448+ // `opentelemetry-proto`'s generated `MetricsServiceServer` depends on tonic
449+ // 0.12, which is incompatible with the tonic 0.13 used by the rest of the
450+ // crate. To avoid the version conflict we implement the gRPC service
451+ // manually: re-use only the prost message types from `opentelemetry-proto`
452+ // and wire up the routing with tonic 0.13 primitives directly.
453+ #[ derive( Clone ) ]
454+ struct TestMetricsService {
455+ received : Arc < Mutex < Vec < ExportMetricsServiceRequest > > > ,
456+ }
457+
458+ const METRICS_EXPORT_PATH : & str =
459+ "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export" ;
460+
461+ impl tower:: Service < hyper:: http:: Request < tonic:: body:: Body > > for TestMetricsService {
462+ type Response = Response < tonic:: body:: Body > ;
463+ type Error = Infallible ;
464+ type Future =
465+ Pin < Box < dyn Future < Output = Result < Self :: Response , Self :: Error > > + Send + ' static > > ;
466+
467+ fn poll_ready ( & mut self , _cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > {
468+ Poll :: Ready ( Ok ( ( ) ) )
469+ }
470+
471+ fn call ( & mut self , req : hyper:: http:: Request < tonic:: body:: Body > ) -> Self :: Future {
472+ let received = self . received . clone ( ) ;
473+ Box :: pin ( async move {
474+ if req. uri ( ) . path ( ) == METRICS_EXPORT_PATH {
475+ let export_svc = tower:: service_fn (
476+ move |request : tonic:: Request < ExportMetricsServiceRequest > | {
477+ let received = received. clone ( ) ;
478+ async move {
479+ received. lock ( ) . push ( request. into_inner ( ) ) ;
480+ Ok :: < tonic:: Response < ExportMetricsServiceResponse > , tonic:: Status > (
481+ tonic:: Response :: new ( ExportMetricsServiceResponse :: default ( ) ) ,
482+ )
483+ }
484+ } ,
485+ ) ;
486+ // ProstCodec<T, U>: Encode = T (sent to client),
487+ // Decode = U (received from client).
488+ let mut grpc = tonic:: server:: Grpc :: new ( tonic:: codec:: ProstCodec :: <
489+ ExportMetricsServiceResponse ,
490+ ExportMetricsServiceRequest ,
491+ > :: default ( ) ) ;
492+ Ok ( grpc. unary ( export_svc, req) . await )
493+ } else {
494+ let mut resp = Response :: new ( tonic:: body:: Body :: empty ( ) ) ;
495+ resp. headers_mut ( ) . insert (
496+ tonic:: Status :: GRPC_STATUS ,
497+ // `Code` in the outer scope is nativelink_error::Code,
498+ // so the `tonic::` prefix is necessary here.
499+ #[ expect(
500+ unused_qualifications,
501+ reason = "tonic::Code differs from the nativelink_error::Code in outer scope"
502+ ) ]
503+ ( tonic:: Code :: Unimplemented as i32 ) . into ( ) ,
504+ ) ;
505+ Ok ( resp)
506+ }
507+ } )
508+ }
509+ }
510+
511+ impl tonic:: server:: NamedService for TestMetricsService {
512+ const NAME : & ' static str = "opentelemetry.proto.collector.metrics.v1.MetricsService" ;
513+ }
514+
515+ #[ expect(
516+ clippy:: disallowed_methods,
517+ reason = "`tokio::test` uses `tokio::runtime::Builder::new_multi_thread` and \
518+ `tokio::runtime::Runtime::block_on` internally"
519+ ) ]
520+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 2 ) ]
521+ async fn metrics_pushed_via_load_balanced_channel ( ) {
522+ let listener = tokio:: net:: TcpListener :: bind ( "127.0.0.1:0" )
523+ . await
524+ . expect ( "bind test listener" ) ;
525+ let port = listener. local_addr ( ) . expect ( "local_addr" ) . port ( ) ;
526+ let incoming = tokio_stream:: wrappers:: TcpListenerStream :: new ( listener) ;
527+
528+ let received: Arc < Mutex < Vec < ExportMetricsServiceRequest > > > = Arc :: new ( Mutex :: new ( vec ! [ ] ) ) ;
529+ let svc = TestMetricsService {
530+ received : received. clone ( ) ,
531+ } ;
532+
533+ crate :: background_spawn!( "otlp_test_server" , async move {
534+ let _ = tonic:: transport:: Server :: builder( )
535+ . add_service( svc)
536+ . serve_with_incoming( incoming)
537+ . await ;
538+ } ) ;
539+
540+ {
541+ let _guard = ENV_LOCK . lock ( ) ;
542+ // SAFETY: ENV_LOCK serializes all env-var writes in this test module.
543+ unsafe { env:: set_var ( NL_OTEL_ENDPOINT , format ! ( "http://127.0.0.1:{port}" ) ) } ;
544+ }
545+ let channel = maybe_load_balanced_channel ( )
546+ . await
547+ . expect ( "channel should be Some when NL_OTEL_ENDPOINT is set" ) ;
548+ {
549+ let _guard = ENV_LOCK . lock ( ) ;
550+ // SAFETY: ENV_LOCK serializes all env-var writes in this test module.
551+ unsafe { env:: remove_var ( NL_OTEL_ENDPOINT ) } ;
552+ }
553+
554+ let exporter = MetricExporter :: builder ( )
555+ . with_tonic ( )
556+ . with_channel ( channel. into ( ) )
557+ . with_protocol ( Protocol :: Grpc )
558+ . build ( )
559+ . expect ( "MetricExporter" ) ;
560+
561+ let meter_provider = SdkMeterProvider :: builder ( )
562+ . with_periodic_exporter ( exporter)
563+ . build ( ) ;
564+
565+ let meter = meter_provider. meter ( "nativelink_test" ) ;
566+ let counter = meter
567+ . u64_counter ( "test.pushes" )
568+ . with_description ( "Counter pushed through the load-balanced channel" )
569+ . build ( ) ;
570+ counter. add ( 42 , & [ KeyValue :: new ( "operation" , "cache_read" ) ] ) ;
571+
572+ meter_provider
573+ . force_flush ( )
574+ . expect ( "force_flush should succeed" ) ;
575+
576+ {
577+ let batches = received. lock ( ) ;
578+ assert ! (
579+ !batches. is_empty( ) ,
580+ "Expected at least one OTLP ExportMetricsServiceRequest at the test server"
581+ ) ;
582+
583+ let metric_names: Vec < & str > = batches
584+ . iter ( )
585+ . flat_map ( |req| & req. resource_metrics )
586+ . flat_map ( |rm| & rm. scope_metrics )
587+ . flat_map ( |sm| & sm. metrics )
588+ . map ( |m| m. name . as_str ( ) )
589+ . collect ( ) ;
590+
591+ assert ! (
592+ metric_names. contains( & "test.pushes" ) ,
593+ "Counter 'test.pushes' should have been delivered via gRPC, got: {metric_names:?}"
594+ ) ;
595+ }
596+
597+ meter_provider. shutdown ( ) . expect ( "shutdown" ) ;
598+ }
599+ }
0 commit comments