Skip to content

Commit 4eed3b9

Browse files
vicancyCopilot
andauthored
Add invoke support for web pubsub client (Azure#36750)
### Packages impacted by this PR @azure/web-pubsub-client ### Issues associated with this PR ### Describe the problem that is addressed by this PR ### What are the possible designs available to address the problem? If there are more than one possible design, why was the one in this PR chosen? ### Are there test cases added in this PR? _(If not, why?)_ ### Provide a list of related PRs _(if any)_ ### Command used to generate this PR:**_(Applicable only to SDK release request PRs)_ ### Checklists - [ ] Added impacted package name to the issue description - [ ] Does this PR needs any fixes in the SDK Generator?** _(If so, create an Issue in the [Autorest/typescript](https://github.com/Azure/autorest.typescript) repository and link it here)_ - [ ] Added a changelog (if necessary) --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent af673f5 commit 4eed3b9

File tree

11 files changed

+890
-8
lines changed

11 files changed

+890
-8
lines changed

sdk/web-pubsub/web-pubsub-client/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,21 @@ await client.joinGroup(groupName);
9191
await client.sendToGroup(groupName, "hello world", "text");
9292
```
9393

94+
### 5. Invoke upstream events (preview)
95+
96+
```ts snippet:ReadmeSampleInvokeEvent
97+
import { WebPubSubClient } from "@azure/web-pubsub-client";
98+
99+
const client = new WebPubSubClient("<client-access-url>");
100+
await client.start();
101+
102+
const result = await client.invokeEvent("processOrder", { orderId: 1 }, "json");
103+
console.log(`Invocation result: ${JSON.stringify(result.data)}`);
104+
```
105+
106+
`invokeEvent` sends an `invoke` request to the service, awaits the correlated `invokeResponse`, and returns the payload. You can abort the invocation by passing `{ abortSignal }`.
107+
_Streaming and service-initiated invocations are not yet supported._
108+
94109
---
95110

96111
## Examples

sdk/web-pubsub/web-pubsub-client/review/web-pubsub-client-node.api.md

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ export interface AckMessageError {
2020
name: string;
2121
}
2222

23+
// @public
24+
export interface CancelInvocationMessage extends WebPubSubMessageBase {
25+
invocationId: string;
26+
readonly kind: "cancelInvocation";
27+
}
28+
2329
// @public
2430
export interface ConnectedMessage extends WebPubSubMessageBase {
2531
connectionId: string;
@@ -59,7 +65,11 @@ export type DownstreamMessageType =
5965
/**
6066
* Type for ServerDataMessage
6167
*/
62-
| "serverData";
68+
| "serverData"
69+
/**
70+
* Type for InvokeResponseMessage
71+
*/
72+
| "invokeResponse";
6373

6474
// @public
6575
export interface GetClientAccessUrlOptions {
@@ -76,6 +86,58 @@ export interface GroupDataMessage extends WebPubSubMessageBase {
7686
sequenceId?: number;
7787
}
7888

89+
// @public
90+
export class InvocationError extends Error {
91+
constructor(message: string, options: InvocationErrorOptions);
92+
errorDetail?: InvokeResponseError;
93+
invocationId: string;
94+
}
95+
96+
// @public (undocumented)
97+
export interface InvocationErrorOptions {
98+
errorDetail?: InvokeResponseError;
99+
invocationId: string;
100+
}
101+
102+
// @public
103+
export interface InvokeEventOptions {
104+
abortSignal?: AbortSignalLike;
105+
invocationId?: string;
106+
}
107+
108+
// @public
109+
export interface InvokeEventResult {
110+
data?: JSONTypes | ArrayBuffer;
111+
dataType?: WebPubSubDataType;
112+
invocationId: string;
113+
}
114+
115+
// @public
116+
export interface InvokeMessage extends WebPubSubMessageBase {
117+
data?: JSONTypes | ArrayBuffer;
118+
dataType?: WebPubSubDataType;
119+
event?: string;
120+
invocationId: string;
121+
readonly kind: "invoke";
122+
target?: "event";
123+
}
124+
125+
// @public
126+
export interface InvokeResponseError {
127+
message: string;
128+
name: string;
129+
}
130+
131+
// @public
132+
export interface InvokeResponseMessage extends WebPubSubMessageBase {
133+
data?: JSONTypes | ArrayBuffer;
134+
dataType?: WebPubSubDataType;
135+
error?: InvokeResponseError;
136+
invocationId: string;
137+
readonly kind: "invokeResponse";
138+
success?: boolean;
139+
}
140+
79141
// @public
80142
export interface JoinGroupMessage extends WebPubSubMessageBase {
81143
ackId?: number;
@@ -242,12 +304,21 @@ export type UpstreamMessageType =
242304
/**
243305
* Type for PingMessage
244306
*/
245-
| "ping";
307+
| "ping"
308+
/**
309+
* Type for InvokeMessage
310+
*/
311+
| "invoke"
312+
/**
313+
* Type for CancelInvocationMessage
314+
*/
315+
| "cancelInvocation";
246316

247317
// @public
248318
export class WebPubSubClient {
249319
constructor(clientAccessUrl: string, options?: WebPubSubClientOptions);
250320
constructor(credential: WebPubSubClientCredential, options?: WebPubSubClientOptions);
321+
invokeEvent(eventName: string, content: JSONTypes | ArrayBuffer, dataType: WebPubSubDataType, options?: InvokeEventOptions): Promise<InvokeEventResult>;
251322
joinGroup(groupName: string, options?: JoinGroupOptions): Promise<WebPubSubResult>;
252323
leaveGroup(groupName: string, options?: LeaveGroupOptions): Promise<WebPubSubResult>;
253324
off(event: "connected", listener: (e: OnConnectedArgs) => void): void;
@@ -318,7 +389,7 @@ export const WebPubSubJsonProtocol: () => WebPubSubClientProtocol;
318389
export const WebPubSubJsonReliableProtocol: () => WebPubSubClientProtocol;
319390

320391
// @public
321-
export type WebPubSubMessage = GroupDataMessage | ServerDataMessage | JoinGroupMessage | LeaveGroupMessage | ConnectedMessage | DisconnectedMessage | SendToGroupMessage | SendEventMessage | SequenceAckMessage | PingMessage | AckMessage | PongMessage;
392+
export type WebPubSubMessage = GroupDataMessage | ServerDataMessage | JoinGroupMessage | LeaveGroupMessage | ConnectedMessage | DisconnectedMessage | SendToGroupMessage | SendEventMessage | SequenceAckMessage | PingMessage | AckMessage | InvokeMessage | InvokeResponseMessage | CancelInvocationMessage | PongMessage;
322393

323394
// @public
324395
export interface WebPubSubMessageBase {

sdk/web-pubsub/web-pubsub-client/src/errors/index.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4-
import type { AckMessageError } from "../models/messages.js";
4+
import type { AckMessageError, InvokeResponseError } from "../models/messages.js";
55

66
/**
77
* Error when sending message failed
@@ -42,3 +42,35 @@ export interface SendMessageErrorOptions {
4242
*/
4343
errorDetail?: AckMessageError;
4444
}
45+
46+
export interface InvocationErrorOptions {
47+
/**
48+
* The invocation id of the request.
49+
*/
50+
invocationId: string;
51+
/**
52+
* Error details from the service if available.
53+
*/
54+
errorDetail?: InvokeResponseError;
55+
}
56+
57+
/**
58+
* Error thrown when an invocation fails or is cancelled.
59+
*/
60+
export class InvocationError extends Error {
61+
/**
62+
* The invocation id of the request.
63+
*/
64+
public invocationId: string;
65+
/**
66+
* Error details from the service if available.
67+
*/
68+
public errorDetail?: InvokeResponseError;
69+
70+
constructor(message: string, options: InvocationErrorOptions) {
71+
super(message);
72+
this.name = "InvocationError";
73+
this.invocationId = options.invocationId;
74+
this.errorDetail = options.errorDetail;
75+
}
76+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
import type { AbortSignalLike } from "@azure/abort-controller";
5+
import type { InvokeResponseMessage } from "./models/messages.js";
6+
import { InvocationError } from "./errors/index.js";
7+
8+
export interface InvocationWaitOptions {
9+
abortSignal?: AbortSignalLike;
10+
}
11+
12+
export interface InvocationRegistration {
13+
invocationId: string;
14+
wait(options?: InvocationWaitOptions): Promise<InvokeResponseMessage>;
15+
}
16+
17+
/**
18+
* Manages pending invocations awaiting invokeResponse frames.
19+
*/
20+
export class InvocationManager {
21+
private readonly _entries = new Map<string, InvocationEntity>();
22+
private _nextId = 0;
23+
24+
public registerInvocation(invocationId?: string): InvocationRegistration {
25+
const resolvedId = invocationId ?? this._generateInvocationId();
26+
if (this._entries.has(resolvedId)) {
27+
throw new InvocationError("Invocation id is already registered.", {
28+
invocationId: resolvedId,
29+
});
30+
}
31+
32+
const entity = new InvocationEntity(resolvedId);
33+
this._entries.set(resolvedId, entity);
34+
return {
35+
invocationId: resolvedId,
36+
wait: (options?: InvocationWaitOptions) => this._waitForEntry(entity, options),
37+
};
38+
}
39+
40+
public resolveInvocation(message: InvokeResponseMessage): boolean {
41+
const entry = this._entries.get(message.invocationId);
42+
if (!entry) {
43+
return false;
44+
}
45+
this._entries.delete(message.invocationId);
46+
entry.resolve(message);
47+
return true;
48+
}
49+
50+
public rejectInvocation(invocationId: string, reason: unknown): boolean {
51+
const entry = this._entries.get(invocationId);
52+
if (!entry) {
53+
return false;
54+
}
55+
this._entries.delete(invocationId);
56+
entry.reject(reason);
57+
return true;
58+
}
59+
60+
public discard(invocationId: string): void {
61+
this._entries.delete(invocationId);
62+
}
63+
64+
public rejectAll(createReason: (invocationId: string) => unknown): void {
65+
this._entries.forEach((entry, invocationId) => {
66+
if (this._entries.delete(invocationId)) {
67+
entry.reject(createReason(invocationId));
68+
}
69+
});
70+
}
71+
72+
private _waitForEntry(
73+
entry: InvocationEntity,
74+
options?: InvocationWaitOptions,
75+
): Promise<InvokeResponseMessage> {
76+
const waitPromise = entry.promise();
77+
const abortSignal = options?.abortSignal;
78+
79+
if (!abortSignal) {
80+
return waitPromise;
81+
}
82+
83+
if (abortSignal.aborted) {
84+
if (this._entries.delete(entry.invocationId)) {
85+
entry.reject(this._createAbortError(entry.invocationId));
86+
}
87+
return waitPromise;
88+
}
89+
90+
return new Promise<InvokeResponseMessage>((resolve, reject) => {
91+
const onAbort = (): void => {
92+
abortSignal.removeEventListener("abort", onAbort);
93+
if (this._entries.delete(entry.invocationId)) {
94+
entry.reject(this._createAbortError(entry.invocationId));
95+
}
96+
};
97+
98+
abortSignal.addEventListener("abort", onAbort);
99+
100+
waitPromise
101+
.then((result) => {
102+
abortSignal.removeEventListener("abort", onAbort);
103+
return resolve(result);
104+
})
105+
.catch((err) => {
106+
abortSignal.removeEventListener("abort", onAbort);
107+
return reject(err);
108+
});
109+
});
110+
}
111+
112+
private _generateInvocationId(): string {
113+
this._nextId += 1;
114+
return this._nextId.toString();
115+
}
116+
117+
private _createAbortError(invocationId: string): InvocationError {
118+
return new InvocationError("Invocation cancelled by abortSignal.", {
119+
invocationId,
120+
});
121+
}
122+
}
123+
124+
class InvocationEntity {
125+
private readonly _promise: Promise<InvokeResponseMessage>;
126+
private _resolve:
127+
| ((value: InvokeResponseMessage | PromiseLike<InvokeResponseMessage>) => void)
128+
| undefined;
129+
private _reject: ((reason?: unknown) => void) | undefined;
130+
131+
constructor(public readonly invocationId: string) {
132+
this._promise = new Promise<InvokeResponseMessage>((resolve, reject) => {
133+
this._resolve = resolve;
134+
this._reject = reject;
135+
});
136+
}
137+
138+
public promise(): Promise<InvokeResponseMessage> {
139+
return this._promise;
140+
}
141+
142+
public resolve(value: InvokeResponseMessage | PromiseLike<InvokeResponseMessage>): void {
143+
const callback = this._resolve;
144+
if (!callback) {
145+
return;
146+
}
147+
this._resolve = undefined;
148+
this._reject = undefined;
149+
callback(value);
150+
}
151+
152+
public reject(reason?: unknown): void {
153+
const callback = this._reject;
154+
if (!callback) {
155+
return;
156+
}
157+
this._resolve = undefined;
158+
this._reject = undefined;
159+
callback(reason);
160+
}
161+
}

0 commit comments

Comments
 (0)