Skip to content

Commit 1be8271

Browse files
authored
Merge pull request #2152 from ably/AIT-38/partial-objects-sync
[AIT-38] Add support for partial sync of objects - protocol v6
2 parents 1f32ea6 + d0bc431 commit 1be8271

File tree

5 files changed

+333
-125
lines changed

5 files changed

+333
-125
lines changed

scripts/moduleReport.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ async function checkLiveObjectsPluginFiles() {
342342
'src/plugins/liveobjects/pathobjectsubscriptionregister.ts',
343343
'src/plugins/liveobjects/realtimeobject.ts',
344344
'src/plugins/liveobjects/rootbatchcontext.ts',
345-
'src/plugins/liveobjects/syncobjectsdatapool.ts',
345+
'src/plugins/liveobjects/syncobjectspool.ts',
346346
]);
347347

348348
return checkBundleFiles(pluginBundleInfo, allowedFiles, 100);

src/plugins/liveobjects/realtimeobject.ts

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { ObjectMessage, ObjectOperationAction } from './objectmessage';
1212
import { ObjectsPool } from './objectspool';
1313
import { DefaultPathObject } from './pathobject';
1414
import { PathObjectSubscriptionRegister } from './pathobjectsubscriptionregister';
15-
import { SyncObjectsDataPool } from './syncobjectsdatapool';
15+
import { SyncObjectsPool } from './syncobjectspool';
1616

