Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/push-preid-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
push:
branches:
# Change this to your branch name where "example-preid" corresponds to the preid you want your changes released on
- feat/example-preid-branch/main
- feat/graphql-data-ka/main

jobs:
e2e:
Expand Down
75 changes: 65 additions & 10 deletions packages/api-graphql/__tests__/AWSAppSyncRealTimeProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { Reachability } from '@aws-amplify/core/internals/utils';
import { ConsoleLogger } from '@aws-amplify/core';
import { MESSAGE_TYPES } from '../src/Providers/constants';
import * as constants from '../src/Providers/constants';
import { log, error } from "console";


import {
delay,
Expand Down Expand Up @@ -147,6 +149,10 @@ describe('AWSAppSyncRealTimeProvider', () => {
Object.defineProperty(constants, 'RECONNECT_DELAY', {
value: 100,
});
// Reduce the keep alive heartbeat to 10ms
Object.defineProperty(constants, 'DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT', {
value: 10,
});
});

afterEach(async () => {
Expand Down Expand Up @@ -765,7 +771,7 @@ describe('AWSAppSyncRealTimeProvider', () => {
// Resolve the message delivery actions
await replaceConstant(
'DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT',
5,
10,
async () => {
await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
Expand All @@ -776,17 +782,17 @@ describe('AWSAppSyncRealTimeProvider', () => {
await fakeWebSocketInterface?.startAckMessage();

await fakeWebSocketInterface?.keepAlive();
},
);

await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);

// Wait until the socket is automatically disconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectionDisrupted,
]);
// Wait until the socket is automatically disconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectionDisrupted,
]);
},
);

expect(fakeWebSocketInterface?.observedConnectionStates).toContain(
CS.ConnectedPendingKeepAlive,
Expand All @@ -798,6 +804,55 @@ describe('AWSAppSyncRealTimeProvider', () => {
);
});

test('subscription observer ka is cleared if data is received', async () => {
const consoleLogger = new ConsoleLogger("");
expect.assertions(1);

const observer = provider.subscribe({
appSyncGraphqlEndpoint: 'ws://localhost:8080',
});

observer.subscribe({ error: () => {} });
// Resolve the message delivery actions
await replaceConstant(
'DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT',
5,
async () => {
await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.handShakeMessage({
connectionTimeoutMs: 100,
});

await fakeWebSocketInterface?.startAckMessage();

await fakeWebSocketInterface?.keepAlive();

await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectedPendingKeepAlive,
]);
},
);

// Send message
await fakeWebSocketInterface?.sendDataMessage({
type: MESSAGE_TYPES.DATA,
payload: { data: {} },
});

await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);

expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([
CS.Disconnected,
CS.Connecting,
CS.Connected,
CS.ConnectedPendingKeepAlive,
CS.Connected,
]);
});

