Skip to content

Commit 16c5536

Browse files
adsamcikCopilot
andcommitted
fix(stats-data): resolve disk I/O concerns in DurableMetricDirtyTracker
Address R8 review finding r8-durable-dirty-tracker-io (3/3 reviewer consensus): 1. Constructor blocking: Replace runBlocking in init with async rehydration via persistenceScope.launch(ioDispatcher). Add CompletableDeferred coordination so consumeDirty(PERSISTENCE) awaits rehydration, ensuring workers never see stale-empty sets. The runBlocking moves to consumeDirty where it runs on worker threads (never main). 2. IO dispatcher: ApplicationScope uses Dispatchers.Default — file I/O was running on Default, risking thread starvation. Fix by: - Injecting @iodispatcher into DurableMetricDirtyTracker and DefaultPersistentDirtyState via StatsDataModule - All launches use launch(ioDispatcher) - DefaultPersistentDirtyState wraps all file ops in withContext(ioDispatcher) as defense-in-depth 3. KDoc accuracy: Strengthen crash-safety documentation in DurableMetricDirtyTracker (markDirty is best-effort async, microsecond race window exists, source-watermark fallback catches it) and PersistentDirtyState interface (explicit crash-safety limitations section). Tests added: - Rehydration coordination: worker consumeDirty blocks until async init completes, then returns rehydrated bits - LIVE consumer does NOT block on rehydration - DefaultPersistentDirtyState dispatches file IO on injected dispatcher Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent f1dab6e commit 16c5536

6 files changed

Lines changed: 255 additions & 68 deletions

File tree