1717
export enum ObjectsEvent {
1818
syncing = 'syncing',
@@ -51,7 +51,7 @@ export class RealtimeObject {
5151
// related to RTC10, should have a separate EventEmitter for users of the library
5252
private _eventEmitterPublic: EventEmitter;
5353
private _objectsPool: ObjectsPool; // RTO3
54-
private _syncObjectsDataPool: SyncObjectsDataPool;
54+
private _syncObjectsPool: SyncObjectsPool;
5555
private _currentSyncId: string | undefined;
5656
private _currentSyncCursor: string | undefined;
5757
private _bufferedObjectOperations: ObjectMessage[];
@@ -68,7 +68,7 @@ export class RealtimeObject {
6868
this._eventEmitterInternal = new this._client.EventEmitter(this._client.logger);
6969
this._eventEmitterPublic = new this._client.EventEmitter(this._client.logger);
7070
this._objectsPool = new ObjectsPool(this);
71-
this._syncObjectsDataPool = new SyncObjectsDataPool(this);
71+
this._syncObjectsPool = new SyncObjectsPool(this);
7272
this._bufferedObjectOperations = [];
7373
this._appliedOnAckSerials = new Set(); // RTO7b1
7474
this._pathObjectSubscriptionRegister = new PathObjectSubscriptionRegister(this);
@@ -163,11 +163,11 @@ export class RealtimeObject {
163163
}
164164

165165
// RTO5a3 - continue current sync sequence
166-
this._syncObjectsDataPool.applyObjectSyncMessages(objectMessages); // RTO5b
166+
this._syncObjectsPool.applyObjectSyncMessages(objectMessages); // RTO5f
167167

168168
// RTO5a4 - if this is the last (or only) message in a sequence of sync updates, end the sync
169169
if (!syncCursor) {
170-
this._endSync();
170+
this._endSync(); // RTO5c
171171
}
172172
}
173173

@@ -208,7 +208,7 @@ export class RealtimeObject {
208208
// if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel.
209209
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
210210
this._objectsPool.resetToInitialPool(true); // RTO4b1, RTO4b2
211-
this._syncObjectsDataPool.clear(); // RTO4b3
211+
this._syncObjectsPool.clear(); // RTO4b3
212212
this._endSync(); // RTO4b4
213213
}
214214
}
@@ -226,7 +226,7 @@ export class RealtimeObject {
226226
case 'failed':
227227
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
228228
this._objectsPool.clearObjectsData(false);
229-
this._syncObjectsDataPool.clear();
229+
this._syncObjectsPool.clear();
230230
break;
231231
}
232232
}
@@ -388,7 +388,7 @@ export class RealtimeObject {
388388
private _startNewSync(syncId?: string, syncCursor?: string): void {
389389
// need to discard all buffered object operation messages on new sync start
390390
this._bufferedObjectOperations = [];
391-
this._syncObjectsDataPool.clear();
391+
this._syncObjectsPool.clear();
392392
this._currentSyncId = syncId;
393393
this._currentSyncCursor = syncCursor;
394394
this._stateChange(ObjectsState.syncing);
@@ -402,7 +402,7 @@ export class RealtimeObject {
402402
this._applyObjectMessages(this._bufferedObjectOperations, ObjectsOperationSource.channel); // RTO5c6
403403

404404
this._bufferedObjectOperations = [];
405-
this._syncObjectsDataPool.clear(); // RTO5c4
405+
this._syncObjectsPool.clear(); // RTO5c4
406406
this._currentSyncId = undefined; // RTO5c3
407407
this._currentSyncCursor = undefined; // RTO5c3
408408

@@ -432,7 +432,7 @@ export class RealtimeObject {
432432
}
433433

434434
private _applySync(): void {
435-
if (this._syncObjectsDataPool.isEmpty()) {
435+
if (this._syncObjectsPool.isEmpty()) {
436436
return;
437437
}
438438

@@ -443,34 +443,34 @@ export class RealtimeObject {
443443
}[] = [];
444444

445445
// RTO5c1
446-
for (const [objectId, entry] of this._syncObjectsDataPool.entries()) {
446+
for (const [objectId, objectMessage] of this._syncObjectsPool.entries()) {
447447
receivedObjectIds.add(objectId);
448448
const existingObject = this._objectsPool.get(objectId);
449449

450450
// RTO5c1a
451451
if (existingObject) {
452-
const update = existingObject.overrideWithObjectState(entry.objectMessage); // RTO5c1a1
452+
const update = existingObject.overrideWithObjectState(objectMessage); // RTO5c1a1
453453
// store updates to call subscription callbacks for all of them once the sync sequence is completed.
454454
// this will ensure that clients get notified about the changes only once everything has been applied.
455455
existingObjectUpdates.push({ object: existingObject, update });
456456
continue;
457457
}
458458

459-
// RTO5c1b,
459+
// RTO5c1b
460460
let newObject: LiveObject;
461-
// assign to a variable so TS doesn't complain about 'never' type in the default case
462-
const objectType = entry.objectType;
463-
switch (objectType) {
464-
case 'LiveCounter':
465-
newObject = LiveCounter.fromObjectState(this, entry.objectMessage); // RTO5c1b1a
466-
break;
467-
468-
case 'LiveMap':
469-
newObject = LiveMap.fromObjectState(this, entry.objectMessage); // RTO5c1b1b
470-
break;
471-
472-
default:
473-
throw new this._client.ErrorInfo(`Unknown LiveObject type: ${objectType}`, 50000, 500); // RTO5c1b1c
461+
if (objectMessage.object?.counter) {
462+
newObject = LiveCounter.fromObjectState(this, objectMessage); // RTO5c1b1a
463+
} else if (objectMessage.object?.map) {
464+
newObject = LiveMap.fromObjectState(this, objectMessage); // RTO5c1b1b
465+
} else {
466+
// RTO5c1b1c
467+
this._client.Logger.logAction(
468+
this._client.logger,
469+
this._client.Logger.LOG_MAJOR,
470+
'RealtimeObject._applySync()',
471+
`received unsupported object state message during OBJECT_SYNC, expected 'counter' or 'map' to be present, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
472+
);
473+
continue;
474474
}
475475

476476
this._objectsPool.set(objectId, newObject); // RTO5c1b1

src/plugins/liveobjects/syncobjectsdatapool.ts

Lines changed: 0 additions & 98 deletions
This file was deleted.
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import type BaseClient from 'common/lib/client/baseclient';
2+
import type RealtimeChannel from 'common/lib/client/realtimechannel';
3+
import { ObjectMessage } from './objectmessage';
4+
import { RealtimeObject } from './realtimeobject';
5+
6+
/**
7+
* @internal
8+
*/
9+
export class SyncObjectsPool {
10+
private _client: BaseClient;
11+
private _channel: RealtimeChannel;
12+
/** Used to accumulate object state during a sync sequence, keyed by object ID */
13+
private _pool: Map<string, ObjectMessage>;
14+
15+
constructor(private _realtimeObject: RealtimeObject) {
16+
this._client = this._realtimeObject.getClient();
17+
this._channel = this._realtimeObject.getChannel();
18+
this._pool = new Map<string, ObjectMessage>();
19+
}
20+
21+
entries() {
22+
return this._pool.entries();
23+
}
24+
25+
size(): number {
26+
return this._pool.size;
27+
}
28+
29+
isEmpty(): boolean {
30+
return this._pool.size === 0;
31+
}
32+
33+
clear(): void {
34+
this._pool.clear();
35+
}
36+
37+
/** @spec RTO5f */
38+
applyObjectSyncMessages(objectMessages: ObjectMessage[]): void {
39+
for (const objectMessage of objectMessages) {
40+
if (!objectMessage.object) {
41+
this._client.Logger.logAction(
42+
this._client.logger,
43+
this._client.Logger.LOG_MAJOR,
44+
'SyncObjectsPool.applyObjectSyncMessages()',
45+
`received OBJECT_SYNC message without 'object' field, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
46+
);
47+
continue;
48+
}
49+
50+
const objectState = objectMessage.object;
51+
52+
if (!objectState.counter && !objectState.map) {
53+
// RTO5f3
54+
this._client.Logger.logAction(
55+
this._client.logger,
56+
this._client.Logger.LOG_MAJOR,
57+
'SyncObjectsPool.applyObjectSyncMessages()',
58+
`received OBJECT_SYNC message with unsupported object type, expected 'counter' or 'map' to be present, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
59+
);
60+
continue;
61+
}
62+
63+
const objectId = objectState.objectId;
64+
const existingEntry = this._pool.get(objectId);
65+
66+
if (!existingEntry) {
67+
// RTO5f1 - no entry with this objectId exists yet, store it
68+
this._pool.set(objectId, objectMessage);
69+
continue;
70+
}
71+
72+
// RTO5f2 - an object is split across multiple sync messages, merge the new state with the existing entry in the pool based on the object type
73+
if (objectState.counter) {
74+
// RTO5f2b - counter objects have a bounded size and should never be split
75+
// across multiple sync messages. Skip the unexpected partial state.
76+
this._client.Logger.logAction(
77+
this._client.logger,
78+
this._client.Logger.LOG_ERROR,
79+
'SyncObjectsPool.applyObjectSyncMessages()',
80+
`received partial OBJECT_SYNC state for a counter object, skipping message; object id: ${objectId}, message id: ${objectMessage.id}, channel: ${this._channel.name}`,
81+
);
82+
continue;
83+
}
84+
85+
if (objectState.map) {
86+
// RTO5f2a
87+
this._mergeMapSyncState(existingEntry, objectMessage);
88+
continue;
89+
}
90+
}
91+
}
92+
93+
/**
94+
* Merges map entries from a partial sync message into an existing entry in the pool.
95+
* @spec RTO5f2a
96+
*/
97+
private _mergeMapSyncState(existingEntry: ObjectMessage, newObjectMessage: ObjectMessage): void {
98+
const existingObjectState = existingEntry.object!;
99+
const newObjectState = newObjectMessage.object!;
100+
101+
if (newObjectState.tombstone) {
102+
// RTO5f2a1 - a tombstone flag on any partial message takes precedence over previously accumulated entries
103+
this._pool.set(existingObjectState.objectId, newObjectMessage);
104+
return;
105+
}
106+
107+
// Other fields on the ObjectState envelope (such as siteTimeserials) and the map envelope
108+
// (such as semantics) are identical across all partial messages for the same object,
109+
// so only the entries need to be merged.
110+
if (!existingObjectState.map!.entries) {
111+
existingObjectState.map!.entries = {};
112+
}
113+
114+
// RTO5f2a2 - during partial sync, no two messages contain the same map key,
115+
// so entries can be merged directly without conflict checking.
116+
Object.assign(existingObjectState.map!.entries, newObjectState.map!.entries);
117+
}
118+
}

0 commit comments

Comments
 (0)