Skip to content

Commit 83e7d8f

Browse files
committed
Protocol v6 and support for partial objects sync
See spec ably/specification#413 and a DR [1] Resolves AIT-38 [1] https://ably.atlassian.net/wiki/x/AQBxCQE
1 parent cb88273 commit 83e7d8f

File tree

3 files changed

+263
-57
lines changed

3 files changed

+263
-57
lines changed

src/plugins/liveobjects/realtimeobject.ts

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,11 @@ export class RealtimeObject {
154154
}
155155

156156
// RTO5a3 - continue current sync sequence
157-
this._syncObjectsPool.applyObjectSyncMessages(objectMessages); // RTO5b
157+
this._syncObjectsPool.applyObjectSyncMessages(objectMessages); // RTO5f
158158

159159
// RTO5a4 - if this is the last (or only) message in a sequence of sync updates, end the sync
160160
if (!syncCursor) {
161-
this._endSync();
161+
this._endSync(); // RTO5c
162162
}
163163
}
164164

@@ -312,34 +312,34 @@ export class RealtimeObject {
312312
}[] = [];
313313

314314
// RTO5c1
315-
for (const [objectId, entry] of this._syncObjectsPool.entries()) {
315+
for (const [objectId, objectMessage] of this._syncObjectsPool.entries()) {
316316
receivedObjectIds.add(objectId);
317317
const existingObject = this._objectsPool.get(objectId);
318318

319319
// RTO5c1a
320320
if (existingObject) {
321-
const update = existingObject.overrideWithObjectState(entry.objectMessage); // RTO5c1a1
321+
const update = existingObject.overrideWithObjectState(objectMessage); // RTO5c1a1
322322
// store updates to call subscription callbacks for all of them once the sync sequence is completed.
323323
// this will ensure that clients get notified about the changes only once everything has been applied.
324324
existingObjectUpdates.push({ object: existingObject, update });
325325
continue;
326326
}
327327

328-
// RTO5c1b,
328+
// RTO5c1b
329329
let newObject: LiveObject;
330-
// assign to a variable so TS doesn't complain about 'never' type in the default case
331-
const objectType = entry.objectType;
332-
switch (objectType) {
333-
case 'LiveCounter':
334-
newObject = LiveCounter.fromObjectState(this, entry.objectMessage); // RTO5c1b1a
335-
break;
336-
337-
case 'LiveMap':
338-
newObject = LiveMap.fromObjectState(this, entry.objectMessage); // RTO5c1b1b
339-
break;
340-
341-
default:
342-
throw new this._client.ErrorInfo(`Unknown LiveObject type: ${objectType}`, 50000, 500); // RTO5c1b1c
330+
if (objectMessage.object?.counter) {
331+
newObject = LiveCounter.fromObjectState(this, objectMessage); // RTO5c1b1a
332+
} else if (objectMessage.object?.map) {
333+
newObject = LiveMap.fromObjectState(this, objectMessage); // RTO5c1b1b
334+
} else {
335+
// RTO5c1b1c
336+
this._client.Logger.logAction(
337+
this._client.logger,
338+
this._client.Logger.LOG_MAJOR,
339+
'RealtimeObject._applySync()',
340+
`received unsupported object state message during OBJECT_SYNC, expected 'counter' or 'map' to be present, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
341+
);
342+
continue;
343343
}
344344

345345
this._objectsPool.set(objectId, newObject); // RTO5c1b1

src/plugins/liveobjects/syncobjectspool.ts

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,19 @@ import type RealtimeChannel from 'common/lib/client/realtimechannel';
33
import { ObjectMessage } from './objectmessage';
44
import { RealtimeObject } from './realtimeobject';
55

6-
export interface LiveObjectDataEntry {
7-
objectMessage: ObjectMessage;
8-
objectType: 'LiveMap' | 'LiveCounter';
9-
}
10-
11-
export interface LiveCounterDataEntry extends LiveObjectDataEntry {
12-
objectType: 'LiveCounter';
13-
}
14-
15-
export interface LiveMapDataEntry extends LiveObjectDataEntry {
16-
objectType: 'LiveMap';
17-
}
18-
19-
export type AnyDataEntry = LiveCounterDataEntry | LiveMapDataEntry;
20-
216
/**
227
* @internal
23-
* @spec RTO5b
248
*/
259
export class SyncObjectsPool {
2610
private _client: BaseClient;
2711
private _channel: RealtimeChannel;
28-
private _pool: Map<string, AnyDataEntry>;
12+
/** Used to accumulate object state during a sync sequence, keyed by object ID */
13+
private _pool: Map<string, ObjectMessage>;
2914

3015
constructor(private _realtimeObject: RealtimeObject) {
3116
this._client = this._realtimeObject.getClient();
3217
this._channel = this._realtimeObject.getChannel();
33-
this._pool = new Map<string, AnyDataEntry>();
18+
this._pool = new Map<string, ObjectMessage>();
3419
}
3520

3621
entries() {
@@ -49,50 +34,83 @@ export class SyncObjectsPool {
4934
this._pool.clear();
5035
}
5136

37+
/** @spec RTO5f */
5238
applyObjectSyncMessages(objectMessages: ObjectMessage[]): void {
5339
for (const objectMessage of objectMessages) {
5440
if (!objectMessage.object) {
5541
this._client.Logger.logAction(
5642
this._client.logger,
5743
this._client.Logger.LOG_MAJOR,
5844
'SyncObjectsPool.applyObjectSyncMessages()',
59-
`object message is received during OBJECT_SYNC without 'object' field, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
45+
`received OBJECT_SYNC message without 'object' field, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
6046
);
6147
continue;
6248
}
6349

6450
const objectState = objectMessage.object;
51+
const objectId = objectState.objectId;
52+
const existingEntry = this._pool.get(objectId);
6553

54+
if (!existingEntry) {
55+
// RTO5f1 - no entry with this objectId exists yet, store it
56+
this._pool.set(objectId, objectMessage);
57+
continue;
58+
}
59+
60+
// RTO5f2 - an object is split across multiple sync messages, merge the new state with the existing entry in the pool based on the object type
6661
if (objectState.counter) {
67-
this._pool.set(objectState.objectId, this._createLiveCounterDataEntry(objectMessage));
68-
} else if (objectState.map) {
69-
this._pool.set(objectState.objectId, this._createLiveMapDataEntry(objectMessage));
70-
} else {
62+
// RTO5f2b - counter objects have a bounded size and should never be split
63+
// across multiple sync messages. Skip the unexpected partial state.
7164
this._client.Logger.logAction(
7265
this._client.logger,
73-
this._client.Logger.LOG_MAJOR,
66+
this._client.Logger.LOG_ERROR,
7467
'SyncObjectsPool.applyObjectSyncMessages()',
75-
`received unsupported object state message during OBJECT_SYNC, expected 'counter' or 'map' to be present, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
68+
`received partial OBJECT_SYNC state for a counter object, skipping message; object id: ${objectId}, message id: ${objectMessage.id}, channel: ${this._channel.name}`,
7669
);
70+
continue;
7771
}
78-
}
79-
}
8072

81-
private _createLiveCounterDataEntry(objectMessage: ObjectMessage): LiveCounterDataEntry {
82-
const newEntry: LiveCounterDataEntry = {
83-
objectMessage,
84-
objectType: 'LiveCounter',
85-
};
73+
if (objectState.map) {
74+
// RTO5f2a
75+
this._mergeMapSyncState(existingEntry, objectMessage);
76+
continue;
77+
}
8678

87-
return newEntry;
79+
// RTO5f2c
80+
this._client.Logger.logAction(
81+
this._client.logger,
82+
this._client.Logger.LOG_MAJOR,
83+
'SyncObjectsPool.applyObjectSyncMessages()',
84+
`received OBJECT_SYNC message with unsupported object type, expected 'counter' or 'map' to be present, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
85+
);
86+
}
8887
}
8988

90-
private _createLiveMapDataEntry(objectMessage: ObjectMessage): LiveMapDataEntry {
91-
const newEntry: LiveMapDataEntry = {
92-
objectMessage,
93-
objectType: 'LiveMap',
94-
};
89+
/**
90+
* Merges map entries from a partial sync message into an existing entry in the pool.
91+
* @spec RTO5f2a
92+
*/
93+
private _mergeMapSyncState(existingEntry: ObjectMessage, newObjectMessage: ObjectMessage): void {
94+
const existingObjectState = existingEntry.object!;
95+
const newObjectState = newObjectMessage.object!;
96+
97+
if (newObjectState.tombstone) {
98+
// RTO5f2a1 - a tombstone flag on any partial message takes precedence over previously accumulated entries
99+
this._pool.set(existingObjectState.objectId, newObjectMessage);
100+
return;
101+
}
95102

96-
return newEntry;
103+
// Other fields on the ObjectState envelope (such as siteTimeserials) and the map envelope
104+
// (such as semantics) are identical across all partial messages for the same object,
105+
// so only the entries need to be merged.
106+
if (!existingObjectState.map!.entries) {
107+
existingObjectState.map!.entries = {};
108+
}
109+
110+
if (newObjectState.map?.entries) {
111+
// RTO5f2a2 - during partial sync, no two messages contain the same map key,
112+
// so entries can be merged directly without conflict checking.
113+
Object.assign(existingObjectState.map!.entries, newObjectState.map.entries);
114+
}
97115
}
98116
}

0 commit comments

Comments
 (0)