[AIT-280] Apply operations on ACK#2155
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughImplements apply-on-ACK for LiveObjects: adds ObjectsOperationSource, publishAndApply, ACK-tracking and buffering during OBJECT_SYNC, changes RealtimeChannel to await publish ACKs, updates LiveMap/LiveCounter applyOperation signatures, and updates batch flush to use publishAndApply. Tests and test helpers added for ACK/echo sequencing. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant RTC as RealtimeChannel
participant LO as LiveObjects
participant Server
Client->>RTC: publish(objectMessages)
RTC->>RTC: sendAndAwaitAck(msg)
RTC->>Server: transport send
alt Server ACKs
Server->>RTC: ACK
RTC->>LO: publishAndApply(syntheticMsg, source=local)
LO->>LO: applyOperation(source=local)
LO-->>Client: emit local state/events
end
alt Server Echo arrives
Server->>RTC: echo OBJECT messages
RTC->>LO: _applyObjectMessages(source=channel)
LO->>LO: skip if serial already applied
end
note over LO: If OBJECT_SYNC in progress, ACK-applies are buffered until endSync
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
1361dea to
32c6075
Compare
a4bed5f to
37bd08d
Compare
37bd08d to
1c5e13e
Compare
1c5e13e to
a5d3751
Compare
1f52161 to
a36fb2a
Compare
a36fb2a to
28b6371
Compare
VeskeR
left a comment
There was a problem hiding this comment.
Some initial comments based on our call yesterday and the spec review. Still need to review generic code changes but I wouldn't expect any drastic changes there as conceptually the spec looks good and the implementation seems to be following the spec.
| `buffering ${syntheticMessages.length} message(s) until sync completes; channel=${this._channel.name}`, | ||
| ); | ||
| await new Promise<void>((resolve) => { | ||
| this._bufferedAcks.push({ objectMessages: syntheticMessages, signal: resolve }); // RTO20e1 |
There was a problem hiding this comment.
Discussed this on a call, posting here a summary and required actions:
This call essentially waits for the channel's LiveObjects state to become SYNCED. (Note: as discussed in ably/specification#419 (comment), this will be rewritten to simply await a SYNCED event instead of dealing with _bufferedAcks directly, but the underlying issue remains the same.)
The problem: if during publishAndApply we experience a connection loss in such a way that publish still succeeds (it was able to send or retry the operation and receive an ACK), but as a result we enter the SYNCING state, we will wait indefinitely for the LiveObjects state to become SYNCED again. This may never happen if we are stuck in a disconnection loop or experiencing network disturbances and are never able to receive the full sync sequence. As a result, a mutation operation (map.set, map.remove, counter.increment) will hang indefinitely until the connection fully restores and we are able to complete the sync.
This was not the case prior to this PR. RealtimeObject.publish relied only on ConnectionManager.send, which handled message queuing and had clear cut-off points for invalid states (see
ably-js/src/common/lib/transport/connectionmanager.ts
Lines 1750 to 1782 in 441cec2
ably-js/src/common/lib/transport/connectionmanager.ts
Lines 1261 to 1268 in 441cec2
Context: conceptually, this is not an entirely new pattern. The same behavior exists for presence.get(), which waits for the presence sync event (
ably-js/src/common/lib/client/presencemap.ts
Lines 179 to 192 in 441cec2
However, this problem is exacerbated by the nature of mutation operations compared to presence.get() / object.get(). You would expect .get() calls to happen once at the beginning of the app lifecycle to retrieve the initial state, after which it is automatically synced over time. If there are connection issues at startup, it is much more intuitive for the end user to refresh the page and try again when something doesn't load.
Mutation operations, on the other hand, are smaller, more intentional, and usually user-initiated. Imagine a button that increments a counter and displays a loader while waiting for publishAndApply to complete. If it hangs due to the issue described above, the user will see an indefinite loader on a button, which degrades the user experience drastically - it would force the user to refresh the entire page.
As such, the infinite-await problem is much more pronounced for mutation operations, and we should think about the general approach we want to take in these cases.
We may proposed a general rule: a publicly available async API should never hang indefinitely and must have a clear cut-off point. For presence.get(), object.get(), and publishAndApply, this could be achieved by using Promise.race with a terminal connection or channel state that we consider "bad enough" to justify throwing an error - for example, the same states that cause a queued message to fail in ConnectionManager.
Actions:
decide whether this issue is critical enough to fix for mutation operations in this PR, or open a separate issue to address it for mutation operations and presence.get() / object.get() together. Might be worth opening a DR for this.
There was a problem hiding this comment.
I've added new spec point RTO20e1 (ably/specification@57f4449) to handle this specifically for apply-on-ACK (implementation to come shortly). I'll make a note for us to think about what we should do for presence.get() and object.get().
There was a problem hiding this comment.
implementation in 7de8748 — will squash pre-merge
There was a problem hiding this comment.
👍 will keep this conversation unresolved for visibility.
| // RTO20c | ||
| const siteCode = this._channel.connectionManager.connectionDetails?.siteCode; | ||
| if (!siteCode) { | ||
| throw new this._client.ErrorInfo( |
There was a problem hiding this comment.
Discussed this on a call, posting here a summary and required actions:
publishAndApply may throw after a successful publish - that is, after the server has accepted the operation - but before applying the change locally. In this case, the caller receives an error suggesting the operation failed. However, since the server already accepted the operation, the echoed message will eventually arrive over the channel and the operation will be applied locally through the normal flow (the same behavior we had before apply-on-ACK, where all changes were applied upon receiving the echo). This creates an ambiguous situation: we signal failure, but the operation did succeed server-side, and the local state will eventually reflect it.
There are two places where this can happen:
siteCodeis not present inconnectionDetails- this would be a server error; the server is expected to always providesiteCode.- The
PublishResultcontains a null serial for one of the published operations - most likely a server error as well (we should confirm with the Realtime team that there are no legitimate situations where a serial is not returned).
In practice, both cases are very unlikely since they require the server to behave incorrectly by omitting required fields.
The question is how we expect developers to handle these errors. There are two scenarios:
- End-user-initiated action (e.g. updating a counter by clicking a button): the developer catches the error, surfaces it to the user, and the user decides whether to retry. They may also observe the echoed operation arriving and see that the action actually succeeded, choosing not to retry.
- Automated action (e.g. incrementing a counter on each page visit): if the developer retries on any error, it could result in a double count when the error was thrown after a successful publish. The correct solution here is to support message metadata for LiveObjects operations, enabling the caller to pass a message id for idempotent publishing (see https://ably.com/docs/pub-sub/advanced#idempotency). This was outlined in the https://ably.atlassian.net/wiki/spaces/LOB/pages/4235722804/LODR-042+LiveObjects+Realtime+Client+API+Improvements#Message-metadata but has not been implemented yet. The current
publishAndApplyimplementation escalates the need for message ID support in LiveObjects operations.
Summary:
no changes required in this PR. The scenarios where we throw after a successful publish correspond to genuine server-side errors where something has gone seriously wrong. If customers raise the need for reliable retries of LiveObjects operations in the future, we can address it by implementing message IDs for idempotent publishing.
Actions:
We should confirm with the Realtime team that PublishResult always includes serials for published operations.
There was a problem hiding this comment.
I think the gist here is the following:
- it has always been possible for the LiveObjects mutation methods to fail even though the operation has been accepted by Realtime (e.g. by becoming
SUSPENDEDwhen we haven't yet received the ACK); this does not change that - in such a situation, it would be good to have an idempotency mechanism that allows the user to retry without accidental repeated operations; this work is already being considered and is unrelated to this PR
- if the server does not send the data that the client needs in order to apply-on-ACK then the client (obviously) cannot apply-on-ACK; we need to convince ourselves this can not happen, and we can continue discussing this in [AIT-280] Apply LiveObjects operations on ACK specification#419 (comment)
There was a problem hiding this comment.
- The current
publishAndApplyimplementation escalates the need for message ID support in LiveObjects operations.
I don't really agree with this. For operations that are non-idempotent, there are other much more likely scenarios that will result in the client re-attempting an operation that had in fact succeeded (eg the client fails to see the ack). We already need to support ids.
There was a problem hiding this comment.
On the point being made: I think the publish promise should resolve if the publish succeeded, and not otherwise. There shouldn't be any valid circumstances that, following receipt of the ack, the apply fails. If something happens, out of spec, that causes the apply to fail, then we should be treating this as a protocol error that invalidates the transport, leading to reconnection and resync.
There was a problem hiding this comment.
There shouldn't be any valid circumstances that, following receipt of the ack, the apply fails.
That's interesting — what do you think about this discussion in that case?
There was a problem hiding this comment.
I think the conclusion in that discussion is correct, that all operations will eventually complete, even if that's ultimately triggered by a channel or connection state change. The fact that presence.get() doesn't do that right now is a spec bug I think.
There was a problem hiding this comment.
I think the conclusion in that discussion is correct, that all operations will eventually complete
What do you mean by "eventually"? The point that Andrii was making was that, if the LiveObjects mutation methods don't apply the operation — and thus don't complete — until the objects sync state becomes SYNCED (and if there are no other circumstances in which they complete) then they may never complete.
Thus, in response, I today specified a new behaviour in which if, whilst waiting for the state to become SYNCED, the channel becomes DETACHED , SUSPENDED, or FAILED, then the mutation methods' promises will reject, and the operation will not be applied locally. But this new behaviour contradicts your principle of "There shouldn't be any valid circumstances that, following receipt of the ack, the apply fails."
There was a problem hiding this comment.
Sorry, I meant that we should ensure that all operations eventually complete, following the same approach that you proposed.
But this new behaviour contradicts your principle of "There shouldn't be any valid circumstances that, following receipt of the ack, the apply fails."
Yes, sorry I should have been clearer. I'm saying that if you get an ack, and attempt to apply the operation locally as a result, then that should always succeed. In the sync case you're not attempting to apply it. There's also the conflation case (see ably/specification#419 (comment)) where you might not attempt to apply the operation on ack.
There was a problem hiding this comment.
I've implemented the changes from ably/specification@24baaa2, which handle:
- server misbehaviour (no
siteCode, malformedACK) serial === null
There was a problem hiding this comment.
We updated the implementation to log instead of throw according to the spec.
Will keep this conversation unresolved for visibility.
There was a problem hiding this comment.
🧹 Nitpick comments (3)
src/plugins/liveobjects/realtimeobject.ts (1)
268-337: Consider settling buffered ACK waiters on terminal channel states.
If the channel transitions tofailed/detachedbefore sync completes, bufferedpublishAndApplypromises can remain pending indefinitely. It may be worth clearing/rejecting_bufferedAckson terminal states.test/realtime/liveobjects.test.js (2)
237-297: Return/await ACK release processing for deterministic tests.
Right nowrelease()is fire‑and‑forget; returning the underlying call (and makingreleaseAllasync) enables callers to await ACK processing when needed.♻️ Suggested change
heldAcks.push({ message, release: () => { helper.recordPrivateApi('call.transport.onProtocolMessage'); - originalOnProtocolMessage.call(transport, message); + return originalOnProtocolMessage.call(transport, message); }, }); @@ - releaseAll: () => { + releaseAll: async () => { while (heldAcks.length > 0) { - heldAcks.shift().release(); + await heldAcks.shift().release(); } },
8429-8433: Ensure interceptor cleanup in apply-on-ACK scenarios.
If a scenario assertion throws, the transport hook stays overridden. Restoring in afinallyavoids cross-test leakage.♻️ Suggested change
- // hold echoes so we can verify value comes from ACK, not echo - createEchoInterceptor(helper, client); - - await scenario.action(root); + const interceptor = createEchoInterceptor(helper, client); + try { + await scenario.action(root); + } finally { + interceptor.restore(); + }
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/plugins/liveobjects/realtimeobject.ts (1)
285-293:siteCodevalidation happens after publish — consider checking earlier.If
siteCodeis unavailable,publishhas already succeeded (the server received the operation), but the local apply fails with an exception. The caller sees an error despite the operation being applied server-side. Consider validatingsiteCodebefore callingpublishto fail fast and avoid this inconsistency.♻️ Suggested reorder
async publishAndApply(objectMessages: ObjectMessage[]): Promise<void> { + // RTO20c - validate siteCode before publishing + const siteCode = this._channel.connectionManager.connectionDetails?.siteCode; + if (!siteCode) { + throw new this._client.ErrorInfo( + 'cannot apply operation locally: siteCode not available from connectionDetails', + 40000, + 400, + ); + } + // RTO20b const publishResult = await this.publish(objectMessages); // ... logging ... - // RTO20c - const siteCode = this._channel.connectionManager.connectionDetails?.siteCode; - if (!siteCode) { - throw new this._client.ErrorInfo( - 'cannot apply operation locally: siteCode not available from connectionDetails', - 40000, - 400, - ); - } - // RTO20d const syntheticMessages = objectMessages.map((originalMsg, i) => {
|
this is now up to date with spec commit 57f4449; the only outstanding conversation there is ably/specification#419 (comment) which I'll discuss with Paddy shortly |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
test/realtime/liveobjects.test.js (1)
8429-8433: Restore interceptors and drain held messages to prevent test bleed-through.
createEchoInterceptor(and the ACK variant elsewhere in this suite) overridestransport.onProtocolMessage. If a scenario throws or late messages arrive, the override and queued messages can leak into cleanup or later assertions. Consider always restoring and releasing in afinally, and optionally asserting an echo was actually intercepted so the test can’t pass if the interceptor stops working.♻️ Suggested cleanup pattern
- // hold echoes so we can verify value comes from ACK, not echo - createEchoInterceptor(helper, client); - - await scenario.action(root); + // hold echoes so we can verify value comes from ACK, not echo + const interceptor = createEchoInterceptor(helper, client); + try { + await scenario.action(root); + await interceptor.waitForEcho(); + } finally { + await interceptor.releaseAll(); + interceptor.restore(); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/realtime/liveobjects.test.js` around lines 8429 - 8433, The test uses createEchoInterceptor (and similar ACK interceptor) which overrides transport.onProtocolMessage and holds messages; wrap the interceptor usage in a try/finally so you always restore transport.onProtocolMessage and drain/release any queued messages in the finally block to avoid leaking into cleanup or other tests; update the test around scenario.action(root) to create the interceptor, run the action in try, then in finally call the interceptor's restore/release methods (or explicitly reset transport.onProtocolMessage and flush the held queue) and add an assertion that an echo/ACK was actually intercepted so the test fails if the interceptor never captured a message.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/plugins/liveobjects/realtimeobject.ts`:
- Around line 293-301: The siteCode check occurs after calling publish, causing
irreversible server-side effects; move the validation of
this._channel.connectionManager.connectionDetails?.siteCode (the siteCode
existence check that currently throws new this._client.ErrorInfo) to run before
calling publish (the publish invocation in this class) so the method rejects
early without sending the operation to the server, or alternatively explicitly
document that publish may succeed server-side while the local call rejects;
update the logic around publish and the siteCode check to ensure publish is only
invoked when siteCode is present.
- Around line 304-310: The guard in the syntheticMessages mapping currently only
checks for serial === null which misses undefined when publishResult.serials is
shorter than objectMessages; update the check in the syntheticMessages callback
(where serial is read from publishResult.serials[i]) to use a loose null check
(serial == null) so it catches both null and undefined before throwing the
this._client.ErrorInfo('cannot apply operation locally: serial is null in
PublishResult', 40000, 400).
---
Nitpick comments:
In `@test/realtime/liveobjects.test.js`:
- Around line 8429-8433: The test uses createEchoInterceptor (and similar ACK
interceptor) which overrides transport.onProtocolMessage and holds messages;
wrap the interceptor usage in a try/finally so you always restore
transport.onProtocolMessage and drain/release any queued messages in the finally
block to avoid leaking into cleanup or other tests; update the test around
scenario.action(root) to create the interceptor, run the action in try, then in
finally call the interceptor's restore/release methods (or explicitly reset
transport.onProtocolMessage and flush the held queue) and add an assertion that
an echo/ACK was actually intercepted so the test fails if the interceptor never
captured a message.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/plugins/liveobjects/realtimeobject.ts`:
- Around line 346-373: The TOCTOU is that publishAndApply() checks this._state
before registering the pending reject handle, so if the channel enters
suspended/failed/detached between the ACK and push into this._pendingOperations
the promise can hang; fix by after adding the rejectHandle and registering the
ObjectsEvent.synced listener (the block that uses
this._eventEmitterInternal.once and pushes to this._pendingOperations),
immediately re-check this._state (or call the same validation logic used by
_failPendingOperations) and if the state is already non-synced-terminal
(suspended/failed/detached) invoke rejectHandle.reject(...) (or resolve if
synced) so the promise does not hang; update publishAndApply(), the onSynced
handler registration, and ensure cleanup logic still runs through cleanup().
---
Duplicate comments:
In `@src/plugins/liveobjects/realtimeobject.ts`:
- Around line 318-330: This comment is a duplicate and confirms the strict check
`serial === null` in RealtimeObject.publishAndApply() is correct because
publishResult.serials length matches objectMessages; no code change
required—leave the null check as-is and resolve/close the duplicate review
comment for the publishAndApply() serial handling.
- Around line 293-314: The checks in RealtimeObject.publishAndApply (the
siteCode validation and publishResult.serials length check) are intentionally
changed to log+return rather than throw; add a concise inline code comment
immediately above these checks (referencing siteCode, publishResult.serials,
this._channel, and publishAndApply) stating this is deliberate because the
publish has succeeded server-side and missing local application will be handled
by the server echo path, so we must not throw; also include a short note
pointing to the relevant commit or decision for future reviewers.
| `buffering ${syntheticMessages.length} message(s) until sync completes; channel=${this._channel.name}`, | ||
| ); | ||
| await new Promise<void>((resolve) => { | ||
| this._bufferedAcks.push({ objectMessages: syntheticMessages, signal: resolve }); // RTO20e1 |
There was a problem hiding this comment.
👍 will keep this conversation unresolved for visibility.
| // RTO20c | ||
| const siteCode = this._channel.connectionManager.connectionDetails?.siteCode; | ||
| if (!siteCode) { | ||
| throw new this._client.ErrorInfo( |
There was a problem hiding this comment.
We updated the implementation to log instead of throw according to the spec.
Will keep this conversation unresolved for visibility.
This informs the compiler that the function throws.
Extract this helper from inside the 'Sync events' describe block so it can be reused by other test sections. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Weirdly enough, the tests for |
How weird, will take a look |
|
Claude reckons this was broken by the |
Based on [1] at 56a0bba. Implementation and tests are Claude-generated from the spec; I've reviewed them and given plenty of feedback, but largely resisted the temptation to tweak things that aren't quite how I'd write them but which are still correct. The only behaviour here that's not in the spec is to also apply-on-ACK for batch operations (the batch API isn't in the spec yet). Summary of decisions re modifications to existing tests (written by Claude): - Removed redundant `waitFor*` calls after SDK operations (`map.set()`, `counter.increment()`, etc.) - with apply-on-ACK, values are available immediately after the operation promise resolves - Kept `waitFor*` calls after REST operations (`objectsHelper.operationRequest()`, `objectsHelper.createAndSetOnMap()`) - these still require waiting for the echo to arrive over Realtime - Added explanatory comment to `applyOperationsScenarios` noting that those tests cover operations received over Realtime (via REST), and pointing to the new "Apply on ACK" section for tests of locally-applied operations [1] ably/specification#419
|
I've reverted that change, with an explanation from Claude — as mentioned there, I don't fully understand the fix (nor do I hugely wish to spend much time trying to do so) but I think it's in "works so good enough" category |
Yeah, that makes sense if underlying "await-for-ack" procedure in publishAndApply requires a couple of microtask ticks to complete. Let's leave it as a |
Note: This PR is based on top of #2167; please review that one first.
Based on ably/specification#419 at
d809334. Implementation and tests are Claude-generated from the spec; I've reviewed them and given plenty of feedback, but largely resisted the temptation to tweak things that aren't quite how I'd write them but which are still correct.The only behaviour here that's not in the spec is to also apply-on-ACK for batch operations (the batch API isn't in the spec yet).
Summary of decisions re modifications to existing tests (written by Claude):
waitFor*calls after SDK operations (map.set(),counter.increment(), etc.) - with apply-on-ACK, values are available immediately after the operation promise resolveswaitFor*calls after REST operations (objectsHelper.operationRequest(),objectsHelper.createAndSetOnMap()) - these still require waiting for the echo to arrive over RealtimeapplyOperationsScenariosnoting that those tests cover operations received over Realtime (via REST), and pointing to the new "Apply on ACK" section for tests of locally-applied operationsDocs PR: ably/docs#3161
Summary by CodeRabbit
New Features
Improvements
Documentation
Tests