Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changesets/feat_jc_rhai_duration_metric.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### Emit `apollo.router.operations.rhai.duration` histogram metric for Rhai script callbacks

A new `apollo.router.operations.rhai.duration` histogram metric (unit: `s`, value type: `f64`) is now emitted for every Rhai script callback execution across all pipeline stages. This mirrors the existing `apollo.router.operations.coprocessor.duration` metric.

Attributes on each datapoint:
- `rhai.stage` — the pipeline stage (e.g. `RouterRequest`, `SubgraphResponse`)
- `rhai.succeeded` — `true` if the callback returned without throwing
Comment thread
goto-bus-stop marked this conversation as resolved.
Outdated

By [@theJC](https://github.com/theJC) in https://github.com/apollographql/router/pull/9072
161 changes: 135 additions & 26 deletions apollo-router/src/plugins/rhai/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fmt;
use std::ops::ControlFlow;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use futures::StreamExt;
use futures::future::ready;
Expand All @@ -21,6 +22,7 @@ use rhai::Scope;
use rhai::Shared;
use schemars::JsonSchema;
use serde::Deserialize;
use strum::Display;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt;
Expand All @@ -38,6 +40,19 @@ mod engine;

pub(crate) const RHAI_SPAN_NAME: &str = "rhai_plugin";

/// Pipeline stage at which a Rhai script callback was invoked.
#[derive(Clone, Copy, Debug, Display)]
enum RhaiStage {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be replaced with just using PipelineStep (from services/external.rs)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, maybe. I consider services/external to really be part of the coprocessor plugin (even though it lives in src/services).

I could move PipelineStep elsewhere, like in services/mod.rs, and use it in both places..?

RouterRequest,
RouterResponse,
SupergraphRequest,
SupergraphResponse,
ExecutionRequest,
ExecutionResponse,
SubgraphRequest,
SubgraphResponse,
}

mod execution;
mod router;
mod subgraph;
Expand Down Expand Up @@ -202,7 +217,7 @@ pub(crate) enum ServiceStep {

// Actually use the checkpoint function so that we can shortcut requests which fail
macro_rules! gen_map_request {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
fn rhai_service_span() -> impl Fn(&$base::Request) -> tracing::Span + Clone {
move |_request: &$base::Request| {
Expand All @@ -217,8 +232,12 @@ macro_rules! gen_map_request {
.instrument(rhai_service_span())
.checkpoint(move |request: $base::Request| {
let shared_request = Shared::new(Mutex::new(Some(request)));
let result: Result<Dynamic, Box<EvalAltResult>> =
execute(&$rhai_service, &$callback, (shared_request.clone(),));
let result: Result<Dynamic, Box<EvalAltResult>> = execute(
&$rhai_service,
$stage,
&$callback,
(shared_request.clone(),),
);
if let Err(error) = result {
let error_details = process_error(error);
if error_details.body.is_none() {
Expand All @@ -241,7 +260,7 @@ macro_rules! gen_map_request {

// Actually use the checkpoint function so that we can shortcut requests which fail
macro_rules! gen_map_router_deferred_request {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
fn rhai_service_span() -> impl Fn(&$base::Request) -> tracing::Span + Clone {
move |_request: &$base::Request| {
Expand All @@ -254,7 +273,7 @@ macro_rules! gen_map_router_deferred_request {
}
ServiceBuilder::new()
.instrument(rhai_service_span())
.checkpoint( move |chunked_request: $base::Request| {
.checkpoint(move |chunked_request: $base::Request| {
// we split the request stream into headers+first body chunk, then a stream of chunks
// for which we will implement mapping later
let $base::Request { router_request, context } = chunked_request;
Expand All @@ -268,7 +287,7 @@ macro_rules! gen_map_router_deferred_request {
),
};
let shared_request = Shared::new(Mutex::new(Some(request)));
let result = execute(&$rhai_service, &$callback, (shared_request.clone(),));
let result = execute(&$rhai_service, $stage, &$callback, (shared_request.clone(),));

if let Err(error) = result {
let error_details = process_error(error);
Expand Down Expand Up @@ -312,6 +331,7 @@ macro_rules! gen_map_router_deferred_request {

let result = execute(
&rhai_service,
$stage,
&callback,
(shared_request.clone(),),
);
Expand Down Expand Up @@ -351,13 +371,17 @@ macro_rules! gen_map_router_deferred_request {
}

macro_rules! gen_map_response {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
service
.map_response(move |response: $base::Response| {
let shared_response = Shared::new(Mutex::new(Some(response)));
let result: Result<Dynamic, Box<EvalAltResult>> =
execute(&$rhai_service, &$callback, (shared_response.clone(),));
let result: Result<Dynamic, Box<EvalAltResult>> = execute(
&$rhai_service,
$stage,
&$callback,
(shared_response.clone(),),
);

if let Err(error) = result {
let error_details = process_error(error);
Expand Down Expand Up @@ -386,7 +410,7 @@ macro_rules! gen_map_response {
// I can't easily unify the macros because the router response processing is quite different to
// other service in terms of payload.
macro_rules! gen_map_router_deferred_response {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
BoxService::new(service.and_then(
|mapped_response: $base::Response| async move {
Expand All @@ -405,10 +429,14 @@ macro_rules! gen_map_router_deferred_response {
};
let shared_response = Shared::new(Mutex::new(Some(response)));

let result =
execute(&$rhai_service, &$callback, (shared_response.clone(),));
if let Err(error) = result {
let result = execute(
&$rhai_service,
$stage,

&$callback,
(shared_response.clone(),),
);
if let Err(error) = result {
let error_details = process_error(error);
if error_details.body.is_none() {
tracing::error!("map_request callback failed: {error_details:#?}");
Expand Down Expand Up @@ -451,6 +479,7 @@ macro_rules! gen_map_router_deferred_response {

let result = execute(
&rhai_service,
$stage,
&callback,
(shared_response.clone(),),
);
Expand Down Expand Up @@ -492,7 +521,7 @@ macro_rules! gen_map_router_deferred_response {
}

macro_rules! gen_map_deferred_response {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident, $stage: expr) => {
$borrow.replace(|service| {
BoxService::new(service.and_then(
|mapped_response: $base::Response| async move {
Expand Down Expand Up @@ -525,8 +554,13 @@ macro_rules! gen_map_deferred_response {
};
let shared_response = Shared::new(Mutex::new(Some(response)));

let result =
execute(&$rhai_service, &$callback, (shared_response.clone(),));
let result = execute(
&$rhai_service,
$stage,

&$callback,
(shared_response.clone(),),
);
if let Err(error) = result {
let error_details = process_error(error);
if error_details.body.is_none() {
Expand Down Expand Up @@ -561,6 +595,7 @@ macro_rules! gen_map_deferred_response {

let result = execute(
&rhai_service,
$stage,
&callback,
(shared_response.clone(),),
);
Expand Down Expand Up @@ -606,33 +641,81 @@ impl ServiceStep {
fn map_request(&mut self, rhai_service: RhaiService, callback: FnPtr) {
match self {
ServiceStep::Router(service) => {
gen_map_router_deferred_request!(router, service, rhai_service, callback);
gen_map_router_deferred_request!(
router,
service,
rhai_service,
callback,
RhaiStage::RouterRequest
);
}
ServiceStep::Supergraph(service) => {
gen_map_request!(supergraph, service, rhai_service, callback);
gen_map_request!(
supergraph,
service,
rhai_service,
callback,
RhaiStage::SupergraphRequest
);
}
ServiceStep::Execution(service) => {
gen_map_request!(execution, service, rhai_service, callback);
gen_map_request!(
execution,
service,
rhai_service,
callback,
RhaiStage::ExecutionRequest
);
}
ServiceStep::Subgraph(service) => {
gen_map_request!(subgraph, service, rhai_service, callback);
gen_map_request!(
subgraph,
service,
rhai_service,
callback,
RhaiStage::SubgraphRequest
);
}
}
}

fn map_response(&mut self, rhai_service: RhaiService, callback: FnPtr) {
match self {
ServiceStep::Router(service) => {
gen_map_router_deferred_response!(router, service, rhai_service, callback);
gen_map_router_deferred_response!(
router,
service,
rhai_service,
callback,
RhaiStage::RouterResponse
);
}
ServiceStep::Supergraph(service) => {
gen_map_deferred_response!(supergraph, service, rhai_service, callback);
gen_map_deferred_response!(
supergraph,
service,
rhai_service,
callback,
RhaiStage::SupergraphResponse
);
}
ServiceStep::Execution(service) => {
gen_map_deferred_response!(execution, service, rhai_service, callback);
gen_map_deferred_response!(
execution,
service,
rhai_service,
callback,
RhaiStage::ExecutionResponse
);
}
ServiceStep::Subgraph(service) => {
gen_map_response!(subgraph, service, rhai_service, callback);
gen_map_response!(
subgraph,
service,
rhai_service,
callback,
RhaiStage::SubgraphResponse
);
}
}
}
Expand Down Expand Up @@ -702,19 +785,45 @@ fn process_error(error: Box<EvalAltResult>) -> ErrorDetails {
error_details
}

/// Execute a Rhai callback for a pipeline service stage.
///
/// Emits a metric recording the time spent executing the Rhai script.
fn execute(
rhai_service: &RhaiService,
stage: RhaiStage,
callback: &FnPtr,
args: impl FuncArgs,
) -> Result<Dynamic, Box<EvalAltResult>> {
if callback.is_curried() {
let start = Instant::now();

let result = if callback.is_curried() {
callback.call(&rhai_service.engine, &rhai_service.ast, args)
} else {
let mut guard = rhai_service.scope.lock();
rhai_service
.engine
.call_fn(&mut guard, &rhai_service.ast, callback.fn_name(), args)
}
};

let duration = start.elapsed();

record_rhai_execution(stage, duration, result.is_ok());

result
}

fn record_rhai_execution(stage: RhaiStage, duration: Duration, succeeded: bool) {
let duration = duration.as_secs_f64();
let stage = stage.to_string();

f64_histogram_with_unit!(
"apollo.router.operations.rhai.duration",
"Time spent executing a Rhai script callback, in seconds",
"s",
duration,
"rhai.stage" = stage,
"rhai.succeeded" = succeeded
);
}

register_plugin!("apollo", "rhai", Rhai);
Expand Down
Loading
Loading