Skip to content

Commit 2f32677

Browse files
Apply LiveObjects operations on ACK
Based on [1] at 56a0bba. Implementation and tests are Claude-generated from the spec; I've reviewed them and given plenty of feedback, but largely resisted the temptation to tweak things that aren't quite how I'd write them but which are still correct. The only behaviour here that's not in the spec is to also apply-on-ACK for batch operations (the batch API isn't in the spec yet). Summary of decisions re modifications to existing tests (written by Claude): - Removed redundant `waitFor*` calls after SDK operations (`map.set()`, `counter.increment()`, etc.) - with apply-on-ACK, values are available immediately after the operation promise resolves - Kept `waitFor*` calls after REST operations (`objectsHelper.operationRequest()`, `objectsHelper.createAndSetOnMap()`) - these still require waiting for the echo to arrive over Realtime - Added explanatory comment to `applyOperationsScenarios` noting that those tests cover operations received over Realtime (via REST), and pointing to the new "Apply on ACK" section for tests of locally-applied operations [1] ably/specification#419
1 parent ee1cbff commit 2f32677

File tree

9 files changed

+926
-188
lines changed

9 files changed

+926
-188
lines changed

liveobjects.d.ts

Lines changed: 44 additions & 50 deletions
Large diffs are not rendered by default.

src/common/lib/client/realtimechannel.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -536,13 +536,13 @@ class RealtimeChannel extends EventEmitter {
536536
await this.sendAndAwaitAck(msg);
537537
}
538538

539-
async sendState(objectMessages: WireObjectMessage[]): Promise<void> {
539+
async sendState(objectMessages: WireObjectMessage[]): Promise<API.PublishResult> {
540540
const msg = protocolMessageFromValues({
541541
action: actions.OBJECT,
542542
channel: this.name,
543543
state: objectMessages,
544544
});
545-
await this.sendAndAwaitAck(msg);
545+
return this.sendAndAwaitAck(msg);
546546
}
547547

548548
// Access to this method is synchronised by ConnectionManager#processChannelMessage, in order to synchronise access to the state stored in _decodingContext.

src/plugins/liveobjects/livecounter.ts

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { __livetype } from '../../../ably';
22
import { LiveCounter as PublicLiveCounter } from '../../../liveobjects';
33
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
44
import { ObjectData, ObjectMessage, ObjectOperation, ObjectOperationAction, ObjectsCounterOp } from './objectmessage';
5-
import { RealtimeObject } from './realtimeobject';
5+
import { ObjectsOperationSource, RealtimeObject } from './realtimeobject';
66

77
export interface LiveCounterData extends LiveObjectData {
88
data: number; // RTLC3
@@ -72,15 +72,15 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
7272
/**
7373
* Send a COUNTER_INC operation to the realtime system to increment a value on this LiveCounter object.
7474
*
75-
* This does not modify the underlying data of this LiveCounter object. Instead, the change will be applied when
76-
* the published COUNTER_INC operation is echoed back to the client and applied to the object following the regular
77-
* operation application procedure.
75+
* The change will be applied locally when the ACK is received from Realtime.
7876
*
79-
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
77+
* @returns A promise which resolves upon receiving the ACK message for the published operation message
78+
* and applying the operation locally.
79+
* @spec RTLC12
8080
*/
8181
async increment(amount: number): Promise<void> {
8282
const msg = LiveCounter.createCounterIncMessage(this._realtimeObject, this.getObjectId(), amount);
83-
return this._realtimeObject.publish([msg]);
83+
return this._realtimeObject.publishAndApply([msg]);
8484
}
8585

8686
/**
@@ -98,8 +98,9 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
9898

9999
/**
100100
* @internal
101+
* @spec RTLC7
101102
*/
102-
applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage): void {
103+
applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage, source: ObjectsOperationSource): boolean {
103104
if (op.objectId !== this.getObjectId()) {
104105
throw new this._client.ErrorInfo(
105106
`Cannot apply object operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`,
@@ -117,15 +118,19 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
117118
'LiveCounter.applyOperation()',
118119
`skipping ${op.action} op: op serial ${opSerial.toString()} <= site serial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this.getObjectId()}`,
119120
);
120-
return;
121+
return false; // RTLC7b
122+
}
123+
124+
// RTLC7c
125+
if (source === ObjectsOperationSource.channel) {
126+
// should update stored site serial immediately. doesn't matter if we successfully apply the op,
127+
// as it's important to mark that the op was processed by the object
128+
this._siteTimeserials[opSiteCode] = opSerial;
121129
}
122-
// should update stored site serial immediately. doesn't matter if we successfully apply the op,
123-
// as it's important to mark that the op was processed by the object
124-
this._siteTimeserials[opSiteCode] = opSerial;
125130

126131
if (this.isTombstoned()) {
127132
// this object is tombstoned so the operation cannot be applied
128-
return;
133+
return false; // RTLC7e
129134
}
130135

131136
let update: LiveCounterUpdate | LiveObjectUpdateNoop;
@@ -155,6 +160,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
155160
}
156161

157162
this.notifyUpdated(update);
163+
return true; // RTLC7d1b, RTLC7d2b, RTLC7d4b
158164
}
159165

160166
/**

src/plugins/liveobjects/livemap.ts

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import {
2222
ObjectsMapOp,
2323
ObjectsMapSemantics,
2424
} from './objectmessage';
25-
import { RealtimeObject } from './realtimeobject';
25+
import { ObjectsOperationSource, RealtimeObject } from './realtimeobject';
2626

2727
export interface ObjectIdObjectData {
2828
/** A reference to another object, used to support composable object structures. */
@@ -265,38 +265,39 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
265265
/**
266266
* Send a MAP_SET operation to the realtime system to set a key on this LiveMap object to a specified value.
267267
*
268-
* This does not modify the underlying data of this LiveMap object. Instead, the change will be applied when
269-
* the published MAP_SET operation is echoed back to the client and applied to the object following the regular
270-
* operation application procedure.
268+
* The change will be applied locally when the ACK is received from Realtime.
271269
*
272-
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
270+
* @returns A promise which resolves upon receiving the ACK message for the published operation message
271+
* and applying the operation locally.
272+
* @spec RTLM20
273273
*/
274274
async set<TKey extends keyof T & string>(
275275
key: TKey,
276276
value: T[TKey] | LiveCounterValueType | LiveMapValueType,
277277
): Promise<void> {
278278
const msgs = await LiveMap.createMapSetMessage(this._realtimeObject, this.getObjectId(), key, value);
279-
return this._realtimeObject.publish(msgs);
279+
return this._realtimeObject.publishAndApply(msgs);
280280
}
281281

282282
/**
283283
* Send a MAP_REMOVE operation to the realtime system to tombstone a key on this LiveMap object.
284284
*
285-
* This does not modify the underlying data of this LiveMap object. Instead, the change will be applied when
286-
* the published MAP_REMOVE operation is echoed back to the client and applied to the object following the regular
287-
* operation application procedure.
285+
* The change will be applied locally when the ACK is received from Realtime.
288286
*
289-
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
287+
* @returns A promise which resolves upon receiving the ACK message for the published operation message
288+
* and applying the operation locally.
289+
* @spec RTLM21
290290
*/
291291
async remove<TKey extends keyof T & string>(key: TKey): Promise<void> {
292292
const msg = LiveMap.createMapRemoveMessage(this._realtimeObject, this.getObjectId(), key);
293-
return this._realtimeObject.publish([msg]);
293+
return this._realtimeObject.publishAndApply([msg]);
294294
}
295295

296296
/**
297297
* @internal
298+
* @spec RTLM15
298299
*/
299-
applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage): void {
300+
applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage, source: ObjectsOperationSource): boolean {
300301
if (op.objectId !== this.getObjectId()) {
301302
throw new this._client.ErrorInfo(
302303
`Cannot apply object operation with objectId=${op.objectId}, to this LiveMap with objectId=${this.getObjectId()}`,
@@ -314,15 +315,19 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
314315
'LiveMap.applyOperation()',
315316
`skipping ${op.action} op: op serial ${opSerial.toString()} <= site serial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this.getObjectId()}`,
316317
);
317-
return;
318+
return false; // RTLM15b
319+
}
320+
321+
// RTLM15c
322+
if (source === ObjectsOperationSource.channel) {
323+
// should update stored site serial immediately. doesn't matter if we successfully apply the op,
324+
// as it's important to mark that the op was processed by the object
325+
this._siteTimeserials[opSiteCode] = opSerial;
318326
}
319-
// should update stored site serial immediately. doesn't matter if we successfully apply the op,
320-
// as it's important to mark that the op was processed by the object
321-
this._siteTimeserials[opSiteCode] = opSerial;
322327

323328
if (this.isTombstoned()) {
324329
// this object is tombstoned so the operation cannot be applied
325-
return;
330+
return false; // RTLM15e
326331
}
327332

328333
let update: LiveMapUpdate<T> | LiveObjectUpdateNoop;
@@ -360,6 +365,7 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
360365
}
361366

