Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 44 additions & 50 deletions liveobjects.d.ts

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -536,13 +536,13 @@ class RealtimeChannel extends EventEmitter {
await this.sendAndAwaitAck(msg);
}

async sendState(objectMessages: WireObjectMessage[]): Promise<void> {
async sendState(objectMessages: WireObjectMessage[]): Promise<API.PublishResult> {
const msg = protocolMessageFromValues({
action: actions.OBJECT,
channel: this.name,
state: objectMessages,
});
await this.sendAndAwaitAck(msg);
return this.sendAndAwaitAck(msg);
}

// Access to this method is synchronised by ConnectionManager#processChannelMessage, in order to synchronise access to the state stored in _decodingContext.
Expand Down
34 changes: 19 additions & 15 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { __livetype } from '../../../ably';
import { LiveCounter as PublicLiveCounter } from '../../../liveobjects';
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { ObjectData, ObjectMessage, ObjectOperation, ObjectOperationAction, ObjectsCounterOp } from './objectmessage';
import { RealtimeObject } from './realtimeobject';
import { ObjectsOperationSource, RealtimeObject } from './realtimeobject';

export interface LiveCounterData extends LiveObjectData {
data: number; // RTLC3
Expand Down Expand Up @@ -72,15 +72,15 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
/**
* Send a COUNTER_INC operation to the realtime system to increment a value on this LiveCounter object.
*
* This does not modify the underlying data of this LiveCounter object. Instead, the change will be applied when
* the published COUNTER_INC operation is echoed back to the client and applied to the object following the regular
* operation application procedure.
* The change will be applied locally when the ACK is received from Realtime.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
* @returns A promise which resolves upon receiving the ACK message for the published operation message
* and applying the operation locally.
* @spec RTLC12
*/
async increment(amount: number): Promise<void> {
const msg = LiveCounter.createCounterIncMessage(this._realtimeObject, this.getObjectId(), amount);
return this._realtimeObject.publish([msg]);
return this._realtimeObject.publishAndApply([msg]);
}

/**
Expand All @@ -98,8 +98,9 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>

/**
* @internal
* @spec RTLC7
*/
applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage): void {
applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage, source: ObjectsOperationSource): boolean {
if (op.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Cannot apply object operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`,
Expand All @@ -117,15 +118,19 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
'LiveCounter.applyOperation()',
`skipping ${op.action} op: op serial ${opSerial.toString()} <= site serial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this.getObjectId()}`,
);
return;
return false; // RTLC7b
}

// RTLC7c
if (source === ObjectsOperationSource.channel) {
// should update stored site serial immediately. doesn't matter if we successfully apply the op,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opSerial;
}
// should update stored site serial immediately. doesn't matter if we successfully apply the op,
Comment thread
lawrence-forooghian marked this conversation as resolved.
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opSerial;

if (this.isTombstoned()) {
// this object is tombstoned so the operation cannot be applied
return;
return false; // RTLC7e
}

let update: LiveCounterUpdate | LiveObjectUpdateNoop;
Expand All @@ -137,8 +142,6 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
case ObjectOperationAction.COUNTER_INC:
if (this._client.Utils.isNil(op.counterOp)) {
this._throwNoPayloadError(op);
// leave an explicit return here, so that TS knows that update object is always set after the switch statement.
return;
} else {
update = this._applyCounterInc(op.counterOp, msg);
}
Expand All @@ -157,6 +160,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
}

this.notifyUpdated(update);
return true; // RTLC7d1b, RTLC7d2b, RTLC7d4b
}

/**
Expand Down Expand Up @@ -263,7 +267,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
};
}

private _throwNoPayloadError(op: ObjectOperation<ObjectData>): void {
private _throwNoPayloadError(op: ObjectOperation<ObjectData>): never {
throw new this._client.ErrorInfo(
`No payload found for ${op.action} op for LiveCounter objectId=${this.getObjectId()}`,
92000,
Expand Down
46 changes: 24 additions & 22 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
ObjectsMapOp,
ObjectsMapSemantics,
} from './objectmessage';
import { RealtimeObject } from './realtimeobject';
import { ObjectsOperationSource, RealtimeObject } from './realtimeobject';

export interface ObjectIdObjectData {
/** A reference to another object, used to support composable object structures. */
Expand Down Expand Up @@ -265,38 +265,39 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
/**
* Send a MAP_SET operation to the realtime system to set a key on this LiveMap object to a specified value.
*
* This does not modify the underlying data of this LiveMap object. Instead, the change will be applied when
* the published MAP_SET operation is echoed back to the client and applied to the object following the regular
* operation application procedure.
* The change will be applied locally when the ACK is received from Realtime.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
* @returns A promise which resolves upon receiving the ACK message for the published operation message
* and applying the operation locally.
* @spec RTLM20
*/
async set<TKey extends keyof T & string>(
key: TKey,
value: T[TKey] | LiveCounterValueType | LiveMapValueType,
): Promise<void> {
const msgs = await LiveMap.createMapSetMessage(this._realtimeObject, this.getObjectId(), key, value);
return this._realtimeObject.publish(msgs);
return this._realtimeObject.publishAndApply(msgs);
}

/**
* Send a MAP_REMOVE operation to the realtime system to tombstone a key on this LiveMap object.
*
* This does not modify the underlying data of this LiveMap object. Instead, the change will be applied when
* the published MAP_REMOVE operation is echoed back to the client and applied to the object following the regular
* operation application procedure.
* The change will be applied locally when the ACK is received from Realtime.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
* @returns A promise which resolves upon receiving the ACK message for the published operation message
* and applying the operation locally.
* @spec RTLM21
*/
async remove<TKey extends keyof T & string>(key: TKey): Promise<void> {
const msg = LiveMap.createMapRemoveMessage(this._realtimeObject, this.getObjectId(), key);
return this._realtimeObject.publish([msg]);
return this._realtimeObject.publishAndApply([msg]);
}

/**
* @internal
* @spec RTLM15
*/
applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage): void {
applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage, source: ObjectsOperationSource): boolean {
if (op.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Cannot apply object operation with objectId=${op.objectId}, to this LiveMap with objectId=${this.getObjectId()}`,
Expand All @@ -314,15 +315,19 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
'LiveMap.applyOperation()',
`skipping ${op.action} op: op serial ${opSerial.toString()} <= site serial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this.getObjectId()}`,
);
return;
return false; // RTLM15b
}

