-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
2704 lines (2320 loc) · 106 KB
/
main.py
File metadata and controls
2704 lines (2320 loc) · 106 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# SPDX-License-Identifier: AGPL-3.0-only
"""Main FastAPI application for FXRoute."""
import json
import logging
import shutil
import time
import asyncio
import random
import zipfile
from contextlib import asynccontextmanager
from pathlib import Path
from typing import List, Optional
import uvicorn
from fastapi import FastAPI, Request, HTTPException, WebSocket, WebSocketDisconnect, UploadFile, File, Form
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from config import get_settings
BASE_DIR = Path(__file__).resolve().parent
STATIC_DIR = BASE_DIR / "static"
# Cooldown to prevent rapid mpv IPC flooding (ms)
PLAY_COMMAND_COOLDOWN_MS = 400
LOCAL_TRACK_SWITCH_SETTLE_MS = 180
SOURCE_HANDOFF_SETTLE_MS = 180
RADIO_SAMPLERATE_RENEGOTIATE_DELAY_MS = 1200
RADIO_SAMPLERATE_PRESET_BOUNCE_DELAY_MS = 350
# Track last play command time to debounce rapid requests
_last_play_command_time = 0.0
def _can_send_play_command():
"""Debounce rapid play/pause/seek commands to prevent mpv IPC overload."""
global _last_play_command_time
now = time.monotonic()
if now - _last_play_command_time < PLAY_COMMAND_COOLDOWN_MS / 1000:
return False
_last_play_command_time = now
return True
from models import (
DeleteTracksRequest,
PlaylistSaveRequest,
PlayRequest,
StationUpsertRequest,
)
from player import get_player, MPVNotInstalledError, MPVError
from stations import add_station, delete_station, get_stations, update_station
from playlists import delete_playlist, get_playlists, save_playlist
from library import LibraryScanner
from downloader import Downloader
from easyeffects import EasyEffectsManager
from peak_monitor import EasyEffectsPeakMonitor
from samplerate import (
SOURCE_MODE_APP_PLAYBACK,
SOURCE_MODE_BLUETOOTH_INPUT,
SOURCE_MODE_EXTERNAL_INPUT,
apply_persisted_audio_output_selection,
disconnect_connected_bluetooth_audio_sources,
get_audio_output_overview,
get_audio_source_overview,
get_bluetooth_audio_overview,
get_samplerate_status,
set_audio_output_selection,
set_audio_source_selection,
set_bluetooth_receiver_enabled,
)
from spotify import (
playerctl_available,
spotify_installed,
get_status as spotify_get_status,
play as spotify_play,
pause as spotify_pause,
toggle as spotify_toggle,
next_track as spotify_next,
previous as spotify_previous,
shuffle_toggle as spotify_shuffle_toggle,
loop_cycle as spotify_loop_cycle,
seek_to as spotify_seek_to,
set_volume as spotify_set_volume,
)
from system_volume import SystemVolumeError, get_output_volume, set_output_volume
logger = logging.getLogger(__name__)
UPLOAD_AUDIO_EXTENSIONS = {".mp3", ".flac", ".ogg", ".oga", ".opus", ".m4a", ".aac", ".wav", ".wma", ".webm", ".weba"}
ZIP_IGNORED_PARTS = {"__MACOSX"}
ZIP_IGNORED_FILENAMES = {".ds_store", "thumbs.db"}
# Global instances (initialized on startup)
settings = None
player_instance = None
library_scanner = None
downloader = None
easyeffects_manager = None
peak_monitor = None
peak_monitor_playback_armed = False
peak_monitor_transition_lock = None
peak_monitor_context_signature = None
easyeffects_preset_load_lock = None
source_transition_lock = None
external_input_loopback_module_id = None
external_input_loopback_source_name = None
bluetooth_input_source_name = None
bluetooth_monitor_task = None
bluetooth_agent_process = None
current_source_mode = SOURCE_MODE_APP_PLAYBACK
latest_spotify_state = None
current_track_info = None
last_track_info = None
playback_queue = []
playback_queue_original = []
playback_queue_index = -1
playback_queue_mode = "app_replace"
queue_advancing = False
queue_transition_target_url = None
playback_queue_loop = False
playback_queue_shuffle = False
single_track_loop = False
# WebSocket connection manager
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
logger.info(f"WebSocket connected: {len(self.active_connections)} active")
def disconnect(self, websocket: WebSocket) -> bool:
if websocket not in self.active_connections:
return False
self.active_connections.remove(websocket)
logger.info(f"WebSocket disconnected: {len(self.active_connections)} active")
return True
async def broadcast(self, message: dict):
data = json.dumps(message)
dead = []
for connection in list(self.active_connections):
try:
if connection.client_state.name != "CONNECTED":
dead.append(connection)
continue
await connection.send_text(data)
except Exception as e:
logger.debug(f"WebSocket send failed: {e}")
dead.append(connection)
for conn in set(dead):
self.disconnect(conn)
manager = ConnectionManager()
def _choose_unique_path(path: Path) -> Path:
if not path.exists():
return path
counter = 2
while True:
candidate = path.with_name(f"{path.stem}-{counter}{path.suffix}")
if not candidate.exists():
return candidate
counter += 1
def _choose_unique_dir(path: Path) -> Path:
if not path.exists():
return path
counter = 2
while True:
candidate = path.with_name(f"{path.name}-{counter}")
if not candidate.exists():
return candidate
counter += 1
def _is_safe_relative_zip_path(name: str) -> Optional[Path]:
normalized = name.replace("\\", "/").strip("/")
if not normalized:
return None
candidate = Path(normalized)
if candidate.is_absolute() or any(part in {"", ".", ".."} for part in candidate.parts):
return None
if any(part in ZIP_IGNORED_PARTS for part in candidate.parts):
return None
if candidate.name.lower() in ZIP_IGNORED_FILENAMES:
return None
return candidate
def _extract_zip_album(zip_path: Path, target_root: Path) -> dict:
extracted_files = []
skipped_entries = []
try:
with zipfile.ZipFile(zip_path) as archive:
if archive.testzip() is not None:
raise HTTPException(status_code=400, detail="Invalid ZIP archive")
for member in archive.infolist():
safe_relative = _is_safe_relative_zip_path(member.filename)
if safe_relative is None:
skipped_entries.append(member.filename)
continue
if member.is_dir():
(target_root / safe_relative).mkdir(parents=True, exist_ok=True)
continue
destination = target_root / safe_relative
destination.parent.mkdir(parents=True, exist_ok=True)
if destination.suffix.lower() in UPLOAD_AUDIO_EXTENSIONS:
destination = _choose_unique_path(destination)
with archive.open(member) as source, destination.open("wb") as target:
shutil.copyfileobj(source, target)
extracted_files.append(destination)
except zipfile.BadZipFile:
raise HTTPException(status_code=400, detail="Invalid ZIP archive")
audio_files = [path for path in extracted_files if path.suffix.lower() in UPLOAD_AUDIO_EXTENSIONS]
return {
"audio_files": audio_files,
"extracted_files": extracted_files,
"skipped_entries": skipped_entries,
}
def _clear_playback_queue():
global playback_queue, playback_queue_original, playback_queue_index, playback_queue_mode, queue_transition_target_url, playback_queue_loop, playback_queue_shuffle, single_track_loop
playback_queue = []
playback_queue_original = []
playback_queue_index = -1
playback_queue_mode = "app_replace"
queue_transition_target_url = None
playback_queue_loop = False
playback_queue_shuffle = False
single_track_loop = False
def _queue_payload() -> dict:
return {
"active": len(playback_queue) > 1,
"index": playback_queue_index,
"count": len(playback_queue),
"mode": playback_queue_mode,
"tracks": [dict(item) for item in playback_queue],
"loop": playback_queue_loop or single_track_loop,
"shuffle": playback_queue_shuffle,
}
def _should_use_mpv_native_queue(ordered_tracks: list[dict]) -> bool:
if len(ordered_tracks) <= 1:
return False
sample_rates = set()
for track in ordered_tracks:
if track.get("source") != "local":
return False
if not track.get("url"):
return False
sample_rate_hz = track.get("sample_rate_hz")
if not isinstance(sample_rate_hz, int) or sample_rate_hz <= 0:
return False
sample_rates.add(sample_rate_hz)
return len(sample_rates) == 1
def _sync_track_context_from_queue_index(index: int) -> Optional[dict]:
global current_track_info, last_track_info, playback_queue_index
if index < 0 or index >= len(playback_queue):
return None
playback_queue_index = index
track = dict(playback_queue[index])
current_track_info = track
last_track_info = track
return track
def _reset_mpv_loop_state() -> None:
if not player_instance or not player_instance._running:
return
player_instance.set_loop_playlist(False)
player_instance.set_loop_file(False)
def _prime_mpv_native_queue(start_index: int) -> bool:
if len(playback_queue) <= 1:
return False
first_url = playback_queue[0].get("url")
if not first_url:
return False
player_instance.set_pause(True)
player_instance.loadfile(first_url, mode="replace")
for item in playback_queue[1:]:
item_url = item.get("url")
if not item_url:
return False
player_instance.loadfile(item_url, mode="append")
if start_index > 0:
player_instance.set_playlist_pos(start_index)
player_instance.set_loop_playlist(playback_queue_loop)
player_instance.set_loop_file(False)
player_instance.set_pause(False)
return True
def _trim_mpv_native_queue_to_current() -> None:
if not player_instance or not player_instance._running:
return
current_index = playback_queue_index
playlist_count = player_instance.get_property("playlist-count")
if not isinstance(current_index, int) or current_index < 0:
return
if not isinstance(playlist_count, int) or playlist_count <= 1:
player_instance.set_loop_playlist(False)
return
for index in range(playlist_count - 1, -1, -1):
if index == current_index:
continue
player_instance.remove_playlist_index(index)
player_instance.set_loop_playlist(False)
def _should_apply_hard_handoff_for_requested_play(*, requested_source: str, previous_source: Optional[str], previous_file: Optional[str], next_url: Optional[str]) -> tuple[bool, Optional[str]]:
if not previous_file or not next_url or previous_file == next_url:
return False, None
if requested_source == "local" and previous_source == "local":
return True, "manual local track switch"
if requested_source in {"local", "radio"} and previous_source in {"local", "radio"} and requested_source != previous_source:
return True, f"source change {previous_source}->{requested_source}"
return False, None
def _current_track_matches(expected_track: dict | None) -> bool:
if not expected_track:
return False
live_track = current_track_info or {}
return (
live_track.get("source") == expected_track.get("source")
and live_track.get("url") == expected_track.get("url")
and live_track.get("id") == expected_track.get("id")
)
def _get_player_audio_samplerate() -> Optional[int]:
global player_instance
if not player_instance or not player_instance._running:
return None
try:
audio_params = player_instance.get_property("audio-params")
except Exception as exc:
logger.debug("Failed to read mpv audio-params: %s", exc)
return None
if not isinstance(audio_params, dict):
return None
rate = audio_params.get("samplerate")
return rate if isinstance(rate, int) and rate > 0 else None
async def _maybe_renegotiate_radio_samplerate(expected_track: dict | None) -> None:
global easyeffects_manager, easyeffects_preset_load_lock
if not expected_track or expected_track.get("source") != "radio":
return
if not easyeffects_manager or not player_instance or not player_instance._running:
return
await asyncio.sleep(RADIO_SAMPLERATE_RENEGOTIATE_DELAY_MS / 1000)
if not _current_track_matches(expected_track):
return
mpv_rate = _get_player_audio_samplerate()
if not mpv_rate:
return
try:
samplerate_status = get_samplerate_status()
except Exception as exc:
logger.debug("Radio samplerate renegotiation check failed: %s", exc)
return
sink_rate = samplerate_status.get("active_rate")
if not isinstance(sink_rate, int) or sink_rate <= 0 or sink_rate == mpv_rate:
return
active_preset = easyeffects_manager.get_active_preset()
if not active_preset:
return
bounce_preset = "Neutral" if active_preset != "Neutral" else "Direct"
logger.info(
"Radio samplerate mismatch detected, bouncing EasyEffects preset via %s: preset=%s mpv_rate=%s sink_rate=%s track=%s",
bounce_preset,
active_preset,
mpv_rate,
sink_rate,
expected_track.get("url"),
)
if easyeffects_preset_load_lock is None:
easyeffects_preset_load_lock = asyncio.Lock()
try:
async with easyeffects_preset_load_lock:
if not _current_track_matches(expected_track):
return
easyeffects_manager.load_preset(bounce_preset)
await asyncio.sleep(RADIO_SAMPLERATE_PRESET_BOUNCE_DELAY_MS / 1000)
if not _current_track_matches(expected_track):
return
easyeffects_manager.load_preset(active_preset)
status = easyeffects_manager.get_status()
await manager.broadcast({"type": "easyeffects", "data": status})
await refresh_peak_monitor_after_effects_change("radio-samplerate-renegotiate")
final_status = get_samplerate_status()
logger.info(
"Radio samplerate renegotiation finished: preset=%s final_sink_rate=%s mpv_rate=%s",
active_preset,
final_status.get("active_rate"),
mpv_rate,
)
except Exception as exc:
logger.warning("Radio samplerate renegotiation failed for %s: %s", expected_track.get("url"), exc)
def _prepare_local_queue(track_id: str, queue_track_ids: Optional[list[str]] = None, shuffle: bool = False, loop: bool = False):
global playback_queue, playback_queue_original, playback_queue_index, playback_queue_mode, playback_queue_loop, playback_queue_shuffle, single_track_loop
playback_queue = []
playback_queue_original = []
playback_queue_index = -1
playback_queue_mode = "app_replace"
playback_queue_loop = False
playback_queue_shuffle = False
single_track_loop = False
tracks = library_scanner.get_tracks()
tracks_by_id = {track.id: track for track in tracks}
selected_ids = [track_id]
if queue_track_ids:
selected_ids = [candidate for candidate in queue_track_ids if candidate in tracks_by_id]
if track_id not in selected_ids:
selected_ids.insert(0, track_id)
ordered_tracks = [tracks_by_id[track.id].to_dict() for track in tracks if track.id in set(selected_ids)]
if not ordered_tracks:
raise HTTPException(status_code=404, detail="Track not found")
original_tracks = [dict(track) for track in ordered_tracks]
if shuffle and len(ordered_tracks) > 1:
current_track = next((track for track in ordered_tracks if track.get("id") == track_id), ordered_tracks[0])
remaining = [track for track in ordered_tracks if track.get("id") != current_track.get("id")]
random.shuffle(remaining)
ordered_tracks = [current_track] + remaining
playback_queue = ordered_tracks if len(ordered_tracks) > 1 else []
playback_queue_original = original_tracks if len(original_tracks) > 1 else []
playback_queue_mode = "mpv_native" if _should_use_mpv_native_queue(playback_queue) else "app_replace"
playback_queue_loop = bool(loop and len(ordered_tracks) > 1)
playback_queue_shuffle = bool(shuffle and len(ordered_tracks) > 1)
single_track_loop = bool(loop and len(ordered_tracks) == 1)
playback_queue_index = -1
if playback_queue:
for index, item in enumerate(playback_queue):
if item.get("id") == track_id:
return _sync_track_context_from_queue_index(index)
return _sync_track_context_from_queue_index(0)
return ordered_tracks[0]
async def _load_queue_track(index: int, *, transition_reason: str = "queue navigation") -> bool:
global queue_transition_target_url
if len(playback_queue) <= 1:
return False
if index < 0 or index >= len(playback_queue):
return False
next_track = dict(playback_queue[index])
next_url = next_track.get("url")
if not next_url:
_clear_playback_queue()
return False
player_state = player_instance.state
previous_track_context = dict(current_track_info or {})
previous_file = player_state.get("current_file") or previous_track_context.get("url")
previous_source = previous_track_context.get("source")
synced_track = _sync_track_context_from_queue_index(index)
if not synced_track:
return False
if playback_queue_mode == "mpv_native":
queue_transition_target_url = next_url
try:
player_instance.set_playlist_pos(index)
player_instance.set_pause(False)
return True
except Exception:
queue_transition_target_url = None
raise
apply_hard_handoff, handoff_reason = _should_apply_hard_handoff_for_requested_play(
requested_source="local",
previous_source=previous_source,
previous_file=previous_file,
next_url=next_url,
)
if apply_hard_handoff:
logger.info(
"Applying hard handoff before %s (%s): %s -> %s",
transition_reason,
handoff_reason,
previous_file,
next_url,
)
player_instance.stop_playback()
settle_ms = LOCAL_TRACK_SWITCH_SETTLE_MS if handoff_reason == "manual local track switch" else SOURCE_HANDOFF_SETTLE_MS
await asyncio.sleep(settle_ms / 1000)
queue_transition_target_url = next_url
try:
player_instance.loadfile(next_url, mode="replace")
player_instance.set_pause(False)
return True
except Exception:
queue_transition_target_url = None
raise
async def _advance_playback_queue(*, transition_reason: str = "queue advance") -> bool:
global playback_queue_index
if len(playback_queue) <= 1:
return False
next_index = playback_queue_index + 1
if next_index >= len(playback_queue):
if playback_queue_loop:
next_index = 0
else:
_clear_playback_queue()
return False
return await _load_queue_track(next_index, transition_reason=transition_reason)
async def _rewind_playback_queue(*, transition_reason: str = "queue rewind") -> bool:
if len(playback_queue) <= 1:
return False
prev_index = playback_queue_index - 1
if prev_index < 0:
return False
return await _load_queue_track(prev_index, transition_reason=transition_reason)
def _set_queue_shuffle(enabled: bool) -> bool:
global playback_queue, playback_queue_original, playback_queue_index, playback_queue_shuffle
if len(playback_queue) <= 1:
playback_queue_shuffle = False
return False
playback_queue_shuffle = bool(enabled)
current_index = playback_queue_index if 0 <= playback_queue_index < len(playback_queue) else 0
current_track_id = (playback_queue[current_index] or {}).get("id") if playback_queue else None
if enabled:
prefix = playback_queue[: current_index + 1]
remaining = playback_queue[current_index + 1 :]
random.shuffle(remaining)
playback_queue = prefix + remaining
if playback_queue_mode == "mpv_native" and player_instance and player_instance._running:
_prime_mpv_native_queue(current_index)
elif playback_queue_original:
playback_queue = [dict(track) for track in playback_queue_original]
if current_track_id:
playback_queue_index = next((index for index, track in enumerate(playback_queue) if track.get("id") == current_track_id), current_index)
if playback_queue_mode == "mpv_native" and player_instance and player_instance._running:
_prime_mpv_native_queue(playback_queue_index if playback_queue_index >= 0 else 0)
return True
def _set_queue_loop(enabled: bool) -> bool:
global playback_queue_loop, single_track_loop
has_local_track = bool(current_track_info and current_track_info.get("source") == "local")
if not has_local_track:
playback_queue_loop = False
single_track_loop = False
return False
if len(playback_queue) > 1:
playback_queue_loop = bool(enabled)
if playback_queue_mode == "mpv_native" and player_instance and player_instance._running:
player_instance.set_loop_playlist(playback_queue_loop)
single_track_loop = False
return True
single_track_loop = bool(enabled)
playback_queue_loop = False
return True
def _sync_active_local_queue_selection(queue_track_ids: Optional[list[str]] = None, shuffle: bool = False, loop: bool = False) -> dict:
global current_track_info, last_track_info, playback_queue_mode
current_track = dict(current_track_info or {})
if current_track.get("source") != "local" or not current_track.get("id"):
raise HTTPException(status_code=409, detail="Local playback is not active")
player_state = player_instance.state if player_instance else {}
if not player_state.get("current_file") or player_state.get("ended"):
raise HTTPException(status_code=409, detail="Nothing is currently loaded to update")
if playback_queue_mode == "mpv_native" and len(playback_queue) > 1:
_trim_mpv_native_queue_to_current()
track_info = _prepare_local_queue(
current_track["id"],
queue_track_ids,
shuffle=shuffle,
loop=loop,
)
current_track_info = track_info
last_track_info = track_info
if len(playback_queue) > 1:
playback_queue_mode = "app_replace"
if player_instance and player_instance._running:
_reset_mpv_loop_state()
return build_playback_payload(player_state)
def ensure_local_source_volume() -> None:
global player_instance
if not player_instance or not player_instance._running:
return
try:
player_instance.set_volume(100)
except Exception as exc:
logger.warning("Failed to pin MPV source volume to 100%%: %s", exc)
def get_output_volume_safe(default: int = 100) -> int:
try:
return get_output_volume()
except Exception as exc:
logger.warning("Failed to read output volume, using fallback %s: %s", default, exc)
return default
async def get_spotify_ui_state(data: Optional[dict] = None) -> dict:
status = dict(data or await spotify_get_status())
source_volume = status.get("volume") if isinstance(status.get("volume"), (int, float)) else None
status["source_volume"] = int(round(float(source_volume))) if source_volume is not None else None
status["volume"] = get_output_volume_safe(status.get("source_volume") or 100)
return status
def build_playback_payload(state: Optional[dict] = None) -> dict:
global current_track_info, easyeffects_manager, player_instance, peak_monitor
playback_state = dict(state or (player_instance.state if player_instance else {}))
source_volume = playback_state.get("volume") if isinstance(playback_state.get("volume"), (int, float)) else None
if current_track_info and current_track_info.get("source") in {"local", "radio"}:
playback_state["source_volume"] = int(round(float(source_volume))) if source_volume is not None else None
elif source_volume is not None:
playback_state["source_volume"] = int(round(float(source_volume)))
playback_state["volume"] = get_output_volume_safe(int(round(float(source_volume))) if source_volume is not None else 100)
playback_state["current_track"] = current_track_info
playback_state["queue"] = _queue_payload()
live_title = None
if player_instance and current_track_info and current_track_info.get("source") == "radio":
metadata = player_instance.get_metadata() if playback_state.get("current_file") else {}
title = (metadata.get("icy-title") or metadata.get("title") or "").strip()
if title:
live_title = title
playback_state["metadata"] = metadata
playback_state["live_title"] = live_title
playback_state["output_peak_warning"] = peak_monitor.snapshot() if peak_monitor else {
"available": False,
"detected": False,
"hold_ms": 0,
"threshold": 1.0,
"vu_db": None,
"target": None,
"last_over_at": None,
"last_error": None,
}
# Keep playback/status payloads lightweight. EasyEffects has dedicated
# endpoints and websocket updates, and pulling full EasyEffects status here
# can stall frequent /api/status polling during playback.
return playback_state
async def on_peak_monitor_change(snapshot: dict):
await manager.broadcast({"type": "playback_peak_warning", "data": snapshot})
async def sync_peak_monitor_for_playback_state(state: dict):
global peak_monitor_playback_armed, peak_monitor, peak_monitor_transition_lock, peak_monitor_context_signature, current_track_info
if not peak_monitor:
return
if peak_monitor_transition_lock is None:
peak_monitor_transition_lock = asyncio.Lock()
async with peak_monitor_transition_lock:
is_active_playback = bool(state.get("current_file") and not state.get("paused") and not state.get("ended"))
source = (current_track_info or {}).get("source") or "unknown"
desired_signature = f"player:{source}:{state.get('current_file') or ''}" if is_active_playback else None
if is_active_playback and (not peak_monitor_playback_armed or peak_monitor_context_signature != desired_signature):
peak_monitor_playback_armed = True
peak_monitor_context_signature = desired_signature
logger.info("Restarting peak monitor on playback context change to refresh PipeWire links: %s", desired_signature)
await peak_monitor.restart()
await manager.broadcast({"type": "playback_peak_warning", "data": peak_monitor.snapshot()})
elif not is_active_playback and peak_monitor_playback_armed:
spotify_state = await get_spotify_ui_state()
if spotify_state.get("available") and spotify_state.get("status") == "Playing":
return
logger.info("Stopping peak monitor while playback is inactive")
await peak_monitor.stop()
peak_monitor_playback_armed = False
peak_monitor_context_signature = None
await manager.broadcast({"type": "playback_peak_warning", "data": peak_monitor.snapshot()})
async def sync_peak_monitor_for_spotify_state(data: dict):
global peak_monitor_playback_armed, peak_monitor, player_instance, peak_monitor_transition_lock, peak_monitor_context_signature
if not peak_monitor:
return
if peak_monitor_transition_lock is None:
peak_monitor_transition_lock = asyncio.Lock()
async with peak_monitor_transition_lock:
player_state = player_instance.state if player_instance else {}
is_spotify_playing = bool(data.get("available") and data.get("status") == "Playing")
desired_signature = "spotify:playing" if is_spotify_playing else None
if is_spotify_playing and (not peak_monitor_playback_armed or peak_monitor_context_signature != desired_signature):
peak_monitor_playback_armed = True
peak_monitor_context_signature = desired_signature
logger.info("Starting peak monitor for active Spotify playback")
await peak_monitor.restart()
await manager.broadcast({"type": "playback_peak_warning", "data": peak_monitor.snapshot()})
elif not is_spotify_playing and peak_monitor_playback_armed:
if not (player_state.get("current_file") and not player_state.get("paused") and not player_state.get("ended")):
logger.info("Stopping peak monitor because Spotify is no longer actively playing")
await peak_monitor.stop()
peak_monitor_playback_armed = False
peak_monitor_context_signature = None
await manager.broadcast({"type": "playback_peak_warning", "data": peak_monitor.snapshot()})
async def sync_peak_monitor_for_source_mode_state(source_overview: dict | None = None):
global peak_monitor_playback_armed, peak_monitor, player_instance, peak_monitor_transition_lock, peak_monitor_context_signature
if not peak_monitor:
return
if peak_monitor_transition_lock is None:
peak_monitor_transition_lock = asyncio.Lock()
async with peak_monitor_transition_lock:
overview = source_overview or get_audio_source_overview()
bluetooth = overview.get("bluetooth") or {}
is_bt_streaming = bool(
overview.get("mode") == SOURCE_MODE_BLUETOOTH_INPUT
and bluetooth.get("state") == "streaming"
and bluetooth.get("connected_device")
)
desired_signature = None
if is_bt_streaming:
desired_signature = f"bluetooth:{bluetooth.get('connected_device')}:{bluetooth.get('active_codec') or ''}"
if is_bt_streaming and (not peak_monitor_playback_armed or peak_monitor_context_signature != desired_signature):
peak_monitor_playback_armed = True
peak_monitor_context_signature = desired_signature
logger.info("Starting peak monitor for active Bluetooth input: %s", desired_signature)
await peak_monitor.restart()
await manager.broadcast({"type": "playback_peak_warning", "data": peak_monitor.snapshot()})
elif (not is_bt_streaming) and peak_monitor_playback_armed and str(peak_monitor_context_signature or "").startswith("bluetooth:"):
player_state = player_instance.state if player_instance else {}
spotify_state = await get_spotify_ui_state()
is_local_playing = bool(player_state.get("current_file") and not player_state.get("paused") and not player_state.get("ended"))
is_spotify_playing = bool(spotify_state.get("available") and spotify_state.get("status") == "Playing")
if not is_local_playing and not is_spotify_playing:
logger.info("Stopping peak monitor because Bluetooth input is no longer actively streaming")
await peak_monitor.stop()
peak_monitor_playback_armed = False
peak_monitor_context_signature = None
await manager.broadcast({"type": "playback_peak_warning", "data": peak_monitor.snapshot()})
async def refresh_peak_monitor_after_effects_change(reason: str = "effects-change"):
global peak_monitor, peak_monitor_playback_armed, peak_monitor_context_signature, player_instance
if not peak_monitor or not peak_monitor_playback_armed:
return
player_state = player_instance.state if player_instance else {}
spotify_state = await get_spotify_ui_state()
is_local_playing = bool(player_state.get("current_file") and not player_state.get("paused") and not player_state.get("ended"))
is_spotify_playing = bool(spotify_state.get("available") and spotify_state.get("status") == "Playing")
if not is_local_playing and not is_spotify_playing:
return
logger.info("Refreshing peak monitor after %s", reason)
peak_monitor_context_signature = None
await asyncio.sleep(0.1)
if is_spotify_playing:
await sync_peak_monitor_for_spotify_state(spotify_state)
elif is_local_playing:
await sync_peak_monitor_for_playback_state(player_state)
# Callback functions
async def on_player_state_change(state: dict):
global queue_advancing, playback_queue_index, current_track_info, last_track_info, queue_transition_target_url
if queue_transition_target_url:
current_file = state.get("current_file")
if current_file == queue_transition_target_url and not state.get("ended"):
queue_transition_target_url = None
elif current_file and current_file != queue_transition_target_url:
queue_transition_target_url = None
if playback_queue_mode == "mpv_native" and len(playback_queue) > 1:
native_index = state.get("playlist_pos")
if not isinstance(native_index, int):
current_file = state.get("current_file")
native_index = next((idx for idx, item in enumerate(playback_queue) if item.get("url") == current_file), None)
if isinstance(native_index, int) and 0 <= native_index < len(playback_queue):
if playback_queue_index != native_index or (current_track_info or {}).get("id") != playback_queue[native_index].get("id"):
playback_queue_index = native_index
current_track_info = dict(playback_queue[native_index])
last_track_info = dict(playback_queue[native_index])
if (
not queue_advancing
and not queue_transition_target_url
and state.get("ended")
and not state.get("current_file")
and current_track_info
and current_track_info.get("source") == "local"
):
queue_advancing = True
try:
if len(playback_queue) > 1 and await _advance_playback_queue(transition_reason="queue auto-advance"):
return
if single_track_loop and current_track_info and current_track_info.get("url"):
player_instance.loadfile(current_track_info["url"], mode="replace")
return
finally:
queue_advancing = False
await sync_peak_monitor_for_playback_state(state)
await manager.broadcast({"type": "playback", "data": build_playback_payload(state)})
async def on_download_progress(progress):
data = progress.to_dict() if hasattr(progress, "to_dict") else progress
await manager.broadcast({"type": "download", "data": data})
status = (data or {}).get("status")
if status == "complete":
global library_scanner
if library_scanner:
library_scanner.refresh(force=True)
await manager.broadcast({"type": "download_complete", "data": data})
elif status == "error":
await manager.broadcast({"type": "download_error", "data": data})
async def broadcast_spotify_state(data=None):
global latest_spotify_state
data = await get_spotify_ui_state(data)
latest_spotify_state = data
await sync_peak_monitor_for_spotify_state(data)
await manager.broadcast({"type": "spotify", "data": data})
return data
async def pause_spotify_for_local_playback_broadcast():
try:
import shutil
pc = shutil.which("playerctl")
if pc:
proc = await asyncio.create_subprocess_exec(pc, "--player=spotify", "pause")
await asyncio.wait_for(proc.communicate(), timeout=3)
except Exception:
pass
try:
await broadcast_spotify_state()
except Exception:
pass
async def pause_local_playback_for_spotify_broadcast():
global player_instance
try:
if player_instance and player_instance._running:
player_instance.stop_playback()
await manager.broadcast({"type": "playback", "data": build_playback_payload(player_instance.state)})
await asyncio.sleep(0.2)
except Exception:
pass
async def _run_pactl_command(*args: str) -> str:
proc = await asyncio.create_subprocess_exec(
"pactl", *args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(stderr.decode(errors="ignore").strip() or f"pactl {' '.join(args)} failed")
return stdout.decode(errors="ignore").strip()
async def _run_pw_link_command(*args: str) -> str:
proc = await asyncio.create_subprocess_exec(
"pw-link", *args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(stderr.decode(errors="ignore").strip() or f"pw-link {' '.join(args)} failed")
return stdout.decode(errors="ignore").strip()
async def _disconnect_external_input_source(source_name: str | None) -> None:
normalized = (source_name or "").strip()
if not normalized:
return
for channel in ("FL", "FR"):
source_port = f"{normalized}:capture_{channel}"
sink_port = f"easyeffects_sink:playback_{channel}"
try:
await _run_pw_link_command("-d", source_port, sink_port)
except Exception:
pass
async def _disable_external_input_loopback() -> None:
global external_input_loopback_module_id, external_input_loopback_source_name
previous_source = external_input_loopback_source_name
if external_input_loopback_module_id is not None:
try:
await _run_pactl_command("unload-module", str(external_input_loopback_module_id))
logger.info("Disabled legacy external-input loopback module %s", external_input_loopback_module_id)
except Exception as exc:
logger.warning("Failed to unload legacy external-input loopback module %s: %s", external_input_loopback_module_id, exc)
await _disconnect_external_input_source(previous_source)
external_input_loopback_module_id = None
external_input_loopback_source_name = None
async def _ensure_external_input_loopback(source_name: str) -> None:
global external_input_loopback_module_id, external_input_loopback_source_name
normalized = (source_name or "").strip()
if not normalized:
raise RuntimeError("Missing source name for external-input monitoring")
if external_input_loopback_source_name == normalized:
return
await _disable_external_input_loopback()
for channel in ("FL", "FR"):
source_port = f"{normalized}:capture_{channel}"
sink_port = f"easyeffects_sink:playback_{channel}"
try:
await _run_pw_link_command(source_port, sink_port)
except Exception as exc:
message = str(exc).lower()
if "file exists" not in message and "exists" not in message:
raise
external_input_loopback_module_id = None
external_input_loopback_source_name = normalized
logger.info("Enabled direct external-input monitoring from %s to easyeffects_sink", normalized)
async def _sync_external_input_monitoring(source_overview: dict | None = None) -> dict:
overview = source_overview or get_audio_source_overview()
if overview.get("mode") != SOURCE_MODE_EXTERNAL_INPUT:
await _disable_external_input_loopback()
return overview
current_input = overview.get("selected_input") or overview.get("current_input") or {}
source_name = current_input.get("source_key") or current_input.get("name")