Skip to content

Commit 37bb799

Browse files
committed
fix(audio): global colored logs, arecord exit detection, capture health metrics
- Add _ColorFormatter in cli/main.py for global ANSI-colored log output (DEBUG=grey, WARNING=yellow, ERROR=red, CRITICAL=bold red) - Fix arecord unexpected exit going undetected: detect proc.poll() != None after inner read loop and trigger retry instead of silently ending capture - Add CaptureHealth dataclass with drop/recovery counts and outage durations - Emit capture_dropped/recovered/gave_up timeline events from voice pipeline - Persist capture_health in session JSON for quantitative USB analysis - Retry aplay up to 3 times on start failure with 0.3s delay - Speaker restart cooldown (3s) to avoid log spam during device outage - Add todo doc for ASR stream stale after mute/unmute investigation Made-with: Cursor
1 parent 9cc3e78 commit 37bb799

6 files changed

Lines changed: 312 additions & 39 deletions

File tree

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# ASR stream goes stale after mute/unmute or long silence
2+
3+
After muting then unmuting the mic (or after a prolonged period where no speech reaches Riva), the ASR stream silently stops producing results even though PCM audio is still flowing.
4+
5+
## Observed behavior
6+
7+
- Session `c87be1b2` (2026-03-14): 9 turns completed successfully.
8+
- Turn 8 triggered a degenerate LLM reasoning loop (10,101 chars, **91.89 s** wall-clock).
9+
- During that wait, the user muted and later unmuted the mic.
10+
- After turn 9 completed, no further `asr_final` events appeared for ~1.5 min despite the green **user_amplitude** waveform being visible on the timeline (PCM capture was healthy).
11+
- Terminal showed no ASR errors; the stream ended normally at session close with `Stream task timeout, cancelling`.
12+
13+
## Why amplitude shows but ASR does not
14+
15+
In `_feed_pcm_to_pipeline`, amplitude is always computed and sent to the client (lines 1005-1024) regardless of `mic_muted`. The ASR send is gated:
16+
17+
```python
18+
if not mic_muted:
19+
await asr.send_audio(pcm_bytes)
20+
```
21+
22+
So the timeline waveform looks alive, but if the Riva gRPC stream has internally timed out (or VAD state has gone stale after 90+ seconds of silence/mute), newly sent audio produces no results.
23+
24+
## Probable root cause (needs confirmation)
25+
26+
Riva Streaming ASR has internal session limits:
27+
- **gRPC keepalive / idle timeout**: if no audio is sent for an extended period the server may silently close the stream.
28+
- **VAD state**: after a long silence gap, the VAD model may reset or require a fresh trigger to start detecting speech again.
29+
- **Maximum session duration**: Riva may cap single-stream duration; after that, the stream yields no more results even though it stays open.
30+
31+
The exact Riva behavior here is unconfirmed — the stream appeared open (no error logged) but stopped producing finals.
32+
33+
## What is already in place
34+
35+
- `mic_muted` gates `asr.send_audio()` in the classic pipeline (line 1003).
36+
- On mute, 0.5 s of silence is injected (`b"\x00" * int(16000 * 2 * 0.5)`) to flush any pending VAD partial (line 1041-1044).
37+
- On unmute, `mic_muted = False` resumes sending PCM to ASR.
38+
- No stream-health monitoring or automatic restart exists today.
39+
40+
## Proposed solutions (pick one or combine)
41+
42+
### Option A: Keep-alive noise during mute
43+
44+
While `mic_muted` is True, instead of sending nothing, send **very low amplitude white noise** (e.g., ±10 out of ±32768) at normal cadence. This keeps the gRPC stream active and the VAD model warm without triggering false speech detection.
45+
46+
Pros: Simplest change; no stream lifecycle management. \
47+
Cons: Assumes the Riva stream itself is still healthy; does not help if the stream has a hard session-duration cap.
48+
49+
### Option B: Restart ASR stream after stale timeout
50+
51+
Monitor elapsed time since the last `asr_final`. If no final arrives within a configurable window (e.g., 60 s while unmuted), tear down the current `RivaASRBackend` stream and create a fresh one.
52+
53+
1. Track `_last_asr_final_time` in the turn executor; update it on every `asr_final`.
54+
2. In `server_capture_consumer` (or a watchdog task), check `time.time() - _last_asr_final_time > ASR_STALE_TIMEOUT`.
55+
3. If stale and `not mic_muted`: call `asr.stop()`, then `asr.start()` to open a fresh streaming session.
56+
4. Log `[asr] Stream restarted after stale timeout` at WARNING level.
57+
58+
Pros: Covers all root causes (idle timeout, VAD reset, session-duration cap). \
59+
Cons: Slightly more complex; brief gap in ASR coverage during restart (~200 ms).
60+
61+
### Option C: Proactive stream rotation
62+
63+
After every turn (or every N turns), close and re-open the ASR stream. This preempts any session-duration limit and keeps the stream fresh.
64+
65+
Pros: Eliminates stale state entirely. \
66+
Cons: Adds latency at turn boundaries; may lose a partial if speech is ongoing during rotation.
67+
68+
## Recommendation
69+
70+
**Option A + B combined**: send keep-alive noise during mute (A) to prevent idle timeout, and add a stale-timeout watchdog (B) as a safety net for unexpected stream failures. Option C is heavier and only needed if Riva has a hard session cap that A+B cannot address.
71+
72+
## Diagnosis checklist (before implementing)
73+
74+
- [ ] Confirm Riva Streaming ASR session limits: check `riva_asr` service config for `max_duration_seconds`, keepalive settings, or gRPC deadline.
75+
- [ ] Add a log line in `RivaASRBackend` when the gRPC response iterator ends (to distinguish "server closed stream" from "no results but stream open").
76+
- [ ] Reproduce by muting for 60+ s mid-session and verifying ASR stops producing results on unmute.
77+
78+
## Effort
79+
80+
**Small–Medium**: Option A is ~30 min (noise generator in `_feed_pcm_to_pipeline`). Option B is ~1–2 hours (watchdog task + stream restart plumbing + tests).

