Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 40 additions & 26 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import ChannelStateChange from './channelstatechange';
import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo';
import * as API from '../../../../ably';
import ConnectionManager from '../transport/connectionmanager';
import ConnectionStateChange from './connectionstatechange';
import { StandardCallback } from '../../types/utils';
import BaseRealtime from './baserealtime';
import { ChannelOptions } from '../../types/channel';
Expand Down Expand Up @@ -87,7 +86,20 @@ class RealtimeChannel extends EventEmitter {
protocolMessageChannelSerial?: string | null;
decodeFailureRecoveryInProgress: null | boolean;
};
_allChannelChanges: EventEmitter;
/**
* Emits an 'attached' event (with no payload) whenever an ATTACHED protocol
* message is received from the server. Used by setOptions (RTL16a) to know
* when the server has confirmed new channel options.
*/
_attachedReceived: EventEmitter;
/**
* Internal event emitter for channel state changes, not affected by public
* off() calls. Exists to satisfy RTC10: the client library should never
* register internal listeners with the public EventEmitter in such a way
* that a user calling off() would result in the library not working as
* expected.
*/
internalStateChanges: EventEmitter;
params?: Record<string, any>;
modes: API.ChannelMode[] | undefined;
stateTimer?: number | NodeJS.Timeout | null;
Expand Down Expand Up @@ -127,9 +139,8 @@ class RealtimeChannel extends EventEmitter {
protocolMessageChannelSerial: null,
decodeFailureRecoveryInProgress: null,
};
/* Only differences between this and the public event emitter is that this emits an
* update event for all ATTACHEDs, whether resumed or not */
this._allChannelChanges = new EventEmitter(this.logger);
this._attachedReceived = new EventEmitter(this.logger);
this.internalStateChanges = new EventEmitter(this.logger);
Comment thread
lawrence-forooghian marked this conversation as resolved.

