11import html
22from hashlib import sha256
3+ from asyncio import FIRST_COMPLETED , Task , create_task , wait
4+ from typing import AsyncIterator , Collection , TypeVar
35
46from aiohttp import ClientSession
57from cache import AsyncLRU
8+ from pyytlounge .wrapper import api_base
69
7- from . import constants , dial_client
10+ from . import chromecast_client , constants , dial_client
811from .conditional_ttl_cache import AsyncConditionalTTL
912
1013
14+ _T = TypeVar ("_T" )
15+
16+
1117def list_to_tuple (function ):
1218 def wrapper (* args ):
1319 args = [tuple (x ) if isinstance (x , list ) else x for x in args ]
@@ -18,6 +24,33 @@ def wrapper(*args):
1824 return wrapper
1925
2026
27+ async def _await_next (iterator : AsyncIterator [_T ]) -> _T :
28+ return await iterator .__anext__ ()
29+
30+
31+ def _as_task (iterator : AsyncIterator [_T ]) -> Task [_T ]:
32+ return create_task (_await_next (iterator ))
33+
34+
35+ async def merge_iterators (iterators : Collection [AsyncIterator [_T ]]) -> AsyncIterator [_T ]:
36+ pending_tasks = {_as_task (iterator ): iterator for iterator in iterators }
37+ try :
38+ while pending_tasks :
39+ done , _ = await wait (pending_tasks , return_when = FIRST_COMPLETED )
40+ for task in done :
41+ iterator = pending_tasks .pop (task )
42+ try :
43+ yield task .result ()
44+ except StopAsyncIteration :
45+ continue
46+ except Exception :
47+ continue
48+ pending_tasks [_as_task (iterator )] = iterator
49+ finally :
50+ for task in pending_tasks :
51+ task .cancel ()
52+
53+
2154# Class that handles all the api calls and their cache
2255class ApiHelper :
2356 def __init__ (self , config , web_session : ClientSession ) -> None :
@@ -29,6 +62,37 @@ def __init__(self, config, web_session: ClientSession) -> None:
2962 self .num_devices = len (config .devices )
3063 self .minimum_skip_length = config .minimum_skip_length
3164
65+ @staticmethod
66+ def _normalize_pairing_code (pairing_code ):
67+ return str (pairing_code ).replace ("-" , "" ).replace (" " , "" )
68+
69+ async def pair_with_code (self , pairing_code ):
70+ normalized_code = self ._normalize_pairing_code (pairing_code )
71+ pair_url = f"{ api_base } /pairing/get_screen"
72+ pair_data = {"pairing_code" : normalized_code }
73+
74+ async with self .web_session .post (url = pair_url , data = pair_data ) as response :
75+ if response .status != 200 :
76+ return None
77+ try :
78+ pair_response = await response .json ()
79+ except BaseException :
80+ return None
81+
82+ screen = pair_response .get ("screen" )
83+ if not screen :
84+ return None
85+
86+ screen_id = screen .get ("screenId" )
87+ if not screen_id :
88+ return None
89+
90+ return {
91+ "screen_id" : screen_id ,
92+ "name" : screen .get ("name" ),
93+ "lounge_token" : screen .get ("loungeToken" ),
94+ }
95+
3296 # Not used anymore, maybe it can stay here a little longer
3397 @AsyncLRU (maxsize = 10 )
3498 async def get_vid_id (self , title , artist , api_key , web_session ):
@@ -204,8 +268,21 @@ async def mark_viewed_segments(self, uuids):
204268 params = {"UUID" : i }
205269 await self .web_session .post (url , params = params )
206270
207- async def discover_youtube_devices_dial (self ):
208- """Discovers YouTube devices using DIAL"""
209- dial_screens = await dial_client .discover (self .web_session )
210- # print(dial_screens)
211- return dial_screens
271+ async def discover_youtube_devices_dial (self , active = True ):
272+ """Discovers YouTube devices using DIAL and Chromecast discovery.
273+
274+ Yields devices as they are discovered instead of waiting for the full
275+ discovery cycle to finish.
276+ """
277+ seen_screen_ids = set ()
278+ async for device in merge_iterators (
279+ [
280+ dial_client .discover (self .web_session , self , active = active ),
281+ chromecast_client .discover (active = active ),
282+ ]
283+ ):
284+ screen_id = device .get ("screen_id" )
285+ if not screen_id or screen_id in seen_screen_ids :
286+ continue
287+ seen_screen_ids .add (screen_id )
288+ yield device
0 commit comments