Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions apollo-router/src/axum_factory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ use crate::services::RouterResponse;
use crate::services::SupergraphResponse;
use crate::services::layers::static_page::home_page_content;
use crate::services::layers::static_page::sandbox_page_content;
use crate::services::new_service::ServiceFactory;
use crate::services::router;
use crate::services::router::pipeline_handle::PipelineRef;
use crate::test_harness::http_client;
Expand Down Expand Up @@ -144,20 +143,10 @@ struct TestRouterFactory {
inner: MockRouterServiceType,
}

impl ServiceFactory<router::Request> for TestRouterFactory {
type Service = MockRouterServiceType;

fn create(&self) -> Self::Service {
self.inner.clone()
}
}

impl RouterFactory for TestRouterFactory {
type RouterService = MockRouterServiceType;

type Future = <<TestRouterFactory as ServiceFactory<router::Request>>::Service as Service<
router::Request,
>>::Future;
fn create(&self) -> router::BoxCloneService {
self.inner.clone().boxed_clone()
}

fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
MultiMap::new()
Expand Down
16 changes: 14 additions & 2 deletions apollo-router/src/layers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ pub trait ServiceBuilderExt<L>: Sized {

/// Decide if processing should continue or not, and if not allow returning of a response.
/// Unlike checkpoint it is possible to perform async operations in the callback. However
/// this requires that the service is `Clone`. This can be achieved using `.buffered()`.
/// the resulting service requires `S: Clone`. Since `BoxCloneService` is already `Clone`,
/// a `.buffered()` call is no longer needed when wrapping a `BoxCloneService`.
///
/// This is useful for things like authentication where you need to make an external call to
/// check if a request should proceed or not.
Expand Down Expand Up @@ -158,7 +159,18 @@ pub trait ServiceBuilderExt<L>: Sized {

/// Adds a buffer to the service stack with a default size.
///
/// This is useful for making services `Clone` and `Send`
/// The buffer spawns a dedicated worker task and queues requests in an in-memory channel.
/// The primary reasons to include a buffer are:
///
/// - **Backpressure**: callers block (rather than failing immediately) when the inner
/// service is busy processing previous requests.
/// - **`LoadShed` / `ConcurrencyLimit` / `RateLimit` interaction**: these layers
/// signal overload by returning `Poll::Pending` from `poll_ready`. A buffer placed
/// *before* them absorbs that pending state and prevents Tokio's cooperative-scheduling
/// budget from causing spurious `Overloaded` responses.
///
/// Now that pipeline services are `BoxCloneService`, a buffer is **no longer needed
/// merely to make a service `Clone` or `Send`**.
///
/// # Examples
///
Expand Down
2 changes: 2 additions & 0 deletions apollo-router/src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,8 @@ macro_rules! register_private_plugin {
/// Handler represents a [`Plugin`] endpoint.
#[derive(Clone)]
pub(crate) struct Handler {
// BoxCloneService is Send but not Sync. The buffer's handle (Arc + Sender) is Sync,
// which is required for axum's route_service handler to be usable across threads.
service: UnconstrainedBuffer<
router::Request,
<router::BoxCloneService as Service<router::Request>>::Future,
Expand Down
6 changes: 1 addition & 5 deletions apollo-router/src/plugins/cache/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use crate::graphql::Error;
use crate::json_ext::Object;
use crate::json_ext::Path;
use crate::json_ext::PathElement;
use crate::layers::ServiceBuilderExt;
use crate::plugin::PluginInit;
use crate::plugin::PluginPrivate;
use crate::plugins::authorization::CacheKeyMetadata;
Expand Down Expand Up @@ -407,10 +406,7 @@ impl PluginPrivate for EntityCache {
response
})
.service(CacheService {
service: ServiceBuilder::new()
.buffered()
.service(service)
.boxed_clone(),
service,
entity_type: self.entity_type.clone(),
name: name.to_string(),
storage,
Expand Down
7 changes: 1 addition & 6 deletions apollo-router/src/plugins/cache/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ use http::header;
use parking_lot::Mutex;
use serde_json_bytes::Value;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt;
use tower_service::Service;

use super::entity::REPRESENTATIONS;
use super::entity::Ttl;
use super::entity::hash_query;
use super::entity::hash_vary_headers;
use crate::layers::ServiceBuilderExt;
use crate::services::subgraph;
use crate::spec::TYPENAME;

Expand All @@ -32,10 +30,7 @@ impl CacheMetricsService {
separate_per_type: bool,
) -> subgraph::BoxCloneService {
tower::util::BoxCloneService::new(CacheMetricsService {
service: ServiceBuilder::new()
.buffered()
.service(service)
.boxed_clone(),
service,
name: Arc::new(name),
counter: Some(Arc::new(Mutex::new(CacheCounter::new(
ttl.map(|t| t.0).unwrap_or_else(|| Duration::from_secs(60)),
Expand Down
11 changes: 4 additions & 7 deletions apollo-router/src/plugins/cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::MockedSubgraphs;
use crate::TestHarness;
use crate::cache::redis::RedisCacheStorage;
use crate::plugin::test::MockSubgraph;
use crate::plugin::test::MockSubgraphService;
use crate::plugins::cache::entity::CONTEXT_CACHE_KEYS;
use crate::plugins::cache::entity::CacheKeyContext;
use crate::plugins::cache::entity::CacheKeysContext;
Expand Down Expand Up @@ -964,12 +963,10 @@ async fn no_data() {
.extra_private_plugin(entity_cache)
.subgraph_hook(|name, service| {
if name == "orga" {
let mut subgraph = MockSubgraphService::new();
subgraph
.expect_call()
.times(1)
.returning(move |_req: subgraph::Request| Err("orga not found".into()));
subgraph.boxed_clone()
tower::service_fn(|_req: subgraph::Request| async {
Err::<subgraph::Response, _>("orga not found".into())
})
.boxed_clone()
Comment on lines +966 to +969
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to double check this change (and the similar one in response_cache) is OK as an expectation is being removed

} else {
service
}
Expand Down
2 changes: 1 addition & 1 deletion apollo-router/src/plugins/connectors/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use crate::metrics::FutureMetricsExt;
use crate::plugins::connectors::tests::req_asserts::Plan;
use crate::plugins::telemetry::consts::CONNECT_SPAN_NAME;
use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
use crate::router_factory::RouterFactory;
use crate::router_factory::RouterSuperServiceFactory;
use crate::router_factory::YamlRouterFactory;
use crate::services::new_service::ServiceFactory;
use crate::services::router::Request;
use crate::services::supergraph;
use crate::uplink::license_enforcement::LicenseState;
Expand Down
1 change: 0 additions & 1 deletion apollo-router/src/plugins/coprocessor/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ impl ConnectorStage {
.instrument(external_service_span())
.option_layer(request_layer)
.option_layer(response_layer)
.buffered()
.service(service)
.boxed_clone()
}
Expand Down
1 change: 0 additions & 1 deletion apollo-router/src/plugins/coprocessor/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ impl ExecutionStage {
.instrument(external_service_span())
.option_layer(request_layer)
.option_layer(response_layer)
.buffered() // XXX: Added during backpressure fixing
.service(service)
.boxed_clone()
}
Expand Down
2 changes: 0 additions & 2 deletions apollo-router/src/plugins/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,6 @@ impl RouterStage {
.instrument(external_service_span())
.option_layer(request_layer)
.option_layer(response_layer)
.buffered() // XXX: Added during backpressure fixing
.service(service)
.boxed_clone()
}
Expand Down Expand Up @@ -921,7 +920,6 @@ impl SubgraphStage {
.instrument(external_service_span())
.option_layer(request_layer)
.option_layer(response_layer)
.buffered() // XXX: Added during backpressure fixing
.service(service)
.boxed_clone()
}
Expand Down
1 change: 0 additions & 1 deletion apollo-router/src/plugins/coprocessor/supergraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ impl SupergraphStage {
.instrument(external_service_span())
.option_layer(request_layer)
.option_layer(response_layer)
.buffered() // XXX: Added during backpressure fixing
.service(service)
.boxed_clone()
}
Expand Down
3 changes: 0 additions & 3 deletions apollo-router/src/plugins/file_uploads/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ impl PluginPrivate for FileUploadsPlugin {
}
.boxed()
})
.buffered()
.service(service)
.boxed_clone()
}
Expand Down Expand Up @@ -108,7 +107,6 @@ impl PluginPrivate for FileUploadsPlugin {
}
.boxed()
})
.buffered()
.service(service)
.boxed_clone()
}
Expand Down Expand Up @@ -149,7 +147,6 @@ impl PluginPrivate for FileUploadsPlugin {
.map(|req| Ok(ControlFlow::Continue(req)))
.boxed()
})
.buffered()
.service(service)
.boxed_clone()
}
Expand Down
2 changes: 2 additions & 0 deletions apollo-router/src/plugins/license_enforcement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ impl PluginPrivate for LicenseEnforcement {
}

fn router_service(&self, service: router::BoxCloneService) -> router::BoxCloneService {
// Buffer required before load_shed() for correct cooperative-scheduling behaviour
// (see ServiceBuilderExt::buffered).
ServiceBuilder::new()
.buffered()
.map_future_with_request_data(
Expand Down
11 changes: 10 additions & 1 deletion apollo-router/src/plugins/limits/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,22 @@ where
}
}

#[derive(Clone)]
pub(crate) struct RequestBodyLimit<Body, S> {
_phantom: std::marker::PhantomData<Body>,
inner: S,
initial_limit: usize,
}

impl<Body, S: Clone> Clone for RequestBodyLimit<Body, S> {
fn clone(&self) -> Self {
Self {
_phantom: std::marker::PhantomData,
inner: self.inner.clone(),
initial_limit: self.initial_limit,
}
}
}

impl<Body, S> RequestBodyLimit<Body, S>
where
S: Service<http::request::Request<super::limited::Limited<Body>>>,
Expand Down
1 change: 0 additions & 1 deletion apollo-router/src/plugins/limits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ impl Plugin for LimitsPlugin {

fn router_service(&self, service: router::BoxCloneService) -> router::BoxCloneService {
ServiceBuilder::new()
.buffered()
.map_future_with_request_data(
|r: &router::Request| r.context.clone(),
|ctx, f| async { Self::map_error_to_graphql(f.await, ctx) },
Expand Down
6 changes: 1 addition & 5 deletions apollo-router/src/plugins/response_cache/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use crate::graphql::Error;
use crate::json_ext::Object;
use crate::json_ext::Path;
use crate::json_ext::PathElement;
use crate::layers::ServiceBuilderExt;
use crate::plugin::PluginInit;
use crate::plugin::PluginPrivate;
use crate::plugins::authorization::CacheKeyMetadata;
Expand Down Expand Up @@ -454,10 +453,7 @@ impl PluginPrivate for ResponseCache {
response
})
.service(CacheService {
service: ServiceBuilder::new()
.buffered()
.service(service)
.boxed_clone(),
service,
entity_type: self.entity_type.clone(),
name: name.to_string(),
storage: self.storage.clone(),
Expand Down
11 changes: 4 additions & 7 deletions apollo-router/src/plugins/response_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::configuration::subgraph::SubgraphConfiguration;
use crate::graphql;
use crate::metrics::FutureMetricsExt;
use crate::plugin::test::MockSubgraph;
use crate::plugin::test::MockSubgraphService;
use crate::plugins::response_cache::debugger::CacheKeysContext;
use crate::plugins::response_cache::invalidation::InvalidationRequest;
use crate::plugins::response_cache::invalidation_endpoint::SubgraphInvalidationConfig;
Expand Down Expand Up @@ -2622,12 +2621,10 @@ async fn no_data() {
.extra_private_plugin(response_cache)
.subgraph_hook(|name, service| {
if name == "orga" {
let mut subgraph = MockSubgraphService::new();
subgraph
.expect_call()
.times(1)
.returning(move |_req: subgraph::Request| Err("orga not found".into()));
subgraph.boxed_clone()
tower::service_fn(|_req: subgraph::Request| async {
Err::<subgraph::Response, _>("orga not found".into())
})
.boxed_clone()
} else {
service
}
Expand Down
18 changes: 14 additions & 4 deletions apollo-router/src/plugins/traffic_shaping/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,10 @@ impl PluginPrivate for TrafficShaping {

fn router_service(&self, service: router::BoxCloneService) -> router::BoxCloneService {
// NB: consider each triplet (map_future_with_request_data, load_shed, layer) as a unit of
// behavior
// behavior.
// The outer buffer before the first load_shed() is required for correct cooperative-
// scheduling behaviour: without it, Tokio's budget can cause poll_ready to return
// Pending spuriously and the load_shed would emit false Overloaded errors.
ServiceBuilder::new()
.buffered()
.map_future_with_request_data(
Expand Down Expand Up @@ -433,6 +436,10 @@ impl PluginPrivate for TrafficShaping {
.clone()
});

// Outer buffer: required before load_shed() for correct cooperative-scheduling
// behaviour (see router_service above for the full explanation).
// Inner buffer (below): provides a backpressure surface so that RateLimitLayer
// can return poll_ready Pending and LoadShed will actually shed that load.
ServiceBuilder::new()
.buffered()
.map_future_with_request_data(
Expand Down Expand Up @@ -514,6 +521,9 @@ impl PluginPrivate for TrafficShaping {
.clone()
});

// Outer buffer: required before load_shed() for correct cooperative-scheduling
// behaviour (see router_service above for the full explanation).
// Inner buffer (below): provides the backpressure surface for RateLimitLayer.
ServiceBuilder::new()
.buffered()
.map_future_with_request_data(
Expand Down Expand Up @@ -803,9 +813,9 @@ mod test {
builder = builder.with_plugins(plugins);

let builder = builder
.with_subgraph_service("accounts", account_service.clone())
.with_subgraph_service("reviews", review_service.clone())
.with_subgraph_service("products", product_service.clone());
.with_subgraph_service("accounts", account_service.boxed_clone())
.with_subgraph_service("reviews", review_service.boxed_clone())
.with_subgraph_service("products", product_service.boxed_clone());

let supergraph_creator = builder.build().await.expect("should build");

Expand Down
11 changes: 5 additions & 6 deletions apollo-router/src/query_planner/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ use crate::services::FetchRequest;
use crate::services::fetch;
use crate::services::fetch::ErrorMapping;
use crate::services::fetch::SubscriptionRequest;
use crate::services::fetch_service::FetchServiceFactory;
use crate::services::new_service::ServiceFactory;
use crate::services::fetch_service::FetchService;
use crate::spec::Query;
use crate::spec::Schema;

Expand All @@ -55,7 +54,7 @@ impl QueryPlan {
pub(crate) async fn execute<'a>(
&self,
context: &'a Context,
service_factory: &'a Arc<FetchServiceFactory>,
service_factory: &'a FetchService,
// The original supergraph request is used to populate variable values and for plugin
// features like propagating headers or subgraph telemetry based on supergraph request
// values.
Expand Down Expand Up @@ -117,7 +116,7 @@ impl QueryPlan {
// holds the query plan executon arguments that do not change between calls
pub(crate) struct ExecutionParameters<'a> {
pub(crate) context: &'a Context,
pub(crate) service_factory: &'a Arc<FetchServiceFactory>,
pub(crate) service_factory: &'a FetchService,
pub(crate) schema: &'a Arc<Schema>,
pub(crate) subgraph_schemas: &'a Arc<SubgraphSchemas>,
pub(crate) supergraph_request: &'a Arc<http::Request<Request>>,
Expand Down Expand Up @@ -239,7 +238,7 @@ impl PlanNode {
&None,
) {
Some(variables) => {
let service = parameters.service_factory.create();
let service = parameters.service_factory.clone();
let request = fetch::Request::Subscription(
SubscriptionRequest::builder()
.context(parameters.context.clone())
Expand Down Expand Up @@ -295,7 +294,7 @@ impl PlanNode {
) {
Some(variables) => {
let paths = variables.inverted_paths.clone();
let service = parameters.service_factory.create();
let service = parameters.service_factory.clone();
let request = fetch::Request::Fetch(
FetchRequest::builder()
.context(parameters.context.clone())
Expand Down
Loading