Skip to content

Commit c1bf6b8

Browse files
committed
Harden idleness tracking and cooldown against races
Three fixes from code review: - EventDispatcher: serialize trackedJobs add/remove with _isIdle writes via a synchronized block. Without this, a stale completion callback could overwrite the _isIdle=false set by a concurrent trackJob, leaving the orchestrator thinking the dispatcher was idle while a job was still tracked. - MonitorOrchestrator: replace the StateFlow-conflation-vulnerable withTimeoutOrNull/filter pattern with a deterministic workGeneration counter snapshot before delay(15s) compared after. A fast dispatch finishing inside the grace window now reliably restarts the cooldown. - VolumeTool: defensive guard for degenerate stream bounds (min > max) before coerceIn, which would throw IllegalArgumentException on an empty range.
1 parent bdbcaf8 commit c1bf6b8

6 files changed

Lines changed: 183 additions & 19 deletions

File tree

app/src/main/java/eu/darken/bluemusic/monitor/core/audio/VolumeTool.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ class VolumeTool @Inject constructor(
152152

153153
val max = getMaxVolume(streamId)
154154
val min = getMinVolume(streamId)
155+
if (min > max) {
156+
log(TAG, WARN) { "Invalid stream bounds: min=$min > max=$max for $streamId; aborting changeVolume." }
157+
return false
158+
}
155159
val clampedTarget = targetLevel.coerceIn(min, max)
156160
if (clampedTarget != targetLevel) {
157161
log(TAG, WARN) { "Target level $targetLevel clamped to $clampedTarget (min=$min, max=$max)." }

app/src/main/java/eu/darken/bluemusic/monitor/core/service/EventDispatcher.kt

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import kotlinx.coroutines.sync.Mutex
3434
import kotlinx.coroutines.sync.withLock
3535
import java.util.concurrent.ConcurrentHashMap
3636
import java.util.concurrent.atomic.AtomicBoolean
37+
import java.util.concurrent.atomic.AtomicLong
3738
import javax.inject.Inject
3839
import javax.inject.Singleton
3940

@@ -68,16 +69,32 @@ class EventDispatcher @Inject constructor(
6869
private val _isIdle = MutableStateFlow(true)
6970
val isIdle: StateFlow<Boolean> = _isIdle.asStateFlow()
7071

72+
// Monotonic counter incremented on every job tracked. Allows the orchestrator's
73+
// cooldown to detect "work happened during grace" deterministically, without
74+
// depending on StateFlow conflation behaviour for short-lived work.
75+
private val workGeneration = AtomicLong(0)
76+
fun currentWorkGeneration(): Long = workGeneration.get()
77+
78+
// Lock that serializes the (set mutation + _isIdle write) pair so completion
79+
// callbacks for old jobs can't overwrite an `_isIdle = false` set by a fresh
80+
// trackJob — see EventDispatcherTest.`isIdle stays consistent under concurrent track and complete`.
81+
private val trackingLock = Any()
82+
7183
suspend fun awaitIdle() {
7284
isIdle.filter { it }.first()
7385
}
7486

7587
private fun trackJob(job: Job) {
76-
trackedJobs.add(job)
77-
_isIdle.value = false
88+
workGeneration.incrementAndGet()
89+
synchronized(trackingLock) {
90+
trackedJobs.add(job)
91+
_isIdle.value = false
92+
}
7893
job.invokeOnCompletion {
79-
trackedJobs.remove(job)
80-
if (trackedJobs.isEmpty()) _isIdle.value = true
94+
synchronized(trackingLock) {
95+
trackedJobs.remove(job)
96+
if (trackedJobs.isEmpty()) _isIdle.value = true
97+
}
8198
}
8299
}
83100

app/src/main/java/eu/darken/bluemusic/monitor/core/service/MonitorOrchestrator.kt

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,10 @@ import kotlinx.coroutines.delay
2222
import kotlinx.coroutines.flow.catch
2323
import kotlinx.coroutines.flow.distinctUntilChanged
2424
import kotlinx.coroutines.flow.emptyFlow
25-
import kotlinx.coroutines.flow.filter
26-
import kotlinx.coroutines.flow.first
2725
import kotlinx.coroutines.flow.flatMapLatest
2826
import kotlinx.coroutines.flow.flow
2927
import kotlinx.coroutines.flow.launchIn
3028
import kotlinx.coroutines.flow.onEach
31-
import kotlinx.coroutines.withTimeoutOrNull
3229
import java.time.Duration
3330
import javax.inject.Inject
3431
import javax.inject.Singleton
@@ -121,16 +118,15 @@ class MonitorOrchestrator @Inject constructor(
121118
log(TAG) { "Active devices present; waiting for dispatcher idle then 15s grace before stopping." }
122119
while (true) {
123120
eventDispatcher.awaitIdle()
124-
val wentBusy = withTimeoutOrNull(Duration.ofSeconds(15).toMillis()) {
125-
eventDispatcher.isIdle.filter { !it }.first()
126-
true
121+
val genAtStart = eventDispatcher.currentWorkGeneration()
122+
delay(Duration.ofSeconds(15).toMillis())
123+
if (eventDispatcher.currentWorkGeneration() != genAtStart) {
124+
log(TAG) { "Dispatcher had new work during grace; restarting cooldown." }
125+
continue
127126
}
128-
if (wentBusy == null) {
129-
log(TAG) { "Dispatcher idle for 15s; stopping." }
130-
monitorJob.cancel()
131-
return@flow
132-
}
133-
log(TAG) { "Dispatcher went busy during grace; restarting cooldown." }
127+
log(TAG) { "Dispatcher idle for 15s; stopping." }
128+
monitorJob.cancel()
129+
return@flow
134130
}
135131
}
136132

app/src/test/java/eu/darken/bluemusic/monitor/core/audio/VolumeToolTest.kt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,31 @@ class VolumeToolTest : BaseTest() {
142142
volumeTool.hasRecentTarget(AudioStream.Id.STREAM_MUSIC, 7) shouldBe true
143143
}
144144

145+
@Test
146+
fun `min greater than max returns false without writing or recording target`() = runTest {
147+
// Defensive guard: if the platform reports degenerate stream bounds (min > max),
148+
// VolumeTool aborts before calling coerceIn (which would throw IllegalArgumentException).
149+
every { audioManager.getStreamMaxVolume(any()) } returns -1
150+
// getMinVolume returns 0 in unit tests (Build.VERSION.SDK_INT==0 path), so 0 > -1.
151+
audioLevels[AudioStream.Id.STREAM_MUSIC] = 0
152+
val writes = mutableListOf<Int>()
153+
every { audioManager.setStreamVolume(AudioStream.Id.STREAM_MUSIC.id, any(), any()) } answers {
154+
val level = secondArg<Int>()
155+
audioLevels[AudioStream.Id.STREAM_MUSIC] = level
156+
writes += level
157+
}
158+
159+
val result = volumeTool.changeVolume(
160+
streamId = AudioStream.Id.STREAM_MUSIC,
161+
targetLevel = 5,
162+
delay = Duration.ofMillis(1),
163+
)
164+
165+
result shouldBe false
166+
writes shouldBe emptyList()
167+
volumeTool.hasRecentTarget(AudioStream.Id.STREAM_MUSIC, 5) shouldBe false
168+
}
169+
145170
@Test
146171
fun `target level below min is clamped to min`() = runTest {
147172
// In unit tests Build.VERSION.SDK_INT is 0 (no Robolectric), so getMinVolume returns 0.

app/src/test/java/eu/darken/bluemusic/monitor/core/service/EventDispatcherTest.kt

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -969,4 +969,74 @@ class EventDispatcherTest : BaseTest() {
969969
// Release the deferred so the cancelled coroutine can finish unwinding cleanly.
970970
release.complete(Unit)
971971
}
972+
973+
@Test
974+
fun `currentWorkGeneration starts at 0 and increments on dispatch`() = runTest {
975+
val buds = managedDevice(budsAddress, connected = true)
976+
devicesFlow.value = listOf(buds)
977+
978+
val release = CompletableDeferred<Unit>()
979+
coEvery { module1.handle(any()) } coAnswers { release.await() }
980+
981+
val dispatcher = createDispatcher()
982+
dispatcher.currentWorkGeneration() shouldBe 0L
983+
984+
dispatcher.dispatch(event(budsAddress, CONNECTED))
985+
advanceUntilIdle()
986+
987+
// At least one trackJob ran (the cancellable launch).
988+
(dispatcher.currentWorkGeneration() > 0L) shouldBe true
989+
990+
release.complete(Unit)
991+
}
992+
993+
@Test
994+
fun `currentWorkGeneration keeps increasing across multiple dispatches`() = runTest {
995+
val buds = managedDevice(budsAddress, connected = true)
996+
devicesFlow.value = listOf(buds)
997+
998+
val firstRelease = CompletableDeferred<Unit>()
999+
val secondRelease = CompletableDeferred<Unit>()
1000+
var callCount = 0
1001+
coEvery { module1.handle(any()) } coAnswers {
1002+
val n = ++callCount
1003+
if (n == 1) firstRelease.await() else secondRelease.await()
1004+
}
1005+
1006+
val dispatcher = createDispatcher()
1007+
1008+
dispatcher.dispatch(event(budsAddress, CONNECTED))
1009+
advanceUntilIdle()
1010+
val genAfterFirst = dispatcher.currentWorkGeneration()
1011+
(genAfterFirst > 0L) shouldBe true
1012+
1013+
dispatcher.dispatch(event(budsAddress, DISCONNECTED))
1014+
advanceUntilIdle()
1015+
val genAfterSecond = dispatcher.currentWorkGeneration()
1016+
(genAfterSecond > genAfterFirst) shouldBe true
1017+
1018+
firstRelease.complete(Unit)
1019+
secondRelease.complete(Unit)
1020+
}
1021+
1022+
@Test
1023+
fun `currentWorkGeneration increments even when a fast dispatch finishes before isIdle is observed`() = runTest {
1024+
// Verifies that the orchestrator's "did anything happen during grace?" check
1025+
// remains accurate even for blink-and-you-miss-it dispatches whose isIdle=false
1026+
// gets conflated by StateFlow before any collector observes it.
1027+
val buds = managedDevice(budsAddress, connected = true)
1028+
devicesFlow.value = listOf(buds)
1029+
1030+
// Module returns immediately — total in-flight time is microseconds.
1031+
coEvery { module1.handle(any()) } returns Unit
1032+
1033+
val dispatcher = createDispatcher()
1034+
val genBefore = dispatcher.currentWorkGeneration()
1035+
1036+
dispatcher.dispatch(event(budsAddress, CONNECTED))
1037+
advanceUntilIdle()
1038+
1039+
dispatcher.isIdle.value shouldBe true
1040+
(dispatcher.currentWorkGeneration() > genBefore) shouldBe true
1041+
}
9721042
}

app/src/test/java/eu/darken/bluemusic/monitor/core/service/MonitorOrchestratorTest.kt

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.BeforeEach
2525
import org.junit.jupiter.api.Test
2626
import testhelpers.BaseTest
2727
import java.time.Duration
28+
import java.util.concurrent.atomic.AtomicLong
2829

2930
class MonitorOrchestratorTest : BaseTest() {
3031

@@ -41,6 +42,7 @@ class MonitorOrchestratorTest : BaseTest() {
4142
private lateinit var devicesFlow: MutableStateFlow<List<ManagedDevice>>
4243
private lateinit var stateFlow: MutableStateFlow<BluetoothRepo.State>
4344
private lateinit var idleFlow: MutableStateFlow<Boolean>
45+
private lateinit var workGen: AtomicLong
4446

4547
@BeforeEach
4648
fun setup() {
@@ -58,9 +60,11 @@ class MonitorOrchestratorTest : BaseTest() {
5860
ringerModeObserver = mockk { every { ringerMode } returns MutableSharedFlow() }
5961
bluetoothEventQueue = mockk { every { events } returns MutableSharedFlow() }
6062
idleFlow = MutableStateFlow(true)
63+
workGen = AtomicLong(0)
6164
eventDispatcher = mockk(relaxed = true) {
6265
every { isIdle } returns idleFlow
6366
coEvery { awaitIdle() } coAnswers { idleFlow.filter { it }.first() }
67+
every { currentWorkGeneration() } answers { workGen.get() }
6468
}
6569
ringerModeTransitionHandler = mockk(relaxed = true)
6670
ownerRegistry = mockk(relaxed = true)
@@ -224,16 +228,25 @@ class MonitorOrchestratorTest : BaseTest() {
224228
monitorReturned = true
225229
}
226230

227-
// Idle from start; advance 10s into grace window
231+
// Idle from start; advance 10s into the first grace window.
228232
advanceTimeBy(10_000)
229233
monitorReturned shouldBe false
230234

231-
// Mid-grace, dispatcher goes busy → resets cooldown
235+
// Mid-grace: dispatcher gets new work — bump generation AND flip idle to false
236+
// to simulate what trackJob() does in production. The cooldown checks generation
237+
// (not isIdle) at the end of the 15s delay, so this *will* be detected.
238+
workGen.incrementAndGet()
232239
idleFlow.value = false
240+
241+
// Finish out the original 15s delay — orchestrator sees generation changed, restarts.
242+
advanceTimeBy(5_001)
243+
monitorReturned shouldBe false
244+
245+
// Restart loop calls awaitIdle() which suspends because dispatcher is busy.
233246
advanceTimeBy(20_000)
234247
monitorReturned shouldBe false
235248

236-
// Dispatcher becomes idle again
249+
// Dispatcher becomes idle again — awaitIdle() returns, new 15s grace begins.
237250
idleFlow.value = true
238251
advanceTimeBy(14_000)
239252
monitorReturned shouldBe false
@@ -245,6 +258,45 @@ class MonitorOrchestratorTest : BaseTest() {
245258
job.cancel()
246259
}
247260

261+
@Test
262+
fun `work bump alone during grace restarts cooldown even without idle flip`() = runTest {
263+
// Edge case: a fast dispatch that completes within the 15s grace window —
264+
// its trackJob bumps the generation but isIdle may already be true again
265+
// by the time the cooldown's delay completes. Generation-based check still detects it.
266+
val device = managedDevice(
267+
"AA:BB:CC:DD:EE:FF",
268+
active = true,
269+
requiresMonitor = false,
270+
)
271+
devicesFlow.value = listOf(device)
272+
273+
val orchestrator = createOrchestrator()
274+
var monitorReturned = false
275+
276+
val job = launch {
277+
orchestrator.monitor(this@runTest) {}
278+
monitorReturned = true
279+
}
280+
281+
// Mid-grace, simulate a brief dispatch that bumps generation but leaves idle=true.
282+
advanceTimeBy(10_000)
283+
workGen.incrementAndGet()
284+
285+
// Finish out the original 15s delay — generation differs → restart.
286+
advanceTimeBy(5_001)
287+
monitorReturned shouldBe false
288+
289+
// Second grace window with no further work.
290+
advanceTimeBy(14_000)
291+
monitorReturned shouldBe false
292+
293+
advanceTimeBy(2_000)
294+
advanceUntilIdle()
295+
monitorReturned shouldBe true
296+
297+
job.cancel()
298+
}
299+
248300
@Test
249301
fun `no devices connected - stops after 15s`() = runTest {
250302
devicesFlow.value = emptyList()

0 commit comments

Comments
 (0)