// RTLM15c
if (source === ObjectsOperationSource.channel) {
// should update stored site serial immediately. doesn't matter if we successfully apply the op,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opSerial;
}
// should update stored site serial immediately. doesn't matter if we successfully apply the op,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opSerial;

if (this.isTombstoned()) {
// this object is tombstoned so the operation cannot be applied
return;
return false; // RTLM15e
}

let update: LiveMapUpdate<T> | LiveObjectUpdateNoop;
Expand All @@ -334,8 +339,6 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
case ObjectOperationAction.MAP_SET:
if (this._client.Utils.isNil(op.mapOp)) {
this._throwNoPayloadError(op);
// leave an explicit return here, so that TS knows that update object is always set after the switch statement.
return;
} else {
update = this._applyMapSet(op.mapOp, opSerial, msg);
}
Expand All @@ -344,8 +347,6 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
case ObjectOperationAction.MAP_REMOVE:
if (this._client.Utils.isNil(op.mapOp)) {
this._throwNoPayloadError(op);
// leave an explicit return here, so that TS knows that update object is always set after the switch statement.
return;
} else {
update = this._applyMapRemove(op.mapOp, opSerial, msg.serialTimestamp, msg);
}
Expand All @@ -364,6 +365,7 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
}

this.notifyUpdated(update);
return true; // RTLM15d1b, RTLM15d2b, RTLM15d3b, RTLM15d5b
}

/**
Expand Down Expand Up @@ -683,7 +685,7 @@ export class LiveMap<T extends Record<string, Value> = Record<string, Value>>
return aggregatedUpdate;
}

private _throwNoPayloadError(op: ObjectOperation<ObjectData>): void {
private _throwNoPayloadError(op: ObjectOperation<ObjectData>): never {
throw new this._client.ErrorInfo(
`No payload found for ${op.action} op for LiveMap objectId=${this.getObjectId()}`,
92000,
Expand Down
6 changes: 4 additions & 2 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { ROOT_OBJECT_ID } from './constants';
import { InstanceEvent } from './instance';
import { ObjectData, ObjectMessage, ObjectOperation } from './objectmessage';
import { PathEvent } from './pathobjectsubscriptionregister';
import { RealtimeObject } from './realtimeobject';
import { ObjectsOperationSource, RealtimeObject } from './realtimeobject';

export enum LiveObjectSubscriptionEvent {
updated = 'updated',
Expand Down Expand Up @@ -326,9 +326,11 @@ export abstract class LiveObject<
/**
* Apply object operation message on this LiveObject.
*
* @returns `true` if the operation was applied successfully, `false` if it was skipped.
* @spec RTLC7g, RTLM15g
* @internal
*/
abstract applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage): void;
abstract applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage, source: ObjectsOperationSource): boolean;
/**
* Overrides internal data for this LiveObject with object state from the given object message.
* Provided object state should hold a valid data for current LiveObject, e.g. counter data for LiveCounter, map data for LiveMap.
Expand Down
Loading
Loading