Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ mimalloc = { version = "0.1.47", features = ["override"] }
moka = { version = "0.12.10", features = ["future"] }
ulid = "1.2.1"
ntex = { version = "2", features = ["tokio"] }
arc-swap = "1.7.1"
12 changes: 10 additions & 2 deletions bin/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod logger;
mod pipeline;
mod shared_state;
mod supergraph;
mod supergraph_mgr;

use std::sync::Arc;

Expand All @@ -11,6 +12,7 @@ use crate::{
logger::configure_logging,
pipeline::graphql_request_handler,
shared_state::RouterSharedState,
supergraph_mgr::SupergraphManager,
};

use hive_router_config::load_config;
Expand All @@ -22,9 +24,12 @@ use ntex::{
async fn graphql_endpoint_handler(
mut request: HttpRequest,
body_bytes: Bytes,
supergraph_manager: web::types::State<Arc<SupergraphManager>>,
app_state: web::types::State<Arc<RouterSharedState>>,
) -> impl web::Responder {
graphql_request_handler(&mut request, body_bytes, app_state.get_ref()).await
let supergraph = supergraph_manager.current();

graphql_request_handler(&mut request, body_bytes, &supergraph, app_state.get_ref()).await
}

pub async fn router_entrypoint() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -33,11 +38,14 @@ pub async fn router_entrypoint() -> Result<(), Box<dyn std::error::Error>> {
configure_logging(&router_config.log);

let addr = router_config.http.address();
let shared_state = RouterSharedState::new(router_config).await?;

let supergraph_manager = Arc::new(SupergraphManager::new_from_config(&router_config).await?);
let shared_state = Arc::new(RouterSharedState::new(router_config));

web::HttpServer::new(move || {
web::App::new()
.state(shared_state.clone())
.state(supergraph_manager.clone())
.route("/graphql", web::to(graphql_endpoint_handler))
.route("/health", web::to(health_check_handler))
.default_service(web::to(landing_page_handler))
Expand Down
6 changes: 3 additions & 3 deletions bin/router/src/pipeline/coerce_variables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::{error, trace, warn};
use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, PipelineErrorVariant};
use crate::pipeline::execution_request::ExecutionRequest;
use crate::pipeline::normalize::GraphQLNormalizationPayload;
use crate::shared_state::RouterSharedState;
use crate::supergraph_mgr::SupergraphData;

#[derive(Clone, Debug)]
pub struct CoerceVariablesPayload {
Expand All @@ -21,7 +21,7 @@ pub struct CoerceVariablesPayload {
#[inline]
pub fn coerce_request_variables(
req: &HttpRequest,
app_state: &Arc<RouterSharedState>,
supergraph: &Arc<SupergraphData>,
execution_params: ExecutionRequest,
normalized_operation: &Arc<GraphQLNormalizationPayload>,
) -> Result<CoerceVariablesPayload, PipelineError> {
Expand All @@ -38,7 +38,7 @@ pub fn coerce_request_variables(
match collect_variables(
&normalized_operation.operation_for_plan,
execution_params.variables,
&app_state.schema_metadata,
&supergraph.metadata,
) {
Ok(values) => {
trace!(
Expand Down
8 changes: 5 additions & 3 deletions bin/router/src/pipeline/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::pipeline::coerce_variables::CoerceVariablesPayload;
use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, PipelineErrorVariant};
use crate::pipeline::normalize::GraphQLNormalizationPayload;
use crate::shared_state::RouterSharedState;
use crate::supergraph_mgr::SupergraphData;
use hive_router_plan_executor::execute_query_plan;
use hive_router_plan_executor::execution::plan::QueryPlanExecutionContext;
use hive_router_plan_executor::introspection::resolve::IntrospectionContext;
Expand All @@ -25,6 +26,7 @@ enum ExposeQueryPlanMode {
#[inline]
pub async fn execute_plan(
req: &mut HttpRequest,
supergraph: &Arc<SupergraphData>,
app_state: &Arc<RouterSharedState>,
normalized_payload: &Arc<GraphQLNormalizationPayload>,
query_plan_payload: &Arc<QueryPlan>,
Expand Down Expand Up @@ -57,8 +59,8 @@ pub async fn execute_plan(

let introspection_context = IntrospectionContext {
query: normalized_payload.operation_for_introspection.as_ref(),
schema: &app_state.planner.consumer_schema.document,
metadata: &app_state.schema_metadata,
schema: &supergraph.planner.consumer_schema.document,
metadata: &supergraph.metadata,
};

execute_query_plan(QueryPlanExecutionContext {
Expand All @@ -68,7 +70,7 @@ pub async fn execute_plan(
extensions,
introspection_context: &introspection_context,
operation_type_name: normalized_payload.root_type_name,
executors: &app_state.subgraph_executor_map,
executors: &supergraph.subgraph_executor_map,
})
.await
.map(Bytes::from)
Expand Down
14 changes: 10 additions & 4 deletions bin/router/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
validation::validate_operation_with_cache,
},
shared_state::RouterSharedState,
supergraph_mgr::SupergraphData,
};

pub mod coerce_variables;
Expand All @@ -43,6 +44,7 @@ static GRAPHIQL_HTML: &str = include_str!("../../static/graphiql.html");
pub async fn graphql_request_handler(
req: &mut HttpRequest,
body_bytes: Bytes,
supergraph: &Arc<SupergraphData>,
state: &Arc<RouterSharedState>,
) -> impl web::Responder {
if req.method() == Method::GET && req.accepts_content_type(*TEXT_HTML_CONTENT_TYPE) {
Expand All @@ -51,7 +53,7 @@ pub async fn graphql_request_handler(
.body(GRAPHIQL_HTML);
}

match execute_pipeline(req, body_bytes, state).await {
match execute_pipeline(req, body_bytes, supergraph, state).await {
Ok(response_bytes) => {
let response_content_type: &'static HeaderValue =
if req.accepts_content_type(*APPLICATION_GRAPHQL_RESPONSE_JSON_STR) {
Expand All @@ -72,23 +74,26 @@ pub async fn graphql_request_handler(
pub async fn execute_pipeline(
req: &mut HttpRequest,
body_bytes: Bytes,
supergraph: &Arc<SupergraphData>,
state: &Arc<RouterSharedState>,
) -> Result<Bytes, PipelineError> {
let execution_request = get_execution_request(req, body_bytes).await?;
let parser_payload = parse_operation_with_cache(req, state, &execution_request).await?;
validate_operation_with_cache(req, state, &parser_payload).await?;
validate_operation_with_cache(req, supergraph, state, &parser_payload).await?;

let progressive_override_ctx = request_override_context()?;
let normalize_payload =
normalize_request_with_cache(req, state, &execution_request, &parser_payload).await?;
normalize_request_with_cache(req, supergraph, state, &execution_request, &parser_payload)
.await?;
let variable_payload =
coerce_request_variables(req, state, execution_request, &normalize_payload)?;
coerce_request_variables(req, supergraph, execution_request, &normalize_payload)?;

let query_plan_cancellation_token =
CancellationToken::with_timeout(state.router_config.query_planner.timeout);

let query_plan_payload = plan_operation_with_cache(
req,
supergraph,
state,
&normalize_payload,
&progressive_override_ctx,
Expand All @@ -98,6 +103,7 @@ pub async fn execute_pipeline(

let execution_result = execute_plan(
req,
supergraph,
state,
&normalize_payload,
&query_plan_payload,
Expand Down
6 changes: 4 additions & 2 deletions bin/router/src/pipeline/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, Pipel
use crate::pipeline::execution_request::ExecutionRequest;
use crate::pipeline::parser::GraphQLParserPayload;
use crate::shared_state::RouterSharedState;
use crate::supergraph_mgr::SupergraphData;
use tracing::{error, trace};

#[derive(Debug)]
Expand All @@ -26,6 +27,7 @@ pub struct GraphQLNormalizationPayload {
#[inline]
pub async fn normalize_request_with_cache(
req: &HttpRequest,
supergraph: &Arc<SupergraphData>,
app_state: &Arc<RouterSharedState>,
execution_params: &ExecutionRequest,
parser_payload: &GraphQLParserPayload,
Expand All @@ -51,7 +53,7 @@ pub async fn normalize_request_with_cache(
Ok(payload)
}
None => match normalize_operation(
&app_state.planner.supergraph,
&supergraph.planner.supergraph,
&parser_payload.parsed_operation,
execution_params.operation_name.as_deref(),
) {
Expand All @@ -64,7 +66,7 @@ pub async fn normalize_request_with_cache(

let operation = doc.operation;
let (root_type_name, projection_plan) =
FieldProjectionPlan::from_operation(&operation, &app_state.schema_metadata);
FieldProjectionPlan::from_operation(&operation, &supergraph.metadata);
let partitioned_operation = partition_operation(operation);

let payload = GraphQLNormalizationPayload {
Expand Down
6 changes: 4 additions & 2 deletions bin/router/src/pipeline/query_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, Pipel
use crate::pipeline::normalize::GraphQLNormalizationPayload;
use crate::pipeline::progressive_override::{RequestOverrideContext, StableOverrideContext};
use crate::shared_state::RouterSharedState;
use crate::supergraph_mgr::SupergraphData;
use hive_router_query_planner::planner::plan_nodes::QueryPlan;
use hive_router_query_planner::utils::cancellation::CancellationToken;
use ntex::web::HttpRequest;
Expand All @@ -13,13 +14,14 @@ use xxhash_rust::xxh3::Xxh3;
#[inline]
pub async fn plan_operation_with_cache(
req: &HttpRequest,
supergraph: &Arc<SupergraphData>,
app_state: &Arc<RouterSharedState>,
normalized_operation: &Arc<GraphQLNormalizationPayload>,
request_override_context: &RequestOverrideContext,
cancellation_token: &CancellationToken,
) -> Result<Arc<QueryPlan>, PipelineError> {
let stable_override_context =
StableOverrideContext::new(&app_state.planner.supergraph, request_override_context);
StableOverrideContext::new(&supergraph.planner.supergraph, request_override_context);

let filtered_operation_for_plan = &normalized_operation.operation_for_plan;
let plan_cache_key =
Expand All @@ -37,7 +39,7 @@ pub async fn plan_operation_with_cache(
}));
}

app_state
supergraph
.planner
.plan_from_normalized_operation(
filtered_operation_for_plan,
Expand Down
4 changes: 3 additions & 1 deletion bin/router/src/pipeline/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ use std::sync::Arc;
use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, PipelineErrorVariant};
use crate::pipeline::parser::GraphQLParserPayload;
use crate::shared_state::RouterSharedState;
use crate::supergraph_mgr::SupergraphData;
use graphql_tools::validation::validate::validate;
use ntex::web::HttpRequest;
use tracing::{error, trace};

#[inline]
pub async fn validate_operation_with_cache(
req: &HttpRequest,
supergraph: &Arc<SupergraphData>,
app_state: &Arc<RouterSharedState>,
parser_payload: &GraphQLParserPayload,
) -> Result<(), PipelineError> {
let consumer_schema_ast = &app_state.planner.consumer_schema.document;
let consumer_schema_ast = &supergraph.planner.consumer_schema.document;

let validation_result = match app_state
.validate_cache
Expand Down
53 changes: 5 additions & 48 deletions bin/router/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,29 @@ use std::sync::Arc;

use graphql_tools::validation::{utils::ValidationError, validate::ValidationPlan};
use hive_router_config::HiveRouterConfig;
use hive_router_plan_executor::{
executors::error::SubgraphExecutorError,
introspection::schema::{SchemaMetadata, SchemaWithMetadata},
SubgraphExecutorMap,
};
use hive_router_query_planner::{
planner::{plan_nodes::QueryPlan, Planner, PlannerError},
state::supergraph_state::SupergraphState,
utils::parsing::parse_schema,
};
use hive_router_query_planner::planner::plan_nodes::QueryPlan;
use moka::future::Cache;

use crate::{
pipeline::normalize::GraphQLNormalizationPayload,
supergraph::{base::LoadSupergraphError, resolve_from_config},
};
use crate::pipeline::normalize::GraphQLNormalizationPayload;

pub struct RouterSharedState {
pub schema_metadata: SchemaMetadata,
pub planner: Planner,
pub validation_plan: ValidationPlan,
pub subgraph_executor_map: SubgraphExecutorMap,
pub plan_cache: Cache<u64, Arc<QueryPlan>>,
pub validate_cache: Cache<u64, Arc<Vec<ValidationError>>>,
pub parse_cache: Cache<u64, Arc<graphql_parser::query::Document<'static, String>>>,
pub normalize_cache: Cache<u64, Arc<GraphQLNormalizationPayload>>,
pub router_config: HiveRouterConfig,
}

#[derive(Debug, thiserror::Error)]
pub enum RouterSharedStateError {
#[error("Failed to load supergraph: {0}")]
SupergraphInitFailure(#[from] LoadSupergraphError),
#[error("Failed to init planner: {0}")]
PlannerInitError(#[from] PlannerError),
#[error("Failed to init executor: {0}")]
ExecutorInitError(#[from] SubgraphExecutorError),
}

impl RouterSharedState {
pub async fn new(router_config: HiveRouterConfig) -> Result<Arc<Self>, RouterSharedStateError> {
let mut supergraph_source_loader = resolve_from_config(&router_config.supergraph).await?;
supergraph_source_loader.reload().await?;
let supergraph_sdl = supergraph_source_loader
.current()
.expect("supergraph should be available after a successful reload");
let parsed_supergraph_sdl = parse_schema(supergraph_sdl);
let supergraph_state = SupergraphState::new(&parsed_supergraph_sdl);
let planner = Planner::new_from_supergraph(&parsed_supergraph_sdl)?;
let schema_metadata = planner.consumer_schema.schema_metadata();

let subgraph_executor_map = SubgraphExecutorMap::from_http_endpoint_map(
supergraph_state.subgraph_endpoint_map,
router_config.traffic_shaping.clone(),
)?;

Ok(Arc::new(Self {
schema_metadata,
planner,
pub fn new(router_config: HiveRouterConfig) -> Self {
Self {
validation_plan: graphql_tools::validation::rules::default_rules_validation_plan(),
subgraph_executor_map,
plan_cache: moka::future::Cache::new(1000),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All those caches are supergraph specific so I think RouterSharedState is already the state which lasts with the supergraph.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I plan to have a shared_state.next(new_state) that will take care of the caches as well 🫡

validate_cache: moka::future::Cache::new(1000),
parse_cache: moka::future::Cache::new(1000),
normalize_cache: moka::future::Cache::new(1000),
router_config,
}))
}
}
}
2 changes: 1 addition & 1 deletion bin/router/src/supergraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub mod hive;

pub async fn resolve_from_config(
config: &SupergraphSource,
) -> Result<Box<dyn SupergraphLoader>, LoadSupergraphError> {
) -> Result<Box<dyn SupergraphLoader + Send + Sync>, LoadSupergraphError> {
debug!("Resolving supergraph from config: {:?}", config);

match config {
Expand Down
Loading
Loading