src/multi_modal_ai_studio/cli/main.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,26 @@
1414
from pathlib import Path
1515

1616

17+
class _ColorFormatter(logging.Formatter):
18+
"""Logging formatter that adds ANSI colors to the level name."""
19+
20+
_COLORS = {
21+
logging.DEBUG: "\033[90m", # grey
22+
logging.INFO: "", # default
23+
logging.WARNING: "\033[93m", # yellow
24+
logging.ERROR: "\033[91m", # red
25+
logging.CRITICAL: "\033[91;1m", # bold red
26+
}
27+
_RESET = "\033[0m"
28+
29+
def format(self, record: logging.LogRecord) -> str:
30+
color = self._COLORS.get(record.levelno, "")
31+
msg = super().format(record)
32+
if color:
33+
return f"{color}{msg}{self._RESET}"
34+
return msg
35+
36+
1737
def main():
1838
"""Main entry point for CLI."""
1939
from multi_modal_ai_studio import __version__
@@ -154,11 +174,10 @@ def main():
154174

155175
args = parser.parse_args()
156176

157-
# Configure logging
158-
logging.basicConfig(
159-
level=getattr(logging, args.log_level),
160-
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
161-
)
177+
# Configure logging with colored output
178+
_handler = logging.StreamHandler()
179+
_handler.setFormatter(_ColorFormatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
180+
logging.basicConfig(level=getattr(logging, args.log_level), handlers=[_handler])
162181

163182
logger = logging.getLogger(__name__)
164183
logger.info("Multi-modal AI Studio starting...")

src/multi_modal_ai_studio/core/session.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ def to_dict(self) -> Dict[str, Any]:
307307
"audio_amplitude_history": getattr(self, "audio_amplitude_history", None) or [],
308308
"ttl_bands": getattr(self, "ttl_bands", None) or [],
309309
"app_version": getattr(self, "app_version", None) or __version__,
310+
"capture_health": getattr(self, "capture_health", None),
310311
}
311312

