@@ -9,7 +9,6 @@ import ChannelStateChange from './channelstatechange';
99import ErrorInfo , { PartialErrorInfo } from '../types/errorinfo' ;
1010import * as API from '../../../../ably' ;
1111import ConnectionManager from '../transport/connectionmanager' ;
12- import ConnectionStateChange from './connectionstatechange' ;
1312import { StandardCallback } from '../../types/utils' ;
1413import BaseRealtime from './baserealtime' ;
1514import { ChannelOptions } from '../../types/channel' ;
@@ -87,7 +86,20 @@ class RealtimeChannel extends EventEmitter {
8786 protocolMessageChannelSerial ?: string | null ;
8887 decodeFailureRecoveryInProgress : null | boolean ;
8988 } ;
90- _allChannelChanges : EventEmitter ;
89+ /**
90+ * Emits an 'attached' event (with no payload) whenever an ATTACHED protocol
91+ * message is received from the server. Used by setOptions (RTL16a) to know
92+ * when the server has confirmed new channel options.
93+ */
94+ _attachedReceived : EventEmitter ;
95+ /**
96+ * Internal event emitter for channel state changes, not affected by public
97+ * off() calls. Exists to satisfy RTC10: the client library should never
98+ * register internal listeners with the public EventEmitter in such a way
99+ * that a user calling off() would result in the library not working as
100+ * expected.
101+ */
102+ internalStateChanges : EventEmitter ;
91103 params ?: Record < string , any > ;
92104 modes : API . ChannelMode [ ] | undefined ;
93105 stateTimer ?: number | NodeJS . Timeout | null ;
@@ -127,9 +139,8 @@ class RealtimeChannel extends EventEmitter {
127139 protocolMessageChannelSerial : null ,
128140 decodeFailureRecoveryInProgress : null ,
129141 } ;
130- /* Only differences between this and the public event emitter is that this emits an
131- * update event for all ATTACHEDs, whether resumed or not */
132- this . _allChannelChanges = new EventEmitter ( this . logger ) ;
142+ this . _attachedReceived = new EventEmitter ( this . logger ) ;
143+ this . internalStateChanges = new EventEmitter ( this . logger ) ;
133144
134145 if ( client . options . plugins ?. Push ) {
135146 this . _push = new client . options . plugins . Push . PushChannel ( this ) ;
@@ -155,6 +166,12 @@ class RealtimeChannel extends EventEmitter {
155166 return this . _object ; // RTL27a
156167 }
157168
169+ // Override of EventEmitter method
170+ emit ( event : string , ...args : unknown [ ] ) {
171+ super . emit ( event , ...args ) ;
172+ this . internalStateChanges . emit ( event , ...args ) ;
173+ }
174+
158175 invalidStateError ( ) : ErrorInfo {
159176 return new ErrorInfo (
160177 'Channel operation failed as channel state is ' + this . state ,
@@ -189,23 +206,21 @@ class RealtimeChannel extends EventEmitter {
189206 * rejecting messages until we have confirmation that the options have changed,
190207 * which would unnecessarily lose message continuity. */
191208 this . attachImpl ( ) ;
192- return new Promise ( ( resolve , reject ) => {
193- // Ignore 'attaching' -- could be just due to to a resume & reattach, should not
194- // call back setOptions until we're definitely attached with the new options (or
195- // else in a terminal state)
196- this . _allChannelChanges . once (
197- [ 'attached' , 'update' , 'detached' , 'failed' ] ,
198- function ( this : { event : string } , stateChange : ConnectionStateChange ) {
199- switch ( this . event ) {
200- case 'update' :
201- case 'attached' :
202- resolve ( ) ;
203- break ;
204- default :
205- reject ( stateChange . reason ) ;
206- }
207- } ,
208- ) ;
209+ return new Promise < void > ( ( resolve , reject ) => {
210+ const cleanup = ( ) => {
211+ this . _attachedReceived . off ( onAttached ) ;
212+ this . internalStateChanges . off ( onFailure ) ;
213+ } ;
214+ const onAttached = ( ) => {
215+ cleanup ( ) ;
216+ resolve ( ) ;
217+ } ;
218+ const onFailure = ( stateChange : ChannelStateChange ) => {
219+ cleanup ( ) ;
220+ reject ( stateChange . reason ) ;
221+ } ;
222+ this . _attachedReceived . once ( 'attached' , onAttached ) ;
223+ this . internalStateChanges . once ( [ 'detached' , 'failed' ] , onFailure ) ;
209224 } ) ;
210225 }
211226 }
@@ -344,7 +359,7 @@ class RealtimeChannel extends EventEmitter {
344359 this . requestState ( 'attaching' , attachReason ) ;
345360 }
346361
347- this . once ( function ( this : { event : string } , stateChange : ChannelStateChange ) {
362+ this . internalStateChanges . once ( function ( this : { event : string } , stateChange : ChannelStateChange ) {
348363 switch ( this . event ) {
349364 case 'attached' :
350365 callback ?.( null , stateChange ) ;
@@ -409,7 +424,7 @@ class RealtimeChannel extends EventEmitter {
409424 // eslint-disable-next-line no-fallthrough
410425 case 'detaching' :
411426 return new Promise ( ( resolve , reject ) => {
412- this . once ( function ( this : { event : string } , stateChange : ChannelStateChange ) {
427+ this . internalStateChanges . once ( function ( this : { event : string } , stateChange : ChannelStateChange ) {
413428 switch ( this . event ) {
414429 case 'detached' :
415430 resolve ( ) ;
@@ -556,6 +571,7 @@ class RealtimeChannel extends EventEmitter {
556571 const hasPresence = message . hasFlag ( 'HAS_PRESENCE' ) ;
557572 const hasBacklog = message . hasFlag ( 'HAS_BACKLOG' ) ;
558573 const hasObjects = message . hasFlag ( 'HAS_OBJECTS' ) ;
574+ this . _attachedReceived . emit ( 'attached' ) ;
559575 if ( this . state === 'attached' ) {
560576 if ( ! resumed ) {
561577 // we have lost continuity.
@@ -569,7 +585,6 @@ class RealtimeChannel extends EventEmitter {
569585 }
570586 }
571587 const change = new ChannelStateChange ( this . state , this . state , resumed , hasBacklog , message . error ) ;
572- this . _allChannelChanges . emit ( 'update' , change ) ;
573588 if ( ! resumed || this . channelOptions . updateOnAttached ) {
574589 this . emit ( 'update' , change ) ;
575590 }
@@ -848,7 +863,6 @@ class RealtimeChannel extends EventEmitter {
848863 }
849864
850865 this . state = state ;
851- this . _allChannelChanges . emit ( state , change ) ;
852866 this . emit ( state , change ) ;
853867 }
854868
0 commit comments