if (client.options.plugins?.Push) {
this._push = new client.options.plugins.Push.PushChannel(this);
Expand All @@ -155,6 +166,12 @@ class RealtimeChannel extends EventEmitter {
return this._object; // RTL27a
}

// Override of EventEmitter method
emit(event: string, ...args: unknown[]) {
super.emit(event, ...args);
this.internalStateChanges.emit(event, ...args);
}

invalidStateError(): ErrorInfo {
return new ErrorInfo(
'Channel operation failed as channel state is ' + this.state,
Expand Down Expand Up @@ -189,23 +206,21 @@ class RealtimeChannel extends EventEmitter {
* rejecting messages until we have confirmation that the options have changed,
* which would unnecessarily lose message continuity. */
this.attachImpl();
return new Promise((resolve, reject) => {
// Ignore 'attaching' -- could be just due to to a resume & reattach, should not
// call back setOptions until we're definitely attached with the new options (or
// else in a terminal state)
this._allChannelChanges.once(
['attached', 'update', 'detached', 'failed'],
function (this: { event: string }, stateChange: ConnectionStateChange) {
switch (this.event) {
case 'update':
case 'attached':
resolve();
break;
default:
reject(stateChange.reason);
}
},
);
return new Promise<void>((resolve, reject) => {
const cleanup = () => {
this._attachedReceived.off(onAttached);
this.internalStateChanges.off(onFailure);
};
const onAttached = () => {
cleanup();
resolve();
};
const onFailure = (stateChange: ChannelStateChange) => {
cleanup();
reject(stateChange.reason);
};
this._attachedReceived.once('attached', onAttached);
this.internalStateChanges.once(['detached', 'failed'], onFailure);
});
Comment thread
lawrence-forooghian marked this conversation as resolved.
}
}
Expand Down Expand Up @@ -344,7 +359,7 @@ class RealtimeChannel extends EventEmitter {
this.requestState('attaching', attachReason);
}

this.once(function (this: { event: string }, stateChange: ChannelStateChange) {
this.internalStateChanges.once(function (this: { event: string }, stateChange: ChannelStateChange) {
switch (this.event) {
case 'attached':
callback?.(null, stateChange);
Expand Down Expand Up @@ -409,7 +424,7 @@ class RealtimeChannel extends EventEmitter {
// eslint-disable-next-line no-fallthrough
case 'detaching':
return new Promise((resolve, reject) => {
this.once(function (this: { event: string }, stateChange: ChannelStateChange) {
this.internalStateChanges.once(function (this: { event: string }, stateChange: ChannelStateChange) {
switch (this.event) {
case 'detached':
resolve();
Expand Down Expand Up @@ -556,6 +571,7 @@ class RealtimeChannel extends EventEmitter {
const hasPresence = message.hasFlag('HAS_PRESENCE');
const hasBacklog = message.hasFlag('HAS_BACKLOG');
const hasObjects = message.hasFlag('HAS_OBJECTS');
this._attachedReceived.emit('attached');
if (this.state === 'attached') {
if (!resumed) {
// we have lost continuity.
Expand All @@ -569,7 +585,6 @@ class RealtimeChannel extends EventEmitter {
}
}
const change = new ChannelStateChange(this.state, this.state, resumed, hasBacklog, message.error);
this._allChannelChanges.emit('update', change);
if (!resumed || this.channelOptions.updateOnAttached) {
this.emit('update', change);
}
Expand Down Expand Up @@ -848,7 +863,6 @@ class RealtimeChannel extends EventEmitter {
}

this.state = state;
this._allChannelChanges.emit(state, change);
this.emit(state, change);
}

Expand Down
3 changes: 1 addition & 2 deletions test/common/modules/private_api_recorder.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'call.transport.send',
'delete.auth.authOptions.requestHeaders',
'deserialize.recoveryKey',
'listen.channel._allChannelChanges.attached',
'listen.channel._allChannelChanges.update',
'listen.channel._attachedReceived.attached',
'listen.connectionManager.connectiondetails',
'listen.connectionManager.transport.active',
'listen.connectionManager.transport.pending',
Expand Down
53 changes: 48 additions & 5 deletions test/realtime/channel.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,8 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
},
function (cb) {
var channelUpdated = false;
helper.recordPrivateApi('listen.channel._allChannelChanges.update');
channel._allChannelChanges.on(['update'], function () {
helper.recordPrivateApi('listen.channel._attachedReceived.attached');
channel._attachedReceived.on('attached', function () {
channelUpdated = true;
});

Expand All @@ -778,9 +778,8 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
},
function (cb) {
var channelUpdated = false;
helper.recordPrivateApi('listen.channel._allChannelChanges.update');
helper.recordPrivateApi('listen.channel._allChannelChanges.attached');
channel._allChannelChanges.on(['attached', 'update'], function () {
helper.recordPrivateApi('listen.channel._attachedReceived.attached');
channel._attachedReceived.on('attached', function () {
channelUpdated = true;
});

Expand Down Expand Up @@ -2016,5 +2015,49 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async

await helper.closeAndFinishAsync(realtime);
});

/**
* Regression test: calling channel.off() while attaching should not
* prevent the attach() promise from resolving.
*
* @specpartial RTC10
*/
it('attach resolves even if channel.off() is called while attaching', async function () {
const helper = this.test.helper;
const realtime = helper.AblyRealtime();

await helper.monitorConnectionThenCloseAndFinishAsync(async () => {
const channel = realtime.channels.get('attach_off_regression');

const attachPromise = channel.attach();
expect(channel.state).to.equal('attaching');
channel.off();

await attachPromise;
}, realtime);
});

/**
* Regression test: calling channel.off() while detaching should not
* prevent the detach() promise from resolving.
*
* @specpartial RTC10
*/
it('detach resolves even if channel.off() is called while detaching', async function () {
const helper = this.test.helper;
const realtime = helper.AblyRealtime();

await helper.monitorConnectionThenCloseAndFinishAsync(async () => {
const channel = realtime.channels.get('detach_off_regression');

await channel.attach();

const detachPromise = channel.detach();
expect(channel.state).to.equal('detaching');
channel.off();

await detachPromise;
}, realtime);
});
});
});
Loading