-
Notifications
You must be signed in to change notification settings - Fork 215
fix(core): properly propagate stream cancellation on disconnect (#1349) #1354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
2ac57d3
8b9ba82
52c3fd0
3b0cf54
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "@workflow/core": patch | ||
| --- | ||
|
|
||
| fix(core): properly propagate stream cancellation on disconnect |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -109,6 +109,45 @@ | |||||||||||||||||||||
| expect(streamClosed).toBe(true); | ||||||||||||||||||||||
| }); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| it('should handle abort signal propagating back up flushablePipe when reader disconnects early', async () => { | ||||||||||||||||||||||
| const chunks: string[] = []; | ||||||||||||||||||||||
| let sinkAborted = false; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Create a sink that aborts (representing a dropped connection) | ||||||||||||||||||||||
| const mockSink = new WritableStream<string>({ | ||||||||||||||||||||||
| write(chunk) { | ||||||||||||||||||||||
| chunks.push(chunk); | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| abort(reason) { | ||||||||||||||||||||||
| sinkAborted = true; | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| }); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| const { readable, writable } = new TransformStream<string, string>(); | ||||||||||||||||||||||
| const state = createFlushableState(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Start piping in background | ||||||||||||||||||||||
| const pipePromise = flushablePipe(readable, mockSink, state); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| pollWritableLock(writable, state); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| const userWriter = writable.getWriter(); | ||||||||||||||||||||||
| await userWriter.write('valid chunk'); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Simulate a stream error / drop on the readable side (which aborts the pipe) | ||||||||||||||||||||||
| const error = new Error('Client disconnected'); | ||||||||||||||||||||||
| readable.cancel(error); | ||||||||||||||||||||||
|
Check failure on line 139 in packages/core/src/flushable-stream.test.ts
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
||||||||||||||||||||||
| // Write should fail because the underlying pipe broke | ||||||||||||||||||||||
| await expect(userWriter.write('another')).rejects.toThrow(); | ||||||||||||||||||||||
|
Check failure on line 142 in packages/core/src/flushable-stream.test.ts
|
||||||||||||||||||||||
|
||||||||||||||||||||||
| // Write should fail because the underlying pipe broke | |
| await expect(userWriter.write('another')).rejects.toThrow(); | |
| // Wait for the pipe to process the error | |
| await pipePromise; | |
| // State promise should reject with the disconnection error | |
| await expect(state.promise).rejects.toThrow('Client disconnected'); | |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // State promise should reject with the cancellation error | |
| await expect(state.promise).rejects.toThrow('Client disconnected'); | |
| // Ensure the sink received the abort signal | |
| expect(sinkAborted).toBe(true); | |
| // The first chunk should have been written before the error | |
| expect(chunks).toContain('valid chunk'); | |
| // Ensure the stream ended | |
| expect(state.streamEnded).toBe(true); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -219,7 +219,12 @@ export async function flushablePipe( | |
|
|
||
| // Read from source - don't count as pending op since we're just waiting for data | ||
| // The important ops are writes to the sink (server) | ||
| const readResult = await reader.read(); | ||
| const readResult = await Promise.race([ | ||
| reader.read(), | ||
| writer.closed.then(() => { | ||
| throw new Error('Writable stream closed prematurely'); | ||
| }), | ||
| ]); | ||
|
|
||
| // Check if stream has ended (e.g., due to error in another path) before processing | ||
| if (state.streamEnded) { | ||
|
|
@@ -257,8 +262,14 @@ export async function flushablePipe( | |
| // while other callers may depend on this rejection. Some known callers | ||
| // explicitly ignore this rejection (`.catch(() => {})`) and rely solely | ||
| // on `state.reject(err)` for error handling. | ||
|
|
||
| // Attempt to cancel the upstream reader so the source knows it should stop generating data. | ||
| reader.cancel(err).catch(() => {}); | ||
|
||
| throw err; | ||
| } finally { | ||
| // If we're exiting normally but the stream was externally ended before completion, | ||
| // we should cancel the reader to notify the source. | ||
| reader.cancel().catch(() => {}); | ||
| reader.releaseLock(); | ||
| writer.releaseLock(); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.