-
Notifications
You must be signed in to change notification settings - Fork 208
Description
Summary
When a workflow stream is returned directly from a route handler via run.readable or run.getReadable(), repeated client disconnects / refreshes do not appear to cancel the underlying workflow stream reader.
In local dev this eventually causes leaked EventEmitter listeners and MaxListenersExceededWarning warnings for chunk:* / close:* stream events.
This reproduces both when the stream is handed to createUIMessageStreamResponse(...) and when it is returned directly with new Response(run.getReadable(), ...).
Versions
workflow:4.1.0-beta.60@workflow/ai:4.0.1-beta.54ai:6.0.97@ai-sdk/react:3.0.99next:16.1.6
Minimal shape of the route
import { createUIMessageStreamResponse } from 'ai';
import { start } from 'workflow/api';
export async function POST() {
const run = await start(myWorkflow, [/* args */]);
return createUIMessageStreamResponse({
stream: run.readable,
headers: {
'x-workflow-run-id': run.runId,
},
});
}Resume route:
export async function GET(
request: Request,
{ params }: { params: Promise<{ id: string }> },
) {
const { id } = await params;
const { searchParams } = new URL(request.url);
const startIndex = Number.parseInt(searchParams.get('startIndex') ?? '0', 10);
const run = getRun(id);
return createUIMessageStreamResponse({
stream: run.getReadable({ startIndex }),
});
}Repro
- Start a workflow that streams for a while.
- Connect to it from the browser.
- Refresh repeatedly during the active stream, around 10 times is enough.
- Keep resuming the same run.
Actual behavior
Eventually local dev logs show:
MaxListenersExceededWarning: Possible EventEmitter memory leak detected.
11 chunk:strm_..._user listeners added to [EventEmitter].
11 close:strm_..._user listeners added to [EventEmitter].The app may still function, but old readers appear to remain subscribed.
Expected behavior
When the HTTP response is aborted because the client refreshes / disconnects, the abandoned workflow stream reader should be canceled and its listeners/subscriptions should be removed.
run.readable / run.getReadable() should be safe to hand directly to a streaming HTTP response without leaking listeners.
Likely root cause
Run.readablejust delegates togetReadable().getReadable()goes through the external reviver pipeline rather than returning the world stream directly.- That pipeline creates a
WorkflowServerReadableStream, butWorkflowServerReadableStreamdoes not implementcancel(). - The reviver path then manually pumps that source stream through
flushablePipe(...). - When the outer HTTP response / consumer is aborted, cancellation does not seem to propagate all the way back to the source stream reader.
- In the local/postgres streamers, listener cleanup happens from the readable stream's own
cancel(), so abandoned readers stay subscribed.
So the missing piece looks like cancel propagation through the built-in run.getReadable() pipeline, not just the route integration.
Workaround
I was able to avoid the leak by bypassing run.readable / run.getReadable() and instead:
- reading the raw workflow stream directly
- wrapping it in a
ReadableStreamwhosecancel()callsreader.cancel() - deserializing it back into the expected chunk type
That strongly suggests the missing piece is cancel propagation in the built-in workflow readable.
Suggested fix
Propagate cancellation from the stream returned by run.readable / run.getReadable() all the way down to the underlying world stream reader.
That likely requires both:
- implementing
cancel(reason)onWorkflowServerReadableStreamso it delegates to its inner reader, and - updating the
flushablePipe(...)/ reviver bridge so aborting the outer consumer cancels the source stream instead of only releasing locks.
Note
I also see ResponseAborted in Next dev logs during refresh, but that looks like expected framework disconnect noise. The real issue here is the accumulating MaxListenersExceededWarning.