312313
def save(self, path: Path) -> None:

src/multi_modal_ai_studio/devices/capture.py

Lines changed: 119 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import queue
1212
import subprocess
1313
import threading
14-
from typing import Optional
14+
import time
15+
from dataclasses import dataclass, field
16+
from typing import Any, Dict, List, Optional
1517

1618
logger = logging.getLogger(__name__)
1719

@@ -26,20 +28,68 @@
2628
RETRY_BACKOFF_BASE = 0.5 # seconds; doubles each attempt up to a cap
2729
RETRY_BACKOFF_MAX = 5.0
2830

31+
# ANSI escape codes for colored terminal output
32+
_RED = "\033[91m"
33+
_GREEN = "\033[92m"
34+
_YELLOW = "\033[93m"
35+
_RESET = "\033[0m"
36+
37+
# Sentinel dict placed in the queue to signal capture health events.
38+
# Pipeline code should check `isinstance(item, dict)` before treating as PCM bytes.
39+
CAPTURE_EVENT_TYPE = "__capture_event__"
40+
41+
42+
def _make_capture_event(event: str, **kwargs: Any) -> Dict[str, Any]:
43+
"""Create a capture health event dict for the queue."""
44+
d: Dict[str, Any] = {"__type__": CAPTURE_EVENT_TYPE, "event": event, "ts": time.time()}
45+
d.update(kwargs)
46+
return d
47+
48+
49+
def is_capture_event(item: Any) -> bool:
50+
"""Return True if item is a capture health event (not PCM bytes)."""
51+
return isinstance(item, dict) and item.get("__type__") == CAPTURE_EVENT_TYPE
52+
53+
54+
@dataclass
55+
class CaptureHealth:
56+
"""Accumulated capture health metrics (thread-safe reads after capture ends)."""
57+
device: str = ""
58+
total_drops: int = 0
59+
total_recoveries: int = 0
60+
outages: List[Dict[str, float]] = field(default_factory=list)
61+
gave_up: bool = False
62+
63+
def to_dict(self) -> Dict[str, Any]:
64+
total_downtime = sum(o.get("duration_s", 0) for o in self.outages)
65+
return {
66+
"device": self.device,
67+
"total_drops": self.total_drops,
68+
"total_recoveries": self.total_recoveries,
69+
"total_downtime_s": round(total_downtime, 3),
70+
"outages": self.outages,
71+
"gave_up": self.gave_up,
72+
}
73+
2974

