Skip to content

Commit fc3f3da

Browse files
committed
Various small speed and safety fixes
1 parent 9e3b6d7 commit fc3f3da

File tree

5 files changed

+60
-28
lines changed

5 files changed

+60
-28
lines changed

music_assistant/controllers/player_queues.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,11 @@ async def play_index(
805805
if isinstance(index, str):
806806
index = self.index_by_id(queue_id, index)
807807
queue.current_index = index
808+
# update current item and elapsed time and signal update
809+
# this way the UI knows immediately that a new item is loading
810+
queue.current_item = self.get_item(queue_id, index)
811+
queue.elapsed_time = seek_position
812+
self.signal_update(queue_id)
808813
queue.index_in_buffer = index
809814
queue.flow_mode_stream_log = []
810815
prefer_flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE)
@@ -823,7 +828,9 @@ async def play_index(
823828
# send play_media request to player
824829
# NOTE that we debounce this a bit to account for someone hitting the next button
825830
# like a madman. This will prevent the player from being overloaded with requests.
826-
async def _play_index(index: int) -> None:
831+
async def _play_index(index: int, debounce: bool) -> None:
832+
if debounce:
833+
await asyncio.sleep(0.25)
827834
for attempt in range(5):
828835
try:
829836
queue_item = self.get_item(queue_id, index)
@@ -856,6 +863,8 @@ async def _play_index(index: int) -> None:
856863
MediaType.RADIO,
857864
MediaType.PLUGIN_SOURCE,
858865
)
866+
if debounce:
867+
await asyncio.sleep(0.25)
859868
queue.flow_mode = flow_mode
860869
await self.mass.players.play_media(
861870
player_id=queue_id,
@@ -866,13 +875,15 @@ async def _play_index(index: int) -> None:
866875

867876
# we set a flag to notify the update logic that we're transitioning to a new track
868877
self._transitioning_players.add(queue_id)
869-
self.mass.call_later(
870-
1 if debounce else 0,
878+
task = self.mass.create_task(
871879
_play_index,
872880
index,
881+
debounce,
873882
task_id=f"play_media_{queue_id}",
883+
abort_existing=True,
874884
)
875885
self.signal_update(queue_id)
886+
await task
876887

877888
@api_command("player_queues/transfer")
878889
async def transfer_queue(

music_assistant/controllers/webserver.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -303,11 +303,11 @@ async def handle_client(self) -> web.WebSocketResponse:
303303
self._writer_task = self.mass.create_task(self._writer())
304304

305305
# send server(version) info when client connects
306-
self._send_message(self.mass.get_server_info())
306+
await self._send_message(self.mass.get_server_info())
307307

308308
# forward all events to clients
309309
def handle_event(event: MassEvent) -> None:
310-
self._send_message(event)
310+
self._send_message_sync(event)
311311

312312
unsub_callback = self.mass.subscribe(handle_event)
313313

@@ -331,7 +331,7 @@ def handle_event(event: MassEvent) -> None:
331331
disconnect_warn = f"Received invalid JSON: {msg.data}"
332332
break
333333

334-
self._handle_command(command_msg)
334+
await self._handle_command(command_msg)
335335

336336
except asyncio.CancelledError:
337337
self._logger.debug("Connection closed by client")
@@ -360,15 +360,15 @@ def handle_event(event: MassEvent) -> None:
360360

361361
return wsock
362362

363-
def _handle_command(self, msg: CommandMessage) -> None:
363+
async def _handle_command(self, msg: CommandMessage) -> None:
364364
"""Handle an incoming command from the client."""
365365
self._logger.debug("Handling command %s", msg.command)
366366

367367
# work out handler for the given path/command
368368
handler = self.mass.command_handlers.get(msg.command)
369369

370370
if handler is None:
371-
self._send_message(
371+
await self._send_message(
372372
ErrorResultMessage(
373373
msg.message_id,
374374
InvalidCommand.error_code,
@@ -392,20 +392,20 @@ async def _run_handler(self, handler: APICommandHandler, msg: CommandMessage) ->
392392
async for item in iterator:
393393
result.append(item)
394394
if len(result) >= 500:
395-
self._send_message(
395+
await self._send_message(
396396
SuccessResultMessage(msg.message_id, result, partial=True)
397397
)
398398
result = []
399399
elif asyncio.iscoroutine(result):
400400
result = await result
401-
self._send_message(SuccessResultMessage(msg.message_id, result))
401+
await self._send_message(SuccessResultMessage(msg.message_id, result))
402402
except Exception as err:
403403
if self._logger.isEnabledFor(logging.DEBUG):
404404
self._logger.exception("Error handling message: %s", msg)
405405
else:
406406
self._logger.error("Error handling message: %s: %s", msg.command, str(err))
407407
err_msg = str(err) or err.__class__.__name__
408-
self._send_message(
408+
await self._send_message(
409409
ErrorResultMessage(msg.message_id, getattr(err, "error_code", 999), err_msg)
410410
)
411411

@@ -424,13 +424,30 @@ async def _writer(self) -> None:
424424
self._logger.log(VERBOSE_LOG_LEVEL, "Writing: %s", message)
425425
await self.wsock.send_str(message)
426426

427-
def _send_message(self, message: MessageType) -> None:
428-
"""Send a message to the client.
427+
async def _send_message(self, message: MessageType) -> None:
428+
"""Send a message to the client (for large response messages).
429429
430+
Runs JSON serialization in executor to avoid blocking for large messages.
430431
Closes connection if the client is not reading the messages.
431432
432433
Async friendly.
433434
"""
435+
# Run JSON serialization in executor to avoid blocking for large messages
436+
loop = asyncio.get_running_loop()
437+
_message = await loop.run_in_executor(None, message.to_json)
438+
439+
try:
440+
self._to_write.put_nowait(_message)
441+
except asyncio.QueueFull:
442+
self._logger.error("Client exceeded max pending messages: %s", MAX_PENDING_MSG)
443+
444+
self._cancel()
445+
446+
def _send_message_sync(self, message: MessageType) -> None:
447+
"""Send a message from a sync context (for small messages like events).
448+
449+
Serializes inline without executor overhead since events are typically small.
450+
"""
434451
_message = message.to_json()
435452

436453
try:

music_assistant/helpers/audio_buffer.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,10 @@ async def clear(self) -> None:
253253
self._data_available.notify_all()
254254
self._space_available.notify_all()
255255

256-
# Run garbage collection in executor to reclaim memory from large buffers
256+
# Run garbage collection in background to reclaim memory from large buffers
257+
# Don't await it to avoid blocking during task cancellation
257258
loop = asyncio.get_running_loop()
258-
await loop.run_in_executor(None, gc.collect)
259+
loop.run_in_executor(None, gc.collect)
259260

260261
async def set_eof(self) -> None:
261262
"""Signal that no more data will be added to the buffer."""

music_assistant/helpers/buffered_generator.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from functools import wraps
99
from typing import Final, ParamSpec
1010

11+
from music_assistant.helpers.util import empty_queue
12+
1113
# Type variables for the buffered decorator
1214
_P = ParamSpec("_P")
1315

@@ -39,6 +41,7 @@ async def buffered(
3941
buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size)
4042
producer_error: Exception | None = None
4143
threshold_reached = asyncio.Event()
44+
cancelled = asyncio.Event()
4245

4346
if buffer_size <= 1:
4447
# No buffering needed, yield directly
@@ -51,24 +54,22 @@ async def producer() -> None:
5154
nonlocal producer_error
5255
try:
5356
async for chunk in generator:
57+
if cancelled.is_set():
58+
# Consumer has stopped, exit cleanly
59+
break
5460
await buffer.put(chunk)
5561
if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
5662
threshold_reached.set()
57-
except asyncio.CancelledError:
58-
# Task was cancelled, clean up the generator
59-
with contextlib.suppress(RuntimeError, asyncio.CancelledError):
60-
await generator.aclose()
61-
raise
6263
except Exception as err:
6364
producer_error = err
64-
# Consumer probably stopped consuming, close the original generator
65-
with contextlib.suppress(RuntimeError, asyncio.CancelledError):
66-
await generator.aclose()
6765
finally:
6866
threshold_reached.set()
67+
# Clean up the generator
68+
with contextlib.suppress(RuntimeError, asyncio.CancelledError):
69+
await generator.aclose()
6970
# Signal end of stream by putting None
7071
with contextlib.suppress(asyncio.QueueFull):
71-
await buffer.put(None)
72+
buffer.put_nowait(None)
7273

7374
# Start the producer task
7475
loop = asyncio.get_running_loop()
@@ -100,9 +101,11 @@ async def producer() -> None:
100101
yield data
101102

102103
finally:
103-
# Ensure the producer task is cleaned up
104-
if not producer_task.done():
105-
producer_task.cancel()
104+
# Signal the producer to stop
105+
cancelled.set()
106+
# Drain the queue to unblock the producer if it's waiting on put()
107+
empty_queue(buffer)
108+
# Wait for the producer to finish cleanly
106109
with contextlib.suppress(asyncio.CancelledError, RuntimeError):
107110
await producer_task
108111

music_assistant/providers/airplay/raop.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> Non
129129
# cancel the current audio source task
130130
assert self._audio_source_task # for type checker
131131
self._audio_source_task.cancel()
132-
with suppress(asyncio.CancelledError):
132+
with suppress(asyncio.CancelledError, RuntimeError):
133133
await self._audio_source_task
134134
# set new audio source and restart the stream
135135
self._audio_source = audio_source

0 commit comments

Comments
 (0)