Skip to content

Commit 2924ce0

Browse files
committed
Support MAP_CLEAR object operation
See DR [1], realtime implementation [2] and spec [3] Resolves AIT-458 [1] https://ably.atlassian.net/wiki/x/DABECAE [2] ably/realtime#8074 [3] ably/specification#432
1 parent 1be8271 commit 2924ce0

File tree

7 files changed

+780
-15
lines changed

7 files changed

+780
-15
lines changed

liveobjects.d.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1489,6 +1489,10 @@ declare namespace ObjectOperationActions {
14891489
* Object operation action for deleting an object.
14901490
*/
14911491
type OBJECT_DELETE = 'object.delete';
1492+
/**
1493+
* Object operation action for clearing a map object.
1494+
*/
1495+
type MAP_CLEAR = 'map.clear';
14921496
}
14931497

14941498
/**
@@ -1500,7 +1504,8 @@ export type ObjectOperationAction =
15001504
| ObjectOperationActions.MAP_REMOVE
15011505
| ObjectOperationActions.COUNTER_CREATE
15021506
| ObjectOperationActions.COUNTER_INC
1503-
| ObjectOperationActions.OBJECT_DELETE;
1507+
| ObjectOperationActions.OBJECT_DELETE
1508+
| ObjectOperationActions.MAP_CLEAR;
15041509

15051510
/**
15061511
* The namespace containing the different types of map object semantics.
@@ -1607,6 +1612,10 @@ export interface ObjectOperation {
16071612
* The payload for the operation if the action is {@link ObjectOperationActions.OBJECT_DELETE}.
16081613
*/
16091614
objectDelete?: ObjectDelete;
1615+
/**
1616+
* The payload for the operation if the action is {@link ObjectOperationActions.MAP_CLEAR}.
1617+
*/
1618+
mapClear?: MapClear;
16101619

16111620
/**
16121621
* The payload for the operation if it is a mutation operation on a map object.
@@ -1735,6 +1744,11 @@ export interface CounterInc {
17351744
*/
17361745
export interface ObjectDelete {}
17371746

1747+
/**
1748+
* Describes the payload for a MAP_CLEAR operation.
1749+
*/
1750+
export interface MapClear {}
1751+
17381752
/**
17391753
* Represents a value in an object on a channel.
17401754
*/

