Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ export interface ServerDataMessage extends WebPubSubMessageBase {
dataType: WebPubSubDataType;
readonly kind: "serverData";
sequenceId?: number;
stream?: StreamInfo;
}

// @public
Expand Down Expand Up @@ -458,7 +457,7 @@ export interface StreamPublisher {
complete(options?: EndStreamOptions): Promise<void>;
keepalive(options?: SendStreamKeepaliveOptions): Promise<void>;
onError(listener: (error: StreamDataError) => void): () => void;
publish(content: JSONTypes | ArrayBuffer, dataType?: WebPubSubDataType, options?: SendStreamDataOptions): Promise<void>;
publish(content: JSONTypes | ArrayBuffer, dataType: WebPubSubDataType, options?: SendStreamDataOptions): Promise<void>;
readonly streamId: string;
}

Expand Down
99 changes: 68 additions & 31 deletions sdk/web-pubsub/web-pubsub-client/samples-dev/helloworld.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ async function main(): Promise<void> {
})
).url;
};
const client = new WebPubSubClient(
{
getClientAccessUrl: fetchClientAccessUrl,
} as WebPubSubClientCredential,
options,
);
const credential = {
getClientAccessUrl: fetchClientAccessUrl,
} as WebPubSubClientCredential;
const client = new WebPubSubClient(credential, options);
const streamReceiver = new WebPubSubClient(credential, options);

