-
Notifications
You must be signed in to change notification settings - Fork 265
Expand file tree
/
Copy pathcommand_builder.py
More file actions
625 lines (562 loc) · 24.6 KB
/
command_builder.py
File metadata and controls
625 lines (562 loc) · 24.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
# documentation about APDU format is available here:
# https://github.com/LedgerHQ/app-ethereum/blob/develop/doc/ethapp.adoc
import struct
import math
from enum import IntEnum
from typing import Optional, Union
from ragger.bip import pack_derivation_path
from .eip712.struct import EIP712FieldType, EIP712TypeDescOffset
class InsType(IntEnum):
GET_PUBLIC_ADDR = 0x02
GET_ETH2_PUBLIC_ADDR = 0x0e
SIGN = 0x04
GET_APP_CONFIGURATION = 0x06
PERSONAL_SIGN = 0x08
PROVIDE_ERC20_TOKEN_INFORMATION = 0x0a
EXTERNAL_PLUGIN_SETUP = 0x12
PROVIDE_NFT_INFORMATION = 0x14
SET_PLUGIN = 0x16
PERFORM_PRIVACY_OPERATION = 0x18
EIP712_SEND_STRUCT_DEF = 0x1a
EIP712_SEND_STRUCT_IMPL = 0x1c
EIP712_SEND_FILTERING = 0x1e
EIP712_SIGN = 0x0c
GET_CHALLENGE = 0x20
PROVIDE_TRUSTED_NAME = 0x22
PROVIDE_ENUM_VALUE = 0x24
PROVIDE_TRANSACTION_INFO = 0x26
PROVIDE_TRANSACTION_FIELD_DESC = 0x28
PROVIDE_PROXY_INFO = 0x2a
PROVIDE_NETWORK_INFORMATION = 0x30
PROVIDE_TX_SIMULATION = 0x32
SIGN_EIP7702_AUTHORIZATION = 0x34
PROVIDE_SAFE_ACCOUNT = 0x36
PROVIDE_GATING = 0x38
PROVIDE_MAP_ENTRY = 0x3A
class P1Type(IntEnum):
COMPLETE_SEND = 0x00
PARTIAL_SEND = 0x01
SIGN_FIRST_CHUNK = 0x00
SIGN_SUBSQT_CHUNK = 0x80
FIRST_CHUNK = 0x01
FOLLOWING_CHUNK = 0x00
OPT_IN_TX_CHECK = 0x01
class P2Type(IntEnum):
STRUCT_NAME = 0x00
STRUCT_FIELD = 0xff
ARRAY = 0x0f
LEGACY_IMPLEM = 0x00
NEW_IMPLEM = 0x01
FILTERING_ACTIVATE = 0x00
FILTERING_DISCARDED_PATH = 0x01
FILTERING_MESSAGE_INFO = 0x0f
FILTERING_CALLDATA_SPENDER = 0xf4
FILTERING_CALLDATA_AMOUNT = 0xf5
FILTERING_CALLDATA_SELECTOR = 0xf6
FILTERING_CALLDATA_CHAIN_ID = 0xf7
FILTERING_CALLDATA_CALLEE = 0xf8
FILTERING_CALLDATA_VALUE = 0xf9
FILTERING_CALLDATA_INFO = 0xfa
FILTERING_TRUSTED_NAME = 0xfb
FILTERING_DATETIME = 0xfc
FILTERING_TOKEN_ADDR_CHECK = 0xfd
FILTERING_AMOUNT_FIELD = 0xfe
FILTERING_RAW = 0xff
NETWORK_CONFIG = 0x00
NETWORK_ICON = 0x01
SIGN_PROCESS_START = 0x00
SIGN_STORE = 0x01
SIGN_START = 0x02
class CommandBuilder:
_CLA: int = 0xE0
def _intToBytes(self, i: int) -> bytes:
if i == 0:
return b"\x00"
return i.to_bytes(math.ceil(i.bit_length() / 8), 'big')
def _serialize(self,
ins: InsType,
p1: int,
p2: int,
cdata: Union[bytes, bytearray] = bytes()) -> bytes:
header = bytearray()
header.append(self._CLA)
header.append(ins)
header.append(p1)
header.append(p2)
header.append(len(cdata))
return bytes(header + cdata)
def eip712_send_struct_def_struct_name(self, name: str) -> bytes:
return self._serialize(InsType.EIP712_SEND_STRUCT_DEF,
P1Type.COMPLETE_SEND,
P2Type.STRUCT_NAME,
name.encode())
def get_app_configuration(self) -> bytes:
return self._serialize(InsType.GET_APP_CONFIGURATION,
0x00,
0x00)
def eip712_send_struct_def_struct_field(self,
field_type: EIP712FieldType,
type_name: str,
type_size: int,
array_levels: list,
key_name: str) -> bytes:
data = bytearray()
typedesc = 0
typedesc |= (len(array_levels) > 0) << EIP712TypeDescOffset.ARRAY
typedesc |= (type_size is not None) << EIP712TypeDescOffset.SIZE
typedesc |= field_type << EIP712TypeDescOffset.TYPE
data.append(typedesc)
if field_type == EIP712FieldType.CUSTOM:
data.append(len(type_name))
data += type_name.encode()
if type_size is not None:
data.append(type_size)
if len(array_levels) > 0:
data.append(len(array_levels))
for level in array_levels:
data.append(0 if level is None else 1)
if level is not None:
data.append(level)
data.append(len(key_name))
data += key_name.encode()
return self._serialize(InsType.EIP712_SEND_STRUCT_DEF,
P1Type.COMPLETE_SEND,
P2Type.STRUCT_FIELD,
data)
def eip712_send_struct_impl_root_struct(self, name: str) -> bytes:
return self._serialize(InsType.EIP712_SEND_STRUCT_IMPL,
P1Type.COMPLETE_SEND,
P2Type.STRUCT_NAME,
name.encode())
def eip712_send_struct_impl_array(self, size: int) -> bytes:
data = bytearray()
data.append(size)
return self._serialize(InsType.EIP712_SEND_STRUCT_IMPL,
P1Type.COMPLETE_SEND,
P2Type.ARRAY,
data)
def eip712_send_struct_impl_struct_field(self, data: bytearray) -> list[bytes]:
chunks = []
# Add a 16-bit integer with the data's byte length (network byte order)
data_w_length = bytearray()
data_w_length += struct.pack(">H", len(data))
data_w_length += data
while len(data_w_length) > 0:
p1 = P1Type.PARTIAL_SEND if len(data_w_length) > 0xff else P1Type.COMPLETE_SEND
chunks.append(self._serialize(InsType.EIP712_SEND_STRUCT_IMPL,
p1,
P2Type.STRUCT_FIELD,
data_w_length[:0xff]))
data_w_length = data_w_length[0xff:]
return chunks
def eip712_sign_new(self, bip32_path: str) -> bytes:
data = pack_derivation_path(bip32_path)
return self._serialize(InsType.EIP712_SIGN,
P1Type.COMPLETE_SEND,
P2Type.NEW_IMPLEM,
data)
def eip712_sign_legacy(self,
bip32_path: str,
domain_hash: bytes,
message_hash: bytes) -> bytes:
data = pack_derivation_path(bip32_path)
data += domain_hash
data += message_hash
return self._serialize(InsType.EIP712_SIGN,
P1Type.COMPLETE_SEND,
P2Type.LEGACY_IMPLEM,
data)
def eip712_filtering_activate(self):
return self._serialize(InsType.EIP712_SEND_FILTERING,
P1Type.COMPLETE_SEND,
P2Type.FILTERING_ACTIVATE,
bytearray())
def _eip712_filtering_send_name(self, name: str, sig: bytes) -> bytes:
data = bytearray()
data.append(len(name))
data += name.encode()
data.append(len(sig))
data += sig
return bytes(data)
def eip712_filtering_discarded_path(self, path: str) -> bytes:
data = bytearray()
data.append(len(path))
data += path.encode()
return self._serialize(InsType.EIP712_SEND_FILTERING,
P1Type.COMPLETE_SEND,
P2Type.FILTERING_DISCARDED_PATH,
data)
def eip712_filtering_message_info(self, name: str, filters_count: int, sig: bytes) -> bytes:
data = bytearray()
data.append(len(name))
data += name.encode()
data.append(filters_count)
data.append(len(sig))
data += sig
return self._serialize(InsType.EIP712_SEND_FILTERING,
P1Type.COMPLETE_SEND,
P2Type.FILTERING_MESSAGE_INFO,
data)
def eip712_filtering_amount_join_token(self, token_idx: int, sig: bytes, discarded: bool) -> bytes:
data = bytearray()
data.append(token_idx)
data.append(len(sig))
data += sig
return self._serialize(InsType.EIP712_SEND_FILTERING,
int(discarded),
P2Type.FILTERING_TOKEN_ADDR_CHECK,
data)
def eip712_filtering_amount_join_value(self, token_idx: int, name: str, sig: bytes, discarded: bool) -> bytes:
data = bytearray()
data.append(len(name))
data += name.encode()
data.append(token_idx)
data.append(len(sig))
data += sig
return self._serialize(InsType.EIP712_SEND_FILTERING,
int(discarded),
P2Type.FILTERING_AMOUNT_FIELD,
data)
def eip712_filtering_datetime(self, name: str, sig: bytes, discarded: bool) -> bytes:
return self._serialize(InsType.EIP712_SEND_FILTERING,
int(discarded),
P2Type.FILTERING_DATETIME,
self._eip712_filtering_send_name(name, sig))
def eip712_filtering_trusted_name(self,
name: str,
name_types: list[int],
name_sources: list[int],
sig: bytes,
discarded: bool) -> bytes:
data = bytearray()
data.append(len(name))
data += name.encode()
data.append(len(name_types))
for t in name_types:
data.append(t)
data.append(len(name_sources))
for s in name_sources:
data.append(s)
data.append(len(sig))
data += sig
return self._serialize(InsType.EIP712_SEND_FILTERING,
int(discarded),
P2Type.FILTERING_TRUSTED_NAME,
data)
def eip712_filtering_calldata_info(self,
index: int,
value_filter_flag: bool,
callee_filter_flag: int,
chain_id_filter_flag: bool,
selector_filter_flag: bool,
amount_filter_flag: bool,
spender_filter_flag: int,
sig: bytes) -> bytes:
data = bytearray()
data += struct.pack(">B", index)
data += struct.pack(">B", value_filter_flag)
data += struct.pack(">?", callee_filter_flag)
data += struct.pack(">B", chain_id_filter_flag)
data += struct.pack(">B", selector_filter_flag)
data += struct.pack(">B", amount_filter_flag)
data += struct.pack(">?", spender_filter_flag)
data.append(len(sig))
data += sig
return self._serialize(InsType.EIP712_SEND_FILTERING,
int(False),
P2Type.FILTERING_CALLDATA_INFO,
data)
def eip712_filtering_calldata_value(self,
index: int,
sig: bytes,
discarded: bool):
data = bytearray()
data += struct.pack(">B", index)
data.append(len(sig))
data += sig
return self._serialize(InsType.EIP712_SEND_FILTERING,
int(discarded),
P2Type.FILTERING_CALLDATA_VALUE,
data)
def eip712_filtering_calldata_callee(self,
index: int,
sig: bytes,
discarded: bool):
data = bytearray()
data += struct.pack(">B", index)
data.append(len(sig))
data += sig
return self._serialize(InsType.EIP712_SEND_FILTERING,
int(discarded),
P2Type.FILTERING_CALLDATA_CALLEE,
data)
def eip712_filtering_calldata_chain_id(self,
index: int,
sig: bytes,
discarded: bool):
data = bytearray()
data += struct.pack(">B", index)
data.append(len(sig))
data += sig
return self._serialize(InsType.EIP712_SEND_FILTERING,
int(discarded),
P2Type.FILTERING_CALLDATA_CHAIN_ID,
data)
def eip712_filtering_calldata_selector(self,
index: int,
sig: bytes,
discarded: bool):
data = bytearray()
data += struct.pack(">B", index)
data.append(len(sig))
data += sig
return self._serialize(InsType.EIP712_SEND_FILTERING,
int(discarded),
P2Type.FILTERING_CALLDATA_SELECTOR,
data)
def eip712_filtering_calldata_amount(self,
index: int,
sig: bytes,
discarded: bool):
data = bytearray()
data += struct.pack(">B", index)
data.append(len(sig))
data += sig
return self._serialize(InsType.EIP712_SEND_FILTERING,
int(discarded),
P2Type.FILTERING_CALLDATA_AMOUNT,
data)
def eip712_filtering_calldata_spender(self,
index: int,
sig: bytes,
discarded: bool):
data = bytearray()
data += struct.pack(">B", index)
data.append(len(sig))
data += sig
return self._serialize(InsType.EIP712_SEND_FILTERING,
int(discarded),
P2Type.FILTERING_CALLDATA_SPENDER,
data)
def eip712_filtering_raw(self, name: str, sig: bytes, discarded: bool) -> bytes:
return self._serialize(InsType.EIP712_SEND_FILTERING,
int(discarded),
P2Type.FILTERING_RAW,
self._eip712_filtering_send_name(name, sig))
def set_external_plugin(self, plugin_name: str, contract_address: bytes, selector: bytes, sig: bytes) -> bytes:
data = bytearray()
data.append(len(plugin_name))
data += plugin_name.encode()
data += contract_address
data += selector
data += sig
return self._serialize(InsType.EXTERNAL_PLUGIN_SETUP,
P1Type.COMPLETE_SEND,
0x00,
data)
def sign(self,
mode: int,
bip32_path: Optional[str] = None,
rlp_data: Optional[bytes] = None) -> list[bytes]:
apdus: list[bytes] = []
payload = bytearray()
if bip32_path is not None:
payload = bytearray(pack_derivation_path(bip32_path))
if rlp_data is not None:
payload += rlp_data
p1 = P1Type.SIGN_FIRST_CHUNK
while (len(payload) > 0) or ((len(apdus) == 0) and (len(payload) == 0)):
apdus.append(self._serialize(InsType.SIGN,
p1,
mode,
payload[:0xff]))
payload = payload[0xff:]
p1 = P1Type.SIGN_SUBSQT_CHUNK
return apdus
def get_challenge(self) -> bytes:
return self._serialize(InsType.GET_CHALLENGE, 0x00, 0x00)
def provide_trusted_name(self, tlv_payload: bytes) -> list[bytes]:
chunks = []
payload = struct.pack(">H", len(tlv_payload))
payload += tlv_payload
p1 = 1
while len(payload) > 0:
chunks.append(self._serialize(InsType.PROVIDE_TRUSTED_NAME,
p1,
0x00,
payload[:0xff]))
payload = payload[0xff:]
p1 = 0
return chunks
def get_public_addr(self,
display: bool,
chaincode: bool,
bip32_path: str,
chain_id: Optional[int]) -> bytes:
payload = pack_derivation_path(bip32_path)
if chain_id is not None:
payload += struct.pack(">Q", chain_id)
return self._serialize(InsType.GET_PUBLIC_ADDR,
int(display),
int(chaincode),
payload)
def get_eth2_public_addr(self,
display: bool,
bip32_path: str) -> bytes:
payload = pack_derivation_path(bip32_path)
return self._serialize(InsType.GET_ETH2_PUBLIC_ADDR,
int(display),
0x00,
payload)
def perform_privacy_operation(self,
display: bool,
bip32_path: str,
pubkey: bytes) -> bytes:
payload = pack_derivation_path(bip32_path)
return self._serialize(InsType.PERFORM_PRIVACY_OPERATION,
int(display),
0x01 if pubkey else 0x00,
payload + pubkey)
def set_plugin(self,
type_: int,
version: int,
plugin_name: str,
contract_addr: bytes,
selector: bytes,
chain_id: int,
key_id: int,
algo_id: int,
sig: bytes) -> bytes:
payload = bytearray()
payload.append(type_)
payload.append(version)
payload.append(len(plugin_name))
payload += plugin_name.encode()
payload += contract_addr
payload += selector
payload += struct.pack(">Q", chain_id)
payload.append(key_id)
payload.append(algo_id)
payload.append(len(sig))
payload += sig
return self._serialize(InsType.SET_PLUGIN, 0x00, 0x00, payload)
def provide_nft_information(self,
type_: int,
version: int,
collection_name: str,
addr: bytes,
chain_id: int,
key_id: int,
algo_id: int,
sig: bytes):
payload = bytearray()
payload.append(type_)
payload.append(version)
payload.append(len(collection_name))
payload += collection_name.encode()
payload += addr
payload += struct.pack(">Q", chain_id)
payload.append(key_id)
payload.append(algo_id)
payload.append(len(sig))
payload += sig
return self._serialize(InsType.PROVIDE_NFT_INFORMATION, 0x00, 0x00, payload)
def personal_sign(self, path: str, msg: bytes):
payload = pack_derivation_path(path)
payload += struct.pack(">I", len(msg))
payload += msg
chunks = []
p1 = P1Type.SIGN_FIRST_CHUNK
while len(payload) > 0:
chunk_size = 0xff
chunks.append(self._serialize(InsType.PERSONAL_SIGN,
p1,
0x00,
payload[:chunk_size]))
payload = payload[chunk_size:]
p1 = P1Type.SIGN_SUBSQT_CHUNK
return chunks
def provide_erc20_token_information(self,
ticker: str,
addr: bytes,
decimals: int,
chain_id: int,
sig: bytes) -> bytes:
payload = bytearray()
payload.append(len(ticker))
payload += ticker.encode()
payload += addr
payload += struct.pack(">I", decimals)
payload += struct.pack(">I", chain_id)
payload += sig
return self._serialize(InsType.PROVIDE_ERC20_TOKEN_INFORMATION,
0x00,
0x00,
payload)
def common_tlv_serialize(self,
ins: InsType,
tlv_payload: bytes,
p1l: list[int] = [0x01, 0x00],
p2l: list[int] = [0x00],
payload: bytes = bytes()) -> list[bytes]:
assert len(p1l) in [1, 2]
assert len(p2l) in [1, 2]
chunks = []
payload += struct.pack(">H", len(tlv_payload))
payload += tlv_payload
p1 = p1l[0]
p2 = p2l[0]
while len(payload) > 0:
chunks.append(self._serialize(ins,
p1,
p2,
payload[:0xff]))
payload = payload[0xff:]
# -1 so it works with a list of 1 or 2 items
p1 = p1l[-1]
p2 = p2l[-1]
return chunks
def provide_network_information(self,
tlv_payload: bytes,
icon: Optional[bytes] = None) -> list[bytes]:
chunks = self.common_tlv_serialize(InsType.PROVIDE_NETWORK_INFORMATION,
tlv_payload,
p2l=[P2Type.NETWORK_CONFIG])
if icon:
p1 = P1Type.FIRST_CHUNK
while len(icon) > 0:
chunks.append(self._serialize(InsType.PROVIDE_NETWORK_INFORMATION,
p1,
P2Type.NETWORK_ICON,
icon[:0xff]))
icon = icon[0xff:]
p1 = P1Type.FOLLOWING_CHUNK
return chunks
def sign_eip7702_authorization(self, bip32_path: str, tlv_payload: bytes) -> list[bytes]:
return self.common_tlv_serialize(InsType.SIGN_EIP7702_AUTHORIZATION,
tlv_payload,
payload=pack_derivation_path(bip32_path))
def provide_enum_value(self, tlv_payload: bytes) -> list[bytes]:
return self.common_tlv_serialize(InsType.PROVIDE_ENUM_VALUE, tlv_payload)
def provide_map_entry(self, tlv_payload: bytes) -> list[bytes]:
return self.common_tlv_serialize(InsType.PROVIDE_MAP_ENTRY, tlv_payload)
def provide_transaction_info(self, tlv_payload: bytes) -> list[bytes]:
return self.common_tlv_serialize(InsType.PROVIDE_TRANSACTION_INFO, tlv_payload)
def provide_transaction_field_desc(self, tlv_payload: bytes) -> list[bytes]:
return self.common_tlv_serialize(InsType.PROVIDE_TRANSACTION_FIELD_DESC, tlv_payload)
def opt_in_tx_simulation(self) -> bytes:
# Serialize the payload
return self._serialize(InsType.PROVIDE_TX_SIMULATION, P1Type.OPT_IN_TX_CHECK, 0x00)
def provide_tx_simulation(self, tlv_payload: bytes) -> list[bytes]:
return self.common_tlv_serialize(InsType.PROVIDE_TX_SIMULATION, tlv_payload, p1l=[0x00], p2l=[0x01, 0x00])
def provide_proxy_info(self, tlv_payload: bytes) -> list[bytes]:
return self.common_tlv_serialize(InsType.PROVIDE_PROXY_INFO, tlv_payload)
def provide_safe_account(self, tlv_payload: bytes, p2: int) -> list[bytes]:
return self.common_tlv_serialize(InsType.PROVIDE_SAFE_ACCOUNT, tlv_payload, p2l=[p2])
def provide_gating(self, tlv_payload: bytes) -> list[bytes]:
return self.common_tlv_serialize(InsType.PROVIDE_GATING, tlv_payload)
def provide_token_info(self, tlv_payload: bytes) -> list[bytes]:
return self.common_tlv_serialize(InsType.PROVIDE_ERC20_TOKEN_INFORMATION,
tlv_payload,
p1l=[0x01],
p2l=[0x01, 0x00])