Skip to content

Commit 1052acf

Browse files
committed
Support for partial objects sync
See spec ably/specification#413 and a DR [1] Resolves AIT-38 [1] https://ably.atlassian.net/wiki/x/AQBxCQE
1 parent 20b6f10 commit 1052acf

File tree

3 files changed

+155
-57
lines changed

3 files changed

+155
-57
lines changed

src/plugins/liveobjects/realtimeobject.ts

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,11 @@ export class RealtimeObject {
163163
}
164164

165165
// RTO5a3 - continue current sync sequence
166-
this._syncObjectsPool.applyObjectSyncMessages(objectMessages); // RTO5b
166+
this._syncObjectsPool.applyObjectSyncMessages(objectMessages); // RTO5f
167167

168168
// RTO5a4 - if this is the last (or only) message in a sequence of sync updates, end the sync
169169
if (!syncCursor) {
170-
this._endSync();
170+
this._endSync(); // RTO5c
171171
}
172172
}
173173

@@ -443,34 +443,34 @@ export class RealtimeObject {
443443
}[] = [];
444444

445445
// RTO5c1
446-
for (const [objectId, entry] of this._syncObjectsPool.entries()) {
446+
for (const [objectId, objectMessage] of this._syncObjectsPool.entries()) {
447447
receivedObjectIds.add(objectId);
448448
const existingObject = this._objectsPool.get(objectId);
449449

450450
// RTO5c1a
451451
if (existingObject) {
452-
const update = existingObject.overrideWithObjectState(entry.objectMessage); // RTO5c1a1
452+
const update = existingObject.overrideWithObjectState(objectMessage); // RTO5c1a1
453453
// store updates to call subscription callbacks for all of them once the sync sequence is completed.
454454
// this will ensure that clients get notified about the changes only once everything has been applied.
455455
existingObjectUpdates.push({ object: existingObject, update });
456456
continue;
457457
}
458458

459-
// RTO5c1b,
459+
// RTO5c1b
460460
let newObject: LiveObject;
461-
// assign to a variable so TS doesn't complain about 'never' type in the default case
462-
const objectType = entry.objectType;
463-
switch (objectType) {
464-
case 'LiveCounter':
465-
newObject = LiveCounter.fromObjectState(this, entry.objectMessage); // RTO5c1b1a
466-
break;
467-
468-
case 'LiveMap':
469-
newObject = LiveMap.fromObjectState(this, entry.objectMessage); // RTO5c1b1b
470-
break;
471-
472-
default:
473-
throw new this._client.ErrorInfo(`Unknown LiveObject type: ${objectType}`, 50000, 500); // RTO5c1b1c
461+
if (objectMessage.object?.counter) {
462+
newObject = LiveCounter.fromObjectState(this, objectMessage); // RTO5c1b1a
463+
} else if (objectMessage.object?.map) {
464+
newObject = LiveMap.fromObjectState(this, objectMessage); // RTO5c1b1b
465+
} else {
466+
// RTO5c1b1c
467+
this._client.Logger.logAction(
468+
this._client.logger,
469+
this._client.Logger.LOG_MAJOR,
470+
'RealtimeObject._applySync()',
471+
`received unsupported object state message during OBJECT_SYNC, expected 'counter' or 'map' to be present, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
472+
);
473+
continue;
474474
}
475475

476476
this._objectsPool.set(objectId, newObject); // RTO5c1b1

src/plugins/liveobjects/syncobjectspool.ts

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,19 @@ import type RealtimeChannel from 'common/lib/client/realtimechannel';
33
import { ObjectMessage } from './objectmessage';
44
import { RealtimeObject } from './realtimeobject';
55

6-
export interface LiveObjectDataEntry {
7-
objectMessage: ObjectMessage;
8-
objectType: 'LiveMap' | 'LiveCounter';
9-
}
10-
11-
export interface LiveCounterDataEntry extends LiveObjectDataEntry {
12-
objectType: 'LiveCounter';
13-
}
14-
15-
export interface LiveMapDataEntry extends LiveObjectDataEntry {
16-
objectType: 'LiveMap';
17-
}
18-
19-
export type AnyDataEntry = LiveCounterDataEntry | LiveMapDataEntry;
20-
216
/**
227
* @internal
23-
* @spec RTO5b
248
*/
259
export class SyncObjectsPool {
2610
private _client: BaseClient;
2711
private _channel: RealtimeChannel;
28-
private _pool: Map<string, AnyDataEntry>;
12+
/** Used to accumulate object state during a sync sequence, keyed by object ID */
13+
private _pool: Map<string, ObjectMessage>;
2914

3015
constructor(private _realtimeObject: RealtimeObject) {
3116
this._client = this._realtimeObject.getClient();
3217
this._channel = this._realtimeObject.getChannel();
33-
this._pool = new Map<string, AnyDataEntry>();
18+
this._pool = new Map<string, ObjectMessage>();
3419
}
3520

3621
entries() {
@@ -49,50 +34,83 @@ export class SyncObjectsPool {
4934
this._pool.clear();
5035
}
5136

37+
/** @spec RTO5f */
5238
applyObjectSyncMessages(objectMessages: ObjectMessage[]): void {
5339
for (const objectMessage of objectMessages) {
5440
if (!objectMessage.object) {
5541
this._client.Logger.logAction(
5642
this._client.logger,
5743
this._client.Logger.LOG_MAJOR,
5844
'SyncObjectsPool.applyObjectSyncMessages()',
59-
`object message is received during OBJECT_SYNC without 'object' field, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
45+
`received OBJECT_SYNC message without 'object' field, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
6046
);
6147
continue;
6248
}
6349

6450
const objectState = objectMessage.object;
51+
const objectId = objectState.objectId;
52+
const existingEntry = this._pool.get(objectId);
6553

54+
if (!existingEntry) {
55+
// RTO5f1 - no entry with this objectId exists yet, store it
56+
this._pool.set(objectId, objectMessage);
57+
continue;
58+
}
59+
60+
// RTO5f2 - an object is split across multiple sync messages, merge the new state with the existing entry in the pool based on the object type
6661
if (objectState.counter) {
67-
this._pool.set(objectState.objectId, this._createLiveCounterDataEntry(objectMessage));
68-
} else if (objectState.map) {
69-
this._pool.set(objectState.objectId, this._createLiveMapDataEntry(objectMessage));
70-
} else {
62+
// RTO5f2b - counter objects have a bounded size and should never be split
63+
// across multiple sync messages. Skip the unexpected partial state.
7164
this._client.Logger.logAction(
7265
this._client.logger,
73-
this._client.Logger.LOG_MAJOR,
66+
this._client.Logger.LOG_ERROR,
7467
'SyncObjectsPool.applyObjectSyncMessages()',
75-
`received unsupported object state message during OBJECT_SYNC, expected 'counter' or 'map' to be present, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
68+
`received partial OBJECT_SYNC state for a counter object, skipping message; object id: ${objectId}, message id: ${objectMessage.id}, channel: ${this._channel.name}`,
7669
);
70+
continue;
7771
}
78-
}
79-
}
8072

81-
private _createLiveCounterDataEntry(objectMessage: ObjectMessage): LiveCounterDataEntry {
82-
const newEntry: LiveCounterDataEntry = {
83-
objectMessage,
84-
objectType: 'LiveCounter',
85-
};
73+
if (objectState.map) {
74+
// RTO5f2a
75+
this._mergeMapSyncState(existingEntry, objectMessage);
76+
continue;
77+
}
8678

87-
return newEntry;
79+
// RTO5f2c
80+
this._client.Logger.logAction(
81+
this._client.logger,
82+
this._client.Logger.LOG_MAJOR,
83+
'SyncObjectsPool.applyObjectSyncMessages()',
84+
`received OBJECT_SYNC message with unsupported object type, expected 'counter' or 'map' to be present, skipping message; message id: ${objectMessage.id}, channel: ${this._channel.name}`,
85+
);
86+
}
8887
}
8988