362367
this.notifyUpdated(update);
368+
return true; // RTLM15d1b, RTLM15d2b, RTLM15d3b, RTLM15d5b
363369
}
364370

365371
/**

src/plugins/liveobjects/liveobject.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { ROOT_OBJECT_ID } from './constants';
55
import { InstanceEvent } from './instance';
66
import { ObjectData, ObjectMessage, ObjectOperation } from './objectmessage';
77
import { PathEvent } from './pathobjectsubscriptionregister';
8-
import { RealtimeObject } from './realtimeobject';
8+
import { ObjectsOperationSource, RealtimeObject } from './realtimeobject';
99

1010
export enum LiveObjectSubscriptionEvent {
1111
updated = 'updated',
@@ -326,9 +326,11 @@ export abstract class LiveObject<
326326
/**
327327
* Apply object operation message on this LiveObject.
328328
*
329+
* @returns `true` if the operation was applied successfully, `false` if it was skipped.
330+
* @spec RTLC7g, RTLM15g
329331
* @internal
330332
*/
331-
abstract applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage): void;
333+
abstract applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage, source: ObjectsOperationSource): boolean;
332334
/**
333335
* Overrides internal data for this LiveObject with object state from the given object message.
334336
* Provided object state should hold a valid data for current LiveObject, e.g. counter data for LiveCounter, map data for LiveMap.

0 commit comments

Comments
 (0)