Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changesets/feat_jc_rhai_duration_metric.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
### Emit `apollo.router.operations.rhai.duration` histogram metric for Rhai script callbacks

A new `apollo.router.operations.rhai.duration` histogram metric (unit: `s`, value type: `f64`) is now emitted for every Rhai script callback execution across all pipeline stages. This mirrors the existing `apollo.router.operations.coprocessor.duration` metric.

Attributes on each datapoint:
- `rhai.stage` — the pipeline stage (e.g. `RouterRequest`, `SubgraphResponse`)
- `rhai.succeeded` — `true` if the callback returned without throwing
- `rhai.is_deferred` — present on response stages. `true` for `@defer` and subscription data chunks, `false` for the primary or initial response.

By [@theJC](https://github.com/theJC) in https://github.com/apollographql/router/pull/9072
195 changes: 170 additions & 25 deletions apollo-router/src/plugins/rhai/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fmt;
use std::ops::ControlFlow;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use futures::StreamExt;
use futures::future::ready;
Expand All @@ -21,6 +22,7 @@ use rhai::Scope;
use rhai::Shared;
use schemars::JsonSchema;
use serde::Deserialize;
use strum::Display;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt;
Expand All @@ -38,6 +40,27 @@ mod engine;

pub(crate) const RHAI_SPAN_NAME: &str = "rhai_plugin";

/// Pipeline stage at which a Rhai script callback was invoked.
#[derive(Clone, Copy, Debug, Display)]
enum RhaiStage {
RouterRequest,
RouterResponse,
SupergraphRequest,
SupergraphResponse,
ExecutionRequest,
ExecutionResponse,
SubgraphRequest,
SubgraphResponse,
}

/// Whether a Rhai callback is executing a primary response chunk or a streaming response chunk
/// (such as with `@defer`red data).
#[derive(Clone, Copy, Debug, Display, PartialEq, Eq)]
enum ResponseChunk {
Primary,
Stream,
}

mod execution;
mod router;
mod subgraph;
Expand Down Expand Up @@ -202,7 +225,7 @@ pub(crate) enum ServiceStep {

// Actually use the checkpoint function so that we can shortcut requests which fail
macro_rules! gen_map_request {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
fn rhai_service_span() -> impl Fn(&$base::Request) -> tracing::Span + Clone {
move |_request: &$base::Request| {
Expand All @@ -217,8 +240,13 @@ macro_rules! gen_map_request {
.instrument(rhai_service_span())
.checkpoint(move |request: $base::Request| {
let shared_request = Shared::new(Mutex::new(Some(request)));
let result: Result<Dynamic, Box<EvalAltResult>> =
execute(&$rhai_service, &$callback, (shared_request.clone(),));
let result: Result<Dynamic, Box<EvalAltResult>> = execute(
&$rhai_service,
$stage,
None,
&$callback,
(shared_request.clone(),),
);
if let Err(error) = result {
let error_details = process_error(error);
if error_details.body.is_none() {
Expand All @@ -241,7 +269,7 @@ macro_rules! gen_map_request {

// Actually use the checkpoint function so that we can shortcut requests which fail
macro_rules! gen_map_router_deferred_request {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
fn rhai_service_span() -> impl Fn(&$base::Request) -> tracing::Span + Clone {
move |_request: &$base::Request| {
Expand All @@ -254,7 +282,7 @@ macro_rules! gen_map_router_deferred_request {
}
ServiceBuilder::new()
.instrument(rhai_service_span())
.checkpoint( move |chunked_request: $base::Request| {
.checkpoint(move |chunked_request: $base::Request| {
// we split the request stream into headers+first body chunk, then a stream of chunks
// for which we will implement mapping later
let $base::Request { router_request, context } = chunked_request;
Expand All @@ -268,7 +296,7 @@ macro_rules! gen_map_router_deferred_request {
),
};
let shared_request = Shared::new(Mutex::new(Some(request)));
let result = execute(&$rhai_service, &$callback, (shared_request.clone(),));
let result = execute(&$rhai_service, $stage, None, &$callback, (shared_request.clone(),));

if let Err(error) = result {
let error_details = process_error(error);
Expand Down Expand Up @@ -312,6 +340,8 @@ macro_rules! gen_map_router_deferred_request {

let result = execute(
&rhai_service,
$stage,
None,
&callback,
(shared_request.clone(),),
);
Expand Down Expand Up @@ -351,13 +381,18 @@ macro_rules! gen_map_router_deferred_request {
}

macro_rules! gen_map_response {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
service
.map_response(move |response: $base::Response| {
let shared_response = Shared::new(Mutex::new(Some(response)));
let result: Result<Dynamic, Box<EvalAltResult>> =
execute(&$rhai_service, &$callback, (shared_response.clone(),));
let result: Result<Dynamic, Box<EvalAltResult>> = execute(
&$rhai_service,
$stage,
Some(ResponseChunk::Primary),
&$callback,
(shared_response.clone(),),
);

if let Err(error) = result {
let error_details = process_error(error);
Expand Down Expand Up @@ -386,7 +421,7 @@ macro_rules! gen_map_response {
// I can't easily unify the macros because the router response processing is quite different to
// other service in terms of payload.
macro_rules! gen_map_router_deferred_response {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
BoxService::new(service.and_then(
|mapped_response: $base::Response| async move {
Expand All @@ -405,10 +440,14 @@ macro_rules! gen_map_router_deferred_response {
};
let shared_response = Shared::new(Mutex::new(Some(response)));

let result =
execute(&$rhai_service, &$callback, (shared_response.clone(),));
let result = execute(
&$rhai_service,
$stage,
Some(ResponseChunk::Primary),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is for the deferred response, shouldn't this be ResponseChunk::Stream?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the macro name means the response can have deferred chunks, but it doesn't have to. An @defer response is made up of a primary response + streaming deferred chunks.

&$callback,
(shared_response.clone(),),
);
if let Err(error) = result {

let error_details = process_error(error);
if error_details.body.is_none() {
tracing::error!("map_request callback failed: {error_details:#?}");
Expand Down Expand Up @@ -451,6 +490,8 @@ macro_rules! gen_map_router_deferred_response {

let result = execute(
&rhai_service,
$stage,
Some(ResponseChunk::Stream),
&callback,
(shared_response.clone(),),
);
Expand Down Expand Up @@ -492,7 +533,7 @@ macro_rules! gen_map_router_deferred_response {
}

macro_rules! gen_map_deferred_response {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
BoxService::new(service.and_then(
|mapped_response: $base::Response| async move {
Expand Down Expand Up @@ -525,8 +566,13 @@ macro_rules! gen_map_deferred_response {
};
let shared_response = Shared::new(Mutex::new(Some(response)));

let result =
execute(&$rhai_service, &$callback, (shared_response.clone(),));
let result = execute(
&$rhai_service,
$stage,
Some(ResponseChunk::Primary),
&$callback,
(shared_response.clone(),),
);
if let Err(error) = result {
let error_details = process_error(error);
if error_details.body.is_none() {
Expand Down Expand Up @@ -561,6 +607,8 @@ macro_rules! gen_map_deferred_response {

let result = execute(
&rhai_service,
$stage,
Some(ResponseChunk::Stream),
&callback,
(shared_response.clone(),),
);
Expand Down Expand Up @@ -606,33 +654,81 @@ impl ServiceStep {
fn map_request(&mut self, rhai_service: RhaiService, callback: FnPtr) {
match self {
ServiceStep::Router(service) => {
gen_map_router_deferred_request!(router, service, rhai_service, callback);
gen_map_router_deferred_request!(
router,
service,
rhai_service,
callback,
RhaiStage::RouterRequest
);
}
ServiceStep::Supergraph(service) => {
gen_map_request!(supergraph, service, rhai_service, callback);
gen_map_request!(
supergraph,
service,
rhai_service,
callback,
RhaiStage::SupergraphRequest
);
}
ServiceStep::Execution(service) => {
gen_map_request!(execution, service, rhai_service, callback);
gen_map_request!(
execution,
service,
rhai_service,
callback,
RhaiStage::ExecutionRequest
);
}
ServiceStep::Subgraph(service) => {
gen_map_request!(subgraph, service, rhai_service, callback);
gen_map_request!(
subgraph,
service,
rhai_service,
callback,
RhaiStage::SubgraphRequest
);
}
}
}

fn map_response(&mut self, rhai_service: RhaiService, callback: FnPtr) {
match self {
ServiceStep::Router(service) => {
gen_map_router_deferred_response!(router, service, rhai_service, callback);
gen_map_router_deferred_response!(
router,
service,
rhai_service,
callback,
RhaiStage::RouterResponse
);
}
ServiceStep::Supergraph(service) => {
gen_map_deferred_response!(supergraph, service, rhai_service, callback);
gen_map_deferred_response!(
supergraph,
service,
rhai_service,
callback,
RhaiStage::SupergraphResponse
);
}
ServiceStep::Execution(service) => {
gen_map_deferred_response!(execution, service, rhai_service, callback);
gen_map_deferred_response!(
execution,
service,
rhai_service,
callback,
RhaiStage::ExecutionResponse
);
}
ServiceStep::Subgraph(service) => {
gen_map_response!(subgraph, service, rhai_service, callback);
gen_map_response!(
subgraph,
service,
rhai_service,
callback,
RhaiStage::SubgraphResponse
);
}
}
}
Expand Down Expand Up @@ -702,18 +798,67 @@ fn process_error(error: Box<EvalAltResult>) -> ErrorDetails {
error_details
}

/// Execute a Rhai callback for a pipeline service stage.
///
/// Emits a metric recording the time spent executing the Rhai script.
fn execute(
rhai_service: &RhaiService,
stage: RhaiStage,
chunk: Option<ResponseChunk>,
callback: &FnPtr,
args: impl FuncArgs,
) -> Result<Dynamic, Box<EvalAltResult>> {
if callback.is_curried() {
let start = Instant::now();

let result = if callback.is_curried() {
callback.call(&rhai_service.engine, &rhai_service.ast, args)
} else {
let mut guard = rhai_service.scope.lock();
rhai_service
.engine
.call_fn(&mut guard, &rhai_service.ast, callback.fn_name(), args)
};

let duration = start.elapsed();

record_rhai_execution(
stage,
duration,
result.is_ok(),
chunk.map(|chunk| chunk == ResponseChunk::Stream),
);

result
}

fn record_rhai_execution(
stage: RhaiStage,
duration: Duration,
succeeded: bool,
is_deferred: Option<bool>,
) {
let duration = duration.as_secs_f64();
let stage = stage.to_string();

if let Some(is_deferred) = is_deferred {
f64_histogram_with_unit!(
"apollo.router.operations.rhai.duration",
"Time spent executing a Rhai script callback, in seconds",
"s",
duration,
"rhai.stage" = stage,
"rhai.succeeded" = succeeded,
"rhai.is_deferred" = is_deferred
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious what the utility of the rhai.is_deferred attribute is; I wouldn't expect a significant difference and am not sure what you'd do if it were different?

Either way, if you start seeing an increase in duration, I'd think you'd have to dig through spans to find the outliers.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair question, and we don't have it for the coprocessor metrics. @theJC do you miss the absence of .is_deferred on coprocessors?

Copy link
Copy Markdown
Contributor

@theJC theJC Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet 😄 - only a couple of our clients have started to use @defer and we have not had cases where we needed to diagnose and distinguish and have metrics specifically surrounding those yet.

It wouldn't break my heart y'all wanted to simplify and eject that complexity/concern at this point.

);
} else {
f64_histogram_with_unit!(
"apollo.router.operations.rhai.duration",
"Time spent executing a Rhai script callback, in seconds",
"s",
duration,
"rhai.stage" = stage,
"rhai.succeeded" = succeeded
);
}
}

Expand Down
Loading
Loading