3075
def _capture_alsa(
3176
device: str,
3277
out_queue: "queue.Queue[Optional[bytes]]",
3378
stop_event: threading.Event,
3479
proc_holder: Optional[list] = None,
80+
health: Optional[CaptureHealth] = None,
3581
) -> None:
3682
"""Capture from ALSA device via arecord; put PCM chunks in out_queue. Runs in thread.
3783
Uses plughw when device is hw:X,Y so ALSA can do sample-rate conversion (many USB mics only support 48kHz).
3884
If proc_holder is a list, the subprocess is stored as proc_holder[0] so the caller can terminate it to release the device quickly.
3985
4086
Auto-restarts arecord up to MAX_CAPTURE_RETRIES times when the device
4187
disappears transiently (e.g. USB bus contention with a camera).
88+
Sends capture health events through out_queue so the pipeline can track outages.
4289
"""
90+
if health is not None:
91+
health.device = device
92+
4393
dev = (device or "default").strip()
4494
if dev.startswith("hw:") and not dev.startswith("plughw:"):
4595
dev = "plug" + dev
@@ -48,6 +98,7 @@ def _capture_alsa(
4898

4999
retries = 0
50100
ever_produced_chunk = False
101+
drop_time: Optional[float] = None
51102

52103
while not stop_event.is_set():
53104
logger.info("ALSA capture starting: %s (device=%s)", " ".join(cmd), device)
@@ -70,6 +121,8 @@ def _capture_alsa(
70121
logger.warning("Failed to start arecord for %s: %s", device, e)
71122
if retries >= MAX_CAPTURE_RETRIES:
72123
logger.error("ALSA capture giving up after %d retries for %s", retries, device)
124+
if health is not None:
125+
health.gave_up = True
73126
out_queue.put(None)
74127
return
75128
retries += 1
@@ -87,24 +140,49 @@ def _capture_alsa(
87140
try:
88141
err = proc.stderr.read().decode("utf-8", errors="replace").strip() if proc.stderr else ""
89142
if err:
90-
logger.warning("ALSA capture read empty (device %s). arecord stderr: %s", device, err)
143+
logger.error("%sALSA capture read empty (device %s). arecord stderr: %s%s", _RED, device, err, _RESET)
91144
else:
92-
logger.warning("ALSA capture read returned empty (device %s); check device/sample rate", device)
145+
logger.error("%sALSA capture read returned empty (device %s); check device/sample rate%s", _RED, device, _RESET)
93146
except Exception:
94-
logger.warning("ALSA capture read returned empty (device %s)", device)
147+
logger.error("%sALSA capture read returned empty (device %s)%s", _RED, device, _RESET)
95148
died_unexpectedly = True
96149
break
97150
if first_chunk_this_run:
98151
first_chunk_this_run = False
99152
if not ever_produced_chunk:
100153
logger.info("ALSA first PCM chunk received from %s (%d bytes); pipeline will get amplitude", device, len(chunk))
101154
else:
102-
logger.info("ALSA capture resumed from %s (%d bytes) after retry", device, len(chunk))
155+
recovery_dur = time.time() - drop_time if drop_time else 0
156+
logger.warning(
157+
"%s[capture_health] RECOVERED device %s after %.2fs outage (retry %d)%s",
158+
_GREEN, device, recovery_dur, retries, _RESET,
159+
)
160+
if health is not None:
161+
health.total_recoveries += 1
162+
if health.outages:
163+
health.outages[-1]["duration_s"] = round(recovery_dur, 3)
164+
out_queue.put(_make_capture_event(
165+
"recovered", device=device, outage_s=round(recovery_dur, 3), retry=retries,
166+
))
167+
drop_time = None
103168
retries = 0
104169
ever_produced_chunk = True
105170
out_queue.put(chunk)
171+
172+
# arecord exited on its own (proc.poll() != None) while we didn't ask it to stop
173+
if not died_unexpectedly and not stop_event.is_set() and proc.poll() is not None:
174+
rc = proc.returncode
175+
try:
176+
err = proc.stderr.read().decode("utf-8", errors="replace").strip() if proc.stderr else ""
177+
except Exception:
178+
err = ""
179+
logger.error(
180+
"%sarecord exited unexpectedly for %s (rc=%s): %s%s",
181+
_RED, device, rc, err or "(no stderr)", _RESET,
182+
)
183+
died_unexpectedly = True
106184
except Exception as e:
107-
logger.warning("ALSA capture read error for %s: %s", device, e)
185+
logger.error("%sALSA capture read error for %s: %s%s", _RED, device, e, _RESET)
108186
died_unexpectedly = True
109187
finally:
110188
try:
@@ -122,28 +200,50 @@ def _capture_alsa(
122200
break
123201

124202
if died_unexpectedly and retries < MAX_CAPTURE_RETRIES:
203+
if drop_time is None:
204+
drop_time = time.time()
125205
retries += 1
206+
if health is not None:
207+
health.total_drops += 1
208+
health.outages.append({"drop_ts": round(drop_time, 3), "retry": retries, "duration_s": 0})
126209
delay = min(RETRY_BACKOFF_BASE * (2 ** (retries - 1)), RETRY_BACKOFF_MAX)
127-
logger.warning(
128-
"ALSA capture died unexpectedly for %s; retry %d/%d in %.1fs",
129-
device, retries, MAX_CAPTURE_RETRIES, delay,
210+
logger.error(
211+
"%s[capture_health] DROPPED device %s; retry %d/%d in %.1fs%s",
212+
_RED, device, retries, MAX_CAPTURE_RETRIES, delay, _RESET,
130213
)
214+
out_queue.put(_make_capture_event(
215+
"dropped", device=device, retry=retries, max_retries=MAX_CAPTURE_RETRIES,
216+
))
131217
stop_event.wait(delay)
132218
continue
133219

134220
if died_unexpectedly:
135-
logger.error("ALSA capture giving up after %d retries for %s", retries, device)
221+
logger.error("%s[capture_health] GAVE UP on device %s after %d retries%s", _RED, device, retries, _RESET)
222+
if health is not None:
223+
health.gave_up = True
224+
if health.outages:
225+
health.outages[-1]["duration_s"] = round(time.time() - drop_time, 3) if drop_time else 0
226+
out_queue.put(_make_capture_event("gave_up", device=device, retries=retries))
136227
elif first_chunk_this_run and not ever_produced_chunk:
137228
try:
138229
err = proc.stderr.read().decode("utf-8", errors="replace").strip() if proc.stderr else ""
139230
if err:
140-
logger.warning("ALSA capture ended with no chunks (device %s). arecord stderr: %s", device, err)
231+
logger.error("%sALSA capture ended with no chunks (device %s). arecord stderr: %s%s", _RED, device, err, _RESET)
141232
else:
142-
logger.warning("ALSA capture ended without sending any chunks (device %s); check arecord -D %s", device, dev)
233+
logger.error("%sALSA capture ended without sending any chunks (device %s); check arecord -D %s%s", _RED, device, dev, _RESET)
143234
except Exception:
144-
logger.warning("ALSA capture ended without sending any chunks (device %s)", device)
235+
logger.error("%sALSA capture ended without sending any chunks (device %s)%s", _RED, device, _RESET)
145236
break
146237

238+
# Log summary if any drops occurred
239+
if health is not None and health.total_drops > 0:
240+
summary = health.to_dict()
241+
logger.warning(
242+
"%s[capture_health] SESSION SUMMARY for %s: drops=%d recoveries=%d downtime=%.2fs gave_up=%s%s",
243+
_YELLOW, device, summary["total_drops"], summary["total_recoveries"],
244+
summary["total_downtime_s"], summary["gave_up"], _RESET,
245+
)
246+
147247
out_queue.put(None)
148248

149249

@@ -213,6 +313,7 @@ def start_server_mic_capture(
213313
out_queue: "queue.Queue[Optional[bytes]]",
214314
stop_event: threading.Event,
215315
proc_holder: Optional[list] = None,
316+
health_out: Optional[list] = None,
216317
) -> Optional[threading.Thread]:
217318
"""
218319
Start a thread that captures from the given server audio input device and puts
@@ -224,20 +325,24 @@ def start_server_mic_capture(
224325
out_queue: queue to put chunks into; None is sentinel when capture ends
225326
stop_event: when set, capture thread should exit
226327
proc_holder: optional list; for ALSA, the arecord subprocess is appended so the caller can terminate it to release the device quickly
328+
health_out: optional list; if provided, CaptureHealth is appended as health_out[0] for retrieval after thread exits
227329
228330
Returns:
229331
The started thread, or None if capture could not be started.
230332
"""
231333
if not device:
232334
return None
335+
health = CaptureHealth(device=device or "")
233336
if source == "alsa":
234337
target = _capture_alsa
235-
args = (device, out_queue, stop_event, proc_holder)
338+
args = (device, out_queue, stop_event, proc_holder, health)
236339
elif source == "usb":
237340
target = _capture_pyaudio
238341
args = (device, out_queue, stop_event)
239342
else:
240343
return None
344+
if health_out is not None:
345+
health_out.append(health)
241346
t = threading.Thread(target=target, args=args, name="server-mic-capture", daemon=True)
242347
t.start()
243348
logger.info("Server mic capture started: %s device %s", source, device)

0 commit comments

Comments
 (0)