Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changesets/feat_jc_rhai_duration_metric.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
### Emit `apollo.router.operations.rhai.duration` histogram metric for Rhai script callbacks

A new `apollo.router.operations.rhai.duration` histogram metric (unit: `s`, value type: `f64`) is now emitted for every Rhai script callback execution across all pipeline stages. This mirrors the existing `apollo.router.operations.coprocessor.duration` metric.

Attributes on each datapoint:
- `rhai.stage` — the pipeline stage (e.g. `RouterRequest`, `SubgraphResponse`)
- `rhai.succeeded` — `true` if the callback returned without throwing
Comment thread
goto-bus-stop marked this conversation as resolved.
Outdated
- `rhai.is_deferred` — present on response stages. `true` for `@defer` and subscription data chunks, `false` for the primary or initial response.

By [@theJC](https://github.com/theJC) in https://github.com/apollographql/router/pull/9072
195 changes: 170 additions & 25 deletions apollo-router/src/plugins/rhai/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fmt;
use std::ops::ControlFlow;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use futures::StreamExt;
use futures::future::ready;
Expand All @@ -21,6 +22,7 @@ use rhai::Scope;
use rhai::Shared;
use schemars::JsonSchema;
use serde::Deserialize;
use strum::Display;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt;
Expand All @@ -38,6 +40,27 @@ mod engine;

pub(crate) const RHAI_SPAN_NAME: &str = "rhai_plugin";

/// Pipeline stage at which a Rhai script callback was invoked.
#[derive(Clone, Copy, Debug, Display)]
enum RhaiStage {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be replaced with just using PipelineStep (from services/external.rs)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, maybe. I consider services/external to really be part of the coprocessor plugin (even though it lives in src/services).

I could move PipelineStep elsewhere, like in services/mod.rs, and use it in both places..?

RouterRequest,
RouterResponse,
SupergraphRequest,
SupergraphResponse,
ExecutionRequest,
ExecutionResponse,
SubgraphRequest,
SubgraphResponse,
}

/// Whether a Rhai callback is executing a primary response chunk or a streaming response chunk
/// (such as with `@defer`red data).
#[derive(Clone, Copy, Debug, Display, PartialEq, Eq)]
enum ResponseChunk {
Primary,
Stream,
}

mod execution;
mod router;
mod subgraph;
Expand Down Expand Up @@ -202,7 +225,7 @@ pub(crate) enum ServiceStep {

// Actually use the checkpoint function so that we can shortcut requests which fail
macro_rules! gen_map_request {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
fn rhai_service_span() -> impl Fn(&$base::Request) -> tracing::Span + Clone {
move |_request: &$base::Request| {
Expand All @@ -217,8 +240,13 @@ macro_rules! gen_map_request {
.instrument(rhai_service_span())
.checkpoint(move |request: $base::Request| {
let shared_request = Shared::new(Mutex::new(Some(request)));
let result: Result<Dynamic, Box<EvalAltResult>> =
execute(&$rhai_service, &$callback, (shared_request.clone(),));
let result: Result<Dynamic, Box<EvalAltResult>> = execute(
&$rhai_service,
$stage,
None,
&$callback,
(shared_request.clone(),),
);
if let Err(error) = result {
let error_details = process_error(error);
if error_details.body.is_none() {
Expand All @@ -241,7 +269,7 @@ macro_rules! gen_map_request {

// Actually use the checkpoint function so that we can shortcut requests which fail
macro_rules! gen_map_router_deferred_request {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
fn rhai_service_span() -> impl Fn(&$base::Request) -> tracing::Span + Clone {
move |_request: &$base::Request| {
Expand All @@ -254,7 +282,7 @@ macro_rules! gen_map_router_deferred_request {
}
ServiceBuilder::new()
.instrument(rhai_service_span())
.checkpoint( move |chunked_request: $base::Request| {
.checkpoint(move |chunked_request: $base::Request| {
// we split the request stream into headers+first body chunk, then a stream of chunks
// for which we will implement mapping later
let $base::Request { router_request, context } = chunked_request;
Expand All @@ -268,7 +296,7 @@ macro_rules! gen_map_router_deferred_request {
),
};
let shared_request = Shared::new(Mutex::new(Some(request)));
let result = execute(&$rhai_service, &$callback, (shared_request.clone(),));
let result = execute(&$rhai_service, $stage, None, &$callback, (shared_request.clone(),));

if let Err(error) = result {
let error_details = process_error(error);
Expand Down Expand Up @@ -312,6 +340,8 @@ macro_rules! gen_map_router_deferred_request {

let result = execute(
&rhai_service,
$stage,
BodyChunk::Stream,
Comment thread
goto-bus-stop marked this conversation as resolved.
Outdated
&callback,
(shared_request.clone(),),
);
Expand Down Expand Up @@ -351,13 +381,18 @@ macro_rules! gen_map_router_deferred_request {
}

macro_rules! gen_map_response {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
service
.map_response(move |response: $base::Response| {
let shared_response = Shared::new(Mutex::new(Some(response)));
let result: Result<Dynamic, Box<EvalAltResult>> =
execute(&$rhai_service, &$callback, (shared_response.clone(),));
let result: Result<Dynamic, Box<EvalAltResult>> = execute(
&$rhai_service,
$stage,
Some(ResponseChunk::Primary),
&$callback,
(shared_response.clone(),),
);

if let Err(error) = result {
let error_details = process_error(error);
Expand Down Expand Up @@ -386,7 +421,7 @@ macro_rules! gen_map_response {
// I can't easily unify the macros because the router response processing is quite different to
// other service in terms of payload.
macro_rules! gen_map_router_deferred_response {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
BoxService::new(service.and_then(
|mapped_response: $base::Response| async move {
Expand All @@ -405,10 +440,14 @@ macro_rules! gen_map_router_deferred_response {
};
let shared_response = Shared::new(Mutex::new(Some(response)));

let result =
execute(&$rhai_service, &$callback, (shared_response.clone(),));
let result = execute(
&$rhai_service,
$stage,
Some(ResponseChunk::Primary),
Comment thread
goto-bus-stop marked this conversation as resolved.
Outdated
&$callback,
(shared_response.clone(),),
);
if let Err(error) = result {

let error_details = process_error(error);
if error_details.body.is_none() {
tracing::error!("map_request callback failed: {error_details:#?}");
Expand Down Expand Up @@ -451,6 +490,8 @@ macro_rules! gen_map_router_deferred_response {

let result = execute(
&rhai_service,
$stage,
Some(ResponseChunk::Stream),
&callback,
(shared_response.clone(),),
);
Expand Down Expand Up @@ -492,7 +533,7 @@ macro_rules! gen_map_router_deferred_response {
}

macro_rules! gen_map_deferred_response {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
BoxService::new(service.and_then(
|mapped_response: $base::Response| async move {
Expand Down Expand Up @@ -525,8 +566,13 @@ macro_rules! gen_map_deferred_response {
};
let shared_response = Shared::new(Mutex::new(Some(response)));

let result =
execute(&$rhai_service, &$callback, (shared_response.clone(),));
let result = execute(
&$rhai_service,
$stage,
Some(ResponseChunk::Primary),
&$callback,
(shared_response.clone(),),
);
if let Err(error) = result {
let error_details = process_error(error);
if error_details.body.is_none() {
Expand Down Expand Up @@ -561,6 +607,8 @@ macro_rules! gen_map_deferred_response {

let result = execute(
&rhai_service,
$stage,
Some(ResponseChunk::Stream),
&callback,
(shared_response.clone(),),
);
Expand Down Expand Up @@ -606,33 +654,81 @@ impl ServiceStep {
fn map_request(&mut self, rhai_service: RhaiService, callback: FnPtr) {
match self {
ServiceStep::Router(service) => {
gen_map_router_deferred_request!(router, service, rhai_service, callback);
gen_map_router_deferred_request!(
router,
service,
rhai_service,
callback,
RhaiStage::RouterRequest
);
}
ServiceStep::Supergraph(service) => {
gen_map_request!(supergraph, service, rhai_service, callback);
gen_map_request!(
supergraph,
service,
rhai_service,
callback,
RhaiStage::SupergraphRequest
);
}
ServiceStep::Execution(service) => {
gen_map_request!(execution, service, rhai_service, callback);
gen_map_request!(
execution,
service,
rhai_service,
callback,
RhaiStage::ExecutionRequest
);
}
ServiceStep::Subgraph(service) => {
gen_map_request!(subgraph, service, rhai_service, callback);
gen_map_request!(
subgraph,
service,
rhai_service,
callback,
RhaiStage::SubgraphRequest
);
}
}
}

fn map_response(&mut self, rhai_service: RhaiService, callback: FnPtr) {
match self {
ServiceStep::Router(service) => {
gen_map_router_deferred_response!(router, service, rhai_service, callback);
gen_map_router_deferred_response!(
router,
service,
rhai_service,
callback,
RhaiStage::RouterResponse
);
}
ServiceStep::Supergraph(service) => {
gen_map_deferred_response!(supergraph, service, rhai_service, callback);
gen_map_deferred_response!(
supergraph,
service,
rhai_service,
callback,
RhaiStage::SupergraphResponse
);
}
ServiceStep::Execution(service) => {
gen_map_deferred_response!(execution, service, rhai_service, callback);
gen_map_deferred_response!(
execution,
service,
rhai_service,
callback,
RhaiStage::ExecutionResponse
);
}
ServiceStep::Subgraph(service) => {
gen_map_response!(subgraph, service, rhai_service, callback);
gen_map_response!(
subgraph,
service,
rhai_service,
callback,
RhaiStage::SubgraphResponse
);
}
}
}
Expand Down Expand Up @@ -702,18 +798,67 @@ fn process_error(error: Box<EvalAltResult>) -> ErrorDetails {
error_details
}

/// Execute a Rhai callback for a pipeline service stage.
///
/// Emits a metric recording the time spent executing the Rhai script.
fn execute(
rhai_service: &RhaiService,
stage: RhaiStage,
chunk: Option<ResponseChunk>,
callback: &FnPtr,
args: impl FuncArgs,
) -> Result<Dynamic, Box<EvalAltResult>> {
if callback.is_curried() {
let start = Instant::now();

let result = if callback.is_curried() {
callback.call(&rhai_service.engine, &rhai_service.ast, args)
} else {
let mut guard = rhai_service.scope.lock();
rhai_service
.engine
.call_fn(&mut guard, &rhai_service.ast, callback.fn_name(), args)
};

let duration = start.elapsed();

record_rhai_execution(
stage,
duration,
result.is_ok(),
chunk.map(|chunk| chunk == ResponseChunk::Stream),
);

result
}

fn record_rhai_execution(
stage: RhaiStage,
duration: Duration,
succeeded: bool,
is_deferred: Option<bool>,
) {
let duration = duration.as_secs_f64();
let stage = stage.to_string();

if let Some(is_deferred) = is_deferred {
f64_histogram_with_unit!(
"apollo.router.operations.rhai.duration",
"Time spent executing a Rhai script callback, in seconds",
"s",
duration,
"rhai.stage" = stage,
"rhai.succeeded" = succeeded,
"rhai.is_deferred" = is_deferred
Comment thread
goto-bus-stop marked this conversation as resolved.
Outdated
);
} else {
f64_histogram_with_unit!(
"apollo.router.operations.rhai.duration",
"Time spent executing a Rhai script callback, in seconds",
"s",
duration,
"rhai.stage" = stage,
"rhai.succeeded" = succeeded
);
}
}

Expand Down
Loading
Loading