stats-api/src/commonMain/kotlin/com/adsamcik/tracker/stats/api/metric/PersistentDirtyState.kt

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,19 @@ package com.adsamcik.tracker.stats.api.metric
2020
* - [remove] is called when the PERSISTENCE consumer drains its in-memory
2121
* set. The marks removed are no longer needed for crash recovery.
2222
*
23-
* Implementations are NOT required to be transactional w.r.t. crashes — a
24-
* crash between in-memory CAS and disk write at worst loses a few recent
25-
* marks (the next worker run will pick them up via the source-watermark
26-
* fallback in `AchievementWorker`). The goal is "at-most-one missed
27-
* window per crash", not strict durability.
23+
* # Crash-safety limitations
24+
*
25+
* Implementations are NOT required to be transactional w.r.t. crashes.
26+
* [add] and [remove] are called asynchronously after the in-memory state
27+
* has already changed. If the OS hard-kills the process between the
28+
* in-memory mutation and the enqueued disk write, the disk mutation is
29+
* lost. This is a deliberate design trade-off — blocking the hot path for
30+
* fsync is too expensive for a best-effort recovery mechanism.
31+
*
32+
* The safety net is the source-watermark fallback in `AchievementWorker`:
33+
* if a mark is lost, the worker re-evaluates from the database on the
34+
* next run. The durable tracker converts "lost forever" into "lost for at
35+
* most one worker window per crash".
2836
*/
2937
interface PersistentDirtyState {
3038
/**

stats-data/src/main/java/com/adsamcik/tracker/stats/data/di/StatsDataModule.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.adsamcik.tracker.stats.data.di
22

33
import android.content.Context
44
import com.adsamcik.tracker.shared.base.di.ApplicationScope
5+
import com.adsamcik.tracker.shared.base.di.IoDispatcher
56
import com.adsamcik.tracker.stats.api.metric.MetricDirtyTracker
67
import com.adsamcik.tracker.stats.api.metric.PersistentDirtyState
78
import com.adsamcik.tracker.stats.api.repository.AchievementMetricsProvider
@@ -42,6 +43,7 @@ import dagger.Provides
4243
import dagger.hilt.InstallIn
4344
import dagger.hilt.android.qualifiers.ApplicationContext
4445
import dagger.hilt.components.SingletonComponent
46+
import kotlinx.coroutines.CoroutineDispatcher
4547
import kotlinx.coroutines.CoroutineScope
4648
import javax.inject.Singleton
4749

@@ -147,7 +149,8 @@ abstract class StatsDataModule {
147149
@Singleton
148150
fun providePersistentDirtyState(
149151
@ApplicationContext context: Context,
150-
): PersistentDirtyState = DefaultPersistentDirtyState(context.filesDir)
152+
@IoDispatcher ioDispatcher: CoroutineDispatcher,
153+
): PersistentDirtyState = DefaultPersistentDirtyState(context.filesDir, ioDispatcher)
151154
}
152155
}
153156

@@ -175,9 +178,11 @@ internal object DurableMetricDirtyTrackerModule {
175178
fun provideMetricDirtyTracker(
176179
persistentState: PersistentDirtyState,
177180
@ApplicationScope appScope: CoroutineScope,
181+
@IoDispatcher ioDispatcher: CoroutineDispatcher,
178182
): MetricDirtyTracker = DurableMetricDirtyTracker(
179183
delegate = DefaultMetricDirtyTracker(),
180184
persistentState = persistentState,
181185
persistenceScope = appScope,
186+
ioDispatcher = ioDispatcher,
182187
)
183188
}

stats-data/src/main/java/com/adsamcik/tracker/stats/data/metric/DefaultPersistentDirtyState.kt

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package com.adsamcik.tracker.stats.data.metric
22

33
import com.adsamcik.tracker.stats.api.metric.PersistentDirtyState
4+
import kotlinx.coroutines.CoroutineDispatcher
5+
import kotlinx.coroutines.Dispatchers
46
import kotlinx.coroutines.sync.Mutex
57
import kotlinx.coroutines.sync.withLock
8+
import kotlinx.coroutines.withContext
69
import java.io.File
710
import java.io.IOException
811

@@ -25,51 +28,72 @@ import java.io.IOException
2528
* Thread safety: every read/write acquires [mutex], so concurrent markDirty
2629
* calls from different coroutines serialize on the file but never on the
2730
* in-memory CAS path (which is what's hot).
31+
*
32+
* # Dispatcher
33+
*
34+
* All file I/O is wrapped in `withContext([ioDispatcher])` as defense-in-depth.
35+
* Even if the caller is already on IO, this guarantees correct dispatcher usage
36+
* regardless of how the class is invoked.
37+
*
38+
* # Crash safety caveat
39+
*
40+
* The atomic-rename write protects against corruption from a crash *during* a
41+
* write. It does NOT provide fsync-level durability — if the OS hard-kills the
42+
* process between the caller's in-memory state change and the enqueued [add]
43+
* or [remove] call, that mutation is lost. This is by design; see
44+
* [DurableMetricDirtyTracker] class KDoc for the full crash-safety discussion.
2845
*/
2946
internal class DefaultPersistentDirtyState(
3047
private val filesDir: File,
48+
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
3149
) : PersistentDirtyState {
3250

3351
private val file: File = File(filesDir, FILE_NAME)
3452
private val tempFile: File = File(filesDir, "$FILE_NAME.tmp")
3553
private val mutex = Mutex()
3654

37-
override suspend fun load(): Set<String> = mutex.withLock {
38-
if (!file.exists()) return@withLock emptySet()
39-
try {
40-
file.useLines { lines ->
41-
lines.map { it.trim() }
42-
.filter { it.isNotEmpty() }
43-
.toCollection(LinkedHashSet())
55+
override suspend fun load(): Set<String> = withContext(ioDispatcher) {
56+
mutex.withLock {
57+
if (!file.exists()) return@withLock emptySet()
58+
try {
59+
file.useLines { lines ->
60+
lines.map { it.trim() }
61+
.filter { it.isNotEmpty() }
62+
.toCollection(LinkedHashSet())
63+
}
64+
} catch (e: IOException) {
65+
// Corrupt or unreadable — return empty (the source-watermark
66+
// fallback in AchievementWorker will catch any missed bits on
67+
// the next worker run).
68+
emptySet()
4469
}
45-
} catch (e: IOException) {
46-
// Corrupt or unreadable — return empty (the source-watermark
47-
// fallback in AchievementWorker will catch any missed bits on
48-
// the next worker run).
49-
emptySet()
5070
}
5171
}
5272

5373
override suspend fun add(tables: Set<String>) {
5474
if (tables.isEmpty()) return
55-
mutex.withLock {
56-
val current = readUnlocked()
57-
val merged = current + tables.map { it.trim() }.filter { it.isNotEmpty() }
58-
if (merged.size == current.size) return@withLock
59-
writeAtomicallyUnlocked(merged)
75+
withContext(ioDispatcher) {
76+
mutex.withLock {
77+
val current = readUnlocked()
78+
val merged = current + tables.map { it.trim() }.filter { it.isNotEmpty() }
79+
if (merged.size == current.size) return@withLock
80+
writeAtomicallyUnlocked(merged)
81+
}
6082
}
6183
}
6284

6385
override suspend fun remove(tables: Set<String>) {
6486
if (tables.isEmpty()) return
65-
mutex.withLock {
66-
val current = readUnlocked()
67-
val survivors = current - tables
68-
if (survivors.size == current.size) return@withLock
69-
if (survivors.isEmpty()) {
70-
file.delete()
71-
} else {
72-
writeAtomicallyUnlocked(survivors)
87+
withContext(ioDispatcher) {
88+
mutex.withLock {
89+
val current = readUnlocked()
90+
val survivors = current - tables
91+
if (survivors.size == current.size) return@withLock
92+
if (survivors.isEmpty()) {
93+
file.delete()
94+
} else {
95+
writeAtomicallyUnlocked(survivors)
96+
}
7397
}
7498
}
7599
}

stats-data/src/main/java/com/adsamcik/tracker/stats/data/metric/DurableMetricDirtyTracker.kt

Lines changed: 64 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ package com.adsamcik.tracker.stats.data.metric
33
import com.adsamcik.tracker.stats.api.metric.MetricDirtyTracker
44
import com.adsamcik.tracker.stats.api.metric.MetricDirtyTracker.Consumer
55
import com.adsamcik.tracker.stats.api.metric.PersistentDirtyState
6+
import kotlinx.coroutines.CompletableDeferred
7+
import kotlinx.coroutines.CoroutineDispatcher
68
import kotlinx.coroutines.CoroutineScope
9+
import kotlinx.coroutines.Dispatchers
710
import kotlinx.coroutines.launch
811
import kotlinx.coroutines.runBlocking
912

@@ -24,76 +27,101 @@ import kotlinx.coroutines.runBlocking
2427
* achievement_progress stays stale.
2528
*
2629
* This decorator closes that gap by also writing PERSISTENCE marks to disk on
27-
* every [markDirty]. On app startup [load] is called once (typically from
28-
* `Application.onCreate`) to backfill the in-memory PERSISTENCE consumer view
29-
* with anything left over from the previous process.
30+
* every [markDirty]. On app startup the persisted state is asynchronously
31+
* loaded (on [ioDispatcher]) and back-filled into the in-memory delegate so
32+
* both LIVE and PERSISTENCE consumer views see the rehydrated bits.
3033
*
3134
* # Performance contract
3235
*
3336
* - In-memory CAS (fast path) is untouched — every call still goes through
3437
* the wrapped delegate first.
35-
* - Disk writes are dispatched on [persistenceScope] (typically the app
36-
* scope on IO dispatcher) so they NEVER block the markDirty caller.
38+
* - Disk writes are dispatched on [persistenceScope] with [ioDispatcher]
39+
* so they NEVER block the markDirty caller and never starve Default
40+
* dispatcher threads with file I/O.
3741
* - LIVE consumer marks are intentionally NOT persisted — they exist only
3842
* for the current session and there's no recovery story.
3943
*
40-
* # Crash safety
44+
* # Crash safety — important caveats
45+
*
46+
* **[markDirty] is best-effort async**: the in-memory bit is set immediately,
47+
* but the disk write is enqueued via `persistenceScope.launch`. If the OS
48+
* hard-kills the process in the microsecond window between the in-memory
49+
* set and the disk commit, that mark is lost. This is a deliberate trade-off:
50+
* blocking the hot path for fsync is too expensive. The recovery path is
51+
* `AchievementWorker`'s source-watermark fallback, which re-evaluates from
52+
* the database if dirty bits are missing.
53+
*
54+
* In practice, [DurableMetricDirtyTracker] converts "dirty bits lost forever
55+
* on process death" into "dirty bits lost for at most one worker window per
56+
* crash". It **reduces** but does **not eliminate** dirty-loss risk.
4157
*
42-
* - markDirty + disk write race: in-memory bit set; disk write enqueued but
43-
* may not have run when process dies. The next markDirty on the same
44-
* table re-enqueues. Worst case: a single SourceTable's bit is lost for
45-
* one worker window, AchievementWorker's source-watermark fallback (its
46-
* own follow-up todo) catches it on the next run.
4758
* - consumeDirty + disk remove race: in-memory drained; disk still has bits
4859
* that get loaded on next startup, causing one redundant evaluation. Safe.
60+
*
61+
* # Rehydration coordination
62+
*
63+
* The async init completes [rehydrationComplete] once the persisted state has
64+
* been loaded. [consumeDirty] for the [Consumer.PERSISTENCE] consumer awaits
65+
* this deferred (via [runBlocking]) so a worker that runs before rehydration
66+
* finishes will block briefly on the worker thread (never the main thread)
67+
* rather than seeing an empty set and short-circuiting. After the first
68+
* await the deferred returns instantly.
4969
*/
5070
class DurableMetricDirtyTracker(
5171
private val delegate: MetricDirtyTracker,
5272
private val persistentState: PersistentDirtyState,
5373
private val persistenceScope: CoroutineScope,
74+
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
5475
) : MetricDirtyTracker {
5576

77+
/**
78+
* Signals that the async rehydration from [persistentState] has completed.
79+
* [consumeDirty] for [Consumer.PERSISTENCE] awaits this so workers never
80+
* see a stale-empty set during the first few milliseconds after init.
81+
*/
82+
internal val rehydrationComplete = CompletableDeferred<Unit>()
83+
5684
init {
57-
// Synchronously rehydrate PERSISTENCE marks from disk so any worker
58-
// that calls consumeDirty on the singleton's first use sees what the
59-
// previous process left behind. runBlocking is safe here because:
60-
// - This is singleton init (off the main thread under Hilt).
61-
// - load() is a small disk read (one file, < 1 KB typically).
62-
// - The cost is bounded and one-time per process.
63-
val persisted = runBlocking { persistentState.load() }
64-
if (persisted.isNotEmpty()) {
65-
// Mark into the delegate so BOTH LIVE and PERSISTENCE consumer
66-
// views see the rehydrated state (parity with what the original
67-
// markDirty calls did before the process died). The LIVE
68-
// consumer's first flush will see the bits, evaluate live state,
69-
// and proceed normally; the PERSISTENCE consumer's first
70-
// consumeDirty will return them so the worker re-evaluates.
71-
delegate.markDirty(persisted)
85+
// Asynchronously rehydrate PERSISTENCE marks from disk. Launched on
86+
// ioDispatcher so the Hilt singleton init never blocks the calling
87+
// thread (which may be main). Workers that call consumeDirty before
88+
// this completes will block on rehydrationComplete in consumeDirty,
89+
// which is safe because workers always run on background threads.
90+
persistenceScope.launch(ioDispatcher) {
91+
try {
92+
val persisted = persistentState.load()
93+
if (persisted.isNotEmpty()) {
94+
delegate.markDirty(persisted)
95+
}
96+
} finally {
97+
rehydrationComplete.complete(Unit)
98+
}
7299
}
73100
}
74101

75102
override fun markDirty(table: String) {
76103
delegate.markDirty(table)
77-
// Persist only the PERSISTENCE-bound subset. The delegate fans out to
78-
// all consumers in-memory, but on disk we only care about marks the
79-
// worker needs to recover after a crash.
80-
persistenceScope.launch { persistentState.add(setOf(table)) }
104+
persistenceScope.launch(ioDispatcher) { persistentState.add(setOf(table)) }
81105
}
82106

83107
override fun markDirty(tables: Set<String>) {
84108
if (tables.isEmpty()) return
85109
delegate.markDirty(tables)
86-
persistenceScope.launch { persistentState.add(tables) }
110+
persistenceScope.launch(ioDispatcher) { persistentState.add(tables) }
87111
}
88112

89113
override fun consumeDirty(consumer: Consumer): Set<String> {
114+
if (consumer == Consumer.PERSISTENCE) {
115+
// Block until rehydration finishes so the worker never sees an
116+
// empty set because the init coroutine hasn't loaded yet. This
117+
// runBlocking is acceptable because:
118+
// - PERSISTENCE consumers are WorkManager workers on background threads.
119+
// - After the first call the deferred is already complete (instant).
120+
runBlocking { rehydrationComplete.await() }
121+
}
90122
val consumed = delegate.consumeDirty(consumer)
91123
if (consumer == Consumer.PERSISTENCE && consumed.isNotEmpty()) {
92-
// PERSISTENCE drained — remove these from durable storage so a
93-
// subsequent crash doesn't re-process them. The LIVE consumer's
94-
// drains do NOT touch disk; their bits remain durable for the
95-
// worker until it consumes them itself.
96-
persistenceScope.launch { persistentState.remove(consumed) }
124+
persistenceScope.launch(ioDispatcher) { persistentState.remove(consumed) }
97125
}
98126
return consumed
99127
}

stats-data/src/test/java/com/adsamcik/tracker/stats/data/metric/DefaultPersistentDirtyStateTest.kt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,17 @@ package com.adsamcik.tracker.stats.data.metric
33
import com.adsamcik.tracker.stats.api.metric.PersistentDirtyState
44
import io.kotest.matchers.collections.shouldBeEmpty
55
import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder
6+
import io.kotest.matchers.ints.shouldBeGreaterThan
7+
import kotlinx.coroutines.CoroutineDispatcher
8+
import kotlinx.coroutines.Dispatchers
69
import kotlinx.coroutines.test.runTest
710
import org.junit.jupiter.api.AfterEach
811
import org.junit.jupiter.api.BeforeEach
912
import org.junit.jupiter.api.DisplayName
1013
import org.junit.jupiter.api.Test
1114
import java.io.File
15+
import java.util.concurrent.atomic.AtomicInteger
16+
import kotlin.coroutines.CoroutineContext
1217

1318
@DisplayName("DefaultPersistentDirtyState")
1419
class DefaultPersistentDirtyStateTest {
@@ -113,4 +118,23 @@ class DefaultPersistentDirtyStateTest {
113118
// parse OR empty. Critical point: no throw.
114119
(loaded.isEmpty() || loaded.isNotEmpty()) // tautology — just asserts the line ran
115120
}
121+
122+
@Test
123+
fun `file IO dispatches on the injected ioDispatcher`() = runTest {
124+
val dispatchCount = AtomicInteger(0)
125+
val trackingDispatcher = object : CoroutineDispatcher() {
126+
override fun dispatch(context: CoroutineContext, block: Runnable) {
127+
dispatchCount.incrementAndGet()
128+
block.run()
129+
}
130+
}
131+
132+
val ioState = DefaultPersistentDirtyState(tempDir, trackingDispatcher)
133+
ioState.add(setOf("test_table"))
134+
dispatchCount.get() shouldBeGreaterThan 0
135+
136+
val countAfterAdd = dispatchCount.get()
137+
ioState.remove(setOf("test_table"))
138+
dispatchCount.get() shouldBeGreaterThan countAfterAdd
139+
}
116140
}

0 commit comments

Comments
 (0)