Skip to content

Commit 9b2224d

Browse files
VeskeRclaude
andcommitted
Fix clearing buffered object operations during sync sequence
Buffered object operations must be cleared on every ATTACHED message, not at the start of a new OBJECT_SYNC sequence. If HAS_OBJECTS is set on ATTACHED, the server will deliver a sync sequence following the attachment, guaranteeing that the objects in that sequence include at least all operations up to the attach point. If HAS_OBJECTS is not set, the client performs an implicit sync sequence by transitioning to SYNCING and immediately clearing local state. Read more in LODR-058 [1] and the specification change [2]. Resolves AIT-285 [1] https://ably.atlassian.net/wiki/x/NQAEHgE [2] ably/specification#416 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d36ba3f commit 9b2224d

File tree

3 files changed

+211
-53
lines changed

3 files changed

+211
-53
lines changed

src/common/lib/client/realtimechannel.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -579,11 +579,15 @@ class RealtimeChannel extends EventEmitter {
579579
if (this._presence) {
580580
this._presence.onAttached(hasPresence);
581581
}
582-
// the Objects tree needs to be re-synced
583-
if (this._object) {
584-
this._object.onAttached(hasObjects);
585-
}
586582
}
583+
584+
// Must always resync the Objects tree after an ATTACHED.
585+
// Whether there are objects on the channel and whether an OBJECT_SYNC sequence will follow
586+
// is determined by the HAS_OBJECTS flag.
587+
if (this._object) {
588+
this._object.onAttached(hasObjects);
589+
}
590+
587591
const change = new ChannelStateChange(this.state, this.state, resumed, hasBacklog, message.error);
588592
if (!resumed || this.channelOptions.updateOnAttached) {
589593
this.emit('update', change);

src/plugins/liveobjects/realtimeobject.ts

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,13 @@ export class RealtimeObject {
153153
/**
154154
* @internal
155155
* @spec RTO5
156+
*
157+
* Note on server-initiated resync: if the realtime server needs to force a resync, it is expected
158+
* to send an ATTACHED message before the new OBJECT_SYNC sequence. However, if an OBJECT_SYNC
159+
* is received after a previously completed sync without a preceding ATTACHED,
160+
* we handle it on a best-effort basis: enter the SYNCING state and start buffering OBJECT messages
161+
* from that point. Since the buffer is cleared at the end of each completed sync sequence, receiving
162+
* an OBJECT_SYNC while in the SYNCED state means we start with an empty buffer.
156163
*/
157164
handleObjectSyncMessages(objectMessages: ObjectMessage[], syncChannelSerial: string | null | undefined): void {
158165
const { syncId, syncCursor } = this._parseSyncChannelSerial(syncChannelSerial); // RTO5a
@@ -200,13 +207,19 @@ export class RealtimeObject {
200207
`channel=${this._channel.name}, hasObjects=${hasObjects}`,
201208
);
202209

203-
// RTO4a
210+
// Regardless of whether HAS_OBJECTS is set, the client must drop any previously buffered object operations
211+
// and start a new sync sequence. If HAS_OBJECTS is set, the realtime server will deliver a sync sequence
212+
// following the ATTACHED, guaranteeing that the objects in that sequence include at least all operations
213+
// up to the point of attachment.
214+
// RTO4d
215+
this._bufferedObjectOperations = [];
216+
// RTO4c
204217
this._startNewSync();
205218

206219
// RTO4b
207220
if (!hasObjects) {
208-
// if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel.
209-
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
221+
// If no HAS_OBJECTS flag was received on attach, end the sync sequence immediately and treat it as no objects on the channel.
222+
// Reset the objects pool to its initial state and emit update events so subscribers to the root object are notified of changes.
210223
this._objectsPool.resetToInitialPool(true); // RTO4b1, RTO4b2
211224
this._syncObjectsDataPool.clear(); // RTO4b3
212225
this._endSync(); // RTO4b4
@@ -386,8 +399,6 @@ export class RealtimeObject {
386399
}
387400

388401
private _startNewSync(syncId?: string, syncCursor?: string): void {
389-
// need to discard all buffered object operation messages on new sync start
390-
this._bufferedObjectOperations = [];
391402
this._syncObjectsDataPool.clear();
392403
this._currentSyncId = syncId;
393404
this._currentSyncCursor = syncCursor;
@@ -397,11 +408,11 @@ export class RealtimeObject {
397408
/** @spec RTO5c */
398409
private _endSync(): void {
399410
this._applySync();
400-
// should apply buffered object operations after we applied the sync.
401-
// can use regular object messages application logic
411+
// Apply buffered object operations after the sync has been applied.
412+
// Uses the regular object message application logic.
402413
this._applyObjectMessages(this._bufferedObjectOperations, ObjectsOperationSource.channel); // RTO5c6
403414

404-
this._bufferedObjectOperations = [];
415+
this._bufferedObjectOperations = []; // RTO5c5
405416
this._syncObjectsDataPool.clear(); // RTO5c4
406417
this._currentSyncId = undefined; // RTO5c3
407418
this._currentSyncCursor = undefined; // RTO5c3

0 commit comments

Comments
 (0)