Skip to content

Commit e2ee334

Browse files
authored
[core] Support NodeJS Stream cancellation - v2 (#33385)
### Packages impacted by this PR @azure-rest/core-client, and @azure/core-sse ### Issues associated with this PR #33334 ### Describe the problem that is addressed by this PR The type of NodeJS streams used in our core doesn't support cancellation even though the actual runtime object does support it. For context, this type is being used by NodeJS to type ["old style streams" that existed before NodeJS v0.10](https://github.com/DefinitelyTyped/DefinitelyTyped/blob/3a17ff00600a50db103774cfb818685c5677e815/types/node/stream.d.ts#L449), therefore, it doesn't have the `destroy` method. ### What are the possible designs available to address the problem? If there are more than one possible design, why was the one in this PR chosen? A few options have been already discussed in the issue. . ### Are there test cases added in this PR? _(If not, why?)_ No, the changes are types only ### Provide a list of related PRs _(if any)_ N/A ### Command used to generate this PR:**_(Applicable only to SDK release request PRs)_ ### Checklists - [x] Added impacted package name to the issue description - [ ] Does this PR needs any fixes in the SDK Generator?** _(If so, create an Issue in the [Autorest/typescript](https://github.com/Azure/autorest.typescript) repository and link it here)_ - [x] Added a changelog (if necessary)
1 parent 725dcd4 commit e2ee334

File tree

12 files changed

+105
-29
lines changed

12 files changed

+105
-29
lines changed

sdk/core/core-client-rest/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- `asNodeStream` now returns a `NodeJSReadableStream` which can be canceled by calling the `destroy` method.
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/core/core-client-rest/review/core-client.api.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ export type HttpBrowserStreamResponse = HttpResponse & {
9898

9999
// @public
100100
export type HttpNodeStreamResponse = HttpResponse & {
101-
body?: NodeJS.ReadableStream;
101+
body?: NodeJSReadableStream;
102102
};
103103

104104
// @public
@@ -115,6 +115,11 @@ export interface InnerError {
115115
innererror?: InnerError;
116116
}
117117

118+
// @public
119+
export interface NodeJSReadableStream extends NodeJS.ReadableStream {
120+
destroy(error?: Error): void;
121+
}
122+
118123
// @public
119124
export interface OperationOptions {
120125
abortSignal?: AbortSignalLike;

sdk/core/core-client-rest/src/common.ts

+13-1
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,26 @@ export interface Client {
217217
pathUnchecked: PathUnchecked;
218218
}
219219

220+
/**
221+
* A Node.js Readable stream that also has a `destroy` method.
222+
*/
223+
export interface NodeJSReadableStream extends NodeJS.ReadableStream {
224+
/**
225+
* Destroy the stream. Optionally emit an 'error' event, and emit a
226+
* 'close' event (unless emitClose is set to false). After this call,
227+
* internal resources will be released.
228+
*/
229+
destroy(error?: Error): void;
230+
}
231+
220232
/**
221233
* Http Response which body is a NodeJS stream object
222234
*/
223235
export type HttpNodeStreamResponse = HttpResponse & {
224236
/**
225237
* Streamable body
226238
*/
227-
body?: NodeJS.ReadableStream;
239+
body?: NodeJSReadableStream;
228240
};
229241

230242
/**

sdk/core/core-sse/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- `createSseStream` now supports NodeJS streams as input.
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/core/core-sse/review/core-sse.api.md

+8
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ export function createSseStream(chunkStream: ReadableStream<Uint8Array>): EventM
1212
// @public
1313
export function createSseStream(chunkStream: IncomingMessage): EventMessageStream;
1414

15+
// @public
16+
export function createSseStream(chunkStream: NodeJSReadableStream): EventMessageStream;
17+
1518
// @public
1619
export interface EventMessage {
1720
data: string;
@@ -24,6 +27,11 @@ export interface EventMessage {
2427
export interface EventMessageStream extends ReadableStream<EventMessage>, AsyncDisposable, AsyncIterable<EventMessage> {
2528
}
2629

30+
// @public
31+
export interface NodeJSReadableStream extends NodeJS.ReadableStream {
32+
destroy(error?: Error): void;
33+
}
34+
2735
// (No @packageDocumentation comment for this package)
2836

2937
```

sdk/core/core-sse/src/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
// Licensed under the MIT License.
33

44
export { createSseStream } from "./sse.js";
5-
export { EventMessage, EventMessageStream } from "./models.js";
5+
export { EventMessage, EventMessageStream, NodeJSReadableStream } from "./models.js";

sdk/core/core-sse/src/models.ts

+12
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,15 @@ export interface EventMessageStream
2525
AsyncIterable<EventMessage> {}
2626

2727
export type PartialSome<T, K extends keyof T> = Omit<T, K> & Partial<Pick<T, K>>;
28+
29+
/**
30+
* A Node.js Readable stream that also has a `destroy` method.
31+
*/
32+
export interface NodeJSReadableStream extends NodeJS.ReadableStream {
33+
/**
34+
* Destroy the stream. Optionally emit an 'error' event, and emit a
35+
* 'close' event (unless emitClose is set to false). After this call,
36+
* internal resources will be released.
37+
*/
38+
destroy(error?: Error): void;
39+
}

sdk/core/core-sse/src/sse.ts

+16-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@
22
// Licensed under the MIT License.
33

44
import type { IncomingMessage } from "node:http";
5-
import type { EventMessage, EventMessageStream, PartialSome } from "./models.js";
5+
import type {
6+
EventMessage,
7+
EventMessageStream,
8+
NodeJSReadableStream,
9+
PartialSome,
10+
} from "./models.js";
611
import { createStream, ensureAsyncIterable } from "./utils.js";
712

813
enum ControlChars {
@@ -15,17 +20,23 @@ enum ControlChars {
1520
/**
1621
* Processes a response stream into a stream of events.
1722
* @param chunkStream - A stream of Uint8Array chunks
18-
* @returns An async iterable of EventMessage objects
23+
* @returns A stream of EventMessage objects
1924
*/
2025
export function createSseStream(chunkStream: ReadableStream<Uint8Array>): EventMessageStream;
2126
/**
2227
* Processes a response stream into a stream of events.
23-
* @param chunkStream - An async iterable of Uint8Array chunks
24-
* @returns An async iterable of EventMessage objects
28+
* @param chunkStream - A NodeJS HTTP response
29+
* @returns A stream of EventMessage objects
2530
*/
2631
export function createSseStream(chunkStream: IncomingMessage): EventMessageStream;
32+
/**
33+
* Processes a response stream into a stream of events.
34+
* @param chunkStream - A NodeJS Readable stream
35+
* @returns A stream of EventMessage objects
36+
*/
37+
export function createSseStream(chunkStream: NodeJSReadableStream): EventMessageStream;
2738
export function createSseStream(
28-
chunkStream: IncomingMessage | ReadableStream<Uint8Array>,
39+
chunkStream: IncomingMessage | NodeJSReadableStream | ReadableStream<Uint8Array>,
2940
): EventMessageStream {
3041
const { cancel, iterable } = ensureAsyncIterable(chunkStream);
3142
const asyncIter = toMessage(toLine(iterable));

sdk/core/core-sse/src/utils.ts

+8-10
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@
22
// Licensed under the MIT License.
33

44
import type { IncomingMessage } from "node:http";
5-
6-
type Nullable<T, K extends keyof T> = {
7-
[P in K]: T[P] | null;
8-
} & Omit<T, K>;
5+
import type { NodeJSReadableStream } from "./models.js";
96

107
export function createStream<T>(
118
asyncIter: AsyncIterableIterator<T>,
@@ -65,7 +62,7 @@ function iteratorToStream<T>(
6562
}
6663

6764
export function ensureAsyncIterable(
68-
stream: Nullable<IncomingMessage, "socket"> | ReadableStream<Uint8Array>,
65+
stream: IncomingMessage | NodeJSReadableStream | ReadableStream<Uint8Array>,
6966
): {
7067
cancel(): Promise<void>;
7168
iterable: AsyncIterable<Uint8Array>;
@@ -79,11 +76,12 @@ export function ensureAsyncIterable(
7976
} else {
8077
return {
8178
cancel: async () => {
82-
/**
83-
* socket is set to null when the stream has been consumed
84-
* so we need to make sure to guard against nullability.
85-
*/
86-
stream.socket?.end();
79+
// socket could be null if the connection is already closed
80+
if ("socket" in stream && stream.socket) {
81+
stream.socket.end();
82+
} else {
83+
stream.destroy();
84+
}
8785
},
8886
iterable: stream as AsyncIterable<Uint8Array>,
8987
};

sdk/core/core-sse/test/public/node/connection.spec.ts

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4-
import { createSseStream } from "../../../src/index.js";
4+
import { createSseStream, NodeJSReadableStream } from "../../../src/index.js";
55
import { type Client, getClient } from "@azure-rest/core-client";
66
import { assert, beforeAll, beforeEach, afterEach, describe, it } from "vitest";
77
import { port } from "../../server/config.mjs";
8-
import type { IncomingMessage } from "http";
98
import { matrix } from "@azure-tools/test-utils-vitest";
109
import { isRestError } from "@azure/core-rest-pipeline";
1110

@@ -18,7 +17,7 @@ async function sendRequest(
1817
client: Client,
1918
path: string,
2019
abortSignal?: AbortSignal,
21-
): Promise<IncomingMessage> {
20+
): Promise<NodeJSReadableStream> {
2221
const res = await client
2322
.pathUnchecked(path)
2423
.get({ accept: contentType, abortSignal })
@@ -33,7 +32,7 @@ async function sendRequest(
3332
if (!receivedContentType.includes(contentType)) {
3433
throw new Error(`Expected a text/event-stream content but received"${receivedContentType}"`);
3534
}
36-
return res.body as IncomingMessage;
35+
return res.body;
3736
}
3837

3938
describe("[Node] Connections", () => {
@@ -71,7 +70,7 @@ describe("[Node] Connections", () => {
7170
});
7271

7372
it("loop until stream ends and then break", async function () {
74-
let stream: IncomingMessage;
73+
let stream: NodeJSReadableStream;
7574
try {
7675
stream = await sendRequest(client, path);
7776
} catch (e) {
@@ -88,7 +87,7 @@ describe("[Node] Connections", () => {
8887
ran = true;
8988
if (sse.data === "[DONE]") {
9089
if (path.includes("no-fin")) {
91-
assert.isNull(stream.socket);
90+
assert.isNull((stream as any).socket);
9291
}
9392
break;
9493
}
@@ -100,7 +99,7 @@ describe("[Node] Connections", () => {
10099
});
101100

102101
it("break early from loop", async function () {
103-
let stream: IncomingMessage;
102+
let stream: NodeJSReadableStream;
104103
try {
105104
// sometimes the server gets into a bad state and doesn't respond so we need to timeout
106105
stream = await sendRequest(client, path, AbortSignal.timeout(25000));

sdk/core/core-sse/test/public/node/util.ts

+1-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,5 @@ export function createStream(
1010
const stream = new PassThrough();
1111
cb((c) => stream.write(c));
1212
stream.end();
13-
// This is a mock test that doesn't need a true IncomingMessage object
14-
return createSseStream(stream as any);
13+
return createSseStream(stream);
1514
}

sdk/core/ts-http-runtime/review/azure-core-comparison.diff

+30-2
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ index 30dac33..66ea99c 100644
489489
if (!cachedHttpClient) {
490490
cachedHttpClient = createDefaultHttpClient();
491491
diff --git a/src/client/common.ts b/src/client/common.ts
492-
index 609878a..da6742a 100644
492+
index 58f6eba..da6742a 100644
493493
--- a/src/client/common.ts
494494
+++ b/src/client/common.ts
495495
@@ -3,19 +3,17 @@
@@ -572,7 +572,35 @@ index 609878a..da6742a 100644
572572
*
573573
* type MyClient = Client & {
574574
* path: Routes;
575-
@@ -333,11 +314,9 @@ export type ClientOptions = PipelineOptions & {
575+
@@ -217,18 +198,6 @@ export interface Client {
576+
pathUnchecked: PathUnchecked;
577+
}
578+
579+
-/**
580+
- * A Node.js Readable stream that also has a `destroy` method.
581+
- */
582+
-export interface NodeJSReadableStream extends NodeJS.ReadableStream {
583+
- /**
584+
- * Destroy the stream. Optionally emit an 'error' event, and emit a
585+
- * 'close' event (unless emitClose is set to false). After this call,
586+
- * internal resources will be released.
587+
- */
588+
- destroy(error?: Error): void;
589+
-}
590+
-
591+
/**
592+
* Http Response which body is a NodeJS stream object
593+
*/
594+
@@ -236,7 +205,7 @@ export type HttpNodeStreamResponse = HttpResponse & {
595+
/**
596+
* Streamable body
597+
*/
598+
- body?: NodeJSReadableStream;
599+
+ body?: NodeJS.ReadableStream;
600+
};
601+
602+
/**
603+
@@ -345,11 +314,9 @@ export type ClientOptions = PipelineOptions & {
576604
*/
577605
apiKeyHeaderName?: string;
578606
};

0 commit comments

Comments
 (0)