|
53 | 53 | "AttestationDependencyError", |
54 | 54 | "AttestationError", |
55 | 55 | "Ed25519Signer", |
| 56 | + "VerificationResult", |
56 | 57 | "action_executed", |
57 | 58 | "artifact_published", |
58 | 59 | "attest_post", |
|
62 | 63 | "capability_coverage", |
63 | 64 | "coverage", |
64 | 65 | "did_key_identity", |
| 66 | + "did_key_to_public_key", |
65 | 67 | "evidence_commit_hash", |
66 | 68 | "evidence_immutable_uri", |
67 | 69 | "evidence_platform_receipt", |
|
73 | 75 | "validity_perpetual", |
74 | 76 | "validity_revocation_checked", |
75 | 77 | "validity_time_bounded", |
| 78 | + "verify", |
76 | 79 | ] |
77 | 80 |
|
78 | 81 | #: Spec version this producer emits. Pinned to the frozen wire format. |
@@ -629,3 +632,196 @@ def attest_post( |
629 | 632 | api_base_url=api_base_url, |
630 | 633 | display_name=display_name, |
631 | 634 | ) |
| 635 | + |
| 636 | + |
| 637 | +# --------------------------------------------------------------------------- # |
| 638 | +# Consumer side — offline verification |
| 639 | +# --------------------------------------------------------------------------- # |
| 640 | +def did_key_to_public_key(did_key: str) -> bytes: |
| 641 | + """Inverse of :func:`public_key_to_did_key` — raw 32-byte ed25519 key from a ``did:key``.""" |
| 642 | + if not isinstance(did_key, str) or not did_key.startswith("did:key:z"): |
| 643 | + raise AttestationError(f"not a base58btc did:key: {did_key!r}") |
| 644 | + try: |
| 645 | + import base58 |
| 646 | + except ImportError as exc: |
| 647 | + raise AttestationDependencyError( |
| 648 | + "did:key decoding needs the 'base58' package — install with: pip install colony-sdk[attestation]" |
| 649 | + ) from exc |
| 650 | + decoded = base58.b58decode(did_key[len("did:key:") + 1 :]) |
| 651 | + if decoded[:2] != _ED25519_MULTICODEC: |
| 652 | + raise AttestationError("did:key multicodec is not ed25519 (0xed01)") |
| 653 | + pub = decoded[2:] |
| 654 | + if len(pub) != 32: |
| 655 | + raise AttestationError(f"ed25519 public key must be 32 bytes, got {len(pub)}") |
| 656 | + return pub |
| 657 | + |
| 658 | + |
| 659 | +@dataclass(frozen=True) |
| 660 | +class VerificationResult: |
| 661 | + """Outcome of :func:`verify`. |
| 662 | +
|
| 663 | + - ``ok`` — the cryptographically + temporally meaningful checks passed: every |
| 664 | + signature in the chain verifies over its peeled JCS bytes, and the validity |
| 665 | + window is satisfied. Truthy via ``__bool__``, so ``if verify(env): ...`` works. |
| 666 | + - ``issuer_bound`` — whether ``sigchain[0]``'s key cryptographically binds to |
| 667 | + the declared issuer. Only ``did:key`` issuers can close this in v0.1; for |
| 668 | + other schemes the signature is still valid but the binding is UNBINDABLE |
| 669 | + (treat as "key K signed this", not "issuer I signed this"). Kept separate |
| 670 | + from ``ok`` so the caller chooses how strict to be. |
| 671 | + - ``reasons`` — why ``ok`` is False (empty when ``ok``). |
| 672 | + - ``notes`` — informational: binding result, and offline-skipped checks |
| 673 | + (revocation / evidence resolution are the caller's responsibility — this |
| 674 | + verifier never touches the network). |
| 675 | + """ |
| 676 | + |
| 677 | + ok: bool |
| 678 | + issuer_bound: bool |
| 679 | + reasons: tuple[str, ...] |
| 680 | + notes: tuple[str, ...] |
| 681 | + |
| 682 | + def __bool__(self) -> bool: |
| 683 | + return self.ok |
| 684 | + |
| 685 | + |
| 686 | +_REQUIRED_FIELDS = ("issuer", "subject", "witnessed_claim", "evidence", "validity", "sigchain") |
| 687 | + |
| 688 | + |
| 689 | +def verify(envelope: Mapping[str, Any], *, now: datetime | None = None) -> VerificationResult: |
| 690 | + """Offline-verify a v0.1.1 attestation envelope. |
| 691 | +
|
| 692 | + Runs the deterministic, network-free subset of the spec's verifier: |
| 693 | +
|
| 694 | + 1. **structural** — required fields present, `envelope_version == "0.1"`, |
| 695 | + evidence non-empty, sigchain non-empty. |
| 696 | + 2. **sigchain** — peel-and-verify each ed25519 signature over |
| 697 | + ``JCS(envelope with sigchain = sigchain[0..i-1])`` (the spec's |
| 698 | + peel-not-replace rule). |
| 699 | + 3. **validity** — `time_bounded` window vs ``now``; `perpetual` always passes; |
| 700 | + `revocation_checked` cannot be confirmed offline (noted, not failed). |
| 701 | + 4. **issuer binding** — for `did:key` issuers, `sigchain[0].key_id == issuer.id`. |
| 702 | +
|
| 703 | + Evidence resolution and revocation are intentionally **out of scope** — this |
| 704 | + function never makes a network call. Resolve `evidence[].uri`, check |
| 705 | + `content_hash`, and query `validity.revocation_uri` yourself if your trust |
| 706 | + model needs them. Needs the optional crypto extra (`pip install |
| 707 | + colony-sdk[attestation]`). |
| 708 | + """ |
| 709 | + reasons: list[str] = [] |
| 710 | + notes: list[str] = [] |
| 711 | + |
| 712 | + if not isinstance(envelope, Mapping): |
| 713 | + return VerificationResult(False, False, ("envelope is not an object",), ()) |
| 714 | + |
| 715 | + if envelope.get("envelope_version") != SPEC_VERSION: |
| 716 | + reasons.append(f"unsupported envelope_version {envelope.get('envelope_version')!r} (expected {SPEC_VERSION!r})") |
| 717 | + for field in _REQUIRED_FIELDS: |
| 718 | + if field not in envelope: |
| 719 | + reasons.append(f"missing required field: {field}") |
| 720 | + |
| 721 | + evidence = envelope.get("evidence") |
| 722 | + if not isinstance(evidence, list) or not evidence: |
| 723 | + reasons.append("evidence must be a non-empty list (self-signed claims are not evidence)") |
| 724 | + |
| 725 | + chain = envelope.get("sigchain") |
| 726 | + if not isinstance(chain, list) or not chain: |
| 727 | + reasons.append("sigchain must be a non-empty list") |
| 728 | + |
| 729 | + # Structural failures are fatal — don't attempt crypto on a malformed envelope. |
| 730 | + if reasons: |
| 731 | + return VerificationResult(False, False, tuple(reasons), tuple(notes)) |
| 732 | + |
| 733 | + assert isinstance(chain, list) # narrowed by the structural checks above |
| 734 | + sig_ok = _verify_sigchain(envelope, chain, reasons, notes) |
| 735 | + val_ok = _verify_validity(envelope["validity"], now, reasons, notes) |
| 736 | + issuer_bound = _check_issuer_binding(chain[0], envelope["issuer"], notes) |
| 737 | + return VerificationResult(sig_ok and val_ok, issuer_bound, tuple(reasons), tuple(notes)) |
| 738 | + |
| 739 | + |
| 740 | +def _verify_sigchain(envelope: Mapping[str, Any], chain: list[Any], reasons: list[str], notes: list[str]) -> bool: |
| 741 | + import base64 |
| 742 | + |
| 743 | + try: |
| 744 | + import nacl.exceptions |
| 745 | + import nacl.signing |
| 746 | + except ImportError as exc: |
| 747 | + raise AttestationDependencyError( |
| 748 | + "envelope verification needs the 'pynacl' package — install with: pip install colony-sdk[attestation]" |
| 749 | + ) from exc |
| 750 | + |
| 751 | + ok = True |
| 752 | + if chain[0].get("role") not in (None, "issuer"): |
| 753 | + reasons.append(f"sigchain[0].role must be 'issuer' or unset, got {chain[0].get('role')!r}") |
| 754 | + ok = False |
| 755 | + |
| 756 | + for i, entry in enumerate(chain): |
| 757 | + if not isinstance(entry, Mapping) or entry.get("alg") != "ed25519": |
| 758 | + reasons.append(f"sigchain[{i}]: unsupported or missing alg (v0.1 = ed25519 only)") |
| 759 | + ok = False |
| 760 | + continue |
| 761 | + stripped = {**envelope, "sigchain": chain[:i]} |
| 762 | + message = canonicalize(stripped) |
| 763 | + try: |
| 764 | + pub = did_key_to_public_key(entry.get("key_id", "")) |
| 765 | + except AttestationError as exc: |
| 766 | + reasons.append(f"sigchain[{i}]: key_id not a resolvable ed25519 did:key ({exc})") |
| 767 | + ok = False |
| 768 | + continue |
| 769 | + sig_str = entry.get("sig", "") |
| 770 | + try: |
| 771 | + sig = base64.urlsafe_b64decode(sig_str + "=" * (-len(sig_str) % 4)) |
| 772 | + nacl.signing.VerifyKey(pub).verify(message, sig) |
| 773 | + except (nacl.exceptions.BadSignatureError, ValueError, TypeError) as exc: |
| 774 | + reasons.append(f"sigchain[{i}]: signature does not verify ({type(exc).__name__})") |
| 775 | + ok = False |
| 776 | + continue |
| 777 | + notes.append(f"sigchain[{i}] ({entry.get('role', '?')}) verified against {entry['key_id'][:24]}…") |
| 778 | + return ok |
| 779 | + |
| 780 | + |
| 781 | +def _verify_validity(validity: Any, now: datetime | None, reasons: list[str], notes: list[str]) -> bool: |
| 782 | + if not isinstance(validity, Mapping): |
| 783 | + reasons.append("validity is not an object") |
| 784 | + return False |
| 785 | + model = validity.get("validity_model") |
| 786 | + now = now or _now() |
| 787 | + |
| 788 | + def _parse(ts: str) -> datetime: |
| 789 | + return datetime.fromisoformat(ts.replace("Z", "+00:00")) |
| 790 | + |
| 791 | + if model == "perpetual": |
| 792 | + notes.append("validity: perpetual (not_after is informational)") |
| 793 | + return True |
| 794 | + if model == "time_bounded": |
| 795 | + try: |
| 796 | + nb, na = _parse(validity["not_before"]), _parse(validity["not_after"]) |
| 797 | + except (KeyError, ValueError, AttributeError, TypeError) as exc: |
| 798 | + reasons.append(f"validity: unparseable not_before/not_after ({type(exc).__name__})") |
| 799 | + return False |
| 800 | + if now < nb: |
| 801 | + reasons.append(f"validity: not yet valid (not_before {validity['not_before']})") |
| 802 | + return False |
| 803 | + if now > na: |
| 804 | + reasons.append(f"validity: expired (not_after {validity['not_after']})") |
| 805 | + return False |
| 806 | + notes.append(f"validity: time_bounded, within [{validity['not_before']}, {validity['not_after']}]") |
| 807 | + return True |
| 808 | + if model == "revocation_checked": |
| 809 | + notes.append("validity: revocation_checked — NOT confirmed offline; caller must query revocation_uri") |
| 810 | + return True |
| 811 | + reasons.append(f"validity: unknown validity_model {model!r}") |
| 812 | + return False |
| 813 | + |
| 814 | + |
| 815 | +def _check_issuer_binding(sig0: Mapping[str, Any], issuer: Any, notes: list[str]) -> bool: |
| 816 | + if not isinstance(issuer, Mapping): |
| 817 | + notes.append("issuer-binding: issuer is not an object") |
| 818 | + return False |
| 819 | + scheme = issuer.get("id_scheme") |
| 820 | + if scheme == "did:key": |
| 821 | + if sig0.get("key_id") == issuer.get("id"): |
| 822 | + notes.append("issuer-binding OK: did:key issuer, key_id == issuer.id (self-resolving)") |
| 823 | + return True |
| 824 | + notes.append("issuer-binding UNVERIFIED: did:key issuer but key_id != issuer.id") |
| 825 | + return False |
| 826 | + notes.append(f"issuer-binding UNBINDABLE: id_scheme {scheme!r} has no key-publication mechanism in v0.1") |
| 827 | + return False |
0 commit comments