diff --git a/.github/workflows/push-preid-release.yml b/.github/workflows/push-preid-release.yml index 9837290ed14..ce92ead0eb6 100644 --- a/.github/workflows/push-preid-release.yml +++ b/.github/workflows/push-preid-release.yml @@ -9,7 +9,7 @@ on: push: branches: # Change this to your branch name where "example-preid" corresponds to the preid you want your changes released on - - feat/example-preid-branch/main + - feat/graphql-data-ka/main jobs: e2e: diff --git a/packages/api-graphql/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/api-graphql/__tests__/AWSAppSyncRealTimeProvider.test.ts index 041b9624898..3e2968cd1de 100644 --- a/packages/api-graphql/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/api-graphql/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -3,6 +3,8 @@ import { Reachability } from '@aws-amplify/core/internals/utils'; import { ConsoleLogger } from '@aws-amplify/core'; import { MESSAGE_TYPES } from '../src/Providers/constants'; import * as constants from '../src/Providers/constants'; +import { log, error } from "console"; + import { delay, @@ -147,6 +149,10 @@ describe('AWSAppSyncRealTimeProvider', () => { Object.defineProperty(constants, 'RECONNECT_DELAY', { value: 100, }); + // Reduce the keep alive heartbeat to 10ms + Object.defineProperty(constants, 'DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT', { + value: 10, + }); }); afterEach(async () => { @@ -765,7 +771,7 @@ describe('AWSAppSyncRealTimeProvider', () => { // Resolve the message delivery actions await replaceConstant( 'DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT', - 5, + 10, async () => { await fakeWebSocketInterface?.readyForUse; await fakeWebSocketInterface?.triggerOpen(); @@ -776,17 +782,17 @@ describe('AWSAppSyncRealTimeProvider', () => { await fakeWebSocketInterface?.startAckMessage(); await fakeWebSocketInterface?.keepAlive(); - }, - ); - await fakeWebSocketInterface?.waitUntilConnectionStateIn([ - CS.Connected, - ]); + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connected, + ]); - // Wait until the socket is automatically disconnected - await fakeWebSocketInterface?.waitUntilConnectionStateIn([ - CS.ConnectionDisrupted, - ]); + // Wait until the socket is automatically disconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectionDisrupted, + ]); + }, + ); expect(fakeWebSocketInterface?.observedConnectionStates).toContain( CS.ConnectedPendingKeepAlive, @@ -798,6 +804,55 @@ describe('AWSAppSyncRealTimeProvider', () => { ); }); + test('subscription observer ka is cleared if data is received', async () => { + const consoleLogger = new ConsoleLogger(""); + expect.assertions(1); + + const observer = provider.subscribe({ + appSyncGraphqlEndpoint: 'ws://localhost:8080', + }); + + observer.subscribe({ error: () => {} }); + // Resolve the message delivery actions + await replaceConstant( + 'DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT', + 5, + async () => { + await fakeWebSocketInterface?.readyForUse; + await fakeWebSocketInterface?.triggerOpen(); + await fakeWebSocketInterface?.handShakeMessage({ + connectionTimeoutMs: 100, + }); + + await fakeWebSocketInterface?.startAckMessage(); + + await fakeWebSocketInterface?.keepAlive(); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectedPendingKeepAlive, + ]); + }, + ); + + // Send message + await fakeWebSocketInterface?.sendDataMessage({ + type: MESSAGE_TYPES.DATA, + payload: { data: {} }, + }); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connected, + ]); + + expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([ + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingKeepAlive, + CS.Connected, + ]); + }); + test('subscription connection disruption triggers automatic reconnection', async () => { expect.assertions(1); diff --git a/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts b/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts index 5d710e478bc..241367736a6 100644 --- a/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts +++ b/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts @@ -23,7 +23,7 @@ import { CONNECTION_INIT_TIMEOUT, CONNECTION_STATE_CHANGE, DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT, - DEFAULT_KEEP_ALIVE_TIMEOUT, + DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT, MAX_DELAY_MS, MESSAGE_TYPES, NON_RETRYABLE_CODES, @@ -83,9 +83,8 @@ export abstract class AWSWebSocketProvider { protected awsRealTimeSocket?: WebSocket; private socketStatus: SOCKET_STATUS = SOCKET_STATUS.CLOSED; - private keepAliveTimeoutId?: ReturnType; - private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT; - private keepAliveAlertTimeoutId?: ReturnType; + private kaTimestamp?: number; + private keepAliveHeartbeatIntervalId?: ReturnType; private promiseArray: { res(): void; rej(reason?: any): void }[] = []; private connectionState: ConnectionState | undefined; private readonly connectionStateMonitor = new ConnectionStateMonitor(); @@ -119,6 +118,7 @@ export abstract class AWSWebSocketProvider { return new Promise((resolve, reject) => { if (this.awsRealTimeSocket) { this.awsRealTimeSocket.onclose = (_: CloseEvent) => { + this._closeSocket(); this.subscriptionObserverMap = new Map(); this.awsRealTimeSocket = undefined; resolve(); @@ -171,7 +171,7 @@ export abstract class AWSWebSocketProvider { this.logger.debug( `${CONTROL_MSG.REALTIME_SUBSCRIPTION_INIT_ERROR}: ${err}`, ); - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + this._closeSocket(); }) .finally(() => { subscriptionStartInProgress = false; @@ -435,7 +435,7 @@ export abstract class AWSWebSocketProvider { this.logger.debug({ err }); const message = String(err.message ?? ''); // Resolving to give the state observer time to propogate the update - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + this._closeSocket(); // Capture the error only when the network didn't cause disruption if ( @@ -544,12 +544,7 @@ export abstract class AWSWebSocketProvider { setTimeout(this._closeSocketIfRequired.bind(this), 1000); } else { this.logger.debug('closing WebSocket...'); - if (this.keepAliveTimeoutId) { - clearTimeout(this.keepAliveTimeoutId); - } - if (this.keepAliveAlertTimeoutId) { - clearTimeout(this.keepAliveAlertTimeoutId); - } + const tempSocket = this.awsRealTimeSocket; // Cleaning callbacks to avoid race condition, socket still exists tempSocket.onclose = null; @@ -557,7 +552,7 @@ export abstract class AWSWebSocketProvider { tempSocket.close(1000); this.awsRealTimeSocket = undefined; this.socketStatus = SOCKET_STATUS.CLOSED; - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + this._closeSocket(); } } @@ -577,13 +572,43 @@ export abstract class AWSWebSocketProvider { errorType: string; }; + private maintainKeepAlive() { + this.kaTimestamp = Date.now(); + } + + private keepAliveHeartbeat(connectionTimeoutMs: number) { + const currentTime = Date.now(); + + // Check for missed KA message + if ( + this.kaTimestamp && + currentTime - this.kaTimestamp > DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT + ) { + this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); + } else { + this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + } + + // Recognize we are disconnected if we haven't seen messages in the keep alive timeout period + if ( + this.kaTimestamp && + currentTime - this.kaTimestamp > connectionTimeoutMs + ) { + this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT); + } + } + private _handleIncomingSubscriptionMessage(message: MessageEvent) { if (typeof message.data !== 'string') { return; } const [isData, data] = this._handleSubscriptionData(message); - if (isData) return; + if (isData) { + this.maintainKeepAlive(); + + return; + } const { type, id, payload } = data; @@ -632,16 +657,7 @@ export abstract class AWSWebSocketProvider { } if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) { - if (this.keepAliveTimeoutId) clearTimeout(this.keepAliveTimeoutId); - if (this.keepAliveAlertTimeoutId) - clearTimeout(this.keepAliveAlertTimeoutId); - this.keepAliveTimeoutId = setTimeout(() => { - this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT); - }, this.keepAliveTimeout); - this.keepAliveAlertTimeoutId = setTimeout(() => { - this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); - }, DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT); - this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + this.maintainKeepAlive(); return; } @@ -686,13 +702,21 @@ export abstract class AWSWebSocketProvider { this.logger.debug(`Disconnect error: ${msg}`); if (this.awsRealTimeSocket) { - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + this._closeSocket(); this.awsRealTimeSocket.close(); } this.socketStatus = SOCKET_STATUS.CLOSED; } + private _closeSocket() { + if (this.keepAliveHeartbeatIntervalId) { + clearInterval(this.keepAliveHeartbeatIntervalId); + this.keepAliveHeartbeatIntervalId = undefined; + } + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + } + private _timeoutStartSubscriptionAck(subscriptionId: string) { const subscriptionObserver = this.subscriptionObserverMap.get(subscriptionId); @@ -708,7 +732,7 @@ export abstract class AWSWebSocketProvider { subscriptionState: SUBSCRIPTION_STATUS.FAILED, }); - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + this._closeSocket(); this.logger.debug( 'timeoutStartSubscription', JSON.stringify({ query, variables }), @@ -820,6 +844,7 @@ export abstract class AWSWebSocketProvider { this.logger.debug(`WebSocket connection error`); }; newSocket.onclose = () => { + this._closeSocket(); reject(new Error('Connection handshake error')); }; newSocket.onopen = () => { @@ -849,6 +874,7 @@ export abstract class AWSWebSocketProvider { this.awsRealTimeSocket.onclose = event => { this.logger.debug(`WebSocket closed ${event.reason}`); + this._closeSocket(); reject(new Error(JSON.stringify(event))); }; @@ -912,7 +938,11 @@ export abstract class AWSWebSocketProvider { return; } - this.keepAliveTimeout = connectionTimeoutMs; + // Setup a keep alive heartbeat for this connection + this.keepAliveHeartbeatIntervalId = setInterval(() => { + this.keepAliveHeartbeat(connectionTimeoutMs); + }, DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT); + this.awsRealTimeSocket.onmessage = this._handleIncomingSubscriptionMessage.bind(this); @@ -923,6 +953,7 @@ export abstract class AWSWebSocketProvider { this.awsRealTimeSocket.onclose = event => { this.logger.debug(`WebSocket closed ${event.reason}`); + this._closeSocket(); this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED); }; } diff --git a/packages/api-graphql/src/Providers/constants.ts b/packages/api-graphql/src/Providers/constants.ts index 5e82f672081..5aef1a60130 100644 --- a/packages/api-graphql/src/Providers/constants.ts +++ b/packages/api-graphql/src/Providers/constants.ts @@ -128,6 +128,11 @@ export const START_ACK_TIMEOUT = 15000; */ export const DEFAULT_KEEP_ALIVE_TIMEOUT = 5 * 60 * 1000; +/** + * Default Time in milleseconds between monitoring checks of keep alive status + */ +export const DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT = 5 * 1000; + /** * Default Time in milleseconds to alert for missed GQL_CONNECTION_KEEP_ALIVE message */