client.on("connected", (e) => {
console.log(`Connection ${e.connectionId} is connected.`);
Expand All @@ -60,26 +59,44 @@ async function main(): Promise<void> {
console.log(`Received message from ${e.message.group} ${formatPayload(e.message.data)}`);
});

const groupStreamFactory = (stream: OnGroupStreamArgs): GroupStreamHandler => ({
onMessage: (args) => {
console.log(
`[stream:${stream.group}/${stream.streamId}] seq=${args.stream.streamSequenceId} ${formatPayload(args.data)}`,
);
},
onComplete: () => {
console.log(`[stream:${stream.group}/${stream.streamId}] completed`);
},
onError: (args) => {
console.log(
`[stream:${stream.group}/${stream.streamId}] failed: ${args.error?.name}${args.error?.message ? ` - ${args.error.message}` : ""}`,
);
},
streamReceiver.on("connected", (e) => {
console.log(`Stream receiver connection ${e.connectionId} is connected.`);
});
client.onGroupStream(groupStreamFactory);

streamReceiver.on("disconnected", (e) => {
console.log(`Stream receiver disconnected: ${e.message}`);
});

const groupStreamFactory = (stream: OnGroupStreamArgs): GroupStreamHandler => {
const receivedParts: string[] = [];

return {
onMessage: (args) => {
receivedParts.push(formatStreamPart(args.data));

console.log(
`[stream:${stream.group}/${stream.streamId}] seq=${args.stream.streamSequenceId} ${formatPayload(args.data)}`,
);
},
onComplete: () => {
console.log(
`[stream:${stream.group}/${stream.streamId}] completed with ${receivedParts.length} part(s): ${receivedParts.join("")}`,
);
},
onError: (args) => {
console.log(
`[stream:${stream.group}/${stream.streamId}] failed: ${args.error?.name}${args.error?.message ? ` - ${args.error.message}` : ""}`,
);
},
};
};
streamReceiver.onGroupStream(groupStreamFactory);

await client.start();
await streamReceiver.start();

await client.joinGroup(groupName);
await streamReceiver.joinGroup(groupName);
await client.sendToGroup(groupName, "hello world", "text", {
fireAndForget: true,
});
Expand All @@ -96,18 +113,31 @@ async function main(): Promise<void> {
fireAndForget: true,
});

const stream = await client.streamToGroup(groupName, { noEcho: false });
stream.onError((error) => {
console.log(
`[publisher:${stream.streamId}] failed: ${error.name}${error.message ? ` - ${error.message}` : ""}`,
);
const firstStream = await client.streamToGroup(groupName, {
noEcho: true,
streamId: "sample-stream-1",
});
await stream.publish("stream part 1", "text");
await stream.publish({ part: 2, text: "stream part 2" }, "json");
await stream.complete();
const secondStream = await client.streamToGroup(groupName, {
noEcho: true,
streamId: "sample-stream-2",
});
for (const publisher of [firstStream, secondStream]) {
publisher.onError((error) => {
console.log(
`[publisher:${publisher.streamId}] failed: ${error.name}${error.message ? ` - ${error.message}` : ""}`,
);
});
}
await firstStream.publish("first stream part 1; ", "text");
await secondStream.publish("second stream part 1; ", "text");
await firstStream.publish("first stream part 2", "text");
await secondStream.publish("second stream part 2", "text");
await secondStream.complete();
await firstStream.complete();

await delay(200);
client.offGroupStream(groupStreamFactory);
await delay(1000);
streamReceiver.offGroupStream(groupStreamFactory);
streamReceiver.stop();
client.stop();
console.log("Client stopped");
}
Expand All @@ -127,3 +157,10 @@ function formatPayload(payload: unknown): string {
}
return JSON.stringify(payload);
}

function formatStreamPart(payload: unknown): string {
if (typeof payload === "string") {
return payload;
}
return formatPayload(payload);
}
22 changes: 2 additions & 20 deletions sdk/web-pubsub/web-pubsub-client/src/asyncSeqQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,14 @@

import type { AbortSignalLike } from "@azure/abort-controller";
import { AbortError } from "@azure/abort-controller";
import type { Deferred } from "./utils/deferred.js";
import { createDeferred } from "./utils/deferred.js";

export interface SeqItem<T> {
sequenceId: number;
value: T;
}

interface Deferred<T> {
promise: Promise<T>;
resolve(value: T): void;
reject(reason?: unknown): void;
}

interface DequeueWaiter<T> {
deferred: Deferred<SeqItem<T>>;
abortSignal?: AbortSignalLike;
Expand Down Expand Up @@ -232,17 +228,3 @@ export class AsyncSeqQueue<T> {
}
}
}

function createDeferred<T>(): Deferred<T> {
let resolvePromise!: (value: T) => void;
let rejectPromise!: (reason?: unknown) => void;
const promise = new Promise<T>((resolve, reject) => {
resolvePromise = resolve;
rejectPromise = reject;
});
return {
promise,
resolve: resolvePromise,
reject: rejectPromise,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ export class InboundStreamSession {
private readonly _ttlInMs: number;
private readonly _handleFromStart: boolean;

// Active streams keyed by `${group}|${streamId}`.
// Active streams keyed by an encoded [group, streamId] tuple.
private readonly _activeStreams: Map<string, ActiveStream>;
private readonly _activeTimeouts: Map<string, ReturnType<typeof setTimeout>>;
// Tracks streamIds skipped by handleFromStart=true, keyed by `${group}|${streamId}`.
// Tracks streamIds skipped by handleFromStart=true, keyed by an encoded [group, streamId] tuple.
private readonly _ignored: Set<string>;

constructor(getFactories: GetGroupStreamFactoriesFn, options?: GroupStreamOptions) {
Expand Down Expand Up @@ -182,7 +182,7 @@ export class InboundStreamSession {
}

private _buildKey(groupName: string, streamId: string): string {
return `${groupName}|${streamId}`;
return JSON.stringify([groupName, streamId]);
}

private _resetActiveTimeout(active: ActiveStream): void {
Expand Down
4 changes: 2 additions & 2 deletions sdk/web-pubsub/web-pubsub-client/src/models/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

import type { AbortSignalLike } from "@azure/abort-controller";
import type { WebPubSubClientProtocol } from "../protocols/index.js";
import type { JSONTypes } from "../webPubSubClient.js";
import type {
DisconnectedMessage,
GroupDataMessage,
JSONTypes,
ServerDataMessage,
StreamDataError,
StreamInfo,
Expand Down Expand Up @@ -250,7 +250,7 @@ export interface StreamPublisher {
*/
publish(
content: JSONTypes | ArrayBuffer,
dataType?: WebPubSubDataType,
dataType: WebPubSubDataType,
options?: SendStreamDataOptions,
): Promise<void>;
/**
Expand Down
9 changes: 4 additions & 5 deletions sdk/web-pubsub/web-pubsub-client/src/models/messages.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

import type { JSONTypes } from "../webPubSubClient.js";
/**
* Types which can be serialized and sent as JSON.
*/
export type JSONTypes = string | number | boolean | object;

/**
* The web pubsub message
Expand Down Expand Up @@ -286,10 +289,6 @@ export interface ServerDataMessage extends WebPubSubMessageBase {
* The sequence id of the data. Only available in reliable protocols
*/
sequenceId?: number;
/**
* Streaming metadata when the payload belongs to a stream.
*/
stream?: StreamInfo;
}

/**
Expand Down
27 changes: 3 additions & 24 deletions sdk/web-pubsub/web-pubsub-client/src/outboundStreamSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,11 @@ import type {
SendStreamKeepaliveOptions,
StreamDataError,
} from "./models/index.js";
import type { WebPubSubDataType } from "./models/messages.js";
import type { JSONTypes, WebPubSubDataType } from "./models/messages.js";
import { AsyncSeqQueue } from "./asyncSeqQueue.js";
import { abortablePromise } from "./utils/abortablePromise.js";

type JSONTypes = string | number | boolean | object;

interface Deferred<T> {
promise: Promise<T>;
resolve(value: T): void;
reject(reason?: unknown): void;
}
import type { Deferred } from "./utils/deferred.js";
import { createDeferred } from "./utils/deferred.js";

type OutboundStreamAction = OutboundStreamDataAction | OutboundStreamEndAction;

Expand Down Expand Up @@ -356,18 +350,3 @@ export class OutboundStreamSession {
};
}
}

function createDeferred<T>(): Deferred<T> {
let resolvePromise!: (value: T) => void;
let rejectPromise!: (reason?: unknown) => void;
const promise = new Promise<T>((resolve, reject) => {
resolvePromise = resolve;
rejectPromise = reject;
});

return {
promise,
resolve: resolvePromise,
reject: rejectPromise,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import type {
AckMessage,
CancelInvocationMessage,
JSONTypes,
StreamAckMessage,
StreamClosedMessage,
StreamInfo,
Expand All @@ -19,7 +20,6 @@ import type {
WebPubSubDataType,
WebPubSubMessage,
} from "../models/messages.js";
import type { JSONTypes } from "../webPubSubClient.js";
import { stringToUint8Array, uint8ArrayToString } from "@azure/core-util";

export function parseMessages(input: string): WebPubSubMessage | null {
Expand All @@ -46,8 +46,8 @@ export function parseMessages(input: string): WebPubSubMessage | null {
return null;
}
} else if (typedMessage.type === "message") {
const stream = parseStreamInfo(parsedMessage.stream);
if (typedMessage.from === "group") {
const stream = parseStreamInfo(parsedMessage.stream);
const data = parsePayload(parsedMessage.data, parsedMessage.dataType as WebPubSubDataType);
if (data === null) {
return null;
Expand All @@ -58,10 +58,10 @@ export function parseMessages(input: string): WebPubSubMessage | null {
if (data === null) {
return null;
}
const { stream: _stream, ...serverMessage } = parsedMessage;
returnMessage = {
...parsedMessage,
...serverMessage,
data,
stream,
kind: "serverData",
} as ServerDataMessage;
} else {
Expand Down
23 changes: 23 additions & 0 deletions sdk/web-pubsub/web-pubsub-client/src/utils/deferred.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

export interface Deferred<T> {
promise: Promise<T>;
resolve(value: T): void;
reject(reason?: unknown): void;
}

export function createDeferred<T>(): Deferred<T> {
let resolvePromise!: (value: T) => void;
let rejectPromise!: (reason?: unknown) => void;
const promise = new Promise<T>((resolve, reject) => {
resolvePromise = resolve;
rejectPromise = reject;
});

return {
promise,
resolve: resolvePromise,
reject: rejectPromise,
};
}
Loading
Loading