Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ itoa = "1.0.15"
ryu = "1.0.20"
indexmap = "2.10.0"
bumpalo = "3.19.0"
tokio-retry2 = "0.6.0"

[dev-dependencies]
subgraphs = { path = "../../bench/subgraphs" }
Expand Down
5 changes: 3 additions & 2 deletions lib/executor/src/executors/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use bytes::Bytes;
use http::HeaderMap;
use http::{HeaderMap, StatusCode};

use crate::execution::plan::ClientRequestDetails;

#[async_trait]
pub trait SubgraphExecutor {
async fn execute<'a>(
&self,
execution_request: HttpExecutionRequest<'a>,
execution_request: &'a HttpExecutionRequest<'a>,
) -> HttpExecutionResponse;
fn to_boxed_arc<'a>(self) -> Arc<Box<dyn SubgraphExecutor + Send + Sync + 'a>>
where
Expand Down Expand Up @@ -38,4 +38,5 @@ pub struct HttpExecutionRequest<'a> {
pub struct HttpExecutionResponse {
pub body: Bytes,
pub headers: HeaderMap,
pub status: StatusCode,
}
11 changes: 8 additions & 3 deletions lib/executor/src/executors/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,20 @@ impl HTTPSubgraphExecutor {
impl SubgraphExecutor for HTTPSubgraphExecutor {
async fn execute<'a>(
&self,
execution_request: HttpExecutionRequest<'a>,
execution_request: &'a HttpExecutionRequest<'a>,
) -> HttpExecutionResponse {
let body = match self.build_request_body(&execution_request) {
let body = match self.build_request_body(execution_request) {
Ok(body) => body,
Err(e) => {
return HttpExecutionResponse {
body: error_to_graphql_bytes(&self.endpoint, e),
headers: Default::default(),
status: http::StatusCode::BAD_REQUEST,
}
}
};

let mut headers = execution_request.headers;
let mut headers = execution_request.headers.clone();
self.header_map.iter().for_each(|(key, value)| {
headers.insert(key, value.clone());
});
Expand All @@ -184,10 +185,12 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
Ok(shared_response) => HttpExecutionResponse {
body: shared_response.body,
headers: shared_response.headers,
status: shared_response.status,
},
Err(e) => HttpExecutionResponse {
body: error_to_graphql_bytes(&self.endpoint, e),
headers: Default::default(),
status: http::StatusCode::BAD_REQUEST,
},
};
}
Expand Down Expand Up @@ -223,10 +226,12 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
Ok(shared_response) => HttpExecutionResponse {
body: shared_response.body.clone(),
headers: shared_response.headers.clone(),
status: shared_response.status,
},
Err(e) => HttpExecutionResponse {
body: error_to_graphql_bytes(&self.endpoint, e.clone()),
headers: Default::default(),
status: http::StatusCode::BAD_REQUEST,
},
}
}
Expand Down
14 changes: 8 additions & 6 deletions lib/executor/src/executors/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@
use tokio::sync::{OnceCell, Semaphore};

use crate::{
executors::{

Check warning on line 18 in lib/executor/src/executors/map.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/executors/map.rs
common::{
HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc,
},
dedupe::{ABuildHasher, SharedResponse},
error::SubgraphExecutorError,
http::HTTPSubgraphExecutor,
timeout::TimeoutExecutor,
}, dedupe::{ABuildHasher, SharedResponse}, error::SubgraphExecutorError, http::HTTPSubgraphExecutor, retry::RetryExecutor, timeout::TimeoutExecutor
},
response::graphql_error::GraphQLError,
};
Expand Down Expand Up @@ -50,7 +46,7 @@
execution_request: HttpExecutionRequest<'a>,
) -> HttpExecutionResponse {
match self.inner.get(subgraph_name) {
Some(executor) => executor.execute(execution_request).await,
Some(executor) => executor.execute(&execution_request).await,
None => {
let graphql_error: GraphQLError = format!(
"Subgraph executor not found for subgraph: {}",
Expand All @@ -67,6 +63,7 @@
HttpExecutionResponse {
body: buffer.freeze(),
headers: Default::default(),
status: http::StatusCode::BAD_REQUEST,
}
}
}
Expand Down Expand Up @@ -144,8 +141,13 @@
if let Some(timeout_config) = &config_arc.timeout {
executor = TimeoutExecutor::try_new(endpoint_uri, timeout_config, executor)?
.to_boxed_arc();
}

Check warning on line 144 in lib/executor/src/executors/map.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/executors/map.rs

if config_arc.max_retries > 0 {
executor = RetryExecutor::new(executor, &config_arc)
.to_boxed_arc();
}

Ok((subgraph_name, executor))
})
.collect::<Result<HashMap<_, _>, SubgraphExecutorError>>()?;
Expand Down
1 change: 1 addition & 0 deletions lib/executor/src/executors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod common;
pub mod dedupe;
pub mod error;

