Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
56d8ea3
test: PoC proving recursive entity resolution works across subgraphs
lennyburdette Apr 9, 2026
9cd669c
feat(connectors): remove circular reference validation
lennyburdette Apr 9, 2026
90735fe
test(connectors): expansion test for circular reference schema
lennyburdette Apr 9, 2026
2affa43
feat(federation): add connectedSelection argument to @join__field
lennyburdette Apr 9, 2026
f99e28c
feat(query_graph): restricted copy nodes for connectedSelection
lennyburdette Apr 9, 2026
6a0c54b
fix: gate connectedSelection behind join v0.6
lennyburdette Apr 9, 2026
9bcf1f8
feat(connectors): emit connectedSelection for recursive types
lennyburdette Apr 9, 2026
694eb20
test: comprehensive tests for connector circular references
lennyburdette Apr 9, 2026
dd831fc
chore: clean up dead code from disabled circular reference check
lennyburdette Apr 9, 2026
beadd65
feat(connectors): generalize connectedSelection for indirect cycles
lennyburdette Apr 9, 2026
c75f809
test: end-to-end test from connector expansion through query planning
lennyburdette Apr 9, 2026
8b2ab05
test: router integration test for circular connector references
lennyburdette Apr 9, 2026
8c35fb4
fix: clippy collapsible-if and rustfmt formatting
lennyburdette Apr 9, 2026
8b0c5d6
ci: retrigger macOS test (flaky failure, passes locally + linux)
lennyburdette Apr 9, 2026
486f6cf
fix(connectors): emit full nested FieldSet in connectedSelection
lennyburdette Apr 10, 2026
6731b6a
fix: clippy unnecessary_filter_map lint
lennyburdette Apr 10, 2026
62ed67d
refactor: add ProvidesCopy enum to scope connector graph changes
lennyburdette Apr 10, 2026
5eef634
feat(connectors): validate nested entity key fields in connectedSelec…
lennyburdette Apr 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 309 additions & 0 deletions apollo-federation/src/connectors/expand/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,27 @@ use std::collections::HashSet;
use std::sync::Arc;

use apollo_compiler::Name;
use apollo_compiler::Node;
use apollo_compiler::Schema;
use apollo_compiler::ast::Argument;
use apollo_compiler::ast::Value;
use apollo_compiler::collections::HashMap;
use apollo_compiler::name;
use apollo_compiler::schema::ExtendedType;
use apollo_compiler::validation::Valid;
use carryover::carryover_directives;
use indexmap::IndexMap;
use itertools::Itertools;
use multimap::MultiMap;
use shape::ShapeCase;

use crate::ApiSchemaOptions;
use crate::Supergraph;
use crate::ValidFederationSubgraph;
use crate::connectors::ConnectSpec;
use crate::connectors::Connector;
use crate::error::FederationError;
use crate::link::join_spec_definition::JOIN_CONNECTED_SELECTION_ARGUMENT_NAME;
use crate::merge::merge_subgraphs;
use crate::schema::FederationSchema;
use crate::subgraph::Subgraph;
Expand Down Expand Up @@ -128,6 +135,13 @@ pub fn expand_connectors(
.map(|(connector, sub)| (sub.name.into(), connector))
.collect();

// Add connectedSelection to @join__field for recursive connector types
add_connected_selections(
supergraph.schema.schema(),
&mut new_supergraph,
&connectors_by_service_name,
)?;

let labels_by_service_name = connectors_by_service_name
.iter()
.map(|(service_name, connector)| (service_name.clone(), connector.label.0.clone()))
Expand Down Expand Up @@ -159,6 +173,301 @@ fn contains_connectors(link: &ConnectLink, subgraph: &ValidFederationSubgraph) -
})
}

