1212
1313TAG_ID = b"\x05 "
1414
15+
1516def get_argparser ():
1617 parser = argparse .ArgumentParser (description = "Manage ledger ble devices." )
17- parser .add_argument ("--show" , help = "Show currently selected ledger device." , action = 'store_true' )
18- parser .add_argument ("--demo" , help = "Get version demo (connect to ble device, send get version, print response and disconnect)." , action = 'store_true' )
18+ parser .add_argument (
19+ "--show" , help = "Show currently selected ledger device." , action = "store_true"
20+ )
21+ parser .add_argument (
22+ "--demo" ,
23+ help = "Get version demo (connect to ble device, send get version, print response and disconnect)." ,
24+ action = "store_true" ,
25+ )
1926 return parser
2027
28+
2129class NoLedgerDeviceDetected (Exception ):
2230 pass
2331
32+
2433class BleScanner (object ):
2534 def __init__ (self ):
2635 self .devices = []
2736
2837 def __scan_callback (self , device : BLEDevice , advertisement_data : AdvertisementData ):
29- if LEDGER_SERVICE_UUID_STAX in advertisement_data .service_uuids or LEDGER_SERVICE_UUID_NANOX in advertisement_data .service_uuids or LEDGER_SERVICE_UUID_EUROPA in advertisement_data .service_uuids :
38+ if (
39+ LEDGER_SERVICE_UUID_STAX in advertisement_data .service_uuids
40+ or LEDGER_SERVICE_UUID_NANOX in advertisement_data .service_uuids
41+ or LEDGER_SERVICE_UUID_EUROPA in advertisement_data .service_uuids
42+ ):
3043 device_is_in_list = False
3144 for dev in self .devices :
3245 if device .address == dev [0 ]:
@@ -45,12 +58,15 @@ async def scan(self):
4558 counter += 1
4659 await scanner .stop ()
4760
61+
4862queue : asyncio .Queue = asyncio .Queue ()
4963
64+
5065def callback (sender , data ):
5166 response = bytes (data )
5267 queue .put_nowait (response )
5368
69+
5470async def _get_client (address : str ) -> BleakClient :
5571 # Connect to client
5672 client = BleakClient (address )
@@ -60,7 +76,11 @@ async def _get_client(address: str) -> BleakClient:
6076 characteristic_write_with_rsp = None
6177 characteristic_write_cmd = None
6278 for service in client .services :
63- if service .uuid in [LEDGER_SERVICE_UUID_NANOX , LEDGER_SERVICE_UUID_STAX , LEDGER_SERVICE_UUID_EUROPA ]:
79+ if service .uuid in [
80+ LEDGER_SERVICE_UUID_NANOX ,
81+ LEDGER_SERVICE_UUID_STAX ,
82+ LEDGER_SERVICE_UUID_EUROPA ,
83+ ]:
6484 for char in service .characteristics :
6585 if "0001" in char .uuid :
6686 characteristic_notify = char
@@ -86,44 +106,45 @@ async def _get_client(address: str) -> BleakClient:
86106 # Get MTU value
87107 await client .write_gatt_char (characteristic_write .uuid , bytes .fromhex ("0800000000" ))
88108 response = await queue .get ()
89- mtu = int .from_bytes (response [5 :6 ], ' big' )
109+ mtu = int .from_bytes (response [5 :6 ], " big" )
90110
91111 print ("[BLE] MTU {:d}" .format (mtu ))
92112
93113 return client , mtu , characteristic_write
94114
115+
95116async def _read (mtu ) -> bytes :
96117 response = await queue .get ()
97118 assert len (response ) >= 5
98119 assert response [0 ] == TAG_ID [0 ]
99- sequence = int .from_bytes (response [1 :3 ], ' big' )
120+ sequence = int .from_bytes (response [1 :3 ], " big" )
100121 assert sequence == 0
101122 total_size = int .from_bytes (response [3 :5 ], "big" )
102123
103124 total_size_bkup = total_size
104- if total_size >= (mtu - 5 ):
125+ if total_size >= (mtu - 5 ):
105126 apdu = response [5 :mtu ]
106- total_size -= ( mtu - 5 )
127+ total_size -= mtu - 5
107128
108129 response = await queue .get ()
109130 assert response [0 ] == TAG_ID [0 ]
110131 assert len (response ) >= 3
111- next_sequence = int .from_bytes (response [1 :3 ], ' big' )
112- assert next_sequence == sequence + 1
132+ next_sequence = int .from_bytes (response [1 :3 ], " big" )
133+ assert next_sequence == sequence + 1
113134 sequence = next_sequence
114135
115- while total_size >= (mtu - 3 ):
136+ while total_size >= (mtu - 3 ):
116137 apdu += response [3 :mtu ]
117- total_size -= ( mtu - 3 )
138+ total_size -= mtu - 3
118139 response = await queue .get ()
119140 assert response [0 ] == TAG_ID [0 ]
120141 assert len (response ) >= 3
121- sequence = int .from_bytes (response [1 :3 ], ' big' )
122- assert next_sequence == sequence + 1
142+ sequence = int .from_bytes (response [1 :3 ], " big" )
143+ assert next_sequence == sequence + 1
123144 sequence = next_sequence
124145
125146 if total_size > 0 :
126- apdu += response [3 : 3 + total_size ]
147+ apdu += response [3 : 3 + total_size ]
127148 else :
128149 apdu = response [5 :]
129150
@@ -162,7 +183,9 @@ def __init__(self, address):
162183
163184 def open (self ):
164185 self .loop = asyncio .new_event_loop ()
165- self .client , self .mtu , self .write_characteristic = self .loop .run_until_complete (_get_client (self .address ))
186+ self .client , self .mtu , self .write_characteristic = self .loop .run_until_complete (
187+ _get_client (self .address )
188+ )
166189 self .opened = True
167190
168191 def close (self ):
@@ -172,7 +195,9 @@ def close(self):
172195 self .loop .close ()
173196
174197 def __write (self , data : bytes ):
175- self .loop .run_until_complete (_write (self .client , data , self .write_characteristic , self .mtu ))
198+ self .loop .run_until_complete (
199+ _write (self .client , data , self .write_characteristic , self .mtu )
200+ )
176201
177202 def __read (self ) -> bytes :
178203 return self .loop .run_until_complete (_read (self .mtu ))
@@ -181,18 +206,25 @@ def exchange(self, data: bytes, timeout=1000) -> bytes:
181206 self .__write (data )
182207 return self .__read ()
183208
209+
184210if __name__ == "__main__" :
185211 args = get_argparser ().parse_args ()
186212 try :
187213 if args .show :
188- print (f"Environment variable LEDGER_BLE_MAC currently set to '{ os .environ ['LEDGER_BLE_MAC' ]} '" )
214+ print (
215+ f"Environment variable LEDGER_BLE_MAC currently set to '{ os .environ ['LEDGER_BLE_MAC' ]} '"
216+ )
189217 elif args .demo :
190218 try :
191- ble_device = BleDevice (os .environ [' LEDGER_BLE_MAC' ])
219+ ble_device = BleDevice (os .environ [" LEDGER_BLE_MAC" ])
192220 except Exception as ex :
193- print (f"Please run 'python -m ledgerblue.BleComm' to select wich device to connect to" )
221+ print (
222+ f"Please run 'python -m ledgerblue.BleComm' to select wich device to connect to"
223+ )
194224 raise ex
195- print ("-----------------------------Get version BLE demo------------------------------" )
225+ print (
226+ "-----------------------------Get version BLE demo------------------------------"
227+ )
196228 ble_device .open ()
197229 print (f"Connected to { ble_device .address } " )
198230 get_version_apdu = bytes .fromhex ("e001000000" )
@@ -201,22 +233,25 @@ def exchange(self, data: bytes, timeout=1000) -> bytes:
201233 print (f"[BLE] <= { result .hex ()} " )
202234 ble_device .close ()
203235 print (f"Disconnected from { ble_device .address } " )
204- print (79 * "-" )
236+ print (79 * "-" )
205237 else :
206238 scanner = BleScanner ()
207239 asyncio .run (scanner .scan ())
208240 devices_str = ""
209241 device_idx = 0
210242 if len (scanner .devices ):
211243 for device in scanner .devices :
212- devices_str += f" -{ device_idx + 1 } - mac=\" { device [0 ]} \" name=\" { device [1 ]} \" \n "
244+ devices_str += (
245+ f' -{ device_idx + 1 } - mac="{ device [0 ]} " name="{ device [1 ]} "\n '
246+ )
213247 device_idx += 1
214248 index = int (input (f"Select device by index\n { devices_str } " ))
215- print (f"Please run 'export LEDGER_BLE_MAC=\" { scanner .devices [index - 1 ][0 ]} \" ' to select which device to connect to" )
249+ print (
250+ f"Please run 'export LEDGER_BLE_MAC=\" { scanner .devices [index - 1 ][0 ]} \" ' to select which device to connect to"
251+ )
216252 else :
217253 raise NoLedgerDeviceDetected
218254 except NoLedgerDeviceDetected as ex :
219255 print (ex )
220256 except Exception as ex :
221257 raise ex
222-
0 commit comments