Skip to content

Commit 6f168f4

Browse files
committed
Support MAP_CLEAR object operation
See DR [1], realtime implementation [2] and spec [3]. The DR specifies that MAP_CLEAR is currently only emitted by the server for the root object. An additional decision was made that the client should be future-proofed to support MAP_CLEAR on any map object ID, not just root. This implementation follows that decision. Semantics of MAP_CLEAR support: 1. OBJECT_SYNC: The clearTimeserial from the sync state is stored on the LiveMap for use by future operations. Materialised entries from the `ObjectMessage.object.map` arrive pre-tombstoned by the server for entries that predated the clear. Initial entries from the `ObjectMessage.object.createOp` are merged via the existing MAP_SET/MAP_REMOVE semantics, which check against clearTimeserial. 2. MAP_SET: After the usual siteTimeserials check, the operation is additionally discarded if clearTimeserial is set and is lexicographically greater than or equal to the operation's serial, since the set predates (or is concurrent with) the clear. 3. MAP_REMOVE: No changes needed - a remove after a clear is valid regardless of clearTimeserial. 4. MAP_CLEAR: The operation is discarded if the current clearTimeserial is already greater than or equal to the incoming serial (stale clear). Otherwise, clearTimeserial is updated to the operation's serial, and all existing entries whose timeserial is null or less than or equal to the new clearTimeserial are tombstoned. Entries with a strictly later timeserial are preserved. Resolves AIT-458 [1] https://ably.atlassian.net/wiki/x/DABECAE [2] ably/realtime#8074 [3] ably/specification#432
1 parent 4f3d5e4 commit 6f168f4

File tree

7 files changed

+761
-17
lines changed

7 files changed

+761
-17
lines changed

liveobjects.d.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1489,6 +1489,10 @@ declare namespace ObjectOperationActions {
14891489
* Object operation action for deleting an object.
14901490
*/
14911491
type OBJECT_DELETE = 'object.delete';
1492+
/**
1493+
* Object operation action for clearing a map object.
1494+
*/
1495+
type MAP_CLEAR = 'map.clear';
14921496
}
14931497

14941498
/**
@@ -1500,7 +1504,8 @@ export type ObjectOperationAction =
15001504
| ObjectOperationActions.MAP_REMOVE
15011505
| ObjectOperationActions.COUNTER_CREATE
15021506
| ObjectOperationActions.COUNTER_INC
1503-
| ObjectOperationActions.OBJECT_DELETE;
1507+
| ObjectOperationActions.OBJECT_DELETE
1508+
| ObjectOperationActions.MAP_CLEAR;
15041509

15051510
/**
15061511
* The namespace containing the different types of map object semantics.
@@ -1607,6 +1612,10 @@ export interface ObjectOperation {
16071612
* The payload for the operation if the action is {@link ObjectOperationActions.OBJECT_DELETE}.
16081613
*/
16091614
objectDelete?: ObjectDelete;
1615+
/**
1616+
* The payload for the operation if the action is {@link ObjectOperationActions.MAP_CLEAR}.
1617+
*/
1618+
mapClear?: MapClear;
16101619

16111620
/**
16121621
* The payload for the operation if it is a mutation operation on a map object.
@@ -1735,6 +1744,11 @@ export interface CounterInc {
17351744
*/
17361745
export interface ObjectDelete {}
17371746

1747+
/**
1748+
* Describes the payload for a MAP_CLEAR operation.
1749+
*/
1750+
export interface MapClear {}
1751+
17381752
/**
17391753
* Represents a value in an object on a channel.
17401754
*/

src/plugins/liveobjects/livemap.ts

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
5656
implements PublicLiveMap<T>
5757
{
5858
declare readonly [__livetype]: 'LiveMap'; // type-only, unique symbol to satisfy branded interfaces, no JS emitted
59+
private _clearTimeserial?: string; // RTLM25
5960

6061
constructor(
6162
realtimeObject: RealtimeObject,
@@ -358,6 +359,11 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
358359
update = this._applyObjectDelete(msg);
359360
break;
360361

362+
case ObjectOperationAction.MAP_CLEAR:
363+
// RTLM15d8
364+
update = this._applyMapClear(msg);
365+
break;
366+
361367
default:
362368
throw new this._client.ErrorInfo(
363369
`Invalid ${op.action} op for LiveMap objectId=${this.getObjectId()}`,
@@ -366,8 +372,8 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
366372
);
367373
}
368374

369-
this.notifyUpdated(update); // RTLM15d1a, RTLM15d6a, RTLM15d7a, RTLM15d5a
370-
return true; // RTLM15d1b, RTLM15d6b, RTLM15d7b, RTLM15d5b
375+
this.notifyUpdated(update); // RTLM15d1a, RTLM15d6a, RTLM15d7a, RTLM15d5a, RTLM15d8a
376+
return true; // RTLM15d1b, RTLM15d6b, RTLM15d7b, RTLM15d5b, RTLM15d8b
371377
}
372378