/// Add `connectedSelection` to `@join__field` directives for entity resolver connectors.
///
/// For any field-level entity resolver connector (not on Query/Mutation root types),
/// this function annotates the corresponding `@join__field` directive with the connector's
/// selection field names so the query graph builder can create restricted copy nodes.
/// This handles both direct cycles (User.friends: [User]) and indirect cycles
/// (Track.modules: [Module], Module.track: Track).
fn add_connected_selections(
original_schema: &Valid<Schema>,
supergraph: &mut FederationSchema,
connectors: &IndexMap<Arc<str>, Connector>,
) -> Result<(), FederationError> {
// Build service_name -> join__Graph enum value name mapping
let service_name_to_enum_value = build_service_name_to_enum_value(supergraph.schema())?;

// Collect (type_name, field_name, enum_value, connected_selection_str) tuples
let mut annotations: Vec<(Name, Name, Name, String)> = Vec::new();

for (service_name, connector) in connectors {
// Only field-level connectors can be recursive
let Some(parent_type_name) = connector.id.directive.parent_type_name() else {
continue;
};

// Only non-root entity resolvers need connectedSelection.
// Root connectors (Query/Mutation) and non-entity-resolver connectors are skipped.
if connector.entity_resolver.is_none()
|| connector.id.directive.on_root_type(original_schema)
{
continue;
}

let Some(graph_enum_value) = service_name_to_enum_value.get(service_name.as_ref()) else {
continue;
};

// The connector's selection field names are the connected selection
let shape = connector.selection.shape();
if let Some(selection_str) = extract_shape_as_field_set(&shape) {
let field_def = connector
.id
.directive
.field_definition(original_schema)
.ok_or_else(|| {
FederationError::internal("field definition not found for connector")
})?;
let return_type_name = field_def.ty.inner_named_type();

// Validate that the selection includes key fields at every level
// where an entity type appears. This ensures the restricted copy
// node has the key fields needed for entity resolution.
validate_entity_keys_in_shape(
original_schema,
&shape,
return_type_name,
&connector.id.directive.coordinate(),
&mut Vec::new(),
)?;

annotations.push((
parent_type_name.clone(),
field_def.name.clone(),
graph_enum_value.clone(),
selection_str,
));
}
}

if annotations.is_empty() {
return Ok(());
}

// Apply annotations to the supergraph schema
let schema = supergraph.schema_mut();

// Ensure the @join__field directive definition declares `connectedSelection`.
// The expansion may produce join/v0.5 SDL which lacks this argument, but we
// need it so the expanded supergraph passes GraphQL validation.
if let Some(join_field_def) = schema.directive_definitions.get_mut(&name!("join__field")) {
let already_has_arg = join_field_def
.arguments
.iter()
.any(|a| a.name == JOIN_CONNECTED_SELECTION_ARGUMENT_NAME);
if !already_has_arg {
use apollo_compiler::ast::InputValueDefinition;
use apollo_compiler::ast::Type;
let join_field_def = join_field_def.make_mut();
join_field_def
.arguments
.push(Node::new(InputValueDefinition {
description: None,
name: JOIN_CONNECTED_SELECTION_ARGUMENT_NAME,
ty: Node::new(Type::Named(name!("join__FieldSet"))),
default_value: None,
directives: Default::default(),
}));
}
}

for (type_name, field_name, enum_value, selection_str) in annotations {
let Some(ExtendedType::Object(obj)) = schema.types.get_mut(&type_name) else {
continue;
};
let obj = obj.make_mut();
let Some(field) = obj.fields.get_mut(&field_name) else {
continue;
};
let field = field.make_mut();

// Find the @join__field directive matching this graph enum value
for directive in field.directives.0.iter_mut() {
if directive.name != name!("join__field") {
continue;
}
let has_matching_graph = directive.arguments.iter().any(|arg| {
arg.name == name!("graph")
&& matches!(arg.value.as_ref(), Value::Enum(v) if *v == enum_value)
});
if has_matching_graph {
// Add connectedSelection argument
let directive = directive.make_mut();
directive.arguments.push(Node::new(Argument {
name: JOIN_CONNECTED_SELECTION_ARGUMENT_NAME,
value: Node::new(Value::String(selection_str.clone())),
}));
break;
}
}
}

Ok(())
}

/// Build a mapping from service name (e.g. "connectors_Query_user_0") to the
/// join__Graph enum value name (e.g. "CONNECTORS_QUERY_USER_0").
fn build_service_name_to_enum_value(
schema: &Schema,
) -> Result<HashMap<String, Name>, FederationError> {
let mut map = HashMap::default();

let Some(ExtendedType::Enum(join_graph_enum)) = schema.types.get("join__Graph") else {
return Ok(map);
};

for (enum_value_name, enum_value_def) in &join_graph_enum.values {
// Look for @join__graph(name: "service_name") on this enum value
for directive in enum_value_def.directives.get_all(&name!("join__graph")) {
for arg in &directive.arguments {
if arg.name == name!("name")
&& let Value::String(service_name) = arg.value.as_ref()
{
map.insert(service_name.clone(), enum_value_name.clone());
}
}
}
}

Ok(map)
}