Check warning on line 3 in lib/executor/src/executors/mod.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/executors/mod.rs
pub mod http;
pub mod map;
pub mod timeout;
pub mod retry;
55 changes: 55 additions & 0 deletions lib/executor/src/executors/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use async_trait::async_trait;
use hive_router_config::traffic_shaping::TrafficShapingExecutorConfig;
use tokio_retry2::{strategy::ExponentialBackoff, Retry, RetryError};

use crate::executors::common::{
HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc,
};

pub struct RetryExecutor {
pub executor: SubgraphExecutorBoxedArc,
pub strategy: std::iter::Take<ExponentialBackoff>,
}

impl RetryExecutor {
pub fn new(executor: SubgraphExecutorBoxedArc, config: &TrafficShapingExecutorConfig) -> Self {
let retry_delay_as_millis = config.retry_delay.as_millis();
let strategy = ExponentialBackoff::from_millis(retry_delay_as_millis as u64)
.factor(config.retry_factor)
.max_delay(config.retry_delay)
.take(config.max_retries + 1); // to account for the initial attempt
Self { executor, strategy }
}
}

#[async_trait]
impl SubgraphExecutor for RetryExecutor {
async fn execute<'a>(
&self,
execution_request: &'a HttpExecutionRequest<'a>,
) -> HttpExecutionResponse {
let action = async move || {
let result = self.executor.execute(execution_request).await;
if result.status.is_success() {
Ok(result)
} else {
let retry_after_header = result
.headers
.get("retry-after")
.and_then(|value| value.to_str().ok())
.and_then(|s| s.parse::<u64>().ok());
let retry_after = retry_after_header.map(std::time::Duration::from_secs);
Err(RetryError::Transient {
err: result,
retry_after,
})
}
};
let result = Retry::spawn(self.strategy.clone(), action).await;

match result {
Ok(response) => response,
Err(response) => response,
}
}
}
8 changes: 5 additions & 3 deletions lib/executor/src/executors/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl TimeoutExecutor {
impl SubgraphExecutor for TimeoutExecutor {
async fn execute<'a>(
&self,
execution_request: HttpExecutionRequest<'a>,
execution_request: &'a HttpExecutionRequest<'a>,
) -> HttpExecutionResponse {
let timeout = self.get_timeout_duration(execution_request.client_request);
let execution = self.executor.execute(execution_request);
Expand All @@ -158,6 +158,7 @@ impl SubgraphExecutor for TimeoutExecutor {
SubgraphExecutorError::RequestTimeout(timeout),
),
headers: Default::default(),
status: http::StatusCode::GATEWAY_TIMEOUT,
},
}
} else {
Expand Down Expand Up @@ -191,11 +192,12 @@ mod tests {
impl SubgraphExecutor for MockExecutor {
async fn execute<'a>(
&self,
_execution_request: HttpExecutionRequest<'a>,
_execution_request: &'a HttpExecutionRequest<'a>,
) -> HttpExecutionResponse {
HttpExecutionResponse {
body: Default::default(),
headers: Default::default(),
status: http::StatusCode::OK,
}
}
}
Expand Down Expand Up @@ -402,7 +404,7 @@ mod tests {
};

println!("Sending request to executor with 5s timeout...");
let response = timeout_executor.execute(execution_request).await;
let response = timeout_executor.execute(&execution_request).await;

println!("Received response from executor.");
assert!(
Expand Down
24 changes: 24 additions & 0 deletions lib/router-config/src/traffic_shaping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@
/// ```
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout: Option<SubgraphTimeoutConfig>,

Check warning on line 54 in lib/router-config/src/traffic_shaping.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/router-config/src/traffic_shaping.rs
#[serde(default = "default_max_retries")]
pub max_retries: usize,

#[serde(deserialize_with = "humantime_serde", default = "default_retry_delay")]
pub retry_delay: Duration,

#[serde(default = "default_retry_factor")]
pub retry_factor: u64,
}

#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
Expand All @@ -74,6 +83,9 @@
pool_idle_timeout_seconds: default_pool_idle_timeout_seconds(),
dedupe_enabled: default_dedupe_enabled(),
timeout: None,
max_retries: 0,
retry_delay: default_retry_delay(),
retry_factor: default_retry_factor(),
}
}
}
Expand All @@ -99,3 +111,15 @@
fn default_dedupe_enabled() -> bool {
true
}

fn default_max_retries() -> usize {
0
}

fn default_retry_delay() -> Duration {
Duration::from_secs(1)
}

fn default_retry_factor() -> u64 {

Check warning on line 123 in lib/router-config/src/traffic_shaping.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/router-config/src/traffic_shaping.rs
1
}
Loading