Skip to content

Conversation

@enisdenjo
Copy link
Member

@enisdenjo enisdenjo commented Dec 13, 2025

Ref router-165

Implement SSE, Incremental Delivery over HTTP and Apollo's Multipart HTTP specs for subscriptions when communicating with subgraphs or clients with entity resolution capabilities.

What changed?

Streaming execution result

lib/executor/src/execution/plan.rs@QueryPlanExecutionResult

The execution pipeline now returns QueryPlanExecutionResult - an enum that can be either a single response or a stream:

pub enum QueryPlanExecutionResult {
    Single(PlanExecutionOutput),
    Stream(PlanSubscriptionOutput),
}

This separation allows the query planner and executor to operate independently from transport concerns, making future improvements (like connection repair and silent retries) easier to implement.

Owned context for streaming

lib/executor/src/execution/plan.rs@OwnedQueryPlanExecutionContext

Subscriptions require long-lived contexts that outlive request lifetimes. The implementation introduces an owned context that:

  • Clones Arc-wrapped shared data (executors, schema metadata, projection/header plans)
  • Enables entity resolution to happen independently for each subscription event
  • Processes remaining plan nodes after the subscription fetch

Subscription handlers

The router respects the client's Accept header to determine response format:

  • text/event-stream → SSE responses
  • multipart/mixed → Incremental Delivery over HTTP
  • multipart/mixed;subscriptionSpec="1.0" → Apollo multipart HTTP
  • Returns 406 Not Acceptable if subscription is requested over unsupported transport
  • Handles errors by emitting an error event and completing the stream
  • Heartbeats every 10 seconds (except for incremental delivery, doesn't have it)
  • Of course protocols used between router and subgraphs and clients can be different

Same behavior is expected when communicating with subgraphs.

SSE

lib/executor/src/executors/sse.rs

Implements the GraphQL over SSE spec distinct connection mode.

Multipart protocol

lib/executor/src/executors/multipart_subscribe.rs

Implements Apollo's multipart HTTP spec and GraphQL's Incremental Delivery over HTTP RFC.

Entity resolution

lib/executor/src/executors/multipart_subscribe.rs@execute_plan_with_initial_data

When a subscription emits data that references entities from other subgraphs, the router:

  1. Receives subscription event from primary subgraph
  2. Executes remaining plan nodes (flatten, fetch, etc.) to populate missing fields
  3. Projects the complete response
  4. Streams to client

This is handled in the async stream generator in execute_query_plan():

while let Some(response) = response_stream.next().await {
    // Parse subgraph response
    // Execute entity resolution if needed (remaining plan nodes)
    // Project and yield to client
}

Subscription node in query plan

lib/query-planner/src/planner/query_plan.rs@wrap_subscription_fetch_nodes

SubscriptionNode is used and now wraps subscription fetch operations:

pub struct SubscriptionNode {
    // It will practically always be a FetchNode
    pub primary: FetchNode,
}

The query planner detects subscription operations and wraps them appropriately, enabling plans with entity resolution like:

Sequence [
  SubscriptionNode { primary: Fetch(reviews) },
  Flatten(products),
  Fetch(products)
]

Subgraph executor subscribe method

lib/executor/src/executors/common.rs@subscribe

The HTTP executor gains a subscribe() method that:

  • Negotiates content-type (prefers multipart, falls back to SSE)
  • Establishes long-lived connections to subgraphs
  • Returns a BoxStream<HttpExecutionResponse> for downstream processing

Configure to only enable/disable subscriptions

The supported subscription protocols in this PR are inherintly HTTP and do not need a "protocol" configuration option. Hive Router will send an accept header listing all supported protocols for subscriptions over HTTP and the subgraph is free to choose whichever one it supports.

Whether we really want to limit specific protocols is up to discussion but objectively there is no benefit since they're all streamed HTTP.

Hence, you can only:

supergraph:
    source: file
    path: supergraph.graphql
subscriptions:
    enabled: true

P.S. if the subscriptions are disabled, the router will respond with when receiving a subscription request:

HTTP/1.1 415 Unsupported Media Type
Content-Type: application/json

{"errors":[{"message":"Subscriptions are not supported","extensions":{"code":"SUBSCRIPTIONS_NOT_SUPPORT"}}]}

What didn't change?

No HTTP Callback Protocol

Intentionally excluded to keep the PR focused. This would require webhook infrastructure in the router, which adds significant complexity. The SSE and multipart protocols cover the vast majority of use cases.

No WebSockets

At the moment only HTTP protocols. Just to keep the PR smaller, it can be integrating easily due to the separation of modules.

Silent Retries & Upstream Connection Repair

Most GraphQL subgraph implementations are stateless for subscriptions and have no concept of "continuing" from where they left off after a connection loss. Implementing retry logic on the router side would create false expectations - users would assume all events are delivered, but some would be lost when the subgraph creates a fresh subscription.

This is fundamentally why the EDFS (Event-Driven Federated Subscriptions) and callback protocols exist. To avoid misleading behavior and keep the PR focused on the core functionality, connection repair is not implemented.

TODO

  • documentation
  • changesets
  • e2e tests for error handling
  • e2e tests for header propagation
  • use details from the actual client request for entity resolution towards subgraphs of events
  • test subgraph disconnect propagation
  • use apollo's multipart http payload for transport erros
  • test client disconnect propagation
    • hard to test, would need changing the source code because async being state machines allows it just to stop execution in any suspended state
  • usage reporting (bin/router/src/pipeline/mod.rs#305)
  • http callback protocol
  • websockets
  • performance considerations
  • entity resolution failures dont end the stream, should they? a subgraph might recover, usually you dont end the whole stream. they should
  • subscriptions deduplication (same subs same selection sets same headers)
  • reloading the supergraph
  • limits, things like client connection or event queue
  • configure enable/disable subscriptions
  • configure accepted protocols
    • read description

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @enisdenjo, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces comprehensive support for GraphQL subscriptions within the router. It fundamentally changes how the router handles long-lived connections and streaming data, moving from a single-response model to one that can manage continuous data flows. This enables real-time data updates for clients and subgraphs, complete with dynamic protocol negotiation and on-the-fly entity resolution for each event in a subscription stream.

Highlights

  • GraphQL Subscriptions Support: The router now fully supports GraphQL subscriptions, implementing both Server-Sent Events (SSE) and Apollo's Multipart HTTP protocols for communication with subgraphs and clients.
  • Streaming Execution Result: The execution pipeline has been refactored to return a QueryPlanExecutionResult enum, which can be either a single response or a stream, enabling native handling of long-lived subscription connections.
  • Owned Context for Streaming: A new OwnedQueryPlanExecutionContext is introduced to manage long-lived contexts required for subscriptions, allowing entity resolution to occur independently for each event within a stream.
  • Dynamic Subscription Handlers: The router intelligently determines the appropriate subscription protocol (SSE or Multipart) based on the client's Accept header, returning a 406 Not Acceptable if unsupported transports are requested.
  • Entity Resolution for Subscription Events: The router can now perform entity resolution for subscription events, fetching missing fields from other subgraphs after receiving initial data from the primary subgraph, and then streaming the complete response to the client.
  • Subscription Node in Query Plan: A new SubscriptionNode has been added to the query planner, specifically designed to wrap primary fetch operations for subscriptions, enabling complex plans that include entity resolution.
  • Subgraph Executor subscribe() Method: The HTTP executor gains a subscribe() method, allowing the router to negotiate content-type (preferring multipart), establish long-lived connections to subgraphs, and return a BoxStream for downstream processing.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant new functionality for GraphQL subscriptions, including support for SSE and multipart protocols, entity resolution for subscription events, and updates to the query planner. The overall architecture is well-thought-out, especially the separation of concerns with QueryPlanExecutionResult and the use of an owned context for long-lived subscriptions.

My review has identified a critical safety issue regarding an unsafe block that could lead to a use-after-free bug. I've also found several areas where manual JSON construction is used, which is brittle and should be replaced with safer serialization. Additionally, there are opportunities to improve performance in the stream handling logic by reducing string allocations, and a potential panic in the benchmark code. Please see the detailed comments for suggestions on how to address these points.

) -> Result<PlanExecutionOutput, PlanExecutionError> {
// Clone initial_data to make it 'static for ExecutionContext
// SAFETY: We're creating a new owned value that will be used within this function
let owned_data: Value<'exec> = unsafe { std::mem::transmute(initial_data.clone()) };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The use of unsafe and mem::transmute here to extend the lifetime of initial_data is dangerous and likely unsound. The initial_data is a Value<'_> that borrows from response_body, which is only valid for the current loop iteration. Because execute_plan_with_initial_data is async, the future it returns can be suspended and resumed after response_body is no longer valid, leading to a use-after-free.

To fix this, you should create a fully owned Value<'static>. You can do this by implementing a helper function that recursively clones the Value data, converting any borrowed Cows to owned ones. This avoids the need for unsafe code.

For example, you could create a function fn to_owned_value<'a>(v: &Value<'a>) -> Value<'static> and use it like let owned_data: Value<'exec> = to_owned_value(&initial_data);.

@github-actions
Copy link

github-actions bot commented Dec 13, 2025

k6-benchmark results

     ✓ response code was 200
     ✓ no graphql errors
     ✓ valid response structure

     █ setup

     checks.........................: 100.00% ✓ 202020      ✗ 0    
     data_received..................: 5.9 GB  196 MB/s
     data_sent......................: 79 MB   2.6 MB/s
     http_req_blocked...............: avg=3.26µs   min=711ns   med=1.91µs  max=5.63ms   p(90)=2.74µs  p(95)=3.14µs  
     http_req_connecting............: avg=373ns    min=0s      med=0s      max=1.62ms   p(90)=0s      p(95)=0s      
     http_req_duration..............: avg=21.78ms  min=2.23ms  med=20.82ms max=125.07ms p(90)=29.71ms p(95)=32.88ms 
       { expected_response:true }...: avg=21.78ms  min=2.23ms  med=20.82ms max=125.07ms p(90)=29.71ms p(95)=32.88ms 
     http_req_failed................: 0.00%   ✓ 0           ✗ 67360
     http_req_receiving.............: avg=137.78µs min=26.32µs med=42.17µs max=70.48ms  p(90)=86.3µs  p(95)=402.26µs
     http_req_sending...............: avg=24.51µs  min=5.54µs  med=11.23µs max=19.33ms  p(90)=16.65µs p(95)=26.77µs 
     http_req_tls_handshaking.......: avg=0s       min=0s      med=0s      max=0s       p(90)=0s      p(95)=0s      
     http_req_waiting...............: avg=21.62ms  min=2.17ms  med=20.69ms max=68.38ms  p(90)=29.45ms p(95)=32.55ms 
     http_reqs......................: 67360   2239.936953/s
     iteration_duration.............: avg=22.27ms  min=7.14ms  med=21.18ms max=208.79ms p(90)=30.13ms p(95)=33.39ms 
     iterations.....................: 67340   2239.271888/s
     vus............................: 50      min=50        max=50 
     vus_max........................: 50      min=50        max=50 

@github-actions
Copy link

github-actions bot commented Dec 13, 2025

🐋 This PR was built and pushed to the following Docker images:

Image Names: ghcr.io/graphql-hive/router

Platforms: linux/amd64,linux/arm64

Image Tags: ghcr.io/graphql-hive/router:pr-620 ghcr.io/graphql-hive/router:sha-c80aa36

Docker metadata
{
"buildx.build.ref": "builder-d43cc119-50af-408e-8735-d33cf0064b10/builder-d43cc119-50af-408e-8735-d33cf0064b100/nb9ci3y07ou5g7xv5l5v1b0di",
"containerimage.descriptor": {
  "mediaType": "application/vnd.oci.image.index.v1+json",
  "digest": "sha256:3956b78c27374b42580546766756a51ee3a6b8709447eab32af28d1150ec2034",
  "size": 1609
},
"containerimage.digest": "sha256:3956b78c27374b42580546766756a51ee3a6b8709447eab32af28d1150ec2034",
"image.name": "ghcr.io/graphql-hive/router:pr-620,ghcr.io/graphql-hive/router:sha-c80aa36"
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant