Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ function getFormValues() {
secretAccessKey: $('#secretAccessKey').val(),
sessionToken: $('#sessionToken').val() || null,
enableDQPmetrics: $('#enableDQPmetrics').is(':checked'),
allowIceRestart: $('#allowIceRestart').is(':checked'),
allowSignalingReconnect: $('#allowSignalingReconnect').is(':checked')


};
}

Expand Down Expand Up @@ -410,6 +414,8 @@ const fields = [
{ field: 'forceSTUN', type: 'radio', name: 'natTraversal' },
{ field: 'forceTURN', type: 'radio', name: 'natTraversal' },
{ field: 'natTraversalDisabled', type: 'radio', name: 'natTraversal' },
{ field: 'allowIceRestart', type: 'checkbox' },
{ field: 'allowSignalingReconnect', type: 'checkbox' }
];
fields.forEach(({ field, type, name }) => {
const id = '#' + field;
Expand Down
21 changes: 21 additions & 0 deletions examples/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,27 @@ <h4>Amazon KVS WebRTC DQP</h4>
"><sup>&#9432;</sup></span>
</div>
</div>
<h4>Amazon KVS failure recovery</h4>
<div class="form-group">
<div class="form-check form-check-inline">
<input class="form-check-input" type="checkbox" id="allowIceRestart" value="allowIceRestart">
<label for="allowIceRestart" class="form-check-label">Enable ICE restart <small>(Viewer only)</small></label>
<span data-delay="{ &quot;hide&quot;: 1500 }" data-position="auto" tabindex="0" class="text-info ml-1" data-toggle="tooltip" data-html="true" title="
<p>Retries sending offer if ICE transitions to disconnected/failed state. Retry happens a maximum of 3 times if answer is not received within message TTL duration</p>
<a href=&quot;https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_SingleMasterConfiguration.html#KinesisVideo-Type-SingleMasterConfiguration-MessageTtlSeconds&quot;>Message TTL information</a>
"><sup>&#9432;</sup></span>
</div>
</div>
<div class="form-group">
<div class="form-check form-check-inline">
<input class="form-check-input" type="checkbox" id="allowSignalingReconnect" value="allowSignalingReconnect">
<label for="allowSignalingReconnect" class="form-check-label">Allow signaling reconnection in the event of disconnection <small>(Viewer only)</small></label>
<span data-delay="{ &quot;hide&quot;: 1500 }" data-position="auto" tabindex="0" class="text-info ml-1" data-toggle="tooltip" data-html="true" title="
<p>Sends offer again post reconnect in situations of network interruption. Retry happens a maximum of 3 times if answer is not received within message TTL duration</p>
<a href=&quot;https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_SingleMasterConfiguration.html#KinesisVideo-Type-SingleMasterConfiguration-MessageTtlSeconds&quot;>Message TTL information</a>
"><sup>&#9432;</sup></span>
</div>
</div>
<hr>
<div>
<button id="master-button" type="submit" class="btn btn-primary">Start Master</button>
Expand Down
118 changes: 116 additions & 2 deletions examples/viewer.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,58 @@ let videoBitRateArray = [];
let audioRateArray = [];
let timeArray = [];

async function initiateIceRestart(viewer, clientId, retryCount = 0) {
let maxRetryCount = 3;
let timeoutId = 0;
viewer.receivedAnswer = false;
if(!viewer.reconnecting) {
try {
console.log('[VIEWER] Encountered negotiation needed event for client ID', clientId);
await viewer.peerConnection.setLocalDescription(
await viewer.peerConnection.createOffer({
offerToReceiveAudio: true,
offerToReceiveVideo: true,
iceRestart: true // Important because this generates the new uFrag/pwd for local ice candidates
}),
);
viewer.signalingClient.sendSdpOffer(viewer.peerConnection.localDescription);
timeoutId = setTimeout(() => {
console.log("Received answer? ", viewer.receivedAnswer, 'Count received: ', retryCount);
if (!viewer.receivedAnswer) {
if(retryCount < maxRetryCount) {
console.log('[VIEWER] No answer received for client ID', clientId, '. Retrying offer send...');
initiateIceRestart(viewer, clientId, retryCount + 1);
} else {
console.log('[VIEWER] Max retry attempts reached...connection failed');
// Reset if there is a disconnect / failure again
viewer.alreadyRestarted = false;
}
} else {
console.log('[VIEWER] Successfully exchanged offer and answer for client ID', clientId);
retryCount = 0;
// Reset if there is a disconnect / failure again
viewer.alreadyRestarted = false;
}
}, (viewer.messageTtlSeconds + 5) * 1000); // messageTtlRange is 5 seconds to 120 seconds: https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_SingleMasterConfiguration.html#KinesisVideo-Type-SingleMasterConfiguration-MessageTtlSeconds
console.log('[VIEWER] SDP offer sent for client ID', clientId, 'as part of restart', 'retry count[', retryCount, ']');
} catch (error) {
console.error('[VIEWER] Error initiating ICE restart:', error);
}
} else {
clearTimeout(timeoutId);
console.log('[VIEWER] Reconnection signal received. Not sending offer here');
}
}

async function startViewer(localView, remoteView, formValues, onStatsReport, onRemoteDataMessage) {
try {
console.log('[VIEWER] Client id is:', formValues.clientId);

const initialTimeout = 60 * 1000; // 60 seconds
let attempts = 0;
const maxAttempts = 5;
viewer.localView = localView;
viewer.remoteView = remoteView;
viewer.alreadyRestarted = false;

if (formValues.enableDQPmetrics) {
viewerButtonPressed = new Date();
Expand Down Expand Up @@ -116,14 +162,17 @@ async function startViewer(localView, remoteView, formValues, onStatsReport, onR
endpoint: formValues.endpoint,
correctClockSkew: true,
});
viewer.allowIceRestart = formValues.allowIceRestart;
viewer.allowSignalingReconnect = formValues.allowSignalingReconnect;

// Get signaling channel ARN
const describeSignalingChannelResponse = await kinesisVideoClient
.describeSignalingChannel({
ChannelName: formValues.channelName,
})
.promise();
const channelARN = describeSignalingChannelResponse.ChannelInfo.ChannelARN;
const channelARN = describeSignalingChannelResponse.ChannelInfo.ChannelARN
viewer.messageTtlSeconds = describeSignalingChannelResponse.ChannelInfo.SingleMasterConfiguration.MessageTtlSeconds
console.log('[VIEWER] Channel ARN:', channelARN);

if (formValues.ingestMedia) {
Expand Down Expand Up @@ -244,6 +293,37 @@ async function startViewer(localView, remoteView, formValues, onStatsReport, onR
viewer.peerConnectionStatsInterval = setInterval(() => viewer.peerConnection.getStats().then(stats => calcStats(stats, formValues.clientId)), 1000);
}

viewer.signalingClient.on('reconnect', async () => {
console.log('[VIEWER] Reconnecting to the signaling server, network change encountered....')
viewer.reconnecting = true;
viewer.receivedAnswer = false;
let maxRetryCount = 3;
let retryCount = 0;
await viewer.peerConnection.setLocalDescription(
await viewer.peerConnection.createOffer({
offerToReceiveAudio: true,
offerToReceiveVideo: true,
}),
);
console.log('[VIEWER] Setting up peer connection again');
viewer.signalingClient.sendSdpOffer(viewer.peerConnection.localDescription);
timeoutId = setTimeout(() => {
console.log("Received answer in reconnect? ", viewer.receivedAnswer, 'Count received: ', retryCount);
if (!viewer.receivedAnswer) {
if(retryCount < maxRetryCount) {
console.log('[VIEWER] Did not receive answer within time during reconnect for', formValues.clientId, '. Retrying offer send...');
viewer.signalingClient.sendSdpOffer(viewer.peerConnection.localDescription);
retryCount++;
} else {
console.log('[VIEWER] Max retry attempts reached trying to reconnect...connection failed');
}
} else {
console.log('[VIEWER] Successfully reconnected with SDP exchange for', formValues.clientId);
retryCount = 0;
}
}, (viewer.messageTtlSeconds + 5) * 1000); // messageTtlRange is 5 seconds to 120 seconds
});

viewer.signalingClient.on('open', async () => {
console.log('[VIEWER] Connected to signaling service');

Expand All @@ -252,6 +332,12 @@ async function startViewer(localView, remoteView, formValues, onStatsReport, onR
// Otherwise, the browser will throw an error saying that either video or audio has to be enabled.
if (formValues.sendVideo || formValues.sendAudio) {
try {
if (viewer.localStream) {
viewer.localStream.getTracks().forEach(track => {
track.stop();
});
viewer.localStream = null;
}
viewer.localStream = await navigator.mediaDevices.getUserMedia(constraints);
viewer.localStream.getTracks().forEach(track => viewer.peerConnection.addTrack(track, viewer.localStream));
localView.srcObject = viewer.localStream;
Expand Down Expand Up @@ -281,6 +367,7 @@ async function startViewer(localView, remoteView, formValues, onStatsReport, onR

viewer.signalingClient.on('sdpAnswer', async answer => {
// Add the SDP answer to the peer connection
viewer.receivedAnswer = true;
console.log('[VIEWER] Received SDP answer');
console.debug('SDP answer:', answer);
await viewer.peerConnection.setRemoteDescription(answer);
Expand All @@ -293,6 +380,17 @@ async function startViewer(localView, remoteView, formValues, onStatsReport, onR
viewer.peerConnection.addIceCandidate(candidate);
});

viewer.signalingClient.on('closewithretry', () => {
console.log('[VIEWER] Disconnected from signaling channel');
if(viewer.allowSignalingReconnect) {
console.log('[VIEWER] Attempting to reconnect');
viewer.signalingClient.reconnect();
// Need to reset since we want to trigger reconnect only once. The internal logic
// takes care of retries
viewer.allowSignalingReconnect = false;
}
});

viewer.signalingClient.on('close', () => {
console.log('[VIEWER] Disconnected from signaling channel');
});
Expand Down Expand Up @@ -344,6 +442,22 @@ async function startViewer(localView, remoteView, formValues, onStatsReport, onR
}
});

viewer.peerConnection.addEventListener('iceconnectionstatechange', async event => {
console.log('[VIEWER] ICE connection state for client ID', formValues.clientId, ':', viewer.peerConnection.iceConnectionState);
if(viewer.peerConnection.iceConnectionState == 'disconnected' || viewer.peerConnection.iceConnectionState == 'failed') {
if(!viewer.alreadyRestarted) {
if(viewer.allowIceRestart) {
viewer.alreadyRestarted = true;
console.log('[VIEWER] Config allows ICE restart for client ID', formValues.clientId);
initiateIceRestart(viewer, formValues.clientId);
viewer.peerConnection.restartIce();
} else {
console.log('[VIEWER] ICE restart based recovery not enabled for client ID', formValues.clientId);
}
}
}
});

console.log('[VIEWER] Starting viewer connection');
viewer.signalingClient.open();
} catch (e) {
Expand Down
10 changes: 3 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 67 additions & 8 deletions src/SignalingClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ interface WebSocketMessage {
*/
export class SignalingClient extends EventEmitter {
private static DEFAULT_CLIENT_ID = 'MASTER';

private reconnectDelay = 1 * 1000;
private reconnection: boolean;
private websocket: WebSocket = null;
private readyState = ReadyState.CLOSED;
private readonly requestSigner: RequestSigner;
Expand Down Expand Up @@ -86,7 +87,7 @@ export class SignalingClient extends EventEmitter {
validateValueNonNil(config.region, 'region');
validateValueNonNil(config.channelEndpoint, 'channelEndpoint');

this.config = { ...config }; // Copy config to new object for immutability.
this.config = {...config}; // Copy config to new object for immutability.

if (config.requestSigner) {
this.requestSigner = config.requestSigner;
Expand All @@ -112,7 +113,6 @@ export class SignalingClient extends EventEmitter {
throw new Error('Client is already open, opening, or closing');
}
this.readyState = ReadyState.CONNECTING;

// The process of opening the connection is asynchronous via promises, but the interaction model is to handle asynchronous actions via events.
// Therefore, we just kick off the asynchronous process and then return and let it fire events.
this.asyncOpen()
Expand Down Expand Up @@ -150,6 +150,7 @@ export class SignalingClient extends EventEmitter {
* connection has been closed.
*/
public close(): void {
console.log("Closing the websocket");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the logging configurable? Currently there is no logs so adding logs might be something customers want to disable

if (this.websocket !== null) {
this.readyState = ReadyState.CLOSING;
this.websocket.close();
Expand All @@ -158,6 +159,36 @@ export class SignalingClient extends EventEmitter {
}
}

public reconnect(maxAttempt = 3): void {
this.reconnection = true;
if (this.readyState === ReadyState.CLOSED) {
this.attemptReconnect(maxAttempt);
} else {
console.log('Invalid state to invoke reconnect from');
}
}

private attemptReconnect(maxAttempts: number, attempt = 0): void {
if (this.readyState === ReadyState.CLOSED) {
this.open();
// If reconnect fails
setTimeout(() => {
if (this.readyState !== ReadyState.OPEN) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic needed to check if close was already called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setTimeout returns a timer id, you can use clearTimeout(id) in close to stop the pending countdown as well

if (attempt < maxAttempts) {
console.log("Failed reconnect attempt ", attempt);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

                    console.log("Failed reconnect attempt ", attempt);
Suggested change
console.log("Failed reconnect attempt ", attempt);
console.log("Failed reconnect attempt", attempt);

Space not needed, this will result in double space

this.attemptReconnect(maxAttempts, attempt + 1);
} else {
console.error("Max reconnect attempts reached, bailing out");
}
} else {
console.log("Reconnected to WS");
}
}, this.reconnectDelay * (attempt + 1)); // increasing delay
} else {
console.log('Invalid state to invoke reconnect from');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also log that invalid state - this.readyState

}
}

/**
* Sends the given SDP offer to the signaling service.
*
Expand All @@ -166,6 +197,7 @@ export class SignalingClient extends EventEmitter {
* @param {string} [recipientClientId] - ID of the client to send the message to. Required for 'MASTER' role. Should not be present for 'VIEWER' role.
*/
public sendSdpOffer(sdpOffer: RTCSessionDescription, recipientClientId?: string): void {
console.log("Sending SDP offer");
this.sendMessage(MessageType.SDP_OFFER, sdpOffer.toJSON(), recipientClientId);
}

Expand Down Expand Up @@ -229,7 +261,13 @@ export class SignalingClient extends EventEmitter {
*/
private onOpen(): void {
this.readyState = ReadyState.OPEN;
this.emit('open');
if(this.reconnection) {
console.log("Successfully reconnected to a new WS");
this.emit('reconnect')
} else {
console.log("Successfully opened a WS");
this.emit('open');
}
}

/**
Expand All @@ -246,7 +284,7 @@ export class SignalingClient extends EventEmitter {
// TODO: Consider how to make it easier for users to be aware of dropped messages.
return;
}
const { messageType, senderClientId } = parsedEventData;
const {messageType, senderClientId} = parsedEventData;
switch (messageType) {
case MessageType.SDP_OFFER:
this.emit('sdpOffer', parsedMessagePayload, senderClientId);
Expand Down Expand Up @@ -321,15 +359,36 @@ export class SignalingClient extends EventEmitter {
* 'error' event handler. Forwards the error onto listeners.
*/
private onError(error: Error | Event): void {
console.log("On error invoked");
this.emit('error', error);
}

/**
* 'close' event handler. Forwards the error onto listeners and cleans up the connection.
*/
private onClose(): void {
private onClose(event?: CloseEvent): void {
this.readyState = ReadyState.CLOSED;
this.cleanupWebSocket();
this.emit('close');
console.log("On close invoked");
switch (event.code) {
// Going away
case 1001:
// Abnormal closure
case 1006:
// Service Restart
case 1012:
// Try Again Later
case 1013:
// TLS Handshake
case 1015:
console.log("Allowing reconnect if option enabled on test page for code", event.code);
this.cleanupWebSocket();
this.emit('closewithretry');
break;
default:
console.log("No reconnect will be attempted. Just exiting with code", event.code);
this.cleanupWebSocket();
this.emit('close');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also emit the event as data? Then someone can take this data and take action

Suggested change
this.emit('close');
this.emit('close', event);

break;
}
}
}