/// Extract a FieldSet string from a Shape, recursing into composite fields.
///
/// For a shape representing `{ id, name, friends: [{ id, name }] }`, this
/// produces `"id name friends { id name }"`. This ensures the restricted copy
/// node in the query graph accurately reflects which fields (including nested
/// ones) the connector's HTTP endpoint returns, avoiding unnecessary entity
/// resolution fetches.
fn extract_shape_as_field_set(shape: &shape::Shape) -> Option<String> {
match shape.case() {
ShapeCase::Object { fields, .. } => {
let parts: Vec<String> = fields
.iter()
.filter(|(k, _)| k.as_str() != "__typename")
.map(|(k, v)| match extract_shape_as_field_set(v) {
Some(nested) => format!("{k} {{ {nested} }}"),
None => k.to_string(),
})
.collect();
if parts.is_empty() {
None
} else {
Some(parts.join(" "))
}
}
// Handle arrays: extract from the tail (element type)
ShapeCase::Array { tail, .. } => extract_shape_as_field_set(tail),
// Handle One (union): try each member
ShapeCase::One(shapes) => {
for member in shapes.iter() {
if let Some(field_set) = extract_shape_as_field_set(member) {
return Some(field_set);
}
}
None
}
_ => None,
}
}

/// Validate that entity types in the connector's selection shape include their
/// key fields at every level. Without key fields, the query planner cannot
/// perform entity resolution to fetch additional fields beyond what the
/// connector returns.
///
/// Checks both the top-level shape (the connector's return type) and any nested
/// composite fields that return entity types.
fn validate_entity_keys_in_shape(
schema: &Schema,
shape: &shape::Shape,
type_name: &Name,
coordinate: &str,
path: &mut Vec<String>,
) -> Result<(), FederationError> {
// Unwrap arrays at the top level
let shape = unwrap_array_shape(shape);

let ShapeCase::Object { fields, .. } = shape.case() else {
return Ok(());
};

// Check if *this* type is an entity — the shape must include its key fields
if let Some(ExtendedType::Object(obj)) = schema.types.get(type_name) {
let key_field_names: Vec<&str> = obj
.directives
.get_all(&name!("join__type"))
.filter_map(|d| {
d.argument_by_name("key", schema)
.ok()
.and_then(|v| v.as_str())
})
.collect();

if !key_field_names.is_empty() {
let shape_field_names: HashSet<&str> = fields.keys().map(|k| k.as_str()).collect();

let any_key_satisfied = key_field_names.iter().any(|key_str| {
key_str
.split_whitespace()
.filter(|f| !f.contains('{') && !f.contains('}'))
.all(|f| shape_field_names.contains(f))
});

if !any_key_satisfied {
let path_str = if path.is_empty() {
type_name.to_string()
} else {
path.join(".")
};
return Err(FederationError::internal(format!(
"Connector `{coordinate}` returns `{type_name}` at \
path `{path_str}` but the selection is missing key field(s) \
required by `@key`. The connector's `selection` must include \
the key fields (e.g. `{example_key}`) so the router can resolve \
additional fields via entity resolution.",
example_key = key_field_names.first().unwrap_or(&"id"),
)));
}
}
}

// Recurse into nested composite fields
let type_fields = match schema.types.get(type_name) {
Some(ExtendedType::Object(obj)) => &obj.fields,
Some(ExtendedType::Interface(iface)) => &iface.fields,
_ => return Ok(()),
};

for (field_name, field_shape) in fields {
if field_name.as_str() == "__typename" {
continue;
}
let Some(field_def) = type_fields.get(field_name.as_str()) else {
continue;
};
let return_type_name = field_def.ty.inner_named_type();
let inner_shape = unwrap_array_shape(field_shape);

if matches!(inner_shape.case(), ShapeCase::Object { .. }) {
path.push(field_name.to_string());
validate_entity_keys_in_shape(schema, inner_shape, return_type_name, coordinate, path)?;
path.pop();
}
}

Ok(())
}

/// Unwrap array shapes to get the element type.
fn unwrap_array_shape(shape: &shape::Shape) -> &shape::Shape {
match shape.case() {
ShapeCase::Array { tail, .. } => unwrap_array_shape(tail),
_ => shape,
}
}

/// Split up a subgraph so that each connector directive becomes its own subgraph.
///
/// Subgraphs passed to this function should contain connector directives.
Expand Down
Loading
Loading