|
3 | 3 | import atexit |
4 | 4 | import base64 |
5 | 5 | import fcntl |
| 6 | +import hashlib |
6 | 7 | import math |
7 | 8 | import os |
8 | 9 | import requests |
|
43 | 44 | # TLV Tags |
44 | 45 | TAG_ICCID = 0x5A |
45 | 46 | TAG_STATUS = 0x80 |
46 | | -TAG_PROFILE_INFO_LIST = 0xBF2D |
47 | | -TAG_SET_NICKNAME = 0xBF29 |
48 | | -TAG_ENABLE_PROFILE = 0xBF31 |
49 | | -TAG_DELETE_PROFILE = 0xBF33 |
| 47 | +TAG_EUICC_INFO = 0xBF20 |
| 48 | +TAG_PREPARE_DOWNLOAD = 0xBF21 |
| 49 | +TAG_BPP_COMMAND = 0xBF23 |
| 50 | +TAG_PROFILE_METADATA = 0xBF25 |
| 51 | +TAG_INSTALL_RESULT_DATA = 0xBF27 |
50 | 52 | TAG_LIST_NOTIFICATION = 0xBF28 |
| 53 | +TAG_SET_NICKNAME = 0xBF29 |
51 | 54 | TAG_RETRIEVE_NOTIFICATION = 0xBF2B |
| 55 | +TAG_PROFILE_INFO_LIST = 0xBF2D |
| 56 | +TAG_EUICC_CHALLENGE = 0xBF2E |
52 | 57 | TAG_NOTIFICATION_METADATA = 0xBF2F |
53 | 58 | TAG_NOTIFICATION_SENT = 0xBF30 |
| 59 | +TAG_ENABLE_PROFILE = 0xBF31 |
| 60 | +TAG_DELETE_PROFILE = 0xBF33 |
| 61 | +TAG_BPP = 0xBF36 |
54 | 62 | TAG_PROFILE_INSTALL_RESULT = 0xBF37 |
| 63 | +TAG_AUTH_SERVER = 0xBF38 |
| 64 | +TAG_CANCEL_SESSION = 0xBF41 |
55 | 65 | TAG_OK = 0xA0 |
56 | 66 |
|
57 | 67 | PROFILE_OK = 0x00 |
|
63 | 73 | 0x03: "disallowedByPolicy", 0x04: "wrongProfileReenabling", |
64 | 74 | PROFILE_CAT_BUSY: "catBusy", 0x06: "undefinedError", |
65 | 75 | } |
| 76 | +AUTH_SERVER_ERROR_CODES = { |
| 77 | + 0x01: "eUICCVerificationFailed", 0x02: "eUICCCertificateExpired", |
| 78 | + 0x03: "eUICCCertificateRevoked", 0x05: "invalidServerSignature", |
| 79 | + 0x06: "euiccCiPKUnknown", 0x0A: "matchingIdRefused", |
| 80 | + 0x10: "insufficientMemory", |
| 81 | +} |
| 82 | +BPP_COMMAND_NAMES = { |
| 83 | + 0: "initialiseSecureChannel", 1: "configureISDP", 2: "storeMetadata", |
| 84 | + 3: "storeMetadata2", 4: "replaceSessionKeys", 5: "loadProfileElements", |
| 85 | +} |
| 86 | +BPP_ERROR_REASONS = { |
| 87 | + 1: "incorrectInputValues", 2: "invalidSignature", 3: "invalidTransactionId", |
| 88 | + 4: "unsupportedCrtValues", 5: "unsupportedRemoteOperationType", |
| 89 | + 6: "unsupportedProfileClass", 7: "scp03tStructureError", 8: "scp03tSecurityError", |
| 90 | + 9: "iccidAlreadyExistsOnEuicc", 10: "insufficientMemoryForProfile", |
| 91 | + 11: "installInterrupted", 12: "peProcessingError", 13: "dataMismatch", |
| 92 | + 14: "invalidNAA", |
| 93 | +} |
| 94 | +BPP_ERROR_MESSAGES = { |
| 95 | + 9: "This eSIM profile is already installed on this device.", |
| 96 | + 10: "Not enough memory on the eUICC to install this profile.", |
| 97 | + 12: "Profile installation failed. The QR code may have already been used.", |
| 98 | +} |
66 | 99 |
|
67 | 100 | # SGP.22 §5.2.6 — SM-DP+ reason/subject codes mapped to user-friendly messages |
68 | 101 | ES9P_ERROR_MESSAGES: dict[tuple[str, str], str] = { |
@@ -459,6 +492,222 @@ def process_notifications(client: AtClient) -> None: |
459 | 492 | print(f"notification {seq_number} failed: {e}", file=sys.stderr) |
460 | 493 |
|
461 | 494 |
|
| 495 | +# --- Authentication & Download --- |
| 496 | + |
| 497 | +def get_challenge_and_info(client: AtClient) -> tuple[bytes, bytes]: |
| 498 | + challenge_resp = es10x_command(client, encode_tlv(TAG_EUICC_CHALLENGE, b"")) |
| 499 | + challenge = require_tag(require_tag(challenge_resp, TAG_EUICC_CHALLENGE, "GetEuiccDataResponse"), |
| 500 | + TAG_STATUS, "challenge in response") |
| 501 | + info_resp = es10x_command(client, encode_tlv(TAG_EUICC_INFO, b"")) |
| 502 | + require_tag(info_resp, TAG_EUICC_INFO, "GetEuiccInfo1Response") |
| 503 | + return challenge, info_resp |
| 504 | + |
| 505 | + |
| 506 | +def authenticate_server(client: AtClient, b64_signed1: str, b64_sig1: str, b64_pk_id: str, b64_cert: str, matching_id: str) -> str: |
| 507 | + tac = bytes([0x35, 0x29, 0x06, 0x11]) |
| 508 | + device_info = encode_tlv(TAG_STATUS, tac) + encode_tlv(0xA1, b"") |
| 509 | + ctx_inner = encode_tlv(TAG_STATUS, matching_id.encode("utf-8")) + encode_tlv(0xA1, device_info) |
| 510 | + content = b64d(b64_signed1) + b64d(b64_sig1) + b64d(b64_pk_id) + b64d(b64_cert) + encode_tlv(0xA0, ctx_inner) |
| 511 | + response = es10x_command(client, encode_tlv(TAG_AUTH_SERVER, content)) |
| 512 | + root = require_tag(response, TAG_AUTH_SERVER, "AuthenticateServerResponse") |
| 513 | + error_tag = find_tag(root, 0xA1) |
| 514 | + if error_tag is not None: |
| 515 | + code = int.from_bytes(error_tag, "big") if error_tag else 0 |
| 516 | + raise RuntimeError(f"AuthenticateServer rejected by eUICC: {AUTH_SERVER_ERROR_CODES.get(code, 'unknown')} (0x{code:02X})") |
| 517 | + return b64e(response) |
| 518 | + |
| 519 | + |
| 520 | +def prepare_download(client: AtClient, b64_signed2: str, b64_sig2: str, b64_cert: str, cc: str | None = None) -> str: |
| 521 | + smdp_signed2 = b64d(b64_signed2) |
| 522 | + smdp_signature2 = b64d(b64_sig2) |
| 523 | + smdp_certificate = b64d(b64_cert) |
| 524 | + smdp_signed2_root = find_tag(smdp_signed2, 0x30) |
| 525 | + if smdp_signed2_root is None: |
| 526 | + raise RuntimeError("Invalid smdpSigned2") |
| 527 | + transaction_id = find_tag(smdp_signed2_root, TAG_STATUS) |
| 528 | + cc_required_flag = find_tag(smdp_signed2_root, 0x01) |
| 529 | + if transaction_id is None or cc_required_flag is None: |
| 530 | + raise RuntimeError("Invalid smdpSigned2") |
| 531 | + content = smdp_signed2 + smdp_signature2 |
| 532 | + if int.from_bytes(cc_required_flag, "big") != 0: |
| 533 | + if not cc: |
| 534 | + raise RuntimeError("Confirmation code required but not provided") |
| 535 | + content += encode_tlv(0x04, hashlib.sha256(hashlib.sha256(cc.encode("utf-8")).digest() + transaction_id).digest()) |
| 536 | + content += smdp_certificate |
| 537 | + response = es10x_command(client, encode_tlv(TAG_PREPARE_DOWNLOAD, content)) |
| 538 | + require_tag(response, TAG_PREPARE_DOWNLOAD, "PrepareDownloadResponse") |
| 539 | + return b64e(response) |
| 540 | + |
| 541 | + |
| 542 | +def _parse_tlv_header_len(data: bytes) -> int: |
| 543 | + tag_len = 2 if data[0] & 0x1F == 0x1F else 1 |
| 544 | + length_byte = data[tag_len] |
| 545 | + return tag_len + (1 + (length_byte & 0x7F) if length_byte & 0x80 else 1) |
| 546 | + |
| 547 | + |
| 548 | +def _split_bpp(bpp: bytes) -> list[bytes]: |
| 549 | + """Split a BoundProfilePackage into APDU chunks per SGP.22 §5.7.6.""" |
| 550 | + root_value = None |
| 551 | + for tag, value, start, end in iter_tlv(bpp, with_positions=True): |
| 552 | + if tag == TAG_BPP: |
| 553 | + root_value = value |
| 554 | + val_start = start + _parse_tlv_header_len(bpp[start:end]) |
| 555 | + break |
| 556 | + if root_value is None: |
| 557 | + raise RuntimeError("Invalid BoundProfilePackage") |
| 558 | + |
| 559 | + chunks: list[bytes] = [] |
| 560 | + for tag, value, start, end in iter_tlv(root_value, with_positions=True): |
| 561 | + if tag == TAG_BPP_COMMAND: |
| 562 | + chunks.append(bpp[0 : val_start + end]) |
| 563 | + elif tag in (0xA0, 0xA2): |
| 564 | + chunks.append(bpp[val_start + start : val_start + end]) |
| 565 | + elif tag in (0xA1, 0xA3): |
| 566 | + hdr_len = _parse_tlv_header_len(root_value[start:end]) |
| 567 | + chunks.append(bpp[val_start + start : val_start + start + hdr_len]) |
| 568 | + for _, _, cs, ce in iter_tlv(value, with_positions=True): |
| 569 | + chunks.append(value[cs:ce]) |
| 570 | + return chunks |
| 571 | + |
| 572 | + |
| 573 | +def _parse_install_result(response: bytes) -> dict[str, Any] | None: |
| 574 | + """Parse a ProfileInstallResult from an APDU response, or None if not present.""" |
| 575 | + root = find_tag(response, TAG_PROFILE_INSTALL_RESULT) |
| 576 | + if not root: |
| 577 | + return None |
| 578 | + result_data = find_tag(root, TAG_INSTALL_RESULT_DATA) |
| 579 | + if not result_data: |
| 580 | + return None |
| 581 | + result: dict[str, Any] = {"seqNumber": 0, "success": False, "bppCommandId": None, "errorReason": None} |
| 582 | + notif_meta = find_tag(result_data, TAG_NOTIFICATION_METADATA) |
| 583 | + if notif_meta: |
| 584 | + seq_num = find_tag(notif_meta, TAG_STATUS) |
| 585 | + if seq_num: |
| 586 | + result["seqNumber"] = int.from_bytes(seq_num, "big") |
| 587 | + final_result = find_tag(result_data, 0xA2) |
| 588 | + if final_result: |
| 589 | + for tag, value in iter_tlv(final_result): |
| 590 | + if tag == 0xA0: |
| 591 | + result["success"] = True |
| 592 | + elif tag == 0xA1: |
| 593 | + bpp_cmd = find_tag(value, TAG_STATUS) |
| 594 | + if bpp_cmd: |
| 595 | + result["bppCommandId"] = int.from_bytes(bpp_cmd, "big") |
| 596 | + err = find_tag(value, 0x81) |
| 597 | + if err: |
| 598 | + result["errorReason"] = int.from_bytes(err, "big") |
| 599 | + return result |
| 600 | + |
| 601 | + |
| 602 | +def load_bpp(client: AtClient, b64_bpp: str) -> dict: |
| 603 | + bpp = b64d(b64_bpp) |
| 604 | + result = None |
| 605 | + for chunk in _split_bpp(bpp): |
| 606 | + response = es10x_command(client, chunk) |
| 607 | + if response: |
| 608 | + result = _parse_install_result(response) or result |
| 609 | + |
| 610 | + if result is None: |
| 611 | + raise RuntimeError("Profile installation failed: no result from eUICC") |
| 612 | + if not result["success"] and result["errorReason"] is not None: |
| 613 | + msg = BPP_ERROR_MESSAGES.get(result["errorReason"]) |
| 614 | + if not msg: |
| 615 | + cmd_name = BPP_COMMAND_NAMES.get(result["bppCommandId"], f"unknown({result['bppCommandId']})") |
| 616 | + err_name = BPP_ERROR_REASONS.get(result["errorReason"], f"unknown({result['errorReason']})") |
| 617 | + msg = f"Profile installation failed at {cmd_name}: {err_name}" |
| 618 | + raise RuntimeError(msg) |
| 619 | + if not result["success"]: |
| 620 | + raise RuntimeError("Profile installation failed: no result from eUICC") |
| 621 | + return result |
| 622 | + |
| 623 | + |
| 624 | +def parse_metadata(b64_metadata: str) -> dict: |
| 625 | + root = find_tag(b64d(b64_metadata), TAG_PROFILE_METADATA) |
| 626 | + if root is None: |
| 627 | + raise RuntimeError("Invalid profileMetadata") |
| 628 | + return decode_struct(root, PROFILE) |
| 629 | + |
| 630 | + |
| 631 | +def cancel_session(client: AtClient, transaction_id: bytes, reason: int = 127) -> str: |
| 632 | + content = encode_tlv(0x80, transaction_id) + encode_tlv(0x81, bytes([reason])) |
| 633 | + response = es10x_command(client, encode_tlv(TAG_CANCEL_SESSION, content)) |
| 634 | + return b64e(response) |
| 635 | + |
| 636 | + |
| 637 | +def parse_lpa_activation_code(activation_code: str) -> tuple[str, str]: |
| 638 | + """Parse 'LPA:1$smdp.example.com$MATCHING-ID' into (smdp_address, matching_id).""" |
| 639 | + if not activation_code.startswith("LPA:"): |
| 640 | + raise ValueError("Invalid activation code format") |
| 641 | + parts = activation_code[4:].split("$") |
| 642 | + if len(parts) != 3: |
| 643 | + raise ValueError("Invalid activation code format") |
| 644 | + return parts[1], parts[2] |
| 645 | + |
| 646 | + |
| 647 | +def _b64_field(data: dict, key: str) -> str: |
| 648 | + return base64_trim(data[key]) |
| 649 | + |
| 650 | + |
| 651 | +def _cancel_session_safe(client: AtClient, smdp: str, tx_id: str, session: requests.Session) -> None: |
| 652 | + b64_cancel = "" |
| 653 | + try: |
| 654 | + b64_cancel = cancel_session(client, b64d(tx_id)) |
| 655 | + except Exception: |
| 656 | + pass |
| 657 | + try: |
| 658 | + es9p_request(smdp, "cancelSession", {"transactionId": tx_id, "cancelSessionResponse": b64_cancel}, "CancelSession", session=session) |
| 659 | + except Exception: |
| 660 | + pass |
| 661 | + |
| 662 | + |
| 663 | +def download_profile(client: AtClient, activation_code: str) -> str: |
| 664 | + """Download and install an eSIM profile. Returns the ICCID of the installed profile.""" |
| 665 | + if not system_time_valid(): |
| 666 | + raise RuntimeError("System time is not set; TLS certificate validation requires a valid clock") |
| 667 | + smdp, matching_id = parse_lpa_activation_code(activation_code) |
| 668 | + challenge, euicc_info = get_challenge_and_info(client) |
| 669 | + session = requests.Session() |
| 670 | + tx_id = None |
| 671 | + |
| 672 | + try: |
| 673 | + # step 1: initiate authentication |
| 674 | + auth = es9p_request(smdp, "initiateAuthentication", { |
| 675 | + "smdpAddress": smdp, "euiccChallenge": b64e(challenge), |
| 676 | + "euiccInfo1": b64e(euicc_info), "matchingId": matching_id, |
| 677 | + }, "Authentication", session=session) |
| 678 | + tx_id = _b64_field(auth, "transactionId") |
| 679 | + |
| 680 | + # step 2: authenticate server |
| 681 | + b64_auth = authenticate_server(client, |
| 682 | + _b64_field(auth, "serverSigned1"), _b64_field(auth, "serverSignature1"), |
| 683 | + _b64_field(auth, "euiccCiPKIdToBeUsed"), _b64_field(auth, "serverCertificate"), |
| 684 | + matching_id) |
| 685 | + |
| 686 | + # step 3: authenticate client + get metadata |
| 687 | + cli = es9p_request(smdp, "authenticateClient", { |
| 688 | + "transactionId": tx_id, "authenticateServerResponse": b64_auth, |
| 689 | + }, "Authentication", session=session) |
| 690 | + iccid = parse_metadata(_b64_field(cli, "profileMetadata"))["iccid"] |
| 691 | + |
| 692 | + # step 4: prepare download |
| 693 | + b64_prep = prepare_download(client, |
| 694 | + _b64_field(cli, "smdpSigned2"), _b64_field(cli, "smdpSignature2"), |
| 695 | + _b64_field(cli, "smdpCertificate")) |
| 696 | + |
| 697 | + # step 5: get and install bound profile package |
| 698 | + bpp = es9p_request(smdp, "getBoundProfilePackage", { |
| 699 | + "transactionId": tx_id, "prepareDownloadResponse": b64_prep, |
| 700 | + }, "GetBoundProfilePackage", session=session) |
| 701 | + load_bpp(client, _b64_field(bpp, "boundProfilePackage")) |
| 702 | + return iccid |
| 703 | + except Exception: |
| 704 | + if tx_id: |
| 705 | + _cancel_session_safe(client, smdp, tx_id, session) |
| 706 | + raise |
| 707 | + finally: |
| 708 | + session.close() |
| 709 | + |
| 710 | + |
462 | 711 | class TiciLPA(LPABase): |
463 | 712 | def __init__(self): |
464 | 713 | if hasattr(self, '_client'): |
@@ -515,7 +764,10 @@ def delete_profile(self, iccid: str) -> None: |
515 | 764 | raise LPAError(f"DeleteProfile failed: {PROFILE_ERROR_CODES.get(code, 'unknown')} (0x{code:02X})") |
516 | 765 |
|
517 | 766 | def download_profile(self, qr: str, nickname: str | None = None) -> None: |
518 | | - return None |
| 767 | + with self._acquire_channel(): |
| 768 | + iccid = download_profile(self._client, qr) |
| 769 | + if nickname and iccid: |
| 770 | + set_profile_nickname(self._client, iccid, nickname) |
519 | 771 |
|
520 | 772 | def nickname_profile(self, iccid: str, nickname: str) -> None: |
521 | 773 | with self._acquire_channel(): |
|
0 commit comments