Skip to content

Commit 825a1d7

Browse files
authored
Merge pull request #112 from kchinnasamy/fix/voice-dictation-decoder-deadlock
fix(voice): break decoder deadlock when recognizer fires before audio ends
2 parents 4c1701a + 0b5beb9 commit 825a1d7

1 file changed

Lines changed: 27 additions & 28 deletions

File tree

voice/data/src/main/kotlin/com/matejdro/micropebble/voice/TranscriptionProviderImpl.kt

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import kotlinx.coroutines.channels.Channel
2424
import kotlinx.coroutines.coroutineScope
2525
import kotlinx.coroutines.flow.Flow
2626
import kotlinx.coroutines.launch
27-
import kotlinx.coroutines.selects.select
2827
import kotlinx.coroutines.withContext
2928
import logcat.logcat
3029
import si.inova.kotlinova.core.reporting.ErrorReporter
@@ -120,21 +119,24 @@ class TranscriptionProviderImpl(
120119
speechRecognizer
121120
}
122121

123-
// Decode and pipe watch audio to the SpeechRecognizer, breaking three possible
124-
// deadlocks:
125-
// 1. Recognizer fires onResults early and stops reading the pipe: the pipe
126-
// fills and the next write() blocks forever.
127-
// 2. Recognizer never fires onResults (waiting for EOF) while audio source
128-
// ends naturally via StopTransfer: same pipe-fills-then-blocks symptom,
129-
// but we never see the recognizer signal completion.
130-
// 3. Recognizer is slow consuming the pipe while the watch streams faster
131-
// than it can process: pipe fills mid-stream, write blocks before either
132-
// audio end or recognition complete signals fire.
122+
// Decode and pipe watch audio to the SpeechRecognizer. Two coroutines plus
123+
// the main writer body handle the four termination paths:
133124
//
134-
// Decode runs on its own coroutine and feeds an unbounded channel — it always
135-
// drains the source flow regardless of how fast the writer can keep up.
136-
// A watcher coroutine closes the pipe (interrupting any blocked write with
137-
// IOException) when either the recognizer completes or the audio source ends.
125+
// 1. Audio source ends (watch StopTransfer): decoder's finally closes
126+
// the channel and the pipe — writer drains remaining items and exits;
127+
// EOF on the pipe signals the recognizer to fire onResults.
128+
// 2. Recognizer fires onResults / onError early: finishedWaiter closes
129+
// the pipe (interrupts any blocked write with IOException) and
130+
// cancels the decoder; writer exits via closed channel or IOException.
131+
// 3. Recognizer is a slow consumer and the pipe fills mid-stream:
132+
// the decoder keeps draining the source into the unbounded channel,
133+
// so StopTransfer can still arrive — eventually path 1 or 2 fires.
134+
// 4. Recognizer fires early AND the watch sends no more audio and no
135+
// StopTransfer: path 2's decoder.cancel() forces the decoder's
136+
// finally, which closes the channel and frees the writer's for-loop.
137+
//
138+
// writeStream.close() is idempotent under runCatching, so both paths can
139+
// race to close the pipe without coordination.
138140
@Suppress("BlockingMethodInNonBlockingContext") // We already are on IO
139141
private suspend fun pipeAudioWithEarlyTermination(
140142
speexInfo: VoiceEncoderInfo.Speex,
@@ -145,16 +147,6 @@ class TranscriptionProviderImpl(
145147
): Unit = coroutineScope {
146148
val targetBufferSize = Short.SIZE_BYTES * speexInfo.frameSize
147149
val decodedFrames = Channel<ByteArray>(Channel.UNLIMITED)
148-
val audioEnded = CompletableDeferred<Unit>()
149-
150-
val closeWatcher = launch {
151-
select<Unit> {
152-
finishedReceiver.onAwait { }
153-
audioEnded.onAwait { }
154-
}
155-
logcat { "Closing audio pipe to deliver EOF to recognizer" }
156-
runCatching { writeStream.close() }
157-
}
158150

159151
val decoder = launch {
160152
try {
@@ -172,21 +164,28 @@ class TranscriptionProviderImpl(
172164
}
173165
} finally {
174166
decodedFrames.close()
175-
audioEnded.complete(Unit)
167+
this@TranscriptionProviderImpl.logcat { "Audio source ended, closing pipe to deliver EOF" }
168+
runCatching { writeStream.close() }
176169
}
177170
}
178171

172+
val finishedWaiter = launch {
173+
finishedReceiver.await()
174+
this@TranscriptionProviderImpl.logcat { "Recognizer finished, closing pipe and stopping decoder" }
175+
runCatching { writeStream.close() }
176+
decoder.cancel()
177+
}
178+
179179
try {
180180
for (decoded in decodedFrames) {
181-
this@TranscriptionProviderImpl.logcat { "Wrote audio stream packet" }
182181
if (finishedReceiver.isCompleted) continue
183182
writeStream.write(decoded, 0, targetBufferSize)
184183
}
185184
} catch (e: IOException) {
186185
logcat { "Pipe closed during write: ${e.message ?: "no message"}" }
187186
} finally {
188187
decoder.cancel()
189-
closeWatcher.cancel()
188+
finishedWaiter.cancel()
190189
}
191190
}
192191

0 commit comments

Comments
 (0)