diff --git a/apollo-router/src/layers/async_checkpoint.rs b/apollo-router/src/layers/async_checkpoint.rs index f2cda0a9d9..3013b5a8df 100644 --- a/apollo-router/src/layers/async_checkpoint.rs +++ b/apollo-router/src/layers/async_checkpoint.rs @@ -345,7 +345,7 @@ mod async_checkpoint_tests { let mut service_stack = ServiceBuilder::new() .checkpoint_async(|req: ExecutionRequest| async { Ok(ControlFlow::Continue(req)) }) - .buffered() + .buffered("service_stack", vec![]) .service(execution_service); let request = ExecutionRequest::fake_builder().build(); diff --git a/apollo-router/src/layers/instrumented_load_shed.rs b/apollo-router/src/layers/instrumented_load_shed.rs new file mode 100644 index 0000000000..9ce0c31262 --- /dev/null +++ b/apollo-router/src/layers/instrumented_load_shed.rs @@ -0,0 +1,175 @@ +//! A wrapper around [`LoadShed`] that increments a counter every time load +//! is shed, reporting it as the `apollo.router.shaping.shed` metric. +//! +//! [`InstrumentedLoadShedLayer`] is a replication of Tower's [`LoadShedLayer`] +//! that produces an [`InstrumentedLoadShed`] service. +//! [`InstrumentedLoadShed`] is a thin wrapper around Tower's [`LoadShed`], +//! and [`InstrumentedResponseFuture`] is a thin wrapper around Tower's +//! [`ResponseFuture`]. +//! +//! ## Design trade-offs +//! +//! Similar to how [`UnconstrainedBuffer`] simplifies its instrumentation by +//! counting requests from the moment they enter the queue to the moment they +//! complete (allowing for a `bound + 1` count), [`InstrumentedLoadShed`] also +//! makes compromises in the name of simplicity. +//! +//! In order to avoid re-implementing the [`LoadShed`] service — which could +//! get out of sync with Tower's implementation and future changes — +//! [`InstrumentedResponseFuture`] only counts shedding upon [`Future::poll`]. +//! This is because Tower's [`ResponseFuture`] does not expose `ResponseState`, +//! which could otherwise be used to determine whether shedding happened +//! without having to poll the future. +//! +//! The advantage of this model is that only sheds that would be observed by a +//! caller are reported. In other words, requests that were shed but whose +//! futures were dropped without ever being polled do not count toward the +//! shedding metric. +//! +//! [`LoadShedLayer`]: tower::load_shed::LoadShedLayer +//! [`LoadShed`]: LoadShed +//! [`ResponseFuture`]: ResponseFuture +//! [`Future::poll`]: Future::poll +//! [`UnconstrainedBuffer`]: super::unconstrained_buffer::UnconstrainedBuffer + +use std::fmt; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use opentelemetry::KeyValue; +use pin_project_lite::pin_project; +use tower::BoxError; +use tower::Layer; +use tower::load_shed::LoadShed; +use tower::load_shed::error::Overloaded; +use tower::load_shed::future::ResponseFuture; +use tower_service::Service; + +/// Adds an instrumented [`LoadShed`] layer to a service. +/// +/// See the module documentation for more details. +#[derive(Clone, Debug)] +pub struct InstrumentedLoadShedLayer { + name: String, + attributes: Vec, +} + +impl InstrumentedLoadShedLayer { + /// Creates a new [`InstrumentedLoadShedLayer`] with the provided `name` and `attributes`. + pub fn new(name: impl Into, attributes: Vec) -> Self { + Self { + name: name.into(), + attributes, + } + } +} + +impl Layer for InstrumentedLoadShedLayer { + type Service = InstrumentedLoadShed; + + fn layer(&self, service: S) -> Self::Service { + InstrumentedLoadShed::new(self.name.clone(), service, self.attributes.clone()) + } +} + +/// A wrapper around [`LoadShed`] that counts +/// shedding events upon [`Future::poll`]. +/// +/// See the module documentation for more details. +#[derive(Debug)] +pub struct InstrumentedLoadShed { + inner: LoadShed, + attributes: Vec, +} + +impl InstrumentedLoadShed { + fn new(name: impl Into, inner: S, mut attributes: Vec) -> Self { + attributes.push(KeyValue::new("layer.service.name", name.into())); + InstrumentedLoadShed { + inner: LoadShed::new(inner), + attributes, + } + } +} + +impl Service for InstrumentedLoadShed +where + S: Service, + S::Error: Into, +{ + type Response = S::Response; + type Error = BoxError; + type Future = InstrumentedResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Req) -> Self::Future { + InstrumentedResponseFuture::new(self.inner.call(req), self.attributes.clone()) + } +} + +impl Clone for InstrumentedLoadShed { + fn clone(&self) -> Self { + InstrumentedLoadShed { + inner: self.inner.clone(), + attributes: self.attributes.clone(), + } + } +} + +pin_project! { + /// A wrapper around Tower's [`ResponseFuture`] + /// that increments the `apollo.router.shaping.shed` counter when shedding + /// is observed during polling. + pub struct InstrumentedResponseFuture { + #[pin] + inner: ResponseFuture, + attributes: Vec, + } +} + +impl InstrumentedResponseFuture { + fn new(inner: ResponseFuture, attributes: Vec) -> Self { + InstrumentedResponseFuture { inner, attributes } + } +} + +impl Future for InstrumentedResponseFuture +where + F: Future>, + E: Into, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let attributes = self.attributes.clone(); + match self.project().inner.poll(cx) { + Poll::Ready(Ok(res)) => Poll::Ready(Ok(res)), + Poll::Ready(Err(err)) if err.is::() => { + u64_counter_with_unit!( + "apollo.router.shaping.shed", + "Number of times that load was shed", + "{shed}", + 1, + attributes + ); + Poll::Ready(Err(err.into())) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), + Poll::Pending => Poll::Pending, + } + } +} + +impl fmt::Debug for InstrumentedResponseFuture +where + // bounds for future-proofing... + F: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("InstrumentedResponseFuture").finish() + } +} diff --git a/apollo-router/src/layers/mod.rs b/apollo-router/src/layers/mod.rs index 6cf4e0edb5..c38a1ff036 100644 --- a/apollo-router/src/layers/mod.rs +++ b/apollo-router/src/layers/mod.rs @@ -3,6 +3,7 @@ use std::future::Future; use std::ops::ControlFlow; +use opentelemetry::KeyValue; use tower::BoxError; use tower::ServiceBuilder; use tower::layer::util::Stack; @@ -15,6 +16,7 @@ use crate::Context; use crate::graphql; use crate::layers::async_checkpoint::AsyncCheckpointLayer; use crate::layers::instrument::InstrumentLayer; +use crate::layers::instrumented_load_shed::InstrumentedLoadShedLayer; use crate::layers::map_future_with_request_data::MapFutureWithRequestDataLayer; use crate::layers::map_future_with_request_data::MapFutureWithRequestDataService; use crate::layers::sync_checkpoint::CheckpointLayer; @@ -23,6 +25,7 @@ use crate::services::supergraph; pub mod async_checkpoint; pub mod instrument; +pub mod instrumented_load_shed; pub mod map_first_graphql_response; pub mod map_future_with_request_data; pub mod sync_checkpoint; @@ -174,7 +177,18 @@ pub trait ServiceBuilderExt: Sized { /// .service(service); /// # } /// ``` - fn buffered(self) -> ServiceBuilder, L>>; + fn buffered( + self, + name: impl Into, + attributes: Vec, + ) -> ServiceBuilder, L>>; + + /// Adds load shedding to the service stack. + fn instrumented_load_shed( + self, + name: impl Into, + attributes: Vec, + ) -> ServiceBuilder>; /// Place a span around the request. /// @@ -332,8 +346,24 @@ impl ServiceBuilderExt for ServiceBuilder { ServiceBuilder::layer(self, layer) } - fn buffered(self) -> ServiceBuilder, L>> { - self.layer(UnconstrainedBufferLayer::new(DEFAULT_BUFFER_SIZE)) + fn buffered( + self, + name: impl Into, + attributes: Vec, + ) -> ServiceBuilder, L>> { + self.layer(UnconstrainedBufferLayer::new( + name.into(), + DEFAULT_BUFFER_SIZE, + attributes, + )) + } + + fn instrumented_load_shed( + self, + name: impl Into, + attributes: Vec, + ) -> ServiceBuilder> { + self.layer(InstrumentedLoadShedLayer::new(name, attributes)) } } diff --git a/apollo-router/src/layers/unconstrained_buffer.rs b/apollo-router/src/layers/unconstrained_buffer.rs index 526dda8a88..ee6468d558 100644 --- a/apollo-router/src/layers/unconstrained_buffer.rs +++ b/apollo-router/src/layers/unconstrained_buffer.rs @@ -57,18 +57,25 @@ use std::marker::PhantomData; use std::task::Context; use std::task::Poll; +use opentelemetry::KeyValue; use tower::BoxError; use tower::Layer; use tower::buffer::Buffer; use tower::buffer::future::ResponseFuture; use tower_service::Service; +use crate::metrics::UpDownCounterGuard; + /// Adds a [coop unconstrained](tokio::task::unconstrained) [`Buffer`] layer to a service. /// /// See the module documentation for more details. -#[derive(Clone, Copy)] +#[derive(Clone)] pub struct UnconstrainedBufferLayer { + /// Name of the buffer layer, used for metrics. + name: String, bound: usize, + /// Buffer attributes, used for metrics. + attributes: Vec, _p: PhantomData, } @@ -79,9 +86,11 @@ impl UnconstrainedBufferLayer { /// backpressure is applied to callers. /// /// See [`Buffer::new`] for guidance on choosing a `bound`. - pub const fn new(bound: usize) -> Self { + pub fn new(name: impl Into, bound: usize, attributes: Vec) -> Self { UnconstrainedBufferLayer { + name: name.into(), bound, + attributes, _p: PhantomData, } } @@ -97,13 +106,19 @@ where type Service = UnconstrainedBuffer; fn layer(&self, service: S) -> Self::Service { - UnconstrainedBuffer::new(service, self.bound) + UnconstrainedBuffer::new( + self.name.clone(), + service, + self.bound, + self.attributes.clone(), + ) } } impl fmt::Debug for UnconstrainedBufferLayer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("UnconstrainedBufferLayer") + .field("name", &self.name) .field("bound", &self.bound) .finish() } @@ -122,7 +137,9 @@ impl fmt::Debug for UnconstrainedBufferLayer { pub struct UnconstrainedBuffer { /// The inner [`Buffer`] layer, which wraps the actual service and is responsible for /// buffering requests. - inner: Buffer, + inner: Buffer, F>, + /// Buffer attributes, used for metrics. + attributes: Vec, } impl UnconstrainedBuffer @@ -130,16 +147,22 @@ where F: 'static, { /// Creates a new `UnconstrainedBuffer` with the specified service and buffer capacity. - pub fn new(service: S, bound: usize) -> Self + pub fn new( + name: impl Into, + service: S, + bound: usize, + mut attributes: Vec, + ) -> Self where S: Service + Send + 'static, F: Send, S::Error: Into + Send + Sync, Req: Send + 'static, { - let inner = Buffer::new(service, bound); - - Self { inner } + let inner = Buffer::new(GaugedRequestService(service), bound); + attributes.push(KeyValue::new("layer.service.name", name.into())); + attributes.push(KeyValue::new("buffer.capacity", bound as i64)); + Self { inner, attributes } } } @@ -162,7 +185,23 @@ where } fn call(&mut self, request: Req) -> Self::Future { - self.inner.call(request) + // Tracks the whole buffer pipeline from the moment the request is enqueued + // to the moment it's fully processed by the Worker. + // This means that at any given time, the counter represents the number of messages + // currently in the buffer + 1 if there's a message being processed by the Worker. + // In that scenario, it's completely possible to have `bound + 1` messages in flight, + // this is a tradeoff we accept to have a guard that automatically decrements on `drop`. + // To only track the number of messages in the buffer itself, we need to manually + // decrement the counter in upon the first `poll_ready` call in the inner service. However, + // that method doesn't know what request is going to be processed next. + let counter = i64_up_down_counter_with_unit!( + "apollo.router.buffer.messages", + "Number of messages currently in the buffer", + "{message}", + 1, + self.attributes + ); + self.inner.call(GaugedRequest(request, counter)) } } @@ -174,10 +213,51 @@ where fn clone(&self) -> Self { Self { inner: self.inner.clone(), + attributes: self.attributes.clone(), } } } +/// A wrapper around the request that holds an [`UpDownCounterGuard`] to track the number of +/// messages in the buffer. +#[derive(Debug)] +struct GaugedRequest(S, UpDownCounterGuard); + +/// A wrapper around the inner service that accepts a [`GaugedRequest`] and forwards the inner request +/// to the actual service. +/// This is necessary because the [`Buffer`] layer operates on [`GaugedRequest`] instead of the +/// original request type, so we need to convert back before calling the inner service. +#[derive(Debug)] +struct GaugedRequestService(S); + +impl Service> for GaugedRequestService +where + S: Service, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx) + } + + fn call(&mut self, request: GaugedRequest) -> Self::Future { + // Drop the `UpDownCounterGuard` after the request is processed by the inner service. + + // When the Buffer's Worker picks up a message, it first calls `poll_ready` on the inner + // service, if the service reports `Ready`, `call` is called. + // Here we can either: + // 1. drop immediately to signal that the message is no longer in the + // buffer queue, which is not entirely correct as it's dequeued before + // `poll_ready` is even called, or + // 2. Keep the guard until the message is fully processed by the Worker, + // which is more intuitive. + let GaugedRequest(request, _guard) = request; + self.0.call(request) + } +} + #[cfg(test)] mod tests { use std::future::poll_fn; diff --git a/apollo-router/src/plugin/mod.rs b/apollo-router/src/plugin/mod.rs index bab3a4985d..b99d228a33 100644 --- a/apollo-router/src/plugin/mod.rs +++ b/apollo-router/src/plugin/mod.rs @@ -919,7 +919,9 @@ pub(crate) struct Handler { impl Handler { pub(crate) fn new(service: router::BoxService) -> Self { Self { - service: ServiceBuilder::new().buffered().service(service), + service: ServiceBuilder::new() + .buffered("handler", vec![]) + .service(service), } } } diff --git a/apollo-router/src/plugins/cache/entity.rs b/apollo-router/src/plugins/cache/entity.rs index f3c39de3c6..217aea9ca4 100644 --- a/apollo-router/src/plugins/cache/entity.rs +++ b/apollo-router/src/plugins/cache/entity.rs @@ -405,7 +405,13 @@ impl PluginPrivate for EntityCache { }) .service(CacheService { service: ServiceBuilder::new() - .buffered() + .buffered( + "subgraph", + vec![ + opentelemetry::KeyValue::new("subgraph.name", name.clone()), + opentelemetry::KeyValue::new("plugin.name", "entity_cache"), + ], + ) .service(service) .boxed_clone(), entity_type: self.entity_type.clone(), diff --git a/apollo-router/src/plugins/cache/metrics.rs b/apollo-router/src/plugins/cache/metrics.rs index 0e3197c218..eae0a6c50b 100644 --- a/apollo-router/src/plugins/cache/metrics.rs +++ b/apollo-router/src/plugins/cache/metrics.rs @@ -33,7 +33,10 @@ impl CacheMetricsService { ) -> subgraph::BoxService { tower::util::BoxService::new(CacheMetricsService { service: ServiceBuilder::new() - .buffered() + .buffered( + "cache_metrics", + vec![opentelemetry::KeyValue::new("subgraph.name", name.clone())], + ) .service(service) .boxed_clone(), name: Arc::new(name), diff --git a/apollo-router/src/plugins/coprocessor/connector.rs b/apollo-router/src/plugins/coprocessor/connector.rs index 7650bef6ff..a2d0e2c908 100644 --- a/apollo-router/src/plugins/coprocessor/connector.rs +++ b/apollo-router/src/plugins/coprocessor/connector.rs @@ -160,6 +160,7 @@ impl ConnectorStage { let response_layer = (self.response != Default::default()).then_some({ let response_config = self.response.clone(); let coprocessor_url = response_config.url.clone().unwrap_or(default_url); + let service_name = service_name.clone(); MapFutureWithRequestDataLayer::new( |req: &request_service::Request| req.context.clone(), @@ -215,7 +216,13 @@ impl ConnectorStage { .instrument(external_service_span()) .option_layer(request_layer) .option_layer(response_layer) - .buffered() + .buffered( + "connector", + vec![ + opentelemetry::KeyValue::new("source.name", service_name.clone()), + opentelemetry::KeyValue::new("plugin.name", "coprocessor"), + ], + ) .service(service) .boxed() } diff --git a/apollo-router/src/plugins/coprocessor/execution.rs b/apollo-router/src/plugins/coprocessor/execution.rs index 483bc9ef24..3de318df37 100644 --- a/apollo-router/src/plugins/coprocessor/execution.rs +++ b/apollo-router/src/plugins/coprocessor/execution.rs @@ -172,7 +172,10 @@ impl ExecutionStage { .instrument(external_service_span()) .option_layer(request_layer) .option_layer(response_layer) - .buffered() // XXX: Added during backpressure fixing + .buffered( + "execution", + vec![opentelemetry::KeyValue::new("plugin.name", "coprocessor")], + ) // XXX: Added during backpressure fixing .service(service) .boxed() } diff --git a/apollo-router/src/plugins/coprocessor/mod.rs b/apollo-router/src/plugins/coprocessor/mod.rs index c8eda60ef0..b5bbcb2f05 100644 --- a/apollo-router/src/plugins/coprocessor/mod.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -781,7 +781,10 @@ impl RouterStage { .instrument(external_service_span()) .option_layer(request_layer) .option_layer(response_layer) - .buffered() // XXX: Added during backpressure fixing + .buffered( + "router", + vec![opentelemetry::KeyValue::new("plugin.name", "coprocessor")], + ) // XXX: Added during backpressure fixing .service(service) .boxed() } @@ -864,6 +867,7 @@ impl SubgraphStage { let response_layer = (self.response != Default::default()).then_some({ let response_config = self.response.clone(); let coprocessor_url = response_config.url.clone().unwrap_or(default_url); + let service_name = service_name.clone(); MapFutureLayer::new(move |fut| { let http_client = http_client.clone(); @@ -913,7 +917,13 @@ impl SubgraphStage { .instrument(external_service_span()) .option_layer(request_layer) .option_layer(response_layer) - .buffered() // XXX: Added during backpressure fixing + .buffered( + "subgraph", + vec![ + opentelemetry::KeyValue::new("subgraph.name", service_name.clone()), + opentelemetry::KeyValue::new("plugin.name", "coprocessor"), + ], + ) // XXX: Added during backpressure fixing .service(service) .boxed() } diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index c02da230f7..448b3c21e6 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -175,7 +175,10 @@ impl SupergraphStage { .instrument(external_service_span()) .option_layer(request_layer) .option_layer(response_layer) - .buffered() // XXX: Added during backpressure fixing + .buffered( + "supergraph", + vec![opentelemetry::KeyValue::new("plugin.name", "coprocessor")], + ) // XXX: Added during backpressure fixing .service(service) .boxed() } diff --git a/apollo-router/src/plugins/file_uploads/mod.rs b/apollo-router/src/plugins/file_uploads/mod.rs index efb94ef018..165485ff1e 100644 --- a/apollo-router/src/plugins/file_uploads/mod.rs +++ b/apollo-router/src/plugins/file_uploads/mod.rs @@ -80,7 +80,10 @@ impl PluginPrivate for FileUploadsPlugin { } .boxed() }) - .buffered() + .buffered( + "router", + vec![opentelemetry::KeyValue::new("plugin.name", "file_uploads")], + ) .service(service) .boxed() } @@ -105,7 +108,10 @@ impl PluginPrivate for FileUploadsPlugin { } .boxed() }) - .buffered() + .buffered( + "supergraph", + vec![opentelemetry::KeyValue::new("plugin.name", "file_uploads")], + ) .service(service) .boxed() } @@ -133,7 +139,7 @@ impl PluginPrivate for FileUploadsPlugin { fn subgraph_service( &self, - _subgraph_name: &str, + subgraph_name: &str, service: subgraph::BoxService, ) -> subgraph::BoxService { if !self.enabled { @@ -146,7 +152,13 @@ impl PluginPrivate for FileUploadsPlugin { .map(|req| Ok(ControlFlow::Continue(req))) .boxed() }) - .buffered() + .buffered( + "subgraph", + vec![ + opentelemetry::KeyValue::new("subgraph.name", subgraph_name.to_string()), + opentelemetry::KeyValue::new("plugin.name", "file_uploads"), + ], + ) .service(service) .boxed() } diff --git a/apollo-router/src/plugins/license_enforcement/mod.rs b/apollo-router/src/plugins/license_enforcement/mod.rs index 7b1e3d17bc..3009fa0801 100644 --- a/apollo-router/src/plugins/license_enforcement/mod.rs +++ b/apollo-router/src/plugins/license_enforcement/mod.rs @@ -88,7 +88,9 @@ impl PluginPrivate for LicenseEnforcement { } }, ) - .load_shed() + .instrumented_load_shed("router", vec![ + opentelemetry::KeyValue::new("plugin.name", "license_enforcement"), + ]) .option_layer( self.tps .as_ref() diff --git a/apollo-router/src/plugins/response_cache/plugin.rs b/apollo-router/src/plugins/response_cache/plugin.rs index fcf24575da..6bccdd1738 100644 --- a/apollo-router/src/plugins/response_cache/plugin.rs +++ b/apollo-router/src/plugins/response_cache/plugin.rs @@ -448,7 +448,13 @@ impl PluginPrivate for ResponseCache { }) .service(CacheService { service: ServiceBuilder::new() - .buffered() + .buffered( + "subgraph", + vec![ + opentelemetry::KeyValue::new("subgraph.name", name.to_string()), + opentelemetry::KeyValue::new("plugin.name", "response_cache"), + ], + ) .service(service) .boxed_clone(), entity_type: self.entity_type.clone(), diff --git a/apollo-router/src/plugins/test/mod.rs b/apollo-router/src/plugins/test/mod.rs index 7504983cce..2f444a457a 100644 --- a/apollo-router/src/plugins/test/mod.rs +++ b/apollo-router/src/plugins/test/mod.rs @@ -459,6 +459,7 @@ mod test_for_harness { use super::*; use crate::Context; use crate::graphql; + use crate::layers::ServiceBuilderExt; use crate::metrics::FutureMetricsExt; use crate::plugin::Plugin; use crate::services::router; @@ -483,7 +484,13 @@ mod test_for_harness { fn router_service(&self, service: BoxService) -> BoxService { ServiceBuilder::new() - .load_shed() + .instrumented_load_shed( + "router", + vec![opentelemetry::KeyValue::new( + "plugin.name", + "my_test_plugin", + )], + ) .concurrency_limit(1) .service(service) .boxed() diff --git a/apollo-router/src/plugins/traffic_shaping/mod.rs b/apollo-router/src/plugins/traffic_shaping/mod.rs index 235f80fb75..8cebf93617 100644 --- a/apollo-router/src/plugins/traffic_shaping/mod.rs +++ b/apollo-router/src/plugins/traffic_shaping/mod.rs @@ -327,6 +327,20 @@ impl PluginPrivate for TrafficShaping { } fn router_service(&self, service: router::BoxService) -> router::BoxService { + let concurrency_limit = self.config.router.as_ref().and_then(|router| { + router + .concurrency_limit + .as_ref() + .map(|limit| ConcurrencyLimitLayer::new(*limit)) + }); + + let rate_limit = self.config.router.as_ref().and_then(|router| { + router + .global_rate_limit + .as_ref() + .map(|limit| RateLimitLayer::new(limit.capacity.into(), limit.interval)) + }); + // NB: consider each triplet (map_future_with_request_data, load_shed, layer) as a unit of // behavior ServiceBuilder::new() @@ -346,7 +360,13 @@ impl PluginPrivate for TrafficShaping { } }, ) - .load_shed() + .instrumented_load_shed( + "router", + vec![ + opentelemetry::KeyValue::new("plugin.name", "traffic_shaping"), + opentelemetry::KeyValue::new("layer.chain.name", "timeout"), + ], + ) .layer(TimeoutLayer::new( self.config .router @@ -370,13 +390,20 @@ impl PluginPrivate for TrafficShaping { } }, ) - .load_shed() - .option_layer(self.config.router.as_ref().and_then(|router| { - router - .concurrency_limit - .as_ref() - .map(|limit| ConcurrencyLimitLayer::new(*limit)) - })) + .instrumented_load_shed( + "router", + std::iter::once(opentelemetry::KeyValue::new( + "plugin.name", + "traffic_shaping", + )) + .chain( + concurrency_limit + .is_some() + .then(|| opentelemetry::KeyValue::new("layer.chain.name", "timeout")), + ) + .collect(), + ) + .option_layer(concurrency_limit) .map_future_with_request_data( |req: &router::Request| req.context.clone(), move |ctx, future| async { @@ -393,13 +420,20 @@ impl PluginPrivate for TrafficShaping { } }, ) - .load_shed() - .option_layer(self.config.router.as_ref().and_then(|router| { - router - .global_rate_limit - .as_ref() - .map(|limit| RateLimitLayer::new(limit.capacity.into(), limit.interval)) - })) + .instrumented_load_shed( + "router", + std::iter::once(opentelemetry::KeyValue::new( + "plugin.name", + "traffic_shaping", + )) + .chain( + rate_limit + .is_some() + .then(|| opentelemetry::KeyValue::new("layer.chain.name", "rate_limit")), + ) + .collect(), + ) + .option_layer(rate_limit) .service(service) .boxed() } @@ -458,7 +492,20 @@ impl PluginPrivate for TrafficShaping { } }, ) - .load_shed() + .instrumented_load_shed( + "subgraph", + vec![ + opentelemetry::KeyValue::new("plugin.name", "traffic_shaping"), + opentelemetry::KeyValue::new("subgraph.name", name.to_string()), + // This layer covers Timeout, Optional RateLimit, and Optional Query Dedup. + // We use a different name for the chain to be able to differentiate it in + // the metrics. + opentelemetry::KeyValue::new( + "layer.chain.name", + "shaping", + ), + ], + ) .layer(TimeoutLayer::new( config.shaping.timeout.unwrap_or(DEFAULT_TIMEOUT), )) @@ -477,7 +524,13 @@ impl PluginPrivate for TrafficShaping { } req }) - .buffered() + .buffered( + "subgraph", + vec![ + opentelemetry::KeyValue::new("plugin.name", "traffic_shaping"), + opentelemetry::KeyValue::new("subgraph.name", name.to_string()), + ] + ) .service(service) .boxed() } else { @@ -539,7 +592,20 @@ impl PluginPrivate for TrafficShaping { } }, ) - .load_shed() + .instrumented_load_shed( + "connector", + vec![ + opentelemetry::KeyValue::new("plugin.name", "traffic_shaping"), + opentelemetry::KeyValue::new("connector.source", source_name.clone()), + // This layer covers Timeout, Optional RateLimit, and Compression. + // We use a different name for the chain to be able to differentiate it in + // the metrics. + opentelemetry::KeyValue::new( + "layer.chain.name", + "shaping", + ), + ], + ) .layer(TimeoutLayer::new( config.timeout.unwrap_or(DEFAULT_TIMEOUT), )) @@ -552,7 +618,12 @@ impl PluginPrivate for TrafficShaping { } req }) - .buffered() + .buffered("connector", + vec![ + opentelemetry::KeyValue::new("source.name", source_name.to_string()), + opentelemetry::KeyValue::new("plugin.name", "traffic_shaping"), + ] + ) .service(service) .boxed() } else { diff --git a/apollo-router/src/services/connector/request_service.rs b/apollo-router/src/services/connector/request_service.rs index c6ad5013f4..5cce5d7f16 100644 --- a/apollo-router/src/services/connector/request_service.rs +++ b/apollo-router/src/services/connector/request_service.rs @@ -163,6 +163,7 @@ impl ConnectorRequestServiceFactory { let mut map = HashMap::with_capacity(connector_sources.len()); for source in connector_sources.iter() { let service = UnconstrainedBuffer::new( + "factory", plugins .iter() .rev() @@ -175,6 +176,10 @@ impl ConnectorRequestServiceFactory { ) .boxed(), DEFAULT_BUFFER_SIZE, + vec![ + KeyValue::new("factory.name", "connector_request_service"), + KeyValue::new("source.name", source.clone()), + ], ); map.insert(source.clone(), service); } diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index 65910a5e48..e2002e671f 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -117,8 +117,10 @@ impl RouterService { query_analysis_layer: QueryAnalysisLayer, batching: Batching, ) -> Self { - let supergraph_service: supergraph::BoxCloneService = - ServiceBuilder::new().buffered().service(sgb).boxed_clone(); + let supergraph_service: supergraph::BoxCloneService = ServiceBuilder::new() + .buffered("router", vec![]) + .service(sgb) + .boxed_clone(); RouterService { apq_layer: Arc::new(apq_layer), @@ -915,6 +917,7 @@ impl RouterCreator { // NOTE: This is the start of the router pipeline (router_service) let sb = UnconstrainedBuffer::new( + "factory", ServiceBuilder::new() .layer(static_page.clone()) .service( @@ -926,6 +929,7 @@ impl RouterCreator { ) .boxed(), DEFAULT_BUFFER_SIZE, + vec![KeyValue::new("factory.name", "router_creator")], ); Ok(Self { diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index 752b0fb4d1..2b98104669 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -1157,7 +1157,14 @@ impl SubgraphServiceFactory { .service(maker.make()) .boxed(); let service = ServiceBuilder::new() - .layer(UnconstrainedBufferLayer::new(DEFAULT_BUFFER_SIZE)) + .layer(UnconstrainedBufferLayer::new( + "factory", + DEFAULT_BUFFER_SIZE, + vec![ + KeyValue::new("factory.name", "subgraph_service_factory"), + KeyValue::new("subgraph.name", name.clone()), + ], + )) .service( plugins .iter() diff --git a/apollo-router/src/services/supergraph/service.rs b/apollo-router/src/services/supergraph/service.rs index 34d29969b4..e1cb54225f 100644 --- a/apollo-router/src/services/supergraph/service.rs +++ b/apollo-router/src/services/supergraph/service.rs @@ -590,7 +590,7 @@ impl PluggableSupergraphServiceBuilder { .layer(SubscriptionExecutionLayer::new( configuration.notify.clone(), )) - .buffered() + .buffered("execution", vec![]) .service(execution_service_factory.create()) .boxed_clone(); @@ -605,6 +605,7 @@ impl PluggableSupergraphServiceBuilder { AllowOnlyHttpPostMutationsLayer::default().layer(supergraph_service); let sb = UnconstrainedBuffer::new( + "factory", ServiceBuilder::new() .layer(content_negotiation::SupergraphLayer::default()) .service( @@ -617,6 +618,7 @@ impl PluggableSupergraphServiceBuilder { ) .boxed(), DEFAULT_BUFFER_SIZE, + vec![KeyValue::new("factory.name", "supergraph_creator")], ); Ok(SupergraphCreator { diff --git a/examples/async-auth/rust/src/allow_client_id_from_file.rs b/examples/async-auth/rust/src/allow_client_id_from_file.rs index 39a0efe247..c2e95051c2 100644 --- a/examples/async-auth/rust/src/allow_client_id_from_file.rs +++ b/examples/async-auth/rust/src/allow_client_id_from_file.rs @@ -150,7 +150,7 @@ impl Plugin for AllowClientIdFromFile { // or ControlFlow::Break(response) with a crafted response if we don't want the request to go through. ServiceBuilder::new() .checkpoint_async(handler) - .buffered() + .buffered("supergraph", vec![]) .service(service) .boxed() }