Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 121 additions & 71 deletions apollo-router/src/query_planner/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;

use apollo_compiler::ExecutableDocument;
use apollo_compiler::ast;
use apollo_compiler::collections::HashMap;
use apollo_compiler::validation::Valid;
use apollo_federation::query_plan::requires_selection;
use apollo_federation::query_plan::serializable_document::SerializableDocument;
Expand Down Expand Up @@ -230,15 +230,30 @@ impl Variables {
return None;
}

let representations = Value::Array(Vec::from_iter(values));
let representations: Vec<Value> = Vec::from_iter(values);
let contextual_arguments = match subgraph_context.as_mut() {
Some(context) => {
context.add_variables_and_get_args(&mut variables, inverted_contexts)
}
None => None,
};

variables.insert("representations", representations);
if let Some(ref ctx_args) = contextual_arguments {
// Per-context singleton representation arrays: each `representations_<ctx>`
// holds only the single representation needed for that context.
for (rep_idx, ctxs) in ctx_args.inverted_contexts.iter().enumerate() {
for &ctx_idx in ctxs {
let key = format!("representations_{ctx_idx}");
let val = Value::Array(vec![representations[rep_idx].clone()]);
variables.insert(key, val);
}
}
} else {
variables.insert(
"representations",
Value::Array(representations),
);
}
Some(Variables {
variables,
inverted_paths,
Expand Down Expand Up @@ -308,48 +323,37 @@ fn remap_entity_error(error: &Error, values_path: &Path, entity_error_path: &Pat
}

enum AliasedErrorHandling {
Remap(Vec<Path>),
Remap(Path),
Ignore,
Fallback,
}

/// Determine how to handle potentially aliased error paths.
/// - If `path` is accepted, converts it from subgraph response into the supergraph response paths
/// - If `path` is accepted, converts it from subgraph response into the supergraph response path
/// and returns `Remap`.
/// - If `path` is from a wrong context path, returns `Ignore`.
/// - If none of them applies, returns `Fallback`.
///
/// With singleton representations, error paths have the form
/// `["_entities_<context>", 0, "field", ...]` where the index is always 0.
/// Uses a pre-built `context_to_path` map for O(1) lookup per error.
fn aliased_error_handling(
path: &Path,
inverted_paths: &[Vec<Path>],
inverted_contexts: Option<&Vec<Vec<usize>>>,
context_to_path: Option<&HashMap<usize, Path>>,
) -> AliasedErrorHandling {
match (path.0.first(), path.0.get(1), inverted_contexts) {
match (path.0.first(), path.0.get(1), context_to_path) {
(
Some(json_ext::PathElement::Key(alias, None)),
Some(json_ext::PathElement::Index(entity_index)),
Some(inverted_contexts),
Some(json_ext::PathElement::Index(_)),
Some(context_to_path),
) if alias.starts_with("_entities_") => {
match (
inverted_paths.get(*entity_index),
inverted_contexts.get(*entity_index),
) {
(Some(paths), Some(contexts)) => {
let matching_paths: Vec<Path> = paths
.iter()
.zip(contexts.iter())
.filter_map(|(values_path, context_index)| {
let expected_alias = format!("_entities_{context_index}");
(alias == &expected_alias).then(|| values_path.clone())
})
.collect();

if matching_paths.is_empty() {
AliasedErrorHandling::Ignore
} else {
AliasedErrorHandling::Remap(matching_paths)
}
}
_ => AliasedErrorHandling::Ignore,
// Parse context index from the alias name "_entities_<ctx>"
let Some(context_index) = alias["_entities_".len()..].parse::<usize>().ok() else {
return AliasedErrorHandling::Fallback;
};
match context_to_path.get(&context_index) {
Some(p) => AliasedErrorHandling::Remap(p.clone()),
None => AliasedErrorHandling::Ignore,
}
}
_ => AliasedErrorHandling::Fallback,
Expand Down Expand Up @@ -475,6 +479,20 @@ impl FetchNode {
current_dir.clone()
};

// Pre-build a mapping from context index to its supergraph path so that
// aliased error remapping is O(1) per error instead of a linear scan
// through inverted_contexts. Each context index maps to exactly one path.
let context_to_path: Option<HashMap<usize, Path>> =
inverted_contexts.as_ref().map(|inv_contexts| {
let mut map = HashMap::new();
for (paths, contexts) in inverted_paths.iter().zip(inv_contexts.iter()) {
for (path, &ctx_idx) in paths.iter().zip(contexts.iter()) {
map.insert(ctx_idx, path.clone());
}
}
map
});

let mut errors: Vec<Error> = vec![];
for mut error in response.errors {
// the locations correspond to the subgraph query and cannot be linked to locations
Expand Down Expand Up @@ -502,13 +520,10 @@ impl FetchNode {
} else {
match aliased_error_handling(
path,
&inverted_paths,
inverted_contexts.as_ref(),
context_to_path.as_ref(),
) {
AliasedErrorHandling::Remap(values_paths) => {
for values_path in values_paths {
errors.push(remap_entity_error(&error, &values_path, path))
}
AliasedErrorHandling::Remap(values_path) => {
errors.push(remap_entity_error(&error, &values_path, path))
}
AliasedErrorHandling::Ignore => {}
AliasedErrorHandling::Fallback => {
Expand Down Expand Up @@ -548,14 +563,13 @@ impl FetchNode {
let mut value = Value::default();
let mut saw_aliases = false;

// Every `_entities_<i>` response array must match with the `inverted_paths` &
// `inverted_contexts` vectors.
// - For each `inverted_paths` entry, the correct context index (`i`) is determined by the
// corresponding `inverted_contexts` entry.
for (index, (paths, contexts)) in inverted_paths
// Every `_entities_<i>` response array is a singleton containing
// only the representation needed for that context. For each
// `inverted_paths` entry, the correct context index (`i`) is
// determined by the corresponding `inverted_contexts` entry.
for (paths, contexts) in inverted_paths
.into_iter()
.zip(inverted_contexts.unwrap_or_default().into_iter())
.enumerate()
{
for (path, context) in paths.into_iter().zip(contexts.into_iter()) {
let alias = format!("_entities_{context}");
Expand All @@ -569,7 +583,8 @@ impl FetchNode {
return (Value::Null, errors);
};

let Some(mut entity) = array.into_iter().nth(index) else {
// Each alias's representations array is a singleton.
let Some(mut entity) = array.into_iter().next() else {
continue;
};

Expand Down Expand Up @@ -1120,7 +1135,7 @@ mod tests {
}

#[test]
fn entity_fetch_aliased_entities_uses_diagonal_entries() {
fn entity_fetch_aliased_entities_uses_singleton_representations() {
let schema = test_schema();
let node = make_fetch_node(make_requires());
let current_dir = Path(vec![key("users"), flatten()]);
Expand All @@ -1130,23 +1145,62 @@ mod tests {
vec![Path(vec![key("users"), index(2)])],
];
let inverted_contexts = vec![vec![0], vec![1], vec![2]];
// Each context's representations array is a singleton.
let response = graphql::Response::builder()
.data(json!({
"_entities_0": [
{"name": "Alice-0"},
{"name": "Bob-0"},
{"name": "Charlie-0"}
],
"_entities_1": [
{"name": "Alice-1"},
{"name": "Bob-1"},
{"name": "Charlie-1"}
],
"_entities_2": [
{"name": "Alice-2"},
{"name": "Bob-2"},
{"name": "Charlie-2"}
]
"_entities_0": [{"name": "Alice-0"}],
"_entities_1": [{"name": "Bob-1"}],
"_entities_2": [{"name": "Charlie-2"}]
}))
.build();

let (value, errors) = node.response_at_path(
&schema,
&current_dir,
inverted_paths,
Some(inverted_contexts),
response,
false,
);

assert!(errors.is_empty());
let arr = value
.as_object()
.unwrap()
.get("users")
.unwrap()
.as_array()
.unwrap();
assert_eq!(arr[0], json!({"name": "Alice-0"}));
assert_eq!(arr[1], json!({"name": "Bob-1"}));
assert_eq!(arr[2], json!({"name": "Charlie-2"}));
}

#[test]
fn entity_fetch_aliased_entities_dedup_uses_singleton_representations() {
// Test with deduplication: 4 entities collapse into 3 representations.
// Entity 0 -> rep 0, context 0
// Entity 1 -> rep 1, context 1
// Entity 2 -> rep 2, context 2
// Entity 3 -> rep 2, context 3 (shares representation with entity 2)
let schema = test_schema();
let node = make_fetch_node(make_requires());
let current_dir = Path(vec![key("users"), flatten()]);
let inverted_paths = vec![
vec![Path(vec![key("users"), index(0)])],
vec![Path(vec![key("users"), index(1)])],
vec![
Path(vec![key("users"), index(2)]),
Path(vec![key("users"), index(3)]),
],
];
let inverted_contexts = vec![vec![0], vec![1], vec![2, 3]];
let response = graphql::Response::builder()
.data(json!({
"_entities_0": [{"name": "Alice-0"}],
"_entities_1": [{"name": "Bob-1"}],
"_entities_2": [{"name": "Charlie-2"}],
"_entities_3": [{"name": "Charlie-3"}]
}))
.build();

Expand All @@ -1170,10 +1224,11 @@ mod tests {
assert_eq!(arr[0], json!({"name": "Alice-0"}));
assert_eq!(arr[1], json!({"name": "Bob-1"}));
assert_eq!(arr[2], json!({"name": "Charlie-2"}));
assert_eq!(arr[3], json!({"name": "Charlie-3"}));
}

#[test]
fn entity_fetch_error_with_aliased_entities_path_only_maps_diagonal_entry() {
fn entity_fetch_error_with_aliased_entities_singleton_representations() {
let schema = test_schema();
let node = make_fetch_node(make_requires());
let current_dir = Path(vec![key("users"), flatten()]);
Expand All @@ -1182,23 +1237,18 @@ mod tests {
vec![Path(vec![key("users"), index(1)])],
];
let inverted_contexts = vec![vec![0], vec![1]];
// With singleton representations, error indices are always 0.
let response = graphql::Response::builder()
.data(json!({
"_entities_0": [null, null],
"_entities_1": [null, null]
"_entities_0": [null],
"_entities_1": [null]
}))
.error(
graphql::Error::builder()
.message("off diagonal")
.message("error on context 1")
.path(Path(vec![key("_entities_1"), index(0), key("name")]))
.build(),
)
.error(
graphql::Error::builder()
.message("diagonal")
.path(Path(vec![key("_entities_1"), index(1), key("name")]))
.build(),
)
.build();

let (_, errors) = node.response_at_path(
Expand All @@ -1211,7 +1261,7 @@ mod tests {
);

assert_eq!(errors.len(), 1);
assert_eq!(errors[0].message, "diagonal");
assert_eq!(errors[0].message, "error on context 1");
assert_eq!(
errors[0].path.as_ref().unwrap(),
&Path(vec![key("users"), index(1), key("name")])
Expand Down
15 changes: 14 additions & 1 deletion apollo-router/src/query_planner/subgraph_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ fn transform_operation(
let mut selections: Vec<Selection> = vec![];
let mut new_variables: Vec<Node<VariableDefinition>> = vec![];
operation.variables.iter().for_each(|v| {
if arguments.contains(v.name.as_str()) {
if v.name.as_str() == "representations" || arguments.contains(v.name.as_str()) {
// Clone both contextual arguments and the representations variable
// into per-context copies (e.g. representations_0, representations_1, ...).
for i in 0..*count {
new_variables.push(Node::new(VariableDefinition {
name: Name::new_unchecked(&format!("{}_{}", v.name.as_str(), i)),
Expand Down Expand Up @@ -301,6 +303,17 @@ fn transform_operation(
op = field_selection.name
)));

// Rename $representations to $representations_i for this alias
for arg in cfs.arguments.iter_mut() {
let arg = arg.make_mut();
if let Some(v) = arg.value.as_variable() {
if v.as_str() == "representations" {
arg.value = Node::new(ast::Value::Variable(Name::new_unchecked(&format!(
"representations_{i}"
))));
}
}
}
transform_field_arguments(&mut cfs.arguments, arguments, i);
transform_selection_set(&mut cfs.selection_set, arguments, i);
selections.push(Selection::Field(cloned));
Expand Down