44
55import asyncio
66import time
7- from collections .abc import Callable
7+ from collections .abc import AsyncGenerator , Callable
88from io import BytesIO
99from typing import TYPE_CHECKING , cast
1010
4949 INTERNAL_PCM_FORMAT ,
5050)
5151from music_assistant .helpers .audio import get_player_filter_params
52- from music_assistant .helpers .ffmpeg import get_ffmpeg_stream
5352from music_assistant .models .player import Player , PlayerMedia
5453
54+ from .timed_client_stream import TimedClientStream
55+
5556if TYPE_CHECKING :
5657 from aioresonate .server .client import ResonateClient
5758 from music_assistant_models .event import MassEvent
5859
5960 from .provider import ResonateProvider
6061
6162
63+ class MusicAssistantMediaStream (MediaStream ):
64+ """MediaStream implementation for Music Assistant with per-player DSP support."""
65+
66+ player_instance : ResonatePlayer
67+ internal_format : AudioFormat
68+ output_format : AudioFormat
69+
70+ def __init__ (
71+ self ,
72+ * ,
73+ main_channel_source : AsyncGenerator [bytes , None ],
74+ main_channel_format : ResonateAudioFormat ,
75+ player_instance : ResonatePlayer ,
76+ internal_format : AudioFormat ,
77+ output_format : AudioFormat ,
78+ ) -> None :
79+ """
80+ Initialise the media stream with audio source and format for main_channel().
81+
82+ Args:
83+ main_channel_source: Audio source generator for the main channel.
84+ main_channel_format: Audio format for the main channel (includes codec).
85+ player_instance: The ResonatePlayer instance for accessing mass and streams.
86+ internal_format: Internal processing format (float32 for headroom).
87+ output_format: Output PCM format (16-bit for player output).
88+ """
89+ super ().__init__ (
90+ main_channel_source = main_channel_source ,
91+ main_channel_format = main_channel_format ,
92+ )
93+ self .player_instance = player_instance
94+ self .internal_format = internal_format
95+ self .output_format = output_format
96+
97+ async def player_channel (
98+ self ,
99+ player_id : str ,
100+ preferred_format : ResonateAudioFormat | None = None ,
101+ position_us : int = 0 ,
102+ ) -> tuple [AsyncGenerator [bytes , None ], ResonateAudioFormat , int ] | None :
103+ """
104+ Get a player-specific audio stream with per-player DSP.
105+
106+ Args:
107+ player_id: Identifier for the player requesting the stream.
108+ preferred_format: The player's preferred native format for the stream.
109+ The implementation may return a different format; the library
110+ will handle any necessary conversion.
111+ position_us: Position in microseconds relative to the main_stream start.
112+ Used for late-joining players to sync with the main stream.
113+
114+ Returns:
115+ A tuple of (audio generator, audio format, actual position in microseconds)
116+ or None if unavailable. If None, the main_stream is used as fallback.
117+ """
118+ mass = self .player_instance .mass
119+ multi_client_stream = self .player_instance .timed_client_stream
120+ assert multi_client_stream is not None
121+
122+ dsp = mass .config .get_player_dsp_config (player_id )
123+ if not dsp .enabled :
124+ # DSP is disabled for this player, use main_stream
125+ return None
126+
127+ # Get per-player DSP filter parameters
128+ # Convert from internal format to output format
129+ filter_params = get_player_filter_params (
130+ mass , player_id , self .internal_format , self .output_format
131+ )
132+
133+ # Get the stream with position (in seconds)
134+ stream_gen , actual_position = await multi_client_stream .get_stream (
135+ output_format = self .output_format ,
136+ filter_params = filter_params ,
137+ )
138+
139+ # Convert position from seconds to microseconds for aioresonate API
140+ actual_position_us = int (actual_position * 1_000_000 )
141+
142+ # Return actual position in microseconds relative to main_stream start
143+ self .player_instance .logger .debug (
144+ "Providing channel stream for player %s at position %d us" ,
145+ player_id ,
146+ actual_position_us ,
147+ )
148+ return (
149+ stream_gen ,
150+ ResonateAudioFormat (
151+ sample_rate = self .output_format .sample_rate ,
152+ bit_depth = self .output_format .bit_depth ,
153+ channels = self .output_format .channels ,
154+ codec = self ._main_channel_format .codec ,
155+ ),
156+ actual_position_us ,
157+ )
158+
159+
62160class ResonatePlayer (Player ):
63161 """A resonate audio player in Music Assistant."""
64162
@@ -67,6 +165,7 @@ class ResonatePlayer(Player):
67165 unsub_group_event_cb : Callable [[], None ]
68166 last_sent_artwork_url : str | None = None
69167 _playback_task : asyncio .Task [None ] | None = None
168+ timed_client_stream : TimedClientStream | None = None
70169
71170 def __init__ (self , provider : ResonateProvider , player_id : str ) -> None :
72171 """Initialize the Player."""
@@ -84,6 +183,7 @@ def __init__(self, provider: ResonateProvider, player_id: str) -> None:
84183 self ._attr_type = PlayerType .PLAYER
85184 self ._attr_supported_features = {
86185 PlayerFeature .SET_MEMBERS ,
186+ PlayerFeature .MULTI_DEVICE_DSP ,
87187 }
88188 self ._attr_can_group_with = {provider .lookup_key }
89189 self ._attr_power_control = PLAYER_CONTROL_NONE
@@ -110,6 +210,15 @@ async def event_cb(self, event: ClientEvent) -> None:
110210 case ClientGroupChangedEvent (new_group = new_group ):
111211 self .unsub_group_event_cb ()
112212 self .unsub_group_event_cb = new_group .add_event_listener (self .group_event_cb )
213+ # Sync playback state from the new group
214+ match new_group .state :
215+ case PlaybackStateType .PLAYING :
216+ self ._attr_playback_state = PlaybackState .PLAYING
217+ case PlaybackStateType .PAUSED :
218+ self ._attr_playback_state = PlaybackState .PAUSED
219+ case PlaybackStateType .STOPPED :
220+ self ._attr_playback_state = PlaybackState .IDLE
221+ self .update_state ()
113222
114223 async def group_event_cb (self , event : GroupEvent ) -> None :
115224 """Event callback registered to the resonate group this player belongs to."""
@@ -155,10 +264,16 @@ async def group_event_cb(self, event: GroupEvent) -> None:
155264 self ._attr_elapsed_time = 0
156265 self ._attr_elapsed_time_last_updated = time .time ()
157266 self .update_state ()
158- case GroupMemberAddedEvent (client_id = _):
159- pass
160- case GroupMemberRemovedEvent (client_id = _):
161- pass
267+ case GroupMemberAddedEvent (client_id = client_id ):
268+ self .logger .debug ("Group member added: %s" , client_id )
269+ if client_id not in self ._attr_group_members :
270+ self ._attr_group_members .append (client_id )
271+ self .update_state ()
272+ case GroupMemberRemovedEvent (client_id = client_id ):
273+ self .logger .debug ("Group member removed: %s" , client_id )
274+ if client_id in self ._attr_group_members :
275+ self ._attr_group_members .remove (client_id )
276+ self .update_state ()
162277 case GroupDeletedEvent ():
163278 pass
164279
@@ -224,25 +339,35 @@ async def _run_playback(self, media: PlayerMedia) -> None:
224339 # Convert string codec to AudioCodec enum
225340 audio_codec = AudioCodec (output_codec )
226341
227- # Apply DSP and other audio filters
228- audio_source = get_ffmpeg_stream (
229- audio_input = self .mass .streams .get_stream (media , flow_pcm_format ),
230- input_format = flow_pcm_format ,
231- output_format = pcm_format ,
232- filter_params = get_player_filter_params (
233- self .mass , self .player_id , flow_pcm_format , pcm_format
234- ),
342+ # Get clean audio source in flow format (high quality internal format)
343+ # Format conversion and per-player DSP will be applied via player_channel
344+ audio_source = self .mass .streams .get_stream (media , flow_pcm_format )
345+
346+ # Create TimedClientStream to wrap the clean audio source
347+ # This distributes the audio to multiple subscribers without DSP
348+ self .timed_client_stream = TimedClientStream (
349+ audio_source = audio_source ,
350+ audio_format = flow_pcm_format ,
235351 )
236352
237- # Create MediaStream wrapping the audio source generator
238- media_stream = MediaStream (
239- source = audio_source ,
240- audio_format = ResonateAudioFormat (
353+ # Setup the main channel subscription
354+ # aioresonate only really supports 16-bit for now TODO: upgrade later to 32-bit
355+ main_channel_gen , main_position = await self .timed_client_stream .get_stream (
356+ output_format = pcm_format ,
357+ filter_params = None , # TODO: this should probably still include the safety limiter
358+ )
359+ assert main_position == 0.0 # first subscriber, should be zero
360+ media_stream = MusicAssistantMediaStream (
361+ main_channel_source = main_channel_gen ,
362+ main_channel_format = ResonateAudioFormat (
241363 sample_rate = pcm_format .sample_rate ,
242364 bit_depth = pcm_format .bit_depth ,
243365 channels = pcm_format .channels ,
244366 codec = audio_codec ,
245367 ),
368+ player_instance = self ,
369+ internal_format = flow_pcm_format ,
370+ output_format = pcm_format ,
246371 )
247372
248373 stop_time = await self .api .group .play_media (media_stream )
@@ -253,6 +378,8 @@ async def _run_playback(self, media: PlayerMedia) -> None:
253378 except Exception :
254379 self .logger .exception ("Error during playback for player %s" , self .display_name )
255380 raise
381+ finally :
382+ self .timed_client_stream = None
256383
257384 async def set_members (
258385 self ,
@@ -268,18 +395,12 @@ async def set_members(
268395 player = cast ("ResonatePlayer" , player ) # For type checking
269396 await self .api .group .remove_client (player .api )
270397 player .api .disconnect_behaviour = DisconnectBehaviour .STOP
271- self ._attr_group_members .remove (player_id )
272398 for player_id in player_ids_to_add or []:
273399 player = self .mass .players .get (player_id , True )
274400 player = cast ("ResonatePlayer" , player ) # For type checking
275401 player .api .disconnect_behaviour = DisconnectBehaviour .UNGROUP
276402 await self .api .group .add_client (player .api )
277- self ._attr_group_members .append (player_id )
278- self .update_state ()
279-
280- def _update_media_art (self , image_data : bytes ) -> None :
281- image = Image .open (BytesIO (image_data ))
282- self .api .group .set_media_art (image )
403+ # self.group_members will be updated by the group event callback
283404
284405 async def _on_queue_update (self , event : MassEvent ) -> None :
285406 """Extract and send current media metadata to resonate players on queue updates."""
@@ -328,7 +449,8 @@ async def _on_queue_update(self, event: MassEvent) -> None:
328449 current_item .media_item
329450 )
330451 if image_data is not None :
331- await asyncio .to_thread (self ._update_media_art , image_data )
452+ image = await asyncio .to_thread (Image .open , BytesIO (image_data ))
453+ await self .api .group .set_media_art (image )
332454 # TODO: null media art if not set?
333455
334456 track_duration = current_item .duration
0 commit comments