Skip to content

Commit 5cb4c3c

Browse files
committed
Fix buffered generator: must yield 1 second chunks
1 parent 6e9430a commit 5cb4c3c

File tree

2 files changed

+124
-53
lines changed

2 files changed

+124
-53
lines changed

music_assistant/controllers/streams.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
get_stream_details,
6767
resample_pcm_audio,
6868
)
69-
from music_assistant.helpers.buffered_generator import use_buffer
69+
from music_assistant.helpers.buffered_generator import use_audio_buffer
7070
from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
7171
from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
7272
from music_assistant.helpers.smart_fades import (
@@ -319,7 +319,8 @@ async def setup(self, config: CoreConfig) -> None:
319319
)
320320
# Start periodic garbage collection task
321321
# This ensures memory from audio buffers and streams is cleaned up regularly
322-
self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
322+
# DISABLED FOR TESTING - may cause event loop blocking
323+
# self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
323324

324325
async def close(self) -> None:
325326
"""Cleanup on exit."""
@@ -827,14 +828,18 @@ def get_announcement_url(
827828
# like https hosts and it also offers the pre-announce 'bell'
828829
return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
829830

830-
@use_buffer(30, 1)
831+
@use_audio_buffer(buffer_size=30, min_buffer_before_yield=4)
831832
async def get_queue_flow_stream(
832833
self,
833834
queue: PlayerQueue,
834835
start_queue_item: QueueItem,
835836
pcm_format: AudioFormat,
836837
) -> AsyncGenerator[bytes, None]:
837-
"""Get a flow stream of all tracks in the queue as raw PCM audio."""
838+
"""
839+
Get a flow stream of all tracks in the queue as raw PCM audio.
840+
841+
yields chunks of exactly 1 second of audio in the given pcm_format.
842+
"""
838843
# ruff: noqa: PLR0915
839844
assert pcm_format.content_type.is_pcm()
840845
queue_track = None
@@ -917,6 +922,7 @@ async def get_queue_flow_stream(
917922
buffer = b""
918923
# handle incoming audio chunks
919924
first_chunk_received = False
925+
buffer_filled = False
920926
async for chunk in self.get_queue_item_stream(
921927
queue_track,
922928
pcm_format=pcm_format,
@@ -941,8 +947,14 @@ async def get_queue_flow_stream(
941947
del chunk
942948
if len(buffer) < req_buffer_size:
943949
# buffer is not full enough, move on
950+
# yield control to event loop to prevent blocking pipe writes
951+
# use 10ms delay to ensure I/O operations can complete
952+
await asyncio.sleep(0.01)
944953
continue
945954

955+
if not buffer_filled and last_fadeout_part:
956+
buffer_filled = True
957+
946958
#### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
947959
if last_fadeout_part and last_streamdetails:
948960
# perform crossfade
@@ -967,7 +979,7 @@ async def get_queue_flow_stream(
967979
last_play_log_entry.seconds_streamed += (
968980
crossfade_part_len / 2 / pcm_sample_size
969981
)
970-
# send crossfade_part (as one big chunk)
982+
# yield crossfade_part - buffered_generator will rechunk to 1-second
971983
yield crossfade_part
972984
del crossfade_part
973985
# also write the leftover bytes from the crossfade action
@@ -1263,7 +1275,7 @@ async def get_queue_item_stream(
12631275
self.mass.create_task(music_prov.on_streamed(streamdetails))
12641276
# Periodic GC task will handle memory cleanup every 15 minutes
12651277

1266-
@use_buffer(30, 1)
1278+
@use_audio_buffer(buffer_size=30, min_buffer_before_yield=4)
12671279
async def get_queue_item_stream_with_smartfade(
12681280
self,
12691281
queue_item: QueueItem,

music_assistant/helpers/buffered_generator.py

Lines changed: 106 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Helper for adding buffering to async generators."""
1+
"""Helper for adding buffering to async audio generators."""
22

33
from __future__ import annotations
44

@@ -8,7 +8,9 @@
88
from functools import wraps
99
from typing import Any, Final, ParamSpec
1010

11-
from music_assistant.helpers.util import close_async_generator, empty_queue
11+
from music_assistant_models.streamdetails import AudioFormat
12+
13+
from music_assistant.helpers.util import close_async_generator
1214

1315
# Type variables for the buffered decorator
1416
_P = ParamSpec("_P")
@@ -21,31 +23,42 @@
2123
_ACTIVE_PRODUCER_TASKS: set[asyncio.Task[Any]] = set()
2224

2325

24-
async def buffered(
26+
async def buffered_audio(
2527
generator: AsyncGenerator[bytes, None],
28+
pcm_format: AudioFormat,
2629
buffer_size: int = DEFAULT_BUFFER_SIZE,
2730
min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
2831
) -> AsyncGenerator[bytes, None]:
2932
"""
30-
Add buffering to an async generator that yields bytes.
33+
Add buffering to an async audio generator that yields PCM audio bytes.
34+
35+
This function uses a shared buffer with asyncio.Condition to decouple the producer
36+
(reading from the stream) from the consumer (yielding to the client).
3137
32-
This function uses an asyncio.Queue to decouple the producer (reading from the stream)
33-
from the consumer (yielding to the client). The producer runs in a separate task and
34-
fills the buffer, while the consumer yields from the buffer.
38+
Ensures chunks yielded to the consumer are exactly 1 second of audio
39+
(critical for sync timing calculations).
3540
3641
Args:
3742
generator: The async generator to buffer
38-
buffer_size: Maximum number of chunks to buffer (default: 30)
43+
pcm_format: AudioFormat - defines chunk size for 1-second audio chunks
44+
buffer_size: Maximum number of 1-second chunks to buffer (default: 30)
3945
min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
4046
4147
Example:
42-
async for chunk in buffered(my_generator(), buffer_size=100):
48+
async for chunk in buffered_audio(my_generator(), pcm_format, buffer_size=100):
49+
# Each chunk is exactly 1 second of audio
4350
process(chunk)
4451
"""
45-
buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size)
52+
# Shared state between producer and consumer
53+
data_buffer = bytearray() # Shared buffer for audio data
54+
condition = asyncio.Condition() # Synchronization primitive
4655
producer_error: Exception | None = None
47-
threshold_reached = asyncio.Event()
48-
cancelled = asyncio.Event()
56+
producer_done = False
57+
cancelled = False
58+
59+
# Calculate chunk size and buffer limits
60+
chunk_size = pcm_format.pcm_sample_size # Size of 1 second of audio
61+
max_buffer_bytes = buffer_size * chunk_size
4962

5063
if buffer_size <= 1:
5164
# No buffering needed, yield directly
@@ -55,33 +68,43 @@ async def buffered(
5568

5669
async def producer() -> None:
5770
"""Read from the original generator and fill the buffer."""
58-
nonlocal producer_error
71+
nonlocal producer_error, producer_done, cancelled
5972
generator_consumed = False
6073
try:
6174
async for chunk in generator:
6275
generator_consumed = True
63-
if cancelled.is_set():
64-
# Consumer has stopped, exit cleanly
76+
if cancelled:
6577
break
66-
await buffer.put(chunk)
67-
if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
68-
threshold_reached.set()
69-
# Yield to event loop every chunk to prevent blocking
70-
await asyncio.sleep(0)
78+
79+
# Wait if buffer is too full
80+
async with condition:
81+
while len(data_buffer) >= max_buffer_bytes and not cancelled:
82+
await condition.wait()
83+
84+
if cancelled:
85+
break
86+
87+
# Append to shared buffer
88+
data_buffer.extend(chunk)
89+
# Notify consumer that data is available
90+
condition.notify()
91+
92+
# Yield to event loop to prevent blocking
93+
# Use 10ms delay to ensure I/O operations (pipe writes) can complete
94+
await asyncio.sleep(0.01)
95+
7196
except Exception as err:
7297
producer_error = err
7398
if isinstance(err, asyncio.CancelledError):
7499
raise
75100
finally:
76-
threshold_reached.set()
77101
# Clean up the generator if needed
78102
if not generator_consumed:
79103
await close_async_generator(generator)
80-
# Signal end of stream by putting None
81-
# We must wait for space in the queue if needed, otherwise the consumer may
82-
# hang waiting for data that will never come
83-
if not cancelled.is_set():
84-
await buffer.put(None)
104+
# Signal end of stream
105+
async with condition:
106+
producer_done = True
107+
condition.notify()
85108

86109
# Start the producer task
87110
loop = asyncio.get_running_loop()
@@ -94,51 +117,80 @@ async def producer() -> None:
94117
# Remove from set when done
95118
producer_task.add_done_callback(_ACTIVE_PRODUCER_TASKS.discard)
96119

120+
# Calculate minimum buffer level before yielding
121+
min_buffer_bytes = min_buffer_before_yield * chunk_size
122+
97123
try:
98124
# Wait for initial buffer to fill
99-
await threshold_reached.wait()
125+
async with condition:
126+
while len(data_buffer) < min_buffer_bytes and not producer_done:
127+
await condition.wait()
100128

101-
# Consume from buffer and yield
129+
# Consume from buffer and yield 1-second audio chunks
102130
while True:
103-
data = await buffer.get()
104-
if data is None:
105-
# End of stream
106-
if producer_error:
107-
raise producer_error
108-
break
109-
yield data
131+
async with condition:
132+
# Wait for enough data or end of stream
133+
while len(data_buffer) < chunk_size and not producer_done:
134+
await condition.wait()
135+
136+
# Check if we're done
137+
if len(data_buffer) < chunk_size and producer_done:
138+
# Yield any remaining partial chunk
139+
if data_buffer:
140+
chunk = bytes(data_buffer)
141+
data_buffer.clear()
142+
condition.notify()
143+
yield chunk
144+
if producer_error:
145+
raise producer_error
146+
break
147+
148+
# Extract exactly 1 second of audio
149+
chunk = bytes(data_buffer[:chunk_size])
150+
del data_buffer[:chunk_size]
151+
152+
# Notify producer that space is available
153+
condition.notify()
154+
155+
# Yield outside the lock to avoid holding it during I/O
156+
yield chunk
110157

111158
finally:
112159
# Signal the producer to stop
113-
cancelled.set()
114-
# Drain the queue to unblock the producer if it's waiting on put()
115-
empty_queue(buffer)
160+
async with condition:
161+
cancelled = True
162+
condition.notify()
116163
# Wait for the producer to finish cleanly with a timeout to prevent blocking
117164
with contextlib.suppress(asyncio.CancelledError, RuntimeError, asyncio.TimeoutError):
118165
await asyncio.wait_for(asyncio.shield(producer_task), timeout=1.0)
119166

120167

121-
def use_buffer(
168+
def use_audio_buffer(
169+
pcm_format_arg: str = "pcm_format",
122170
buffer_size: int = DEFAULT_BUFFER_SIZE,
123171
min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
124172
) -> Callable[
125173
[Callable[_P, AsyncGenerator[bytes, None]]],
126174
Callable[_P, AsyncGenerator[bytes, None]],
127175
]:
128176
"""
129-
Add buffering to async generator functions that yield bytes (decorator).
177+
Add buffering to async audio generator functions that yield PCM audio bytes (decorator).
130178
131-
This decorator uses an asyncio.Queue to decouple the producer (reading from the stream)
132-
from the consumer (yielding to the client). The producer runs in a separate task and
133-
fills the buffer, while the consumer yields from the buffer.
179+
This decorator uses a shared buffer with asyncio.Condition to decouple the producer
180+
(reading from the stream) from the consumer (yielding to the client).
181+
182+
Ensures chunks yielded are exactly 1 second of audio (critical for sync timing).
134183
135184
Args:
136-
buffer_size: Maximum number of chunks to buffer (default: 30)
185+
pcm_format_arg: Name of the argument containing AudioFormat (default: "pcm_format")
186+
buffer_size: Maximum number of 1-second chunks to buffer (default: 30)
137187
min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
138188
139189
Example:
140-
@use_buffer(buffer_size=100)
141-
async def my_stream() -> AsyncGenerator[bytes, None]:
190+
@use_audio_buffer(pcm_format_arg="pcm_format", buffer_size=100)
191+
async def my_stream(pcm_format: AudioFormat) -> AsyncGenerator[bytes, None]:
192+
# Generator can yield variable-sized chunks
193+
# Decorator ensures output is exactly 1-second chunks
142194
...
143195
"""
144196

@@ -147,8 +199,15 @@ def decorator(
147199
) -> Callable[_P, AsyncGenerator[bytes, None]]:
148200
@wraps(func)
149201
async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> AsyncGenerator[bytes, None]:
150-
async for chunk in buffered(
202+
# Extract pcm_format from function arguments
203+
pcm_format = kwargs.get(pcm_format_arg)
204+
if pcm_format is None:
205+
msg = f"Audio buffer decorator requires '{pcm_format_arg}' argument"
206+
raise ValueError(msg)
207+
208+
async for chunk in buffered_audio(
151209
func(*args, **kwargs),
210+
pcm_format=pcm_format,
152211
buffer_size=buffer_size,
153212
min_buffer_before_yield=min_buffer_before_yield,
154213
):

0 commit comments

Comments
 (0)