Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions juniper_graphql_ws/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ All user visible changes to `juniper_graphql_ws` crate will be documented in thi



## master

### Fixed

- Inability to re-subscribe with the same operation `id` after subscription was completed by server. ([#1368])

[#1368]: /../../pull/1368




## [0.5.0] · 2025-09-08
[0.5.0]: /../../tree/juniper_graphql_ws-v0.5.0/juniper_graphql_ws

Expand Down
106 changes: 52 additions & 54 deletions juniper_graphql_ws/src/graphql_transport_ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,68 +158,66 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
} => {
let reactions = match msg {
ClientMessage::Subscribe { id, payload } => {
// Prune stoppers which streams are already completed or canceled.
stoppers.retain(|_, tx| !tx.is_canceled());

if stoppers.contains_key(&id) {
// We already have an operation with this id. We must close the connection.
Output::Close {
code: 4409,
message: format!("Subscriber for {id} already exists"),
}
.into_stream()
} else if config.max_in_flight_operations > 0
&& stoppers.len() >= config.max_in_flight_operations
{
// Too many in-flight operations. Just send back a validation error.
stream::iter(vec![
Output::Message(ServerMessage::Error {
id: id.clone(),
payload: GraphQLError::from(RuleError::new(
"Too many in-flight operations.",
&[],
))
.into(),
}),
Output::Message(ServerMessage::Complete { id }),
])
.boxed()
} else {
// Go ahead and prune canceled stoppers before adding a new one.
stoppers.retain(|_, tx| !tx.is_canceled());

if config.max_in_flight_operations > 0
&& stoppers.len() >= config.max_in_flight_operations
{
// Too many in-flight operations. Just send back a validation error.
stream::iter(vec![
Output::Message(ServerMessage::Error {
id: id.clone(),
payload: GraphQLError::from(RuleError::new(
"Too many in-flight operations.",
&[],
))
.into(),
}),
Output::Message(ServerMessage::Complete { id }),
])
.boxed()
} else {
// Create a channel that we can use to cancel the operation.
let (tx, rx) = oneshot::channel::<()>();
stoppers.insert(id.clone(), tx);

// Create the operation stream. This stream will emit Next and Error
// messages, but will not emit Complete – that part is up to us.
let s = Self::start(
id.clone(),
ExecutionParams {
subscribe_payload: payload,
config: config.clone(),
schema: schema.clone(),
},
)
.into_stream()
.flatten();

// Combine this with our oneshot channel so that the stream ends if the
// oneshot is ever fired.
let s = stream::unfold((rx, s.boxed()), async |(rx, mut s)| {
let next = match future::select(rx, s.next()).await {
Either::Left(_) => None,
Either::Right((r, rx)) => r.map(|r| (r, rx)),
};
next.map(|(r, rx)| (r, (rx, s)))
});

// Once the stream ends, send the Complete message.
let s = s.chain(
Output::Message(ServerMessage::Complete { id }).into_stream(),
);

s.boxed()
}
// Create a channel that we can use to cancel the operation.
let (tx, rx) = oneshot::channel::<()>();
stoppers.insert(id.clone(), tx);

// Create the operation stream. This stream will emit Next and Error
// messages, but will not emit Complete – that part is up to us.
let s = Self::start(
id.clone(),
ExecutionParams {
subscribe_payload: payload,
config: config.clone(),
schema: schema.clone(),
},
)
.into_stream()
.flatten();

// Combine this with our oneshot channel so that the stream ends if the
// oneshot is ever fired.
let s = stream::unfold((rx, s.boxed()), async |(rx, mut s)| {
let next = match future::select(rx, s.next()).await {
Either::Left(_) => None,
Either::Right((r, rx)) => r.map(|r| (r, rx)),
};
next.map(|(r, rx)| (r, (rx, s)))
});

// Once the stream ends, send the Complete message.
let s = s.chain(
Output::Message(ServerMessage::Complete { id }).into_stream(),
);

s.boxed()
}
}
ClientMessage::Complete { id } => {
Expand Down
108 changes: 53 additions & 55 deletions juniper_graphql_ws/src/graphql_ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,66 +141,64 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
} => {
let reactions = match msg {
ClientMessage::Start { id, payload } => {
// Prune stoppers which streams are already completed or canceled.
stoppers.retain(|_, tx| !tx.is_canceled());

if stoppers.contains_key(&id) {
// We already have an operation with this id, so we can't start a new
// one.
stream::empty().boxed()
} else if config.max_in_flight_operations > 0
&& stoppers.len() >= config.max_in_flight_operations
{
// Too many in-flight operations. Just send back a validation error.
stream::iter(vec![
Reaction::ServerMessage(ServerMessage::Error {
id: id.clone(),
payload: GraphQLError::from(RuleError::new(
"Too many in-flight operations.",
&[],
))
.into(),
}),
Reaction::ServerMessage(ServerMessage::Complete { id }),
])
.boxed()
} else {
// Go ahead and prune canceled stoppers before adding a new one.
stoppers.retain(|_, tx| !tx.is_canceled());

if config.max_in_flight_operations > 0
&& stoppers.len() >= config.max_in_flight_operations
{
// Too many in-flight operations. Just send back a validation error.
stream::iter(vec![
Reaction::ServerMessage(ServerMessage::Error {
id: id.clone(),
payload: GraphQLError::from(RuleError::new(
"Too many in-flight operations.",
&[],
))
.into(),
}),
Reaction::ServerMessage(ServerMessage::Complete { id }),
])
.boxed()
} else {
// Create a channel that we can use to cancel the operation.
let (tx, rx) = oneshot::channel::<()>();
stoppers.insert(id.clone(), tx);

// Create the operation stream. This stream will emit Data and Error
// messages, but will not emit Complete – that part is up to us.
let s = Self::start(
id.clone(),
ExecutionParams {
start_payload: payload,
config: config.clone(),
schema: schema.clone(),
},
)
.into_stream()
.flatten();

// Combine this with our oneshot channel so that the stream ends if the
// oneshot is ever fired.
let s = stream::unfold((rx, s.boxed()), async |(rx, mut s)| {
let next = match future::select(rx, s.next()).await {
Either::Left(_) => None,
Either::Right((r, rx)) => r.map(|r| (r, rx)),
};
next.map(|(r, rx)| (r, (rx, s)))
});

// Once the stream ends, send the Complete message.
let s = s.chain(
Reaction::ServerMessage(ServerMessage::Complete { id })
.into_stream(),
);

s.boxed()
}
// Create a channel that we can use to cancel the operation.
let (tx, rx) = oneshot::channel::<()>();
stoppers.insert(id.clone(), tx);

// Create the operation stream. This stream will emit Data and Error
// messages, but will not emit Complete – that part is up to us.
let s = Self::start(
id.clone(),
ExecutionParams {
start_payload: payload,
config: config.clone(),
schema: schema.clone(),
},
)
.into_stream()
.flatten();

// Combine this with our oneshot channel so that the stream ends if the
// oneshot is ever fired.
let s = stream::unfold((rx, s.boxed()), async |(rx, mut s)| {
let next = match future::select(rx, s.next()).await {
Either::Left(_) => None,
Either::Right((r, rx)) => r.map(|r| (r, rx)),
};
next.map(|(r, rx)| (r, (rx, s)))
});

// Once the stream ends, send the Complete message.
let s = s.chain(
Reaction::ServerMessage(ServerMessage::Complete { id })
.into_stream(),
);

s.boxed()
}
}
ClientMessage::Stop { id } => {
Expand Down