373379
/**
@@ -440,6 +446,7 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
440446
} else {
441447
// otherwise override data for this object with data from the object state
442448
this._createOperationIsMerged = false; // RTLM6b
449+
this._clearTimeserial = objectState.map?.clearTimeserial; // RTLM6i
443450
this._dataRef = this._liveMapDataFromMapEntries(objectState.map?.entries ?? {}); // RTLM6c
444451
// RTLM6d
445452
if (!this._client.Utils.isNil(objectState.createOp)) {
@@ -737,9 +744,20 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
737744
): LiveMapUpdate<T> | LiveObjectUpdateNoop {
738745
const { ErrorInfo, Utils } = this._client;
739746

747+
// RTLM7h - Check operation's serial against clearTimeserial first
748+
if (this._clearTimeserial && (!opSerial || this._clearTimeserial >= opSerial)) {
749+
this._client.Logger.logAction(
750+
this._client.logger,
751+
this._client.Logger.LOG_MICRO,
752+
'LiveMap._applyMapSet()',
753+
`skipping update for key="${op.key}": op serial ${opSerial} < clear serial ${this._clearTimeserial}; objectId=${this.getObjectId()}`,
754+
);
755+
return { noop: true };
756+
}
757+
740758
const existingEntry = this._dataRef.data.get(op.key);
741759
// RTLM7a
742-
if (existingEntry && !this._canApplyMapOperation(existingEntry.timeserial, opSerial)) {
760+
if (existingEntry && !this._canApplyMapEntryOperation(existingEntry.timeserial, opSerial)) {
743761
// RTLM7a1 - the operation's serial <= the entry's serial, ignore the operation.
744762
this._client.Logger.logAction(
745763
this._client.logger,
@@ -823,9 +841,11 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
823841
opTimestamp: number | undefined,
824842
msg: ObjectMessage,
825843
): LiveMapUpdate<T> | LiveObjectUpdateNoop {
844+
// No need to check against clearTimeserial - a remove after a clear is valid
845+
826846
const existingEntry = this._dataRef.data.get(op.key);
827847
// RTLM8a
828-
if (existingEntry && !this._canApplyMapOperation(existingEntry.timeserial, opSerial)) {
848+
if (existingEntry && !this._canApplyMapEntryOperation(existingEntry.timeserial, opSerial)) {
829849
// RTLM8a1 - the operation's serial <= the entry's serial, ignore the operation.
830850
this._client.Logger.logAction(
831851
this._client.logger,
@@ -881,12 +901,87 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
881901
return update;
882902
}
883903

904+
/** @spec RTLM24 */
905+
private _applyMapClear(objectMessage: ObjectMessage): LiveMapUpdate<T> | LiveObjectUpdateNoop {
906+
const opSerial = objectMessage.serial!;
907+
908+
if (this._clearTimeserial == null || opSerial > this._clearTimeserial) {
909+
// RTLM24d
910+
this._client.Logger.logAction(
911+
this._client.logger,
912+
this._client.Logger.LOG_MICRO,
913+
'LiveMap._applyMapClear()',
914+
`updating clearTimeserial; previous=${this._clearTimeserial}, new=${opSerial}; objectId=${this.getObjectId()}`,
915+
);
916+
this._clearTimeserial = opSerial;
917+
} else {
918+
// RTLM24c
919+
this._client.Logger.logAction(
920+
this._client.logger,
921+
this._client.Logger.LOG_MICRO,
922+
'LiveMap._applyMapClear()',
923+
`skipping MAP_CLEAR: op serial ${opSerial} <= current clear serial ${this._clearTimeserial}; objectId=${this.getObjectId()}`,
924+
);
925+
return { noop: true };
926+
}
927+
928+
const update: LiveMapUpdate<T> = {
929+
update: {},
930+
objectMessage,
931+
_type: 'LiveMapUpdate',
932+
};
933+
934+
// RTLM24e
935+
for (const [key, entry] of this._dataRef.data.entries()) {
936+
const entrySerial = entry.timeserial;
937+
// RTLM24e1
938+
if (entrySerial == null || this._clearTimeserial >= entrySerial) {
939+
this._client.Logger.logAction(
940+
this._client.logger,
941+
this._client.Logger.LOG_MICRO,
942+
'LiveMap._applyMapClear()',
943+
`clearing entry; key="${key}", entry serial=${entrySerial}, clear serial=${this._clearTimeserial}, objectId=${this.getObjectId()}`,
944+
);
945+
946+
// Handle parent reference removal for object references
947+
if (entry.data && 'objectId' in entry.data) {
948+
// Remove parent reference from the object that was being referenced
949+
const referencedObject = this._realtimeObject.getPool().get(entry.data.objectId);
950+
if (referencedObject) {
951+
referencedObject.removeParentReference(this, key);
952+
}
953+
}
954+
955+
entry.tombstone = true; // RTLM24e1c
956+
entry.tombstonedAt = this._calculateTombstonedAt(
957+
objectMessage.serialTimestamp,
958+
'LiveMap._applyMapClear()',
959+
`objectId=${this.getObjectId()}`,
960+
); // RTLM24e1d
961+
entry.timeserial = this._clearTimeserial; // RTLM24e1b
962+
entry.data = undefined; // RTLM24e1a
963+
964+
const typedKey: keyof T & string = key;
965+
update.update[typedKey] = 'removed'; // RTLM24e1e
966+
} else {
967+
this._client.Logger.logAction(
968+
this._client.logger,
969+
this._client.Logger.LOG_MICRO,
970+
'LiveMap._applyMapClear()',
971+
`skipping clearing entry; key="${key}", entry serial=${entrySerial}, clear serial=${this._clearTimeserial}, objectId=${this.getObjectId()}`,
972+
);
973+
}
974+
}
975+
976+
return update; // RTLM24f
977+
}
978+
884979
/**
885980
* Returns true if the serials of the given operation and entry indicate that
886981
* the operation should be applied to the entry, following the CRDT semantics of this LiveMap.
887982
* @spec RTLM9
888983
*/
889-
private _canApplyMapOperation(mapEntrySerial: string | undefined, opSerial: string | undefined): boolean {
984+
private _canApplyMapEntryOperation(mapEntrySerial: string | undefined, opSerial: string | undefined): boolean {
890985
// for LWW CRDT semantics (the only supported LiveMap semantic) an operation
891986
// should only be applied if its serial is strictly greater ("after") than an entry's serial.
892987

src/plugins/liveobjects/objectmessage.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const operationActions: ObjectsApi.ObjectOperationAction[] = [
1111
'counter.create',
1212
'counter.inc',
1313
'object.delete',
14+
'map.clear',
1415
];
1516

1617
const mapSemantics: ObjectsApi.ObjectsMapSemantics[] = ['lww'];
@@ -25,6 +26,7 @@ export enum ObjectOperationAction {
2526
COUNTER_CREATE = 3,
2627
COUNTER_INC = 4,
2728
OBJECT_DELETE = 5,
29+
MAP_CLEAR = 6,
2830
}
2931

3032
/** @spec OMP2 */
@@ -119,6 +121,8 @@ export interface ObjectsMap<TData> {
119121
semantics?: ObjectsMapSemantics; // OMP3a
120122
/** The map entries, indexed by key. */
121123
entries?: Record<string, ObjectsMapEntry<TData>>; // OMP3b
124+
/** The {@link ObjectMessage.serial} value of the last `MAP_CLEAR` operation applied to the map. If no `MAP_CLEAR` has been applied, this field is omitted */
125+
clearTimeserial?: string; // OMP3c
122126
}
123127

124128
/**
@@ -227,6 +231,14 @@ export interface CounterCreateWithObjectId {
227231
_derivedFrom?: CounterCreate;
228232
}
229233

234+
/**
235+
* A MapClear describes the payload for a MAP_CLEAR operation
236+
* @spec MCL1
237+
*/
238+
export interface MapClear {
239+
// MCL2 - Empty message, MAP_CLEAR requires no operation-specific data
240+
}
241+
230242
/**
231243
* An ObjectOperation describes an operation to be applied to an object on a channel.
232244
* @spec OOP1
@@ -273,6 +285,10 @@ export interface ObjectOperation<TData> {
273285
* Contains the nonce and JSON-encoded initial value from {@link CounterCreate} for object ID verification.
274286
*/
275287
counterCreateWithObjectId?: CounterCreateWithObjectId; // OOP3q
288+
/**
289+
* The payload for MAP_CLEAR operation.
290+
*/
291+
mapClear?: MapClear; // OOP3r
276292
}
277293

278294
/**
@@ -487,7 +503,7 @@ function toUserFacingMapEntry(entry: ObjectsMapEntry<ObjectData>): ObjectsApi.Ob
487503
}
488504

489505
function toUserFacingObjectOperation(operation: ObjectOperation<ObjectData>): ObjectsApi.ObjectOperation {
490-
const { mapSet: internalMapSet, mapRemove, counterInc, objectDelete } = operation;
506+
const { mapSet: internalMapSet, mapRemove, counterInc, objectDelete, mapClear } = operation;
491507

492508
// resolve *Create from direct property or from *CreateWithObjectId._derivedFrom
493509
const internalMapCreate = operation.mapCreate ?? operation.mapCreateWithObjectId?._derivedFrom;
@@ -537,6 +553,7 @@ function toUserFacingObjectOperation(operation: ObjectOperation<ObjectData>): Ob
537553
counterCreate,
538554
counterInc,
539555
objectDelete,
556+
mapClear,
540557
// deprecated fields
541558
mapOp,
542559
counterOp,

src/plugins/liveobjects/realtimeobject.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,8 @@ export class RealtimeObject {
522522
case ObjectOperationAction.MAP_SET:
523523
case ObjectOperationAction.MAP_REMOVE:
524524
case ObjectOperationAction.COUNTER_INC:
525-
case ObjectOperationAction.OBJECT_DELETE: {
525+
case ObjectOperationAction.OBJECT_DELETE:
526+
case ObjectOperationAction.MAP_CLEAR: {
526527
// we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
527528
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
528529
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,

test/common/modules/liveobjects_helper.js

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ define(['ably', 'shared_helper', 'liveobjects'], function (Ably, Helper, LiveObj
1313
COUNTER_CREATE: 3,
1414
COUNTER_INC: 4,
1515
OBJECT_DELETE: 5,
16+
MAP_CLEAR: 6,
1617
};
1718
const ACTION_STRINGS = {
1819
MAP_CREATE: 'MAP_CREATE',
@@ -21,6 +22,7 @@ define(['ably', 'shared_helper', 'liveobjects'], function (Ably, Helper, LiveObj
2122
COUNTER_CREATE: 'COUNTER_CREATE',
2223
COUNTER_INC: 'COUNTER_INC',
2324
OBJECT_DELETE: 'OBJECT_DELETE',
25+
MAP_CLEAR: 'MAP_CLEAR',
2426
};
2527

2628
class LiveObjectsHelper {
@@ -187,8 +189,21 @@ define(['ably', 'shared_helper', 'liveobjects'], function (Ably, Helper, LiveObj
187189
return op;
188190
}
189191

192+
mapClearOp(opts) {
193+
const { objectId } = opts ?? {};
194+
const op = {
195+
operation: {
196+
action: ACTIONS.MAP_CLEAR,
197+
objectId,
198+
mapClear: {},
199+
},
200+
};
201+
202+
return op;
203+
}
204+
190205
mapObject(opts) {
191-
const { objectId, siteTimeserials, initialEntries, materialisedEntries, tombstone } = opts;
206+
const { objectId, siteTimeserials, initialEntries, materialisedEntries, tombstone, clearTimeserial } = opts;
192207
const obj = {
193208
object: {
194209
objectId,
@@ -197,6 +212,7 @@ define(['ably', 'shared_helper', 'liveobjects'], function (Ably, Helper, LiveObj
197212
map: {
198213
semantics: 0,
199214
entries: materialisedEntries,
215+
...(clearTimeserial != null ? { clearTimeserial } : {}),
200216
},
201217
},
202218
};

test/common/modules/private_api_recorder.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
4242
'call.channel.processMessage',
4343
'call.channel.requestState',
4444
'call.channel.sendPresence',
45+
'call.channel.sendState',
4546
'call.channel.sync',
4647
'call.connectionManager.activeProtocol.getTransport',
4748
'call.connectionManager.clearConnection',
@@ -84,6 +85,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
8485
'read.Defaults.version',
8586
'read.EventEmitter.events',
8687
'read.LiveMap._dataRef.data',
88+
'read.LiveObject._parentReferences',
8789
'read.LiveObject._subscriptions',
8890
'read.PathObjectSubscriptionRegister._subscriptions',
8991
'read.Platform.Config.push',

0 commit comments

Comments
 (0)