Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions src/common/lib/transport/messagequeue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,49 @@ class MessageQueue extends EventEmitter {
this.messages.unshift.apply(this.messages, messages);
}

completeMessages(serial: number, count: number, err?: ErrorInfo | null): void {
/**
* For all messages targeted by the selector, calls their callback and removes them from the queue.
*
* @param selector - Describes which messages to target. 'all' means all messages in the queue (regardless of whether they have had a `msgSerial` assigned); `serial` / `count` targets a range of messages described by an `ACK` or `NACK` received from Ably (this assumes that all the messages in the queue have had a `msgSerial` assigned).
*/
completeMessages(selector: 'all' | { serial: number; count: number }, err?: ErrorInfo | null): void {
Logger.logAction(
this.logger,
Logger.LOG_MICRO,
'MessageQueue.completeMessages()',
'serial = ' + serial + '; count = ' + count,
selector == 'all' ? '(all)' : 'serial = ' + selector.serial + '; count = ' + selector.count,
);
err = err || null;
const messages = this.messages;
if (messages.length === 0) {
throw new Error('MessageQueue.completeMessages(): completeMessages called on any empty MessageQueue');
}
const first = messages[0];
if (first) {
const startSerial = first.message.msgSerial as number;
const endSerial = serial + count; /* the serial of the first message that is *not* the subject of this call */
if (endSerial > startSerial) {
const completeMessages = messages.splice(0, endSerial - startSerial);
for (const message of completeMessages) {
(message.callback as Function)(err);

let completeMessages: PendingMessage[] = [];

if (selector === 'all') {
completeMessages = messages.splice(0);
} else {
const first = messages[0];
if (first) {
const startSerial = first.message.msgSerial as number;
const endSerial =
selector.serial + selector.count; /* the serial of the first message that is *not* the subject of this call */
if (endSerial > startSerial) {
completeMessages = messages.splice(0, endSerial - startSerial);
}
}
if (messages.length == 0) this.emit('idle');
}

for (const message of completeMessages) {
(message.callback as Function)(err);
}
Comment thread
lawrence-forooghian marked this conversation as resolved.

if (messages.length == 0) this.emit('idle');
}

completeAllMessages(err: ErrorInfo): void {
this.completeMessages(0, Number.MAX_SAFE_INTEGER || Number.MAX_VALUE, err);
this.completeMessages('all', err);
}

resetSendAttempted(): void {
Expand Down
4 changes: 2 additions & 2 deletions src/common/lib/transport/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Protocol extends EventEmitter {

onAck(serial: number, count: number): void {
Logger.logAction(this.logger, Logger.LOG_MICRO, 'Protocol.onAck()', 'serial = ' + serial + '; count = ' + count);
this.messageQueue.completeMessages(serial, count);
this.messageQueue.completeMessages({ serial, count });
}

onNack(serial: number, count: number, err: ErrorInfo): void {
Expand All @@ -58,7 +58,7 @@ class Protocol extends EventEmitter {
if (!err) {
err = new ErrorInfo('Unable to send message; channel not responding', 50001, 500);
}
this.messageQueue.completeMessages(serial, count, err);
this.messageQueue.completeMessages({ serial, count }, err);
}

onceIdle(listener: ErrCallback): void {
Expand Down
1 change: 1 addition & 0 deletions test/common/modules/private_api_recorder.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'read.connectionManager.connectionId',
'read.connectionManager.connectionStateTtl',
'read.connectionManager.domains',
'read.connectionManager.queuedMessages',
'read.connectionManager.msgSerial',
'read.connectionManager.options',
'read.connectionManager.options.timeouts.httpMaxRetryDuration',
Expand Down
129 changes: 116 additions & 13 deletions test/realtime/failure.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -479,16 +479,16 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
});

/* RTN7c
* Publish a message, then before it receives an ack, disconnect the
* Publish a message whilst CONNECTED, then before it receives an ack, disconnect the
* transport, and let the connection go into some terminal failure state.
* Check that the publish callback is called with an error.
*/
function nack_on_connection_failure(failureFn, expectedRealtimeState, expectedNackCode) {
function nack_of_sent_message_on_connection_failure(failureFn, expectedRealtimeState, expectedNackCode) {
return function (done) {
/* Use one transport because stubbing out transport#onProtocolMesage */
var helper = this.test.helper.withParameterisedTestTitle('nack_on_connection_failure'),
var helper = this.test.helper.withParameterisedTestTitle('nack_of_sent_message_on_connection_failure'),
realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }),
channel = realtime.channels.get('nack_on_connection_failure');
channel = realtime.channels.get('nack_of_sent_message_on_connection_failure');

async.series(
[
Expand Down Expand Up @@ -539,10 +539,10 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
};
}

/** @specpartial RTN7c - test for SUSPENDED */
/** @specpartial RTN7c - test for SUSPENDED in the case where the message has been sent on the transport */
it(
'nack_on_connection_suspended',
nack_on_connection_failure(
'nack_of_sent_message_on_connection_suspended',
nack_of_sent_message_on_connection_failure(
function (realtime, helper) {
helper.becomeSuspended(realtime);
},
Expand All @@ -551,10 +551,10 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
),
);

/** @specpartial RTN7c - test for FAILED */
/** @specpartial RTN7c - test for FAILED in the case where the message has been sent on the transport */
it(
'nack_on_connection_failed',
nack_on_connection_failure(
'nack_of_sent_message_on_connection_failed',
nack_of_sent_message_on_connection_failure(
function (realtime, helper) {
helper.recordPrivateApi('read.connectionManager.activeProtocol.transport');
helper.recordPrivateApi('call.transport.onProtocolMessage');
Expand All @@ -568,10 +568,113 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
),
);

/** @specpartial RTN7c - test for CLOSED */
/** @specpartial RTN7c - test for CLOSED in the case where the message has been sent on the transport */
it(
'nack_on_connection_closed',
nack_on_connection_failure(
'nack_of_sent_message_on_connection_closed',
nack_of_sent_message_on_connection_failure(
function (realtime) {
realtime.close();
},
'closed',
80017,
),
);

/* RTN7c
* Publish a message whilst not CONNECTED, then whilst the message is still
* queued let the connection go into some terminal failure state. Check
* that the publish callback is called with an error.
*/
function nack_of_queued_message_on_connection_failure(failureFn, expectedRealtimeState, expectedNackCode) {
return function (done) {
/* Use one transport because stubbing out transport#onProtocolMesage */
var helper = this.test.helper.withParameterisedTestTitle('nack_of_queued_message_on_connection_failure'),
realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }),
channel = realtime.channels.get('nack_of_queued_message_on_connection_failure');

// Drop inbound CONNECTED so that we don't become CONNECTED
helper.recordPrivateApi('listen.connectionManager.transport.pending');
realtime.connection.connectionManager.on('transport.pending', function (transport) {
var originalOnProtocolMessage = transport.onProtocolMessage;
helper.recordPrivateApi('replace.transport.onProtocolMessage');
transport.onProtocolMessage = function (message) {
if (message.action !== 4) {
helper.recordPrivateApi('call.transport.onProtocolMessage');
originalOnProtocolMessage.call(this, message);
}
};
});

async.series(
[
function (cb) {
Helper.whenPromiseSettles(channel.publish('foo', 'bar'), function (err) {
try {
expect(err, 'Publish failed as expected').to.be.ok;
expect(realtime.connection.state).to.equal(
expectedRealtimeState,
'check realtime state is ' + expectedRealtimeState,
);
expect(err.code).to.equal(expectedNackCode, 'Check error code was ' + expectedNackCode);
cb();
} catch (err) {
cb(err);
}
});
// We wait for the `publish()`-ed message to appear in the message queue before enacting the connection state change (queueing is preceded by asynchronous encoding). AFAIK there isn't an event-driven internal API for this so we'll just poll.
(async () => {
helper.recordPrivateApi('call.Platform.nextTick');
helper.recordPrivateApi('read.connectionManager.queuedMessages');
while (true) {
await new Promise((res) => Ably.Realtime.Platform.Config.nextTick(res));
if (realtime.connection.connectionManager.queuedMessages.count() > 0) {
failureFn(realtime, helper.withParameterisedTestTitle(null));
break;
}
}
})();
},
],
function (err) {
helper.closeAndFinish(done, realtime, err);
},
);
};
}

/** @specpartial RTN7c - test for SUSPENDED in the case where the message has not yet been sent on the transport */
it(
'nack_of_queued_message_on_connection_suspended',
nack_of_queued_message_on_connection_failure(
function (realtime, helper) {
helper.recordPrivateApi('call.connectionManager.notifyState');
realtime.connection.connectionManager.notifyState({ state: 'suspended' });
},
'suspended',
80002,
),
);

/** @specpartial RTN7c - test for FAILED in the case where the message has not yet been sent on the transport */
it(
'nack_of_queued_message_on_connection_failed',
nack_of_queued_message_on_connection_failure(
function (realtime, helper) {
helper.recordPrivateApi('call.connectionManager.notifyState');
realtime.connection.connectionManager.notifyState({
state: 'failed',
error: { statusCode: 401, code: 40100, message: 'connection failed because reasons' },
});
},
'failed',
40100,
),
);

/** @specpartial RTN7c - test for CLOSED in the case where the message has not yet been sent on the transport */
it(
'nack_of_queued_message_on_connection_closed',
nack_of_queued_message_on_connection_failure(
function (realtime) {
realtime.close();
},
Expand Down
Loading