src/plugins/liveobjects/livemap.ts

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
5656
implements PublicLiveMap<T>
5757
{
5858
declare readonly [__livetype]: 'LiveMap'; // type-only, unique symbol to satisfy branded interfaces, no JS emitted
59+
private _clearTimeserial?: string;
5960

6061
constructor(
6162
realtimeObject: RealtimeObject,
@@ -358,6 +359,10 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
358359
update = this._applyObjectDelete(msg);
359360
break;
360361

362+
case ObjectOperationAction.MAP_CLEAR:
363+
update = this._applyMapClear(msg);
364+
break;
365+
361366
default:
362367
throw new this._client.ErrorInfo(
363368
`Invalid ${op.action} op for LiveMap objectId=${this.getObjectId()}`,
@@ -440,6 +445,7 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
440445
} else {
441446
// otherwise override data for this object with data from the object state
442447
this._createOperationIsMerged = false; // RTLM6b
448+
this._clearTimeserial = objectState.map?.clearTimeserial;
443449
this._dataRef = this._liveMapDataFromMapEntries(objectState.map?.entries ?? {}); // RTLM6c
444450
// RTLM6d
445451
if (!this._client.Utils.isNil(objectState.createOp)) {
@@ -737,9 +743,20 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
737743
): LiveMapUpdate<T> | LiveObjectUpdateNoop {
738744
const { ErrorInfo, Utils } = this._client;
739745

746+
// Check operation's serial against clearTimeserial first
747+
if (this._clearTimeserial && (!opSerial || this._clearTimeserial >= opSerial)) {
748+
this._client.Logger.logAction(
749+
this._client.logger,
750+
this._client.Logger.LOG_MICRO,
751+
'LiveMap._applyMapSet()',
752+
`skipping update for key="${op.key}": op serial ${opSerial} < clear serial ${this._clearTimeserial}; objectId=${this.getObjectId()}`,
753+
);
754+
return { noop: true };
755+
}
756+
740757
const existingEntry = this._dataRef.data.get(op.key);
741758
// RTLM7a
742-
if (existingEntry && !this._canApplyMapOperation(existingEntry.timeserial, opSerial)) {
759+
if (existingEntry && !this._canApplyMapEntryOperation(existingEntry.timeserial, opSerial)) {
743760
// RTLM7a1 - the operation's serial <= the entry's serial, ignore the operation.
744761
this._client.Logger.logAction(
745762
this._client.logger,
@@ -823,9 +840,11 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
823840
opTimestamp: number | undefined,
824841
msg: ObjectMessage,
825842
): LiveMapUpdate<T> | LiveObjectUpdateNoop {
843+
// No need to check against clearTimeserial - a remove after a clear is valid
844+
826845
const existingEntry = this._dataRef.data.get(op.key);
827846
// RTLM8a
828-
if (existingEntry && !this._canApplyMapOperation(existingEntry.timeserial, opSerial)) {
847+
if (existingEntry && !this._canApplyMapEntryOperation(existingEntry.timeserial, opSerial)) {
829848
// RTLM8a1 - the operation's serial <= the entry's serial, ignore the operation.
830849
this._client.Logger.logAction(
831850
this._client.logger,
@@ -886,12 +905,91 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
886905
return update;
887906
}
888907

908+
private _applyMapClear(objectMessage: ObjectMessage): LiveMapUpdate<T> | LiveObjectUpdateNoop {
909+
const opSerial = objectMessage.serial!;
910+
911+
if (this._clearTimeserial == null || opSerial > this._clearTimeserial) {
912+
this._client.Logger.logAction(
913+
this._client.logger,
914+
this._client.Logger.LOG_MICRO,
915+
'LiveMap._applyMapClear()',
916+
`updating clearTimeserial; previous=${this._clearTimeserial}, new=${opSerial}; objectId=${this.getObjectId()}`,
917+
);
918+
this._clearTimeserial = opSerial;
919+
} else {
920+
this._client.Logger.logAction(
921+
this._client.logger,
922+
this._client.Logger.LOG_MICRO,
923+
'LiveMap._applyMapClear()',
924+
`skipping MAP_CLEAR: op serial ${opSerial} <= current clear serial ${this._clearTimeserial}; objectId=${this.getObjectId()}`,
925+
);
926+
return { noop: true };
927+
}
928+
929+
let tombstonedAt: number;
930+
if (objectMessage.serialTimestamp != null) {
931+
tombstonedAt = objectMessage.serialTimestamp;
932+
} else {
933+
this._client.Logger.logAction(
934+
this._client.logger,
935+
this._client.Logger.LOG_MINOR,
936+
'LiveMap._applyMapClear()',
937+
`map has been cleared but no "serialTimestamp" found in the message, using local clock instead; objectId=${this.getObjectId()}`,
938+
);
939+
tombstonedAt = Date.now(); // best-effort estimate since no timestamp provided by the server
940+
}
941+
942+
const update: LiveMapUpdate<T> = {
943+
update: {},
944+
objectMessage,
945+
_type: 'LiveMapUpdate',
946+
};
947+
948+
for (const [key, entry] of this._dataRef.data.entries()) {
949+
const entrySerial = entry.timeserial;
950+
if (entrySerial == null || this._clearTimeserial >= entrySerial) {
951+
this._client.Logger.logAction(
952+
this._client.logger,
953+
this._client.Logger.LOG_MICRO,
954+
'LiveMap._applyMapClear()',
955+
`clearing entry; key="${key}", entry serial=${entrySerial}, clear serial=${this._clearTimeserial}, objectId=${this.getObjectId()}`,
956+
);
957+
958+
// Handle parent reference removal for object references
959+
if (entry.data && 'objectId' in entry.data) {
960+
// Remove parent reference from the object that was being referenced
961+
const referencedObject = this._realtimeObject.getPool().get(entry.data.objectId);
962+
if (referencedObject) {
963+
referencedObject.removeParentReference(this, key);
964+
}
965+
}
966+
967+
entry.tombstone = true;
968+
entry.tombstonedAt = tombstonedAt;
969+
entry.timeserial = this._clearTimeserial;
970+
entry.data = undefined;
971+
972+
const typedKey: keyof T & string = key;
973+
update.update[typedKey] = 'removed';
974+
} else {
975+
this._client.Logger.logAction(
976+
this._client.logger,
977+
this._client.Logger.LOG_MICRO,
978+
'LiveMap._applyMapClear()',
979+
`skipping clearing entry; key="${key}", entry serial=${entrySerial}, clear serial=${this._clearTimeserial}, objectId=${this.getObjectId()}`,
980+
);
981+
}
982+
}
983+
984+
return update;
985+
}
986+
889987
/**
890988
* Returns true if the serials of the given operation and entry indicate that
891989
* the operation should be applied to the entry, following the CRDT semantics of this LiveMap.
892990
* @spec RTLM9
893991
*/
894-
private _canApplyMapOperation(mapEntrySerial: string | undefined, opSerial: string | undefined): boolean {
992+
private _canApplyMapEntryOperation(mapEntrySerial: string | undefined, opSerial: string | undefined): boolean {
895993
// for LWW CRDT semantics (the only supported LiveMap semantic) an operation
896994
// should only be applied if its serial is strictly greater ("after") than an entry's serial.
897995

src/plugins/liveobjects/objectmessage.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const operationActions: ObjectsApi.ObjectOperationAction[] = [
1111
'counter.create',
1212
'counter.inc',
1313
'object.delete',
14+
'map.clear',
1415
];
1516

1617
const mapSemantics: ObjectsApi.ObjectsMapSemantics[] = ['lww'];
@@ -25,6 +26,7 @@ export enum ObjectOperationAction {
2526
COUNTER_CREATE = 3,
2627
COUNTER_INC = 4,
2728
OBJECT_DELETE = 5,
29+
MAP_CLEAR = 6,
2830
}
2931

3032
/** @spec OMP2 */
@@ -119,6 +121,8 @@ export interface ObjectsMap<TData> {
119121
semantics?: ObjectsMapSemantics; // OMP3a
120122
/** The map entries, indexed by key. */
121123
entries?: Record<string, ObjectsMapEntry<TData>>; // OMP3b
124+
/** The {@link ObjectMessage.serial} value of the last `MAP_CLEAR` operation applied to the map. If no `MAP_CLEAR` has been applied, this field is omitted */
125+
clearTimeserial?: string; // OMP3c
122126
}
123127

124128
/**
@@ -227,6 +231,13 @@ export interface CounterCreateWithObjectId {
227231
_derivedFrom?: CounterCreate;
228232
}
229233

234+
/**
235+
* A MapClear describes the payload for a MAP_CLEAR operation
236+
*/
237+
export interface MapClear {
238+
// Empty message, MAP_CLEAR requires no operation-specific data
239+
}
240+
230241
/**
231242
* An ObjectOperation describes an operation to be applied to an object on a channel.
232243
* @spec OOP1
@@ -273,6 +284,10 @@ export interface ObjectOperation<TData> {
273284
* Contains the nonce and JSON-encoded initial value from {@link CounterCreate} for object ID verification.
274285
*/
275286
counterCreateWithObjectId?: CounterCreateWithObjectId; // OOP3q
287+
/**
288+
* The payload for MAP_CLEAR operation.
289+
*/
290+
mapClear?: MapClear;
276291
}
277292

278293
/**
@@ -487,7 +502,7 @@ function toUserFacingMapEntry(entry: ObjectsMapEntry<ObjectData>): ObjectsApi.Ob
487502
}
488503

489504
function toUserFacingObjectOperation(operation: ObjectOperation<ObjectData>): ObjectsApi.ObjectOperation {
490-
const { mapSet: internalMapSet, mapRemove, counterInc, objectDelete } = operation;
505+
const { mapSet: internalMapSet, mapRemove, counterInc, objectDelete, mapClear } = operation;
491506

492507
// resolve *Create from direct property or from *CreateWithObjectId._derivedFrom
493508
const internalMapCreate = operation.mapCreate ?? operation.mapCreateWithObjectId?._derivedFrom;
@@ -537,6 +552,7 @@ function toUserFacingObjectOperation(operation: ObjectOperation<ObjectData>): Ob
537552
counterCreate,
538553
counterInc,
539554
objectDelete,
555+
mapClear,
540556
// deprecated fields
541557
mapOp,
542558
counterOp,

src/plugins/liveobjects/realtimeobject.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,8 @@ export class RealtimeObject {
522522
case ObjectOperationAction.MAP_SET:
523523
case ObjectOperationAction.MAP_REMOVE:
524524
case ObjectOperationAction.COUNTER_INC:
525-
case ObjectOperationAction.OBJECT_DELETE: {
525+
case ObjectOperationAction.OBJECT_DELETE:
526+
case ObjectOperationAction.MAP_CLEAR: {
526527
// we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
527528
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
528529
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,

test/common/modules/liveobjects_helper.js

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ define(['ably', 'shared_helper', 'liveobjects'], function (Ably, Helper, LiveObj
1313
COUNTER_CREATE: 3,
1414
COUNTER_INC: 4,
1515
OBJECT_DELETE: 5,
16+
MAP_CLEAR: 6,
1617
};
1718
const ACTION_STRINGS = {
1819
MAP_CREATE: 'MAP_CREATE',
@@ -21,6 +22,7 @@ define(['ably', 'shared_helper', 'liveobjects'], function (Ably, Helper, LiveObj
2122
COUNTER_CREATE: 'COUNTER_CREATE',
2223
COUNTER_INC: 'COUNTER_INC',
2324
OBJECT_DELETE: 'OBJECT_DELETE',
25+
MAP_CLEAR: 'MAP_CLEAR',
2426
};
2527

2628
class LiveObjectsHelper {
@@ -187,8 +189,21 @@ define(['ably', 'shared_helper', 'liveobjects'], function (Ably, Helper, LiveObj
187189
return op;
188190
}
189191

192+
mapClearOp(opts) {
193+
const { objectId } = opts ?? {};
194+
const op = {
195+
operation: {
196+
action: ACTIONS.MAP_CLEAR,
197+
objectId,
198+
mapClear: {},
199+
},
200+
};
201+
202+
return op;
203+
}
204+
190205
mapObject(opts) {
191-
const { objectId, siteTimeserials, initialEntries, materialisedEntries, tombstone } = opts;
206+
const { objectId, siteTimeserials, initialEntries, materialisedEntries, tombstone, clearTimeserial } = opts;
192207
const obj = {
193208
object: {
194209
objectId,
@@ -197,6 +212,7 @@ define(['ably', 'shared_helper', 'liveobjects'], function (Ably, Helper, LiveObj
197212
map: {
198213
semantics: 0,
199214
entries: materialisedEntries,
215+
...(clearTimeserial != null ? { clearTimeserial } : {}),
200216
},
201217
},
202218
};

test/common/modules/private_api_recorder.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
4242
'call.channel.processMessage',
4343
'call.channel.requestState',
4444
'call.channel.sendPresence',
45+
'call.channel.sendState',
4546
'call.channel.sync',
4647
'call.connectionManager.activeProtocol.getTransport',
4748
'call.connectionManager.clearConnection',
@@ -84,6 +85,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
8485
'read.Defaults.version',
8586
'read.EventEmitter.events',
8687
'read.LiveMap._dataRef.data',
88+
'read.LiveObject._parentReferences',
8789
'read.LiveObject._subscriptions',
8890
'read.PathObjectSubscriptionRegister._subscriptions',
8991
'read.Platform.Config.push',

0 commit comments

Comments
 (0)