-
Notifications
You must be signed in to change notification settings - Fork 58
Expand file tree
/
Copy pathlivecounter.ts
More file actions
306 lines (266 loc) · 10.7 KB
/
livecounter.ts
File metadata and controls
306 lines (266 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import { __livetype } from '../../../ably';
import { LiveCounter as PublicLiveCounter } from '../../../liveobjects';
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { ObjectData, ObjectMessage, ObjectOperation, ObjectOperationAction, ObjectsCounterOp } from './objectmessage';
import { ObjectsOperationSource, RealtimeObject } from './realtimeobject';
export interface LiveCounterData extends LiveObjectData {
data: number; // RTLC3
}
export interface LiveCounterUpdate extends LiveObjectUpdate {
update: { amount: number };
_type: 'LiveCounterUpdate';
}
/** @spec RTLC1, RTLC2 */
export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate> implements PublicLiveCounter {
declare readonly [__livetype]: 'LiveCounter'; // type-only, unique symbol to satisfy branded interfaces, no JS emitted
/**
* Returns a {@link LiveCounter} instance with a 0 value.
*
* @internal
* @spec RTLC4
*/
static zeroValue(realtimeObject: RealtimeObject, objectId: string): LiveCounter {
return new LiveCounter(realtimeObject, objectId);
}
/**
* Returns a {@link LiveCounter} instance based on the provided object state.
* The provided object state must hold a valid counter object data.
*
* @internal
*/
static fromObjectState(realtimeObject: RealtimeObject, objectMessage: ObjectMessage): LiveCounter {
const obj = new LiveCounter(realtimeObject, objectMessage.object!.objectId);
obj.overrideWithObjectState(objectMessage);
return obj;
}
/**
* @internal
*/
static createCounterIncMessage(realtimeObject: RealtimeObject, objectId: string, amount: number): ObjectMessage {
const client = realtimeObject.getClient();
if (typeof amount !== 'number' || !Number.isFinite(amount)) {
throw new client.ErrorInfo('Counter value increment should be a valid number', 40003, 400);
}
const msg = ObjectMessage.fromValues(
{
operation: {
action: ObjectOperationAction.COUNTER_INC,
objectId,
counterOp: { amount },
} as ObjectOperation<ObjectData>,
},
client.Utils,
client.MessageEncoding,
);
return msg;
}
/** @spec RTLC5 */
value(): number {
return this._dataRef.data; // RTLC5c
}
/**
* Send a COUNTER_INC operation to the realtime system to increment a value on this LiveCounter object.
*
* The change will be applied locally when the ACK is received from Realtime.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message
* and applying the operation locally.
* @spec RTLC12
*/
async increment(amount: number): Promise<void> {
const msg = LiveCounter.createCounterIncMessage(this._realtimeObject, this.getObjectId(), amount);
return this._realtimeObject.publishAndApply([msg]);
}
/**
* An alias for calling {@link LiveCounter.increment | LiveCounter.increment(-amount)}
*/
async decrement(amount: number): Promise<void> {
// do an explicit type safety check here before negating the amount value,
// so we don't unintentionally change the type sent by a user
if (typeof amount !== 'number' || !Number.isFinite(amount)) {
throw new this._client.ErrorInfo('Counter value decrement should be a valid number', 40003, 400);
}
return this.increment(-amount);
}
/**
* @internal
* @spec RTLC7
*/
applyOperation(op: ObjectOperation<ObjectData>, msg: ObjectMessage, source: ObjectsOperationSource): boolean {
if (op.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Cannot apply object operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`,
92000,
500,
);
}
const opSerial = msg.serial!;
const opSiteCode = msg.siteCode!;
if (!this._canApplyOperation(opSerial, opSiteCode)) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveCounter.applyOperation()',
`skipping ${op.action} op: op serial ${opSerial.toString()} <= site serial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this.getObjectId()}`,
);
return false; // RTLC7b
}
// RTLC7c
if (source === ObjectsOperationSource.channel) {
// should update stored site serial immediately. doesn't matter if we successfully apply the op,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opSerial;
}
if (this.isTombstoned()) {
// this object is tombstoned so the operation cannot be applied
return false; // RTLC7e
}
let update: LiveCounterUpdate | LiveObjectUpdateNoop;
switch (op.action) {
case ObjectOperationAction.COUNTER_CREATE:
update = this._applyCounterCreate(op, msg);
break;
case ObjectOperationAction.COUNTER_INC:
if (this._client.Utils.isNil(op.counterOp)) {
this._throwNoPayloadError(op);
} else {
update = this._applyCounterInc(op.counterOp, msg);
}
break;
case ObjectOperationAction.OBJECT_DELETE:
update = this._applyObjectDelete(msg);
break;
default:
throw new this._client.ErrorInfo(
`Invalid ${op.action} op for LiveCounter objectId=${this.getObjectId()}`,
92000,
500,
);
}
this.notifyUpdated(update);
return true; // RTLC7d1b, RTLC7d2b, RTLC7d4b
}
/**
* @internal
* @spec RTLC6
*/
overrideWithObjectState(objectMessage: ObjectMessage): LiveCounterUpdate | LiveObjectUpdateNoop {
const objectState = objectMessage.object;
if (objectState == null) {
throw new this._client.ErrorInfo(`Missing object state; LiveCounter objectId=${this.getObjectId()}`, 92000, 500);
}
if (objectState.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Invalid object state: object state objectId=${objectState.objectId}; LiveCounter objectId=${this.getObjectId()}`,
92000,
500,
);
}
if (!this._client.Utils.isNil(objectState.createOp)) {
// it is expected that create operation can be missing in the object state, so only validate it when it exists
if (objectState.createOp.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Invalid object state: object state createOp objectId=${objectState.createOp?.objectId}; LiveCounter objectId=${this.getObjectId()}`,
92000,
500,
);
}
if (objectState.createOp.action !== ObjectOperationAction.COUNTER_CREATE) {
throw new this._client.ErrorInfo(
`Invalid object state: object state createOp action=${objectState.createOp?.action}; LiveCounter objectId=${this.getObjectId()}`,
92000,
500,
);
}
}
// object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation.
// should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object.
this._siteTimeserials = objectState.siteTimeserials ?? {}; // RTLC6a
if (this.isTombstoned()) {
// this object is tombstoned. this is a terminal state which can't be overridden. skip the rest of object state message processing
return { noop: true };
}
const previousDataRef = this._dataRef;
let update: LiveCounterUpdate;
if (objectState.tombstone) {
// tombstone this object and ignore the data from the object state message
update = this.tombstone(objectMessage);
} else {
// otherwise override data for this object with data from the object state
this._createOperationIsMerged = false; // RTLC6b
this._dataRef = { data: objectState.counter?.count ?? 0 }; // RTLC6c
// RTLC6d
if (!this._client.Utils.isNil(objectState.createOp)) {
this._mergeInitialDataFromCreateOperation(objectState.createOp, objectMessage);
}
// update will contain the diff between previous value and new value from object state
update = this._updateFromDataDiff(previousDataRef, this._dataRef);
update.objectMessage = objectMessage;
}
return update;
}
/**
* @internal
*/
onGCInterval(): void {
// nothing to GC for a counter object
return;
}
/** @spec RTLC4 */
protected _getZeroValueData(): LiveCounterData {
return { data: 0 };
}
protected _updateFromDataDiff(prevDataRef: LiveCounterData, newDataRef: LiveCounterData): LiveCounterUpdate {
const counterDiff = newDataRef.data - prevDataRef.data;
return { update: { amount: counterDiff }, _type: 'LiveCounterUpdate' };
}
protected _mergeInitialDataFromCreateOperation(
objectOperation: ObjectOperation<ObjectData>,
msg: ObjectMessage,
): LiveCounterUpdate {
// if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case.
// note that it is intentional to SUM the incoming count from the create op.
// if we got here, it means that current counter instance is missing the initial value in its data reference,
// which we're going to add now.
this._dataRef.data += objectOperation.counter?.count ?? 0; // RTLC6d1
this._createOperationIsMerged = true; // RTLC6d2
return {
update: { amount: objectOperation.counter?.count ?? 0 },
objectMessage: msg,
_type: 'LiveCounterUpdate',
};
}
private _throwNoPayloadError(op: ObjectOperation<ObjectData>): never {
throw new this._client.ErrorInfo(
`No payload found for ${op.action} op for LiveCounter objectId=${this.getObjectId()}`,
92000,
500,
);
}
private _applyCounterCreate(
op: ObjectOperation<ObjectData>,
msg: ObjectMessage,
): LiveCounterUpdate | LiveObjectUpdateNoop {
if (this._createOperationIsMerged) {
// There can't be two different create operation for the same object id, because the object id
// fully encodes that operation. This means we can safely ignore any new incoming create operations
// if we already merged it once.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveCounter._applyCounterCreate()',
`skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=${this.getObjectId()}`,
);
return { noop: true };
}
return this._mergeInitialDataFromCreateOperation(op, msg);
}
private _applyCounterInc(op: ObjectsCounterOp, msg: ObjectMessage): LiveCounterUpdate {
this._dataRef.data += op.amount;
return {
update: { amount: op.amount },
objectMessage: msg,
_type: 'LiveCounterUpdate',
};
}
}