diff --git a/.changesets/feat_jc_rhai_duration_metric.md b/.changesets/feat_jc_rhai_duration_metric.md new file mode 100644 index 0000000000..ce7f3b138b --- /dev/null +++ b/.changesets/feat_jc_rhai_duration_metric.md @@ -0,0 +1,9 @@ +### Emit `apollo.router.operations.rhai.duration` histogram metric for Rhai script callbacks + +A new `apollo.router.operations.rhai.duration` histogram metric (unit: `s`, value type: `f64`) is now emitted for every Rhai script callback execution across all pipeline stages. This mirrors the existing `apollo.router.operations.coprocessor.duration` metric. + +Attributes on each datapoint: +- `rhai.stage` — the pipeline stage (e.g. `RouterRequest`, `SubgraphResponse`) +- `rhai.succeeded` — `true` if the callback returned without throwing + +By [@theJC](https://github.com/theJC) in https://github.com/apollographql/router/pull/9072 diff --git a/apollo-router/src/plugins/rhai/mod.rs b/apollo-router/src/plugins/rhai/mod.rs index b227b4ca53..b91d6e48a8 100644 --- a/apollo-router/src/plugins/rhai/mod.rs +++ b/apollo-router/src/plugins/rhai/mod.rs @@ -4,6 +4,7 @@ use std::fmt; use std::ops::ControlFlow; use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; use futures::StreamExt; use futures::future::ready; @@ -21,6 +22,7 @@ use rhai::Scope; use rhai::Shared; use schemars::JsonSchema; use serde::Deserialize; +use strum::Display; use tower::BoxError; use tower::ServiceBuilder; use tower::ServiceExt; @@ -38,6 +40,19 @@ mod engine; pub(crate) const RHAI_SPAN_NAME: &str = "rhai_plugin"; +/// Pipeline stage at which a Rhai script callback was invoked. +#[derive(Clone, Copy, Debug, Display)] +enum RhaiStage { + RouterRequest, + RouterResponse, + SupergraphRequest, + SupergraphResponse, + ExecutionRequest, + ExecutionResponse, + SubgraphRequest, + SubgraphResponse, +} + mod execution; mod router; mod subgraph; @@ -202,7 +217,7 @@ pub(crate) enum ServiceStep { // Actually use the checkpoint function so that we can shortcut requests which fail macro_rules! gen_map_request { - ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { + ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => { $borrow.replace(|service| { fn rhai_service_span() -> impl Fn(&$base::Request) -> tracing::Span + Clone { move |_request: &$base::Request| { @@ -217,8 +232,12 @@ macro_rules! gen_map_request { .instrument(rhai_service_span()) .checkpoint(move |request: $base::Request| { let shared_request = Shared::new(Mutex::new(Some(request))); - let result: Result> = - execute(&$rhai_service, &$callback, (shared_request.clone(),)); + let result: Result> = execute( + &$rhai_service, + $stage, + &$callback, + (shared_request.clone(),), + ); if let Err(error) = result { let error_details = process_error(error); if error_details.body.is_none() { @@ -241,7 +260,7 @@ macro_rules! gen_map_request { // Actually use the checkpoint function so that we can shortcut requests which fail macro_rules! gen_map_router_deferred_request { - ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { + ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => { $borrow.replace(|service| { fn rhai_service_span() -> impl Fn(&$base::Request) -> tracing::Span + Clone { move |_request: &$base::Request| { @@ -254,7 +273,7 @@ macro_rules! gen_map_router_deferred_request { } ServiceBuilder::new() .instrument(rhai_service_span()) - .checkpoint( move |chunked_request: $base::Request| { + .checkpoint(move |chunked_request: $base::Request| { // we split the request stream into headers+first body chunk, then a stream of chunks // for which we will implement mapping later let $base::Request { router_request, context } = chunked_request; @@ -268,7 +287,7 @@ macro_rules! gen_map_router_deferred_request { ), }; let shared_request = Shared::new(Mutex::new(Some(request))); - let result = execute(&$rhai_service, &$callback, (shared_request.clone(),)); + let result = execute(&$rhai_service, $stage, &$callback, (shared_request.clone(),)); if let Err(error) = result { let error_details = process_error(error); @@ -312,6 +331,7 @@ macro_rules! gen_map_router_deferred_request { let result = execute( &rhai_service, + $stage, &callback, (shared_request.clone(),), ); @@ -351,13 +371,17 @@ macro_rules! gen_map_router_deferred_request { } macro_rules! gen_map_response { - ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { + ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => { $borrow.replace(|service| { service .map_response(move |response: $base::Response| { let shared_response = Shared::new(Mutex::new(Some(response))); - let result: Result> = - execute(&$rhai_service, &$callback, (shared_response.clone(),)); + let result: Result> = execute( + &$rhai_service, + $stage, + &$callback, + (shared_response.clone(),), + ); if let Err(error) = result { let error_details = process_error(error); @@ -386,7 +410,7 @@ macro_rules! gen_map_response { // I can't easily unify the macros because the router response processing is quite different to // other service in terms of payload. macro_rules! gen_map_router_deferred_response { - ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { + ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => { $borrow.replace(|service| { BoxService::new(service.and_then( |mapped_response: $base::Response| async move { @@ -405,10 +429,14 @@ macro_rules! gen_map_router_deferred_response { }; let shared_response = Shared::new(Mutex::new(Some(response))); - let result = - execute(&$rhai_service, &$callback, (shared_response.clone(),)); - if let Err(error) = result { + let result = execute( + &$rhai_service, + $stage, + &$callback, + (shared_response.clone(),), + ); + if let Err(error) = result { let error_details = process_error(error); if error_details.body.is_none() { tracing::error!("map_request callback failed: {error_details:#?}"); @@ -451,6 +479,7 @@ macro_rules! gen_map_router_deferred_response { let result = execute( &rhai_service, + $stage, &callback, (shared_response.clone(),), ); @@ -492,7 +521,7 @@ macro_rules! gen_map_router_deferred_response { } macro_rules! gen_map_deferred_response { - ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => { + ($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => { $borrow.replace(|service| { BoxService::new(service.and_then( |mapped_response: $base::Response| async move { @@ -525,8 +554,13 @@ macro_rules! gen_map_deferred_response { }; let shared_response = Shared::new(Mutex::new(Some(response))); - let result = - execute(&$rhai_service, &$callback, (shared_response.clone(),)); + let result = execute( + &$rhai_service, + $stage, + + &$callback, + (shared_response.clone(),), + ); if let Err(error) = result { let error_details = process_error(error); if error_details.body.is_none() { @@ -561,6 +595,7 @@ macro_rules! gen_map_deferred_response { let result = execute( &rhai_service, + $stage, &callback, (shared_response.clone(),), ); @@ -606,16 +641,40 @@ impl ServiceStep { fn map_request(&mut self, rhai_service: RhaiService, callback: FnPtr) { match self { ServiceStep::Router(service) => { - gen_map_router_deferred_request!(router, service, rhai_service, callback); + gen_map_router_deferred_request!( + router, + service, + rhai_service, + callback, + RhaiStage::RouterRequest + ); } ServiceStep::Supergraph(service) => { - gen_map_request!(supergraph, service, rhai_service, callback); + gen_map_request!( + supergraph, + service, + rhai_service, + callback, + RhaiStage::SupergraphRequest + ); } ServiceStep::Execution(service) => { - gen_map_request!(execution, service, rhai_service, callback); + gen_map_request!( + execution, + service, + rhai_service, + callback, + RhaiStage::ExecutionRequest + ); } ServiceStep::Subgraph(service) => { - gen_map_request!(subgraph, service, rhai_service, callback); + gen_map_request!( + subgraph, + service, + rhai_service, + callback, + RhaiStage::SubgraphRequest + ); } } } @@ -623,16 +682,40 @@ impl ServiceStep { fn map_response(&mut self, rhai_service: RhaiService, callback: FnPtr) { match self { ServiceStep::Router(service) => { - gen_map_router_deferred_response!(router, service, rhai_service, callback); + gen_map_router_deferred_response!( + router, + service, + rhai_service, + callback, + RhaiStage::RouterResponse + ); } ServiceStep::Supergraph(service) => { - gen_map_deferred_response!(supergraph, service, rhai_service, callback); + gen_map_deferred_response!( + supergraph, + service, + rhai_service, + callback, + RhaiStage::SupergraphResponse + ); } ServiceStep::Execution(service) => { - gen_map_deferred_response!(execution, service, rhai_service, callback); + gen_map_deferred_response!( + execution, + service, + rhai_service, + callback, + RhaiStage::ExecutionResponse + ); } ServiceStep::Subgraph(service) => { - gen_map_response!(subgraph, service, rhai_service, callback); + gen_map_response!( + subgraph, + service, + rhai_service, + callback, + RhaiStage::SubgraphResponse + ); } } } @@ -702,19 +785,45 @@ fn process_error(error: Box) -> ErrorDetails { error_details } +/// Execute a Rhai callback for a pipeline service stage. +/// +/// Emits a metric recording the time spent executing the Rhai script. fn execute( rhai_service: &RhaiService, + stage: RhaiStage, callback: &FnPtr, args: impl FuncArgs, ) -> Result> { - if callback.is_curried() { + let start = Instant::now(); + + let result = if callback.is_curried() { callback.call(&rhai_service.engine, &rhai_service.ast, args) } else { let mut guard = rhai_service.scope.lock(); rhai_service .engine .call_fn(&mut guard, &rhai_service.ast, callback.fn_name(), args) - } + }; + + let duration = start.elapsed(); + + record_rhai_execution(stage, duration, result.is_ok()); + + result +} + +fn record_rhai_execution(stage: RhaiStage, duration: Duration, succeeded: bool) { + let duration = duration.as_secs_f64(); + let stage = stage.to_string(); + + f64_histogram_with_unit!( + "apollo.router.operations.rhai.duration", + "Time spent executing a Rhai script callback, in seconds", + "s", + duration, + "rhai.stage" = stage, + "rhai.succeeded" = succeeded + ); } register_plugin!("apollo", "rhai", Rhai); diff --git a/apollo-router/src/plugins/rhai/tests.rs b/apollo-router/src/plugins/rhai/tests.rs index 4e015cd0b7..04973688cf 100644 --- a/apollo-router/src/plugins/rhai/tests.rs +++ b/apollo-router/src/plugins/rhai/tests.rs @@ -4,6 +4,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::SystemTime; +use futures::StreamExt; use http::HeaderMap; use http::HeaderValue; use http::Method; @@ -31,6 +32,7 @@ use crate::graphql; use crate::graphql::Error; use crate::graphql::Request; use crate::http_ext; +use crate::metrics::FutureMetricsExt; use crate::plugin::DynPlugin; use crate::plugin::test::MockExecutionService; use crate::plugin::test::MockRouterService; @@ -48,6 +50,7 @@ use crate::services::ExecutionRequest; use crate::services::SubgraphRequest; use crate::services::SupergraphRequest; use crate::services::SupergraphResponse; +use crate::services::router; use crate::test_harness::tracing_test; // There is a lot of repetition in these tests, so I've tried to reduce that with these two @@ -1179,3 +1182,286 @@ async fn test_complex_property_chain() { .await .expect("test failed - complex property chains should work"); } + +#[tokio::test] +async fn test_rhai_metric_router_request() { + async { + let mut mock_service = MockRouterService::new(); + mock_service + .expect_call() + .times(1) + .returning(move |req: router::Request| { + Ok(router::Response::fake_builder() + .context(req.context) + .build() + .unwrap()) + }); + + let dyn_plugin: Box = crate::plugin::plugins() + .find(|factory| factory.name == "apollo.rhai") + .expect("Plugin not found") + .create_instance_without_schema( + &serde_json::Value::from_str( + r#"{"scripts":"tests/fixtures", "main":"test_metrics.rhai"}"#, + ) + .unwrap(), + ) + .await + .unwrap(); + let mut router_service = dyn_plugin.router_service(BoxService::new(mock_service)); + let req = router::Request::fake_builder().build().unwrap(); + let _ = router_service.ready().await.unwrap().call(req).await; + + assert_histogram_count!( + "apollo.router.operations.rhai.duration", + 1, + "rhai.stage" = "RouterRequest", + "rhai.succeeded" = true + ); + } + .with_metrics() + .await; +} + +#[tokio::test] +async fn test_rhai_metric_supergraph_request() { + async { + let mut mock_service = MockSupergraphService::new(); + mock_service + .expect_call() + .times(1) + .returning(move |req: SupergraphRequest| { + Ok(SupergraphResponse::fake_builder() + .context(req.context) + .build() + .unwrap()) + }); + + let dyn_plugin: Box = crate::plugin::plugins() + .find(|factory| factory.name == "apollo.rhai") + .expect("Plugin not found") + .create_instance_without_schema( + &serde_json::Value::from_str( + r#"{"scripts":"tests/fixtures", "main":"test_metrics.rhai"}"#, + ) + .unwrap(), + ) + .await + .unwrap(); + let mut router_service = dyn_plugin.supergraph_service(BoxService::new(mock_service)); + let req = SupergraphRequest::fake_builder().build().unwrap(); + let _ = router_service.ready().await.unwrap().call(req).await; + + assert_histogram_count!( + "apollo.router.operations.rhai.duration", + 1, + "rhai.stage" = "SupergraphRequest", + "rhai.succeeded" = true + ); + } + .with_metrics() + .await; +} + +#[tokio::test] +async fn test_rhai_metric_subgraph_request() { + async { + let mut mock_service = crate::plugin::test::MockSubgraphService::new(); + mock_service + .expect_call() + .times(1) + .returning(move |req: SubgraphRequest| { + Ok(subgraph::Response::fake_builder() + .context(req.context) + .build()) + }); + + let dyn_plugin: Box = crate::plugin::plugins() + .find(|factory| factory.name == "apollo.rhai") + .expect("Plugin not found") + .create_instance_without_schema( + &serde_json::Value::from_str( + r#"{"scripts":"tests/fixtures", "main":"test_metrics.rhai"}"#, + ) + .unwrap(), + ) + .await + .unwrap(); + let mut router_service = dyn_plugin.subgraph_service("test", BoxService::new(mock_service)); + let req = SubgraphRequest::fake_builder().build(); + let _ = router_service.ready().await.unwrap().call(req).await; + + assert_histogram_count!( + "apollo.router.operations.rhai.duration", + 1, + "rhai.stage" = "SubgraphRequest", + "rhai.succeeded" = true + ); + } + .with_metrics() + .await; +} + +#[tokio::test] +async fn test_rhai_metric_failed_callback() { + async { + let mut mock_service = MockSupergraphService::new(); + // The supergraph_service in test_metrics_fail.rhai throws, so we might never call the mock + mock_service + .expect_call() + .times(0..=1) + .returning(move |req: SupergraphRequest| { + Ok(SupergraphResponse::fake_builder() + .context(req.context) + .build() + .unwrap()) + }); + + let dyn_plugin: Box = crate::plugin::plugins() + .find(|factory| factory.name == "apollo.rhai") + .expect("Plugin not found") + .create_instance_without_schema( + &serde_json::Value::from_str( + r#"{"scripts":"tests/fixtures", "main":"test_metrics_fail.rhai"}"#, + ) + .unwrap(), + ) + .await + .unwrap(); + let mut router_service = dyn_plugin.supergraph_service(BoxService::new(mock_service)); + let req = SupergraphRequest::fake_builder().build().unwrap(); + let _ = router_service.ready().await.unwrap().call(req).await; + + assert_histogram_count!( + "apollo.router.operations.rhai.duration", + 1, + "rhai.stage" = "SupergraphRequest", + "rhai.succeeded" = false + ); + } + .with_metrics() + .await; +} + +#[tokio::test] +async fn test_rhai_metric_no_callback_no_emission() { + async { + let mut mock_service = MockSupergraphService::new(); + mock_service.expect_call().never(); + + let dyn_plugin: Box = crate::plugin::plugins() + .find(|factory| factory.name == "apollo.rhai") + .expect("Plugin not found") + .create_instance_without_schema( + &serde_json::Value::from_str( + r#"{"scripts":"tests/fixtures", "main":"test_metrics_empty.rhai"}"#, + ) + .unwrap(), + ) + .await + .unwrap(); + // No supergraph_service callback registered — plugin returns original service unchanged + // and no metric is emitted + let _service = dyn_plugin.supergraph_service(BoxService::new(mock_service)); + + assert_histogram_not_exists!( + "apollo.router.operations.rhai.duration", + f64, + "rhai.stage" = "SupergraphRequest" + ); + } + .with_metrics() + .await; +} + +#[tokio::test] +async fn test_rhai_metric_subgraph_response() { + async { + let mut mock_service = MockSubgraphService::new(); + mock_service + .expect_call() + .times(1) + .returning(move |req: SubgraphRequest| { + Ok(subgraph::Response::fake_builder() + .context(req.context) + .build()) + }); + + let dyn_plugin: Box = crate::plugin::plugins() + .find(|factory| factory.name == "apollo.rhai") + .expect("Plugin not found") + .create_instance_without_schema( + &serde_json::Value::from_str( + r#"{"scripts":"tests/fixtures", "main":"test_metrics_response.rhai"}"#, + ) + .unwrap(), + ) + .await + .unwrap(); + let mut router_service = dyn_plugin.subgraph_service("test", BoxService::new(mock_service)); + let req = SubgraphRequest::fake_builder().build(); + let _ = router_service.ready().await.unwrap().call(req).await; + + assert_histogram_count!( + "apollo.router.operations.rhai.duration", + 1, + "rhai.stage" = "SubgraphResponse", + "rhai.succeeded" = true + ); + } + .with_metrics() + .await; +} + +#[tokio::test] +async fn test_rhai_metric_deferred_response_causes_multiple_executions() { + async { + let ctx = Context::default(); + let deferred_response = SupergraphResponse::fake_stream_builder() + .responses(vec![ + graphql::Response::builder().build(), + graphql::Response::builder().build(), + ]) + .context(ctx.clone()) + .build() + .unwrap(); + + let mut mock_service = MockSupergraphService::new(); + mock_service + .expect_call() + .times(1) + .return_once(move |_req: SupergraphRequest| Ok(deferred_response)); + + let dyn_plugin: Box = crate::plugin::plugins() + .find(|factory| factory.name == "apollo.rhai") + .expect("Plugin not found") + .create_instance_without_schema( + &serde_json::Value::from_str( + r#"{"scripts":"tests/fixtures", "main":"test_metrics_response.rhai"}"#, + ) + .unwrap(), + ) + .await + .unwrap(); + let mut router_service = dyn_plugin.supergraph_service(BoxService::new(mock_service)); + let req = SupergraphRequest::fake_builder().build().unwrap(); + let resp = router_service + .ready() + .await + .unwrap() + .call(req) + .await + .unwrap(); + // Drive the response stream to completion so the deferred-chunk metric fires. + let _chunks: Vec<_> = resp.response.into_body().collect().await; + + assert_histogram_count!( + "apollo.router.operations.rhai.duration", + 2, + "rhai.stage" = "SupergraphResponse", + "rhai.succeeded" = true + ); + } + .with_metrics() + .await; +} diff --git a/apollo-router/tests/fixtures/test_metrics.rhai b/apollo-router/tests/fixtures/test_metrics.rhai new file mode 100644 index 0000000000..97d21b7978 --- /dev/null +++ b/apollo-router/tests/fixtures/test_metrics.rhai @@ -0,0 +1,20 @@ +fn router_service(service) { + let request_callback = |request| { + request + }; + service.map_request(request_callback); +} + +fn supergraph_service(service) { + let request_callback = |request| { + request + }; + service.map_request(request_callback); +} + +fn subgraph_service(service, name) { + let request_callback = |request| { + request + }; + service.map_request(request_callback); +} diff --git a/apollo-router/tests/fixtures/test_metrics_empty.rhai b/apollo-router/tests/fixtures/test_metrics_empty.rhai new file mode 100644 index 0000000000..5e96bfb2ea --- /dev/null +++ b/apollo-router/tests/fixtures/test_metrics_empty.rhai @@ -0,0 +1 @@ +// No service callbacks registered diff --git a/apollo-router/tests/fixtures/test_metrics_fail.rhai b/apollo-router/tests/fixtures/test_metrics_fail.rhai new file mode 100644 index 0000000000..d95661358d --- /dev/null +++ b/apollo-router/tests/fixtures/test_metrics_fail.rhai @@ -0,0 +1,6 @@ +fn supergraph_service(service) { + let request_callback = |request| { + throw "intentional test failure"; + }; + service.map_request(request_callback); +} diff --git a/apollo-router/tests/fixtures/test_metrics_response.rhai b/apollo-router/tests/fixtures/test_metrics_response.rhai new file mode 100644 index 0000000000..b79da4ea45 --- /dev/null +++ b/apollo-router/tests/fixtures/test_metrics_response.rhai @@ -0,0 +1,13 @@ +fn subgraph_service(service, name) { + let response_callback = |response| { + response + }; + service.map_response(response_callback); +} + +fn supergraph_service(service) { + let response_callback = |response| { + response + }; + service.map_response(response_callback); +} diff --git a/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx b/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx index 1a41e0f3ed..28c9fa5814 100644 --- a/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx +++ b/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx @@ -158,6 +158,12 @@ The `apollo.router.cache.redis.errors` metric also includes an `error_type` attr - `apollo.router.operations.coprocessor.duration` - Time spent waiting for the coprocessor to answer, in seconds. - `coprocessor.stage`: string (`RouterRequest`, `RouterResponse`, `SubgraphRequest`, `SubgraphResponse`) +## Rhai + +- `apollo.router.operations.rhai.duration` - Time spent executing a Rhai script callback, in seconds + - `rhai.stage`: string (`RouterRequest`, `RouterResponse`, `SupergraphRequest`, `SupergraphResponse`, `ExecutionRequest`, `ExecutionResponse`, `SubgraphRequest`, `SubgraphResponse`) + - `rhai.succeeded`: bool + ## Performance - `apollo_router_schema_load_duration` - Time spent loading the schema in seconds.