diff --git a/src/common/lib/transport/messagequeue.ts b/src/common/lib/transport/messagequeue.ts index a335a585b..f5d775e0a 100644 --- a/src/common/lib/transport/messagequeue.ts +++ b/src/common/lib/transport/messagequeue.ts @@ -39,34 +39,49 @@ class MessageQueue extends EventEmitter { this.messages.unshift.apply(this.messages, messages); } - completeMessages(serial: number, count: number, err?: ErrorInfo | null): void { + /** + * For all messages targeted by the selector, calls their callback and removes them from the queue. + * + * @param selector - Describes which messages to target. 'all' means all messages in the queue (regardless of whether they have had a `msgSerial` assigned); `serial` / `count` targets a range of messages described by an `ACK` or `NACK` received from Ably (this assumes that all the messages in the queue have had a `msgSerial` assigned). + */ + completeMessages(selector: 'all' | { serial: number; count: number }, err?: ErrorInfo | null): void { Logger.logAction( this.logger, Logger.LOG_MICRO, 'MessageQueue.completeMessages()', - 'serial = ' + serial + '; count = ' + count, + selector == 'all' ? '(all)' : 'serial = ' + selector.serial + '; count = ' + selector.count, ); err = err || null; const messages = this.messages; if (messages.length === 0) { throw new Error('MessageQueue.completeMessages(): completeMessages called on any empty MessageQueue'); } - const first = messages[0]; - if (first) { - const startSerial = first.message.msgSerial as number; - const endSerial = serial + count; /* the serial of the first message that is *not* the subject of this call */ - if (endSerial > startSerial) { - const completeMessages = messages.splice(0, endSerial - startSerial); - for (const message of completeMessages) { - (message.callback as Function)(err); + + let completeMessages: PendingMessage[] = []; + + if (selector === 'all') { + completeMessages = messages.splice(0); + } else { + const first = messages[0]; + if (first) { + const startSerial = first.message.msgSerial as number; + const endSerial = + selector.serial + selector.count; /* the serial of the first message that is *not* the subject of this call */ + if (endSerial > startSerial) { + completeMessages = messages.splice(0, endSerial - startSerial); } } - if (messages.length == 0) this.emit('idle'); } + + for (const message of completeMessages) { + (message.callback as Function)(err); + } + + if (messages.length == 0) this.emit('idle'); } completeAllMessages(err: ErrorInfo): void { - this.completeMessages(0, Number.MAX_SAFE_INTEGER || Number.MAX_VALUE, err); + this.completeMessages('all', err); } resetSendAttempted(): void { diff --git a/src/common/lib/transport/protocol.ts b/src/common/lib/transport/protocol.ts index e3f1590e1..a9eb3eb7d 100644 --- a/src/common/lib/transport/protocol.ts +++ b/src/common/lib/transport/protocol.ts @@ -45,7 +45,7 @@ class Protocol extends EventEmitter { onAck(serial: number, count: number): void { Logger.logAction(this.logger, Logger.LOG_MICRO, 'Protocol.onAck()', 'serial = ' + serial + '; count = ' + count); - this.messageQueue.completeMessages(serial, count); + this.messageQueue.completeMessages({ serial, count }); } onNack(serial: number, count: number, err: ErrorInfo): void { @@ -58,7 +58,7 @@ class Protocol extends EventEmitter { if (!err) { err = new ErrorInfo('Unable to send message; channel not responding', 50001, 500); } - this.messageQueue.completeMessages(serial, count, err); + this.messageQueue.completeMessages({ serial, count }, err); } onceIdle(listener: ErrCallback): void { diff --git a/test/common/modules/private_api_recorder.js b/test/common/modules/private_api_recorder.js index 338fe3ffb..fc01599b7 100644 --- a/test/common/modules/private_api_recorder.js +++ b/test/common/modules/private_api_recorder.js @@ -103,6 +103,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths) 'read.connectionManager.connectionId', 'read.connectionManager.connectionStateTtl', 'read.connectionManager.domains', + 'read.connectionManager.queuedMessages', 'read.connectionManager.msgSerial', 'read.connectionManager.options', 'read.connectionManager.options.timeouts.httpMaxRetryDuration', diff --git a/test/realtime/failure.test.js b/test/realtime/failure.test.js index 789c1d347..84dc8a646 100644 --- a/test/realtime/failure.test.js +++ b/test/realtime/failure.test.js @@ -479,16 +479,16 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async }); /* RTN7c - * Publish a message, then before it receives an ack, disconnect the + * Publish a message whilst CONNECTED, then before it receives an ack, disconnect the * transport, and let the connection go into some terminal failure state. * Check that the publish callback is called with an error. */ - function nack_on_connection_failure(failureFn, expectedRealtimeState, expectedNackCode) { + function nack_of_sent_message_on_connection_failure(failureFn, expectedRealtimeState, expectedNackCode) { return function (done) { /* Use one transport because stubbing out transport#onProtocolMesage */ - var helper = this.test.helper.withParameterisedTestTitle('nack_on_connection_failure'), + var helper = this.test.helper.withParameterisedTestTitle('nack_of_sent_message_on_connection_failure'), realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }), - channel = realtime.channels.get('nack_on_connection_failure'); + channel = realtime.channels.get('nack_of_sent_message_on_connection_failure'); async.series( [ @@ -539,10 +539,10 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async }; } - /** @specpartial RTN7c - test for SUSPENDED */ + /** @specpartial RTN7c - test for SUSPENDED in the case where the message has been sent on the transport */ it( - 'nack_on_connection_suspended', - nack_on_connection_failure( + 'nack_of_sent_message_on_connection_suspended', + nack_of_sent_message_on_connection_failure( function (realtime, helper) { helper.becomeSuspended(realtime); }, @@ -551,10 +551,10 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async ), ); - /** @specpartial RTN7c - test for FAILED */ + /** @specpartial RTN7c - test for FAILED in the case where the message has been sent on the transport */ it( - 'nack_on_connection_failed', - nack_on_connection_failure( + 'nack_of_sent_message_on_connection_failed', + nack_of_sent_message_on_connection_failure( function (realtime, helper) { helper.recordPrivateApi('read.connectionManager.activeProtocol.transport'); helper.recordPrivateApi('call.transport.onProtocolMessage'); @@ -568,10 +568,113 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async ), ); - /** @specpartial RTN7c - test for CLOSED */ + /** @specpartial RTN7c - test for CLOSED in the case where the message has been sent on the transport */ it( - 'nack_on_connection_closed', - nack_on_connection_failure( + 'nack_of_sent_message_on_connection_closed', + nack_of_sent_message_on_connection_failure( + function (realtime) { + realtime.close(); + }, + 'closed', + 80017, + ), + ); + + /* RTN7c + * Publish a message whilst not CONNECTED, then whilst the message is still + * queued let the connection go into some terminal failure state. Check + * that the publish callback is called with an error. + */ + function nack_of_queued_message_on_connection_failure(failureFn, expectedRealtimeState, expectedNackCode) { + return function (done) { + /* Use one transport because stubbing out transport#onProtocolMesage */ + var helper = this.test.helper.withParameterisedTestTitle('nack_of_queued_message_on_connection_failure'), + realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }), + channel = realtime.channels.get('nack_of_queued_message_on_connection_failure'); + + // Drop inbound CONNECTED so that we don't become CONNECTED + helper.recordPrivateApi('listen.connectionManager.transport.pending'); + realtime.connection.connectionManager.on('transport.pending', function (transport) { + var originalOnProtocolMessage = transport.onProtocolMessage; + helper.recordPrivateApi('replace.transport.onProtocolMessage'); + transport.onProtocolMessage = function (message) { + if (message.action !== 4) { + helper.recordPrivateApi('call.transport.onProtocolMessage'); + originalOnProtocolMessage.call(this, message); + } + }; + }); + + async.series( + [ + function (cb) { + Helper.whenPromiseSettles(channel.publish('foo', 'bar'), function (err) { + try { + expect(err, 'Publish failed as expected').to.be.ok; + expect(realtime.connection.state).to.equal( + expectedRealtimeState, + 'check realtime state is ' + expectedRealtimeState, + ); + expect(err.code).to.equal(expectedNackCode, 'Check error code was ' + expectedNackCode); + cb(); + } catch (err) { + cb(err); + } + }); + // We wait for the `publish()`-ed message to appear in the message queue before enacting the connection state change (queueing is preceded by asynchronous encoding). AFAIK there isn't an event-driven internal API for this so we'll just poll. + (async () => { + helper.recordPrivateApi('call.Platform.nextTick'); + helper.recordPrivateApi('read.connectionManager.queuedMessages'); + while (true) { + await new Promise((res) => Ably.Realtime.Platform.Config.nextTick(res)); + if (realtime.connection.connectionManager.queuedMessages.count() > 0) { + failureFn(realtime, helper.withParameterisedTestTitle(null)); + break; + } + } + })(); + }, + ], + function (err) { + helper.closeAndFinish(done, realtime, err); + }, + ); + }; + } + + /** @specpartial RTN7c - test for SUSPENDED in the case where the message has not yet been sent on the transport */ + it( + 'nack_of_queued_message_on_connection_suspended', + nack_of_queued_message_on_connection_failure( + function (realtime, helper) { + helper.recordPrivateApi('call.connectionManager.notifyState'); + realtime.connection.connectionManager.notifyState({ state: 'suspended' }); + }, + 'suspended', + 80002, + ), + ); + + /** @specpartial RTN7c - test for FAILED in the case where the message has not yet been sent on the transport */ + it( + 'nack_of_queued_message_on_connection_failed', + nack_of_queued_message_on_connection_failure( + function (realtime, helper) { + helper.recordPrivateApi('call.connectionManager.notifyState'); + realtime.connection.connectionManager.notifyState({ + state: 'failed', + error: { statusCode: 401, code: 40100, message: 'connection failed because reasons' }, + }); + }, + 'failed', + 40100, + ), + ); + + /** @specpartial RTN7c - test for CLOSED in the case where the message has not yet been sent on the transport */ + it( + 'nack_of_queued_message_on_connection_closed', + nack_of_queued_message_on_connection_failure( function (realtime) { realtime.close(); },