Skip to content

Commit 8eb140f

Browse files
committed
[AIT-296] fix: clear buffered operations unconditionally on ATTACHED (RTO4d)
- Ensured `BufferedObjectOperations` is always cleared when channel state transitions to ATTACHED, regardless of the `hasObjects` flag. - Updated tests to verify buffer clearing behavior (RTO4d).
1 parent f57632f commit 8eb140f

File tree

3 files changed

+159
-3
lines changed

3 files changed

+159
-3
lines changed

liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,8 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
296296
ChannelState.attached -> {
297297
Log.v(tag, "Objects.onAttached() channel=$channelName, hasObjects=$hasObjects")
298298

299+
objectsManager.clearBufferedObjectOperations() // RTO4d - clear unconditionally on ATTACHED
300+
299301
// RTO4a
300302
val fromInitializedState = this@DefaultRealtimeObjects.state == ObjectsState.Initialized
301303
if (hasObjects || fromInitializedState) {
@@ -310,7 +312,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
310312
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
311313
objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2
312314
objectsManager.clearSyncObjectsDataPool() // RTO4b3
313-
objectsManager.clearBufferedObjectOperations() // RTO4b5
315+
// RTO4b5 removed — buffer already cleared by RTO4d above
314316
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
315317
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
316318
objectsManager.endSync(fromInitializedState) // RTO4b4

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt

Lines changed: 155 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,31 @@ import io.ably.lib.objects.type.livemap.LiveMapEntry
1515
import io.ably.lib.objects.unit.BufferedObjectOperations
1616
import io.ably.lib.objects.unit.ObjectsManager
1717
import io.ably.lib.objects.unit.SyncObjectsDataPool
18+
import io.ably.lib.objects.unit.getMockObjectsAdapter
1819
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
1920
import io.ably.lib.objects.unit.size
2021
import io.ably.lib.realtime.ChannelState
22+
import io.ably.lib.types.AblyException
23+
import io.ably.lib.types.ErrorInfo
2124
import io.ably.lib.types.ProtocolMessage
2225
import io.mockk.verify
2326
import kotlinx.coroutines.test.runTest
27+
import org.junit.After
2428
import org.junit.Test
2529
import kotlin.test.assertEquals
2630
import io.mockk.every
2731

2832
class DefaultRealtimeObjectsTest {
2933

34+
private val testInstances = mutableListOf<DefaultRealtimeObjects>()
35+
36+
@After
37+
fun tearDown() {
38+
val cleanupError = AblyException.fromErrorInfo(ErrorInfo("test cleanup", 500))
39+
testInstances.forEach { it.dispose(cleanupError) }
40+
testInstances.clear()
41+
}
42+
3043
@Test
3144
fun `(RTO4, RTO4a) When channel ATTACHED with HAS_OBJECTS flag true should start sync sequence`() = runTest {
3245
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
@@ -69,7 +82,7 @@ class DefaultRealtimeObjectsTest {
6982
}
7083

7184
assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsDataPool.size) // RTO4b3
72-
assertEquals(0, defaultRealtimeObjects.ObjectsManager.BufferedObjectOperations.size) // RTO4b5
85+
assertEquals(0, defaultRealtimeObjects.ObjectsManager.BufferedObjectOperations.size) // RTO4d
7386
assertEquals(1, defaultRealtimeObjects.objectsPool.size()) // RTO4b1 - Only root remains
7487
assertEquals(rootObject, defaultRealtimeObjects.objectsPool.get(ROOT_OBJECT_ID)) // points to previously created root object
7588
assertEquals(0, rootObject.data.size) // RTO4b2 - root object must be empty
@@ -155,6 +168,147 @@ class DefaultRealtimeObjectsTest {
155168
}
156169
}
157170

171+
@Test
172+
fun `(RTO4d) ATTACHED with hasObjects=true still clears bufferedObjectOperations`() = runTest {
173+
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
174+
val manager = defaultRealtimeObjects.ObjectsManager
175+
176+
// Pre-populate bufferedObjectOperations with a dummy operation
177+
@Suppress("UNCHECKED_CAST")
178+
(manager.BufferedObjectOperations as MutableList<ObjectMessage>).add(
179+
ObjectMessage(
180+
id = "pre-attach-op",
181+
operation = ObjectOperation(
182+
action = ObjectOperationAction.CounterInc,
183+
objectId = "counter:test@1",
184+
counterOp = ObjectsCounterOp(amount = 5.0)
185+
)
186+
)
187+
)
188+
assertEquals(1, manager.BufferedObjectOperations.size)
189+
190+
// ATTACHED with hasObjects=true — RTO4d must clear the buffer before starting sync
191+
defaultRealtimeObjects.handleStateChange(ChannelState.attached, true)
192+
193+
assertWaiter { defaultRealtimeObjects.state == ObjectsState.Syncing }
194+
assertEquals(0, manager.BufferedObjectOperations.size, "RTO4d - buffer must be cleared unconditionally on ATTACHED")
195+
}
196+
197+
@Test
198+
fun `(RTO4d) Pre-ATTACHED buffered operations are discarded, not applied after sync`() = runTest {
199+
val defaultRealtimeObjects = DefaultRealtimeObjects("testChannel", getMockObjectsAdapter())
200+
.also { testInstances.add(it) }
201+
202+
// Set up a counter in the pool
203+
val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
204+
defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
205+
206+
val objectsManager = defaultRealtimeObjects.ObjectsManager
207+
208+
// Pre-populate bufferedObjectOperations with a COUNTER_INC — simulates an op received before ATTACHED
209+
@Suppress("UNCHECKED_CAST")
210+
(objectsManager.BufferedObjectOperations as MutableList<ObjectMessage>).add(
211+
ObjectMessage(
212+
id = "pre-attach-inc",
213+
operation = ObjectOperation(
214+
action = ObjectOperationAction.CounterInc,
215+
objectId = "counter:test@1",
216+
counterOp = ObjectsCounterOp(amount = 5.0)
217+
)
218+
)
219+
)
220+
assertEquals(1, objectsManager.BufferedObjectOperations.size)
221+
222+
// ATTACHED with hasObjects=true: RTO4d clears the buffer, then starts sync
223+
defaultRealtimeObjects.handleStateChange(ChannelState.attached, true)
224+
assertWaiter { defaultRealtimeObjects.state == ObjectsState.Syncing }
225+
assertEquals(0, objectsManager.BufferedObjectOperations.size, "buffer must be cleared by RTO4d")
226+
227+
// Complete sync by calling handleObjectSyncMessages directly (sequentialScope is idle now)
228+
objectsManager.handleObjectSyncMessages(
229+
listOf(
230+
ObjectMessage(
231+
id = "sync-msg-1",
232+
objectState = ObjectState(
233+
objectId = "counter:test@1",
234+
tombstone = false,
235+
siteTimeserials = mapOf("site1" to "serial1"),
236+
counter = ObjectsCounter(count = 0.0)
237+
)
238+
)
239+
),
240+
"sync-id:" // empty cursor — ends sync (RTO5a4)
241+
)
242+
243+
assertEquals(ObjectsState.Synced, defaultRealtimeObjects.state)
244+
245+
// The pre-ATTACHED COUNTER_INC was discarded — counter should remain at 0
246+
assertEquals(0.0, counter.data.get(), "RTO4d - pre-ATTACHED buffered op must be discarded, not applied after sync")
247+
}
248+
249+
@Test
250+
fun `(RTO4d) Buffered operations survive a server-initiated resync (new OBJECT_SYNC without ATTACHED)`() {
251+
val defaultRealtimeObjects = DefaultRealtimeObjects("testChannel", getMockObjectsAdapter())
252+
.also { testInstances.add(it) }
253+
254+
// Set up a counter in the pool
255+
val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
256+
counter.data.set(5.0)
257+
defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
258+
259+
val objectsManager = defaultRealtimeObjects.ObjectsManager
260+
261+
// sync-1 is in progress
262+
objectsManager.startNewSync("sync-1")
263+
assertEquals(ObjectsState.Syncing, defaultRealtimeObjects.state)
264+
265+
// Buffer a COUNTER_INC during sync-1
266+
objectsManager.handleObjectMessages(
267+
listOf(
268+
ObjectMessage(
269+
id = "channel-op-1",
270+
operation = ObjectOperation(
271+
action = ObjectOperationAction.CounterInc,
272+
objectId = "counter:test@1",
273+
counterOp = ObjectsCounterOp(amount = 3.0)
274+
),
275+
serial = "serial-op-1",
276+
siteCode = "site1"
277+
)
278+
)
279+
)
280+
assertEquals(1, objectsManager.BufferedObjectOperations.size, "op buffered during sync-1")
281+
282+
// Server sends a new OBJECT_SYNC with a different sync-id — triggers startNewSync("sync-2") internally
283+
// OLD behaviour (RTO5a2b): startNewSync would have cleared bufferedObjectOperations here
284+
// NEW behaviour (RTO5a2b removed): buffer is preserved
285+
objectsManager.handleObjectSyncMessages(
286+
listOf(
287+
ObjectMessage(
288+
id = "sync2-msg-1",
289+
objectState = ObjectState(
290+
objectId = "counter:test@1",
291+
tombstone = false,
292+
siteTimeserials = mapOf("site1" to "resync-serial"),
293+
counter = ObjectsCounter(count = 5.0)
294+
)
295+
)
296+
),
297+
"sync-2:cursor-1" // has cursor — not ending yet
298+
)
299+
300+
assertEquals(1, objectsManager.BufferedObjectOperations.size,
301+
"startNewSync must NOT clear bufferedObjectOperations (RTO5a2b removed)")
302+
303+
// Complete sync-2 (ending serial, no new messages)
304+
objectsManager.handleObjectSyncMessages(emptyList(), "sync-2:")
305+
306+
assertEquals(ObjectsState.Synced, defaultRealtimeObjects.state)
307+
// sync-2 restored counter to 5.0; buffered COUNTER_INC (+3.0) applied after sync → 8.0
308+
assertEquals(8.0, counter.data.get(),
309+
"buffered COUNTER_INC from before server-initiated resync must be applied after sync completes")
310+
}
311+
158312
@Test
159313
fun `(OM2) Populate objectMessage missing id, timestamp and connectionId from protocolMessage`() = runTest {
160314
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ class ObjectsManagerTest {
483483
// Start a new sync — should clear both appliedOnAckSerials and bufferedAcks (RTO21b)
484484
objectsManager.startNewSync("seq-2")
485485

486-
// RTO21b — both cleared
486+
// RTO21b — appliedOnAckSerials and bufferedAcks cleared
487487
assertTrue(defaultRealtimeObjects.appliedOnAckSerials.isEmpty(),
488488
"appliedOnAckSerials should be cleared on new sync")
489489
assertEquals(0, objectsManager.BufferedAcks.size,

0 commit comments

Comments
 (0)