test('subscription connection disruption triggers automatic reconnection', async () => {
expect.assertions(1);

Expand Down
85 changes: 58 additions & 27 deletions packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
CONNECTION_INIT_TIMEOUT,
CONNECTION_STATE_CHANGE,
DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT,
DEFAULT_KEEP_ALIVE_TIMEOUT,
DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT,
MAX_DELAY_MS,
MESSAGE_TYPES,
NON_RETRYABLE_CODES,
Expand Down Expand Up @@ -83,9 +83,8 @@ export abstract class AWSWebSocketProvider {

protected awsRealTimeSocket?: WebSocket;
private socketStatus: SOCKET_STATUS = SOCKET_STATUS.CLOSED;
private keepAliveTimeoutId?: ReturnType<typeof setTimeout>;
private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
private keepAliveAlertTimeoutId?: ReturnType<typeof setTimeout>;
private kaTimestamp?: number;
private keepAliveHeartbeatIntervalId?: ReturnType<typeof setTimeout>;
private promiseArray: { res(): void; rej(reason?: any): void }[] = [];
private connectionState: ConnectionState | undefined;
private readonly connectionStateMonitor = new ConnectionStateMonitor();
Expand Down Expand Up @@ -119,6 +118,7 @@ export abstract class AWSWebSocketProvider {
return new Promise<void>((resolve, reject) => {
if (this.awsRealTimeSocket) {
this.awsRealTimeSocket.onclose = (_: CloseEvent) => {
this._closeSocket();
this.subscriptionObserverMap = new Map();
this.awsRealTimeSocket = undefined;
resolve();
Expand Down Expand Up @@ -171,7 +171,7 @@ export abstract class AWSWebSocketProvider {
this.logger.debug(
`${CONTROL_MSG.REALTIME_SUBSCRIPTION_INIT_ERROR}: ${err}`,
);
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this._closeSocket();
})
.finally(() => {
subscriptionStartInProgress = false;
Expand Down Expand Up @@ -435,7 +435,7 @@ export abstract class AWSWebSocketProvider {
this.logger.debug({ err });
const message = String(err.message ?? '');
// Resolving to give the state observer time to propogate the update
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this._closeSocket();

// Capture the error only when the network didn't cause disruption
if (
Expand Down Expand Up @@ -544,20 +544,15 @@ export abstract class AWSWebSocketProvider {
setTimeout(this._closeSocketIfRequired.bind(this), 1000);
} else {
this.logger.debug('closing WebSocket...');
if (this.keepAliveTimeoutId) {
clearTimeout(this.keepAliveTimeoutId);
}
if (this.keepAliveAlertTimeoutId) {
clearTimeout(this.keepAliveAlertTimeoutId);
}

const tempSocket = this.awsRealTimeSocket;
// Cleaning callbacks to avoid race condition, socket still exists
tempSocket.onclose = null;
tempSocket.onerror = null;
tempSocket.close(1000);
this.awsRealTimeSocket = undefined;
this.socketStatus = SOCKET_STATUS.CLOSED;
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this._closeSocket();
}
}

Expand All @@ -577,13 +572,43 @@ export abstract class AWSWebSocketProvider {
errorType: string;
};

private maintainKeepAlive() {
this.kaTimestamp = Date.now();
}

private keepAliveHeartbeat(connectionTimeoutMs: number) {
const currentTime = Date.now();

// Check for missed KA message
if (
this.kaTimestamp &&
currentTime - this.kaTimestamp > DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT
) {
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED);
} else {
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE);
}

// Recognize we are disconnected if we haven't seen messages in the keep alive timeout period
if (
this.kaTimestamp &&
currentTime - this.kaTimestamp > connectionTimeoutMs
) {
this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT);
}
}

private _handleIncomingSubscriptionMessage(message: MessageEvent) {
if (typeof message.data !== 'string') {
return;
}

const [isData, data] = this._handleSubscriptionData(message);
if (isData) return;
if (isData) {
this.maintainKeepAlive();

return;
}

const { type, id, payload } = data;

Expand Down Expand Up @@ -632,16 +657,7 @@ export abstract class AWSWebSocketProvider {
}

if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) {
if (this.keepAliveTimeoutId) clearTimeout(this.keepAliveTimeoutId);
if (this.keepAliveAlertTimeoutId)
clearTimeout(this.keepAliveAlertTimeoutId);
this.keepAliveTimeoutId = setTimeout(() => {
this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT);
}, this.keepAliveTimeout);
this.keepAliveAlertTimeoutId = setTimeout(() => {
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED);
}, DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT);
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE);
this.maintainKeepAlive();

return;
}
Expand Down Expand Up @@ -686,13 +702,21 @@ export abstract class AWSWebSocketProvider {
this.logger.debug(`Disconnect error: ${msg}`);

if (this.awsRealTimeSocket) {
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this._closeSocket();
this.awsRealTimeSocket.close();
}

this.socketStatus = SOCKET_STATUS.CLOSED;
}

private _closeSocket() {
if (this.keepAliveHeartbeatIntervalId) {
clearInterval(this.keepAliveHeartbeatIntervalId);
this.keepAliveHeartbeatIntervalId = undefined;
}
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
}

private _timeoutStartSubscriptionAck(subscriptionId: string) {
const subscriptionObserver =
this.subscriptionObserverMap.get(subscriptionId);
Expand All @@ -708,7 +732,7 @@ export abstract class AWSWebSocketProvider {
subscriptionState: SUBSCRIPTION_STATUS.FAILED,
});

this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this._closeSocket();
this.logger.debug(
'timeoutStartSubscription',
JSON.stringify({ query, variables }),
Expand Down Expand Up @@ -820,6 +844,7 @@ export abstract class AWSWebSocketProvider {
this.logger.debug(`WebSocket connection error`);
};
newSocket.onclose = () => {
this._closeSocket();
reject(new Error('Connection handshake error'));
};
newSocket.onopen = () => {
Expand Down Expand Up @@ -849,6 +874,7 @@ export abstract class AWSWebSocketProvider {

this.awsRealTimeSocket.onclose = event => {
this.logger.debug(`WebSocket closed ${event.reason}`);
this._closeSocket();
reject(new Error(JSON.stringify(event)));
};

Expand Down Expand Up @@ -912,7 +938,11 @@ export abstract class AWSWebSocketProvider {
return;
}

this.keepAliveTimeout = connectionTimeoutMs;
// Setup a keep alive heartbeat for this connection
this.keepAliveHeartbeatIntervalId = setInterval(() => {
this.keepAliveHeartbeat(connectionTimeoutMs);
}, DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT);

this.awsRealTimeSocket.onmessage =
this._handleIncomingSubscriptionMessage.bind(this);

Expand All @@ -923,6 +953,7 @@ export abstract class AWSWebSocketProvider {

this.awsRealTimeSocket.onclose = event => {
this.logger.debug(`WebSocket closed ${event.reason}`);
this._closeSocket();
this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED);
};
}
Expand Down
5 changes: 5 additions & 0 deletions packages/api-graphql/src/Providers/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ export const START_ACK_TIMEOUT = 15000;
*/
export const DEFAULT_KEEP_ALIVE_TIMEOUT = 5 * 60 * 1000;

/**
* Default Time in milleseconds between monitoring checks of keep alive status
*/
export const DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT = 5 * 1000;

/**
* Default Time in milleseconds to alert for missed GQL_CONNECTION_KEEP_ALIVE message
*/
Expand Down
Loading