90-
private _createLiveMapDataEntry(objectMessage: ObjectMessage): LiveMapDataEntry {
91-
const newEntry: LiveMapDataEntry = {
92-
objectMessage,
93-
objectType: 'LiveMap',
94-
};
89+
/**
90+
* Merges map entries from a partial sync message into an existing entry in the pool.
91+
* @spec RTO5f2a
92+
*/
93+
private _mergeMapSyncState(existingEntry: ObjectMessage, newObjectMessage: ObjectMessage): void {
94+
const existingObjectState = existingEntry.object!;
95+
const newObjectState = newObjectMessage.object!;
96+
97+
if (newObjectState.tombstone) {
98+
// RTO5f2a1 - a tombstone flag on any partial message takes precedence over previously accumulated entries
99+
this._pool.set(existingObjectState.objectId, newObjectMessage);
100+
return;
101+
}
95102

96-
return newEntry;
103+
// Other fields on the ObjectState envelope (such as siteTimeserials) and the map envelope
104+
// (such as semantics) are identical across all partial messages for the same object,
105+
// so only the entries need to be merged.
106+
if (!existingObjectState.map!.entries) {
107+
existingObjectState.map!.entries = {};
108+
}
109+
110+
if (newObjectState.map?.entries) {
111+
// RTO5f2a2 - during partial sync, no two messages contain the same map key,
112+
// so entries can be merged directly without conflict checking.
113+
Object.assign(existingObjectState.map!.entries, newObjectState.map.entries);
114+
}
97115
}
98116
}

test/realtime/liveobjects.test.js

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -961,6 +961,86 @@ define(['ably', 'shared_helper', 'chai', 'liveobjects', 'liveobjects_helper'], f
961961
},
962962
},
963963

964+
{
965+
description: 'partial OBJECT_SYNC merges map entries across multiple messages for the same objectId',
966+
action: async (ctx) => {
967+
const { channel, objectsHelper, entryPathObject } = ctx;
968+
969+
const mapId = objectsHelper.fakeMapObjectId();
970+
971+
// assign map object to root
972+
await objectsHelper.processObjectStateMessageOnChannel({
973+
channel,
974+
syncSerial: 'serial:cursor1',
975+
state: [
976+
objectsHelper.mapObject({
977+
objectId: 'root',
978+
siteTimeserials: { aaa: lexicoTimeserial('aaa', 0, 0) },
979+
initialEntries: {
980+
map: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { objectId: mapId } },
981+
},
982+
}),
983+
],
984+
});
985+
986+
// send partial sync messages for the same map object, each with different materialised entries.
987+
// initialEntries are identical across all partial messages for the same object - a server guarantee.
988+
const partialMessages = [
989+
{
990+
syncSerial: 'serial:cursor2',
991+
materialisedEntries: {
992+
key1: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { number: 1 } },
993+
key2: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { string: 'two' } },
994+
},
995+
},
996+
{
997+
syncSerial: 'serial:cursor3',
998+
materialisedEntries: {
999+
key3: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { number: 3 } },
1000+
key4: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { boolean: true } },
1001+
},
1002+
},
1003+
{
1004+
syncSerial: 'serial:', // end sync sequence
1005+
materialisedEntries: {
1006+
key5: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { string: 'five' } },
1007+
},
1008+
},
1009+
];
1010+
1011+
for (const partial of partialMessages) {
1012+
await objectsHelper.processObjectStateMessageOnChannel({
1013+
channel,
1014+
syncSerial: partial.syncSerial,
1015+
state: [
1016+
objectsHelper.mapObject({
1017+
objectId: mapId,
1018+
siteTimeserials: { aaa: lexicoTimeserial('aaa', 0, 0) },
1019+
initialEntries: {
1020+
initialKey: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { string: 'initial' } },
1021+
},
1022+
materialisedEntries: partial.materialisedEntries,
1023+
}),
1024+
],
1025+
});
1026+
}
1027+
1028+
const map = entryPathObject.get('map');
1029+
1030+
expect(map.get('initialKey').value()).to.equal(
1031+
'initial',
1032+
'Check keys from the create operation are present',
1033+
);
1034+
1035+
// check that materialised entries from all partial messages were merged
1036+
expect(map.get('key1').value()).to.equal(1, 'Check key1 from first partial sync');
1037+
expect(map.get('key2').value()).to.equal('two', 'Check key2 from first partial sync');
1038+
expect(map.get('key3').value()).to.equal(3, 'Check key3 from second partial sync');
1039+
expect(map.get('key4').value()).to.equal(true, 'Check key4 from second partial sync');
1040+
expect(map.get('key5').value()).to.equal('five', 'Check key5 from third partial sync');
1041+
},
1042+
},
1043+
9641044
{
9651045
allTransportsAndProtocols: true,
9661046
description: 'LiveCounter is initialized with initial value from OBJECT_SYNC sequence',

0 commit comments

Comments
 (0)