Skip to content

Commit cfa74f0

Browse files
committed
Fix cache regression in reflectIfNewer and cancellation hang in loadIfNewer
1 parent 1934d39 commit cfa74f0

2 files changed

Lines changed: 67 additions & 7 deletions

File tree

util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/FlowStoreCache.kt

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import com.github.benmanes.caffeine.cache.AsyncCache
66
import com.github.benmanes.caffeine.cache.Caffeine
77
import java.util.concurrent.CompletableFuture
88
import java.util.concurrent.ConcurrentHashMap
9+
import kotlinx.coroutines.CancellationException
910
import kotlinx.coroutines.CoroutineScope
1011
import kotlinx.coroutines.future.await
1112
import kotlinx.coroutines.launch
@@ -34,7 +35,7 @@ class FlowStoreCache<K : Any, V : Any>(
3435
inFlight.putIfAbsent(key, mine)?.let {
3536
return it.await()
3637
}
37-
loaderScope.launch {
38+
val job = loaderScope.launch {
3839
try {
3940
val loaded = load(key)
4041
mine.complete(
@@ -46,6 +47,13 @@ class FlowStoreCache<K : Any, V : Any>(
4647
inFlight.remove(key, mine)
4748
}
4849
}
50+
job.invokeOnCompletion { throwable ->
51+
if (throwable != null) {
52+
mine.completeExceptionally(throwable)
53+
} else if (!mine.isDone) {
54+
mine.completeExceptionally(CancellationException("Job completed without completing future"))
55+
}
56+
}
4957
return mine.await()
5058
}
5159

@@ -63,14 +71,21 @@ class FlowStoreCache<K : Any, V : Any>(
6371
}
6472

6573
/**
66-
* Applies [event] to a resident entry only, gated on version; absent keys await the next read.
74+
* Applies [event] to a resident or in-flight loading entry only, gated on version; other absent
75+
* keys await the next read.
6776
*/
6877
fun reflectIfNewer(event: VersionedMapEvent<K, V>) {
69-
asyncCache().asMap().computeIfPresent(event.key) { _, old ->
70-
val current = old.getNow(null)
71-
if (current != null && event.version > current.version)
72-
CompletableFuture.completedFuture(event.toEntry())
73-
else old
78+
asyncCache().asMap().compute(event.key) { _, old ->
79+
val current = old?.getNow(null)
80+
if (current != null) {
81+
if (event.version > current.version) CompletableFuture.completedFuture(event.toEntry()) else old
82+
} else if (old != null) {
83+
old
84+
} else if (inFlight.containsKey(event.key)) {
85+
CompletableFuture.completedFuture(event.toEntry())
86+
} else {
87+
null
88+
}
7489
}
7590
}
7691
}

util/src/test/kotlin/com/caplin/integration/datasourcex/util/store/FlowStoreCacheTest.kt

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package com.caplin.integration.datasourcex.util.store
22

3+
import com.caplin.integration.datasourcex.util.flow.VersionedMapEvent
34
import com.github.benmanes.caffeine.cache.Caffeine
5+
import io.kotest.assertions.throwables.shouldThrow
46
import io.kotest.core.spec.style.FunSpec
57
import io.kotest.matchers.nulls.shouldBeNull
68
import io.kotest.matchers.shouldBe
79
import java.util.concurrent.atomic.AtomicInteger
10+
import kotlinx.coroutines.CancellationException
811
import kotlinx.coroutines.CompletableDeferred
912
import kotlinx.coroutines.CoroutineScope
1013
import kotlinx.coroutines.Dispatchers
@@ -53,4 +56,46 @@ class FlowStoreCacheTest :
5356
cache.putIfNewer("k", Live("v5b", 5L)) shouldBe Live("v5", 5L) // equal rejected
5457
cache.putIfNewer("k", Live("v9", 9L)) shouldBe Live("v9", 9L)
5558
}
59+
60+
test("reflectIfNewer updates the cache during an in-flight load, preventing regression").config(
61+
coroutineTestScope = false
62+
) {
63+
val cache = Caffeine.newBuilder().buildFlowStoreCache<String, String>(scope)
64+
val loadGate = CompletableDeferred<Unit>()
65+
val resultGate = CompletableDeferred<Unit>()
66+
67+
val loadJob = async {
68+
cache.loadIfNewer("k") {
69+
loadGate.complete(Unit)
70+
resultGate.await()
71+
Versioned("v5", 5L)
72+
}
73+
}
74+
75+
// Wait until the load is executing and in-flight
76+
loadGate.await()
77+
78+
// Call reflectIfNewer with a newer version (Version 6) while load is in-flight
79+
cache.reflectIfNewer(VersionedMapEvent.Upsert("k", "v6", 6L))
80+
81+
// Complete the load (which yields Version 5)
82+
resultGate.complete(Unit)
83+
loadJob.await()
84+
85+
// Verify that the cache keeps the newer Version 6 and did not regress to Version 5
86+
cache.getIfPresent("k") shouldBe Live("v6", 6L)
87+
}
88+
89+
test("loadIfNewer throws CancellationException if the scope is cancelled").config(
90+
coroutineTestScope = false
91+
) {
92+
val deadScope = CoroutineScope(Dispatchers.Default)
93+
deadScope.cancel()
94+
95+
val cache = Caffeine.newBuilder().buildFlowStoreCache<String, String>(deadScope)
96+
97+
shouldThrow<CancellationException> {
98+
cache.loadIfNewer("k") { Versioned("v", 1L) }
99+
}
100+
}
56101
})

0 commit comments

Comments
 (0)