Skip to content

Commit 7569b10

Browse files
authored
fix(cloudflare): Use TransformStream to keep track of streams (#20452)
closes #20409 closes [JS-2233](https://linear.app/getsentry/issue/JS-2233/sentrycloudflare-root-httpserver-span-lost-on-streaming-responses) ## Problem The issue is that we had a `waitUntil?.(streamMonitor);` that waited until the stream was done. `streamMonitor` could [potentially live forever](https://developers.cloudflare.com/workflows/reference/limits/#wall-time-limits-by-invocation-type), while `waitUntil` has a limit of [30 seconds](https://developers.cloudflare.com/workers/runtime-apis/context/#waituntil) until it is getting cancelled. With the current approach we allow streams only being open for 30 seconds - then the `waitUntil` would cancel the request. This can only be reproduced when deployed, that is the reason why we didn't notice in our test cases. ## Solution By removing the `waitUntil` there is no hard limit, but we still have to wait until the stream is over in order to end our spans and flush accordingly. This can be achieved with [TransformStream](https://developers.cloudflare.com/workers/runtime-apis/streams/transformstream/), where we simply use the stream body from the client and pipe it through our transformer. With `flush` and `cancel` we know exactly when the stream would end or be cancelled - which is the only thing we need. There is btw no reason to add integration of E2E tests, as `miniflare` doesn't have this limitation and it couldn't be reproduced, so the tests would always succeed. The unit tests are kinda simulating that by checking if `waitUntil` is getting called or not. I also figured that the client isn't getting disposed and would leak memory - this PR is also fixing that (see screenshots). ## Some evidence repro: https://github.com/JPeer264/sentry-repros/tree/issue-20409 cloudflare logs before: <img width="878" height="580" alt="image" src="https://github.com/user-attachments/assets/82636c8f-5471-40d6-a058-9d60928f3359" /> cloudflare logs after: <img width="877" height="708" alt="image" src="https://github.com/user-attachments/assets/7cb8f96f-b1c5-40e5-a838-47eafe474223" /> Sentry trace: https://sentry-sdks.sentry.io/explore/traces/trace/29a307b1272e48dbb3a87c270c487e5a/ <img width="892" height="82" alt="Screenshot 2026-04-22 at 15 03 32" src="https://github.com/user-attachments/assets/c95097ee-8c58-428f-ad93-3dbc0b656ac7" />
1 parent a4c9686 commit 7569b10

3 files changed

Lines changed: 203 additions & 43 deletions

File tree

packages/cloudflare/src/request.ts

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -129,40 +129,36 @@ export function wrapRequestHandler(
129129
const classification = classifyResponseStreaming(res);
130130

131131
if (classification.isStreaming && res.body) {
132-
// Streaming response detected - monitor consumption to keep span alive
133132
try {
134-
const [clientStream, monitorStream] = res.body.tee();
135-
136-
// Monitor stream consumption and end span when complete
137-
const streamMonitor = (async () => {
138-
const reader = monitorStream.getReader();
139-
140-
try {
141-
let done = false;
142-
while (!done) {
143-
const result = await reader.read();
144-
done = result.done;
145-
}
146-
} catch {
147-
// Stream error or cancellation - will end span in finally
148-
} finally {
149-
reader.releaseLock();
150-
span.end();
151-
waitUntil?.(flushAndDispose(client));
152-
}
153-
})();
154-
155-
// Keep worker alive until stream monitoring completes (otherwise span won't end)
156-
waitUntil?.(streamMonitor);
157-
158-
// Return response with client stream
159-
return new Response(clientStream, {
133+
let ended = false;
134+
135+
const endSpanOnce = (): void => {
136+
if (ended) return;
137+
138+
ended = true;
139+
span.end();
140+
waitUntil?.(flushAndDispose(client));
141+
};
142+
143+
const transform = new TransformStream({
144+
flush() {
145+
// Source stream completed normally.
146+
endSpanOnce();
147+
},
148+
cancel() {
149+
// Client disconnected (or downstream cancelled). The `cancel`
150+
// is being called while the response is still considered
151+
// active, so this is a safe place to end the span.
152+
endSpanOnce();
153+
},
154+
});
155+
156+
return new Response(res.body.pipeThrough(transform), {
160157
status: res.status,
161158
statusText: res.statusText,
162159
headers: res.headers,
163160
});
164-
} catch (_e) {
165-
// tee() failed (e.g stream already locked) - fall back to non-streaming handling
161+
} catch {
166162
span.end();
167163
waitUntil?.(flushAndDispose(client));
168164
return res;

packages/cloudflare/test/instrumentations/worker/instrumentFetch.test.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,11 @@ describe('instrumentFetch', () => {
163163
const wrappedHandler = withSentry(vi.fn(), handler);
164164
const waits: Promise<unknown>[] = [];
165165
const waitUntil = vi.fn(promise => waits.push(promise));
166-
await wrappedHandler.fetch?.(new Request('https://example.com'), MOCK_ENV_WITHOUT_DSN, {
167-
waitUntil,
168-
} as unknown as ExecutionContext);
166+
await wrappedHandler
167+
.fetch?.(new Request('https://example.com'), MOCK_ENV_WITHOUT_DSN, {
168+
waitUntil,
169+
} as unknown as ExecutionContext)
170+
.then(response => response.text());
169171
expect(flush).not.toBeCalled();
170172
expect(waitUntil).toBeCalled();
171173
vi.advanceTimersToNextTimer().runAllTimers();

packages/cloudflare/test/request.test.ts

Lines changed: 173 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ const MOCK_OPTIONS: CloudflareOptions = {
1414
dsn: 'https://public@dsn.ingest.sentry.io/1337',
1515
};
1616

17+
const NODE_MAJOR_VERSION = parseInt(process.versions.node.split('.')[0]!);
18+
1719
function addDelayedWaitUntil(context: ExecutionContext) {
1820
context.waitUntil(new Promise<void>(resolve => setTimeout(() => resolve())));
1921
}
@@ -44,7 +46,7 @@ describe('withSentry', () => {
4446
await wrapRequestHandler(
4547
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context },
4648
() => new Response('test'),
47-
);
49+
).then(response => response.text());
4850

4951
expect(waitUntilSpy).toHaveBeenCalledTimes(1);
5052
expect(waitUntilSpy).toHaveBeenLastCalledWith(expect.any(Promise));
@@ -111,11 +113,8 @@ describe('withSentry', () => {
111113

112114
await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => {
113115
addDelayedWaitUntil(context);
114-
const response = new Response('test');
115-
// Add Content-Length to skip probing
116-
response.headers.set('content-length', '4');
117-
return response;
118-
});
116+
return new Response('test');
117+
}).then(response => response.text());
119118
expect(waitUntil).toBeCalled();
120119
vi.advanceTimersToNextTimer().runAllTimers();
121120
await Promise.all(waits);
@@ -336,7 +335,7 @@ describe('withSentry', () => {
336335
SentryCore.captureMessage('sentry-trace');
337336
return new Response('test');
338337
},
339-
);
338+
).then(response => response.text());
340339

341340
// Wait for async span end and transaction capture
342341
await new Promise(resolve => setTimeout(resolve, 50));
@@ -389,10 +388,8 @@ describe('flushAndDispose', () => {
389388
const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
390389

391390
await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => {
392-
const response = new Response('test');
393-
response.headers.set('content-length', '4');
394-
return response;
395-
});
391+
return new Response('test');
392+
}).then(response => response.text());
396393

397394
// Wait for all waitUntil promises to resolve
398395
await Promise.all(waits);
@@ -518,6 +515,171 @@ describe('flushAndDispose', () => {
518515
disposeSpy.mockRestore();
519516
});
520517

518+
// Regression tests for https://github.com/getsentry/sentry-javascript/issues/20409
519+
//
520+
// Pre-fix: streaming responses were observed via `body.tee()` + a long-running
521+
// `waitUntil(streamMonitor)`. Cloudflare caps `waitUntil` at ~30s after the
522+
// handler returns, so any stream taking longer than 30s to fully emit had the
523+
// monitor cancelled before `span.end()` / `flushAndDispose()` ran — silently
524+
// dropping the root `http.server` span.
525+
//
526+
// Post-fix: the body is piped through a passthrough `TransformStream`; the
527+
// `flush` (normal completion) and `cancel` (client disconnect) callbacks fire
528+
// while the response stream is still active (no waitUntil cap), so they can
529+
// safely end the span and register `flushAndDispose` via a fresh `waitUntil`
530+
// window. The contract guaranteed below: `waitUntil` is NOT called with any
531+
// long-running stream-observation promise — only with `flushAndDispose`, and
532+
// only after the response stream has finished (either by completion or cancel).
533+
describe('regression #20409: streaming responses do not park stream observation in waitUntil', () => {
534+
test('waitUntil is not called until streaming response is fully delivered', async () => {
535+
const waits: Promise<unknown>[] = [];
536+
const waitUntil = vi.fn((promise: Promise<unknown>) => waits.push(promise));
537+
const context = { waitUntil } as unknown as ExecutionContext;
538+
539+
const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
540+
const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose');
541+
542+
// Stream emits chunk1, then waits indefinitely until we open the gate
543+
// before emitting chunk2 + closing. Models a long-running upstream
544+
// (e.g. SSE / LLM streaming) whose body takes longer than the
545+
// handler-return time to fully drain.
546+
let releaseLastChunk!: () => void;
547+
const lastChunkGate = new Promise<void>(resolve => {
548+
releaseLastChunk = resolve;
549+
});
550+
551+
const stream = new ReadableStream({
552+
async start(controller) {
553+
controller.enqueue(new TextEncoder().encode('chunk1'));
554+
await lastChunkGate;
555+
controller.enqueue(new TextEncoder().encode('chunk2'));
556+
controller.close();
557+
},
558+
});
559+
560+
const result = await wrapRequestHandler(
561+
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context },
562+
() => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }),
563+
);
564+
565+
// Handler has returned, but the source stream has NOT closed yet.
566+
// The pre-fix code would have already enqueued a long-running
567+
// `waitUntil(streamMonitor)` task at this point. The post-fix code
568+
// must not call waitUntil at all here.
569+
expect(waitUntil).not.toHaveBeenCalled();
570+
571+
// Drain the response — Cloudflare would do this when forwarding to the client.
572+
const reader = result.body!.getReader();
573+
await reader.read(); // chunk1
574+
// Source still hasn't closed — still no waitUntil.
575+
expect(waitUntil).not.toHaveBeenCalled();
576+
577+
releaseLastChunk();
578+
await reader.read(); // chunk2
579+
await reader.read(); // done
580+
reader.releaseLock();
581+
582+
// Stream completed → TransformStream `flush` fired → span ended →
583+
// `flushAndDispose(client)` queued via waitUntil exactly once.
584+
await Promise.all(waits);
585+
expect(waitUntil).toHaveBeenCalledTimes(1);
586+
expect(waitUntil).toHaveBeenLastCalledWith(expect.any(Promise));
587+
expect(flushSpy).toHaveBeenCalled();
588+
expect(disposeSpy).toHaveBeenCalled();
589+
590+
flushSpy.mockRestore();
591+
disposeSpy.mockRestore();
592+
});
593+
594+
// Node 18's TransformStream does not invoke the transformer's `cancel` hook
595+
// when the downstream consumer cancels (WHATWG spec addition landed in Node 20).
596+
// Cloudflare Workers run modern V8 where this works, so we only skip the
597+
// test under Node 18.
598+
test.skipIf(NODE_MAJOR_VERSION < 20)(
599+
'waitUntil is called once and dispose runs when client cancels mid-stream',
600+
async () => {
601+
const waits: Promise<unknown>[] = [];
602+
const waitUntil = vi.fn((promise: Promise<unknown>) => waits.push(promise));
603+
const context = { waitUntil } as unknown as ExecutionContext;
604+
605+
const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
606+
const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose');
607+
608+
// Stream emits one chunk and then never closes — models an upstream
609+
// that keeps emitting indefinitely. We then cancel the response from
610+
// the consumer side to model a client disconnect.
611+
let sourceCancelled = false;
612+
const stream = new ReadableStream({
613+
start(controller) {
614+
controller.enqueue(new TextEncoder().encode('chunk1'));
615+
// intentionally don't close
616+
},
617+
cancel() {
618+
sourceCancelled = true;
619+
},
620+
});
621+
622+
const result = await wrapRequestHandler(
623+
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context },
624+
() => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }),
625+
);
626+
627+
// Handler returned, source still open — no waitUntil yet.
628+
expect(waitUntil).not.toHaveBeenCalled();
629+
630+
const reader = result.body!.getReader();
631+
await reader.read(); // chunk1
632+
await reader.cancel('client disconnected'); // simulates client disconnect
633+
reader.releaseLock();
634+
635+
// TransformStream `cancel` fired → span ended → flushAndDispose queued.
636+
await Promise.all(waits);
637+
expect(waitUntil).toHaveBeenCalledTimes(1);
638+
expect(waitUntil).toHaveBeenLastCalledWith(expect.any(Promise));
639+
expect(flushSpy).toHaveBeenCalled();
640+
expect(disposeSpy).toHaveBeenCalled();
641+
// pipeThrough should also propagate the cancel upstream to the source.
642+
expect(sourceCancelled).toBe(true);
643+
644+
flushSpy.mockRestore();
645+
disposeSpy.mockRestore();
646+
},
647+
);
648+
649+
test('waitUntil is called exactly once even if the response is consumed multiple times', async () => {
650+
// Sanity: no matter how the response is drained, the TransformStream's
651+
// flush callback must only end the span (and queue flushAndDispose) once.
652+
const waits: Promise<unknown>[] = [];
653+
const waitUntil = vi.fn((promise: Promise<unknown>) => waits.push(promise));
654+
const context = { waitUntil } as unknown as ExecutionContext;
655+
656+
const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
657+
const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose');
658+
659+
const stream = new ReadableStream({
660+
start(controller) {
661+
controller.enqueue(new TextEncoder().encode('a'));
662+
controller.enqueue(new TextEncoder().encode('b'));
663+
controller.close();
664+
},
665+
});
666+
667+
const result = await wrapRequestHandler(
668+
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context },
669+
() => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }),
670+
);
671+
672+
const text = await result.text();
673+
expect(text).toBe('ab');
674+
675+
await Promise.all(waits);
676+
expect(waitUntil).toHaveBeenCalledTimes(1);
677+
678+
flushSpy.mockRestore();
679+
disposeSpy.mockRestore();
680+
});
681+
});
682+
521683
test('dispose is NOT called for protocol upgrade responses (status 101)', async () => {
522684
const context = createMockExecutionContext();
523685
const waits: Promise<unknown>[] = [];

0 commit comments

Comments
 (0)