|
| 1 | +"""AGT + Cognitive Attestation: policy enforcement plus signed interpretability. |
| 2 | +
|
| 3 | +AGT decides whether an action is allowed. Cognitive Attestation signs the |
| 4 | +interpretable decomposition of model state behind the decision, so an auditor |
| 5 | +can verify what the reasoning substrate looked like when the action fired, |
| 6 | +not just whether the policy rule matched. |
| 7 | +
|
| 8 | +Paper: Cognitive Attestation (Zenodo DOI 10.5281/zenodo.19646276) |
| 9 | +Reference implementation: github.com/aeoess/agent-passport-system |
| 10 | +Community-contributed, experimental, Apache 2.0. |
| 11 | +""" |
| 12 | + |
| 13 | +from __future__ import annotations |
| 14 | + |
| 15 | +import hashlib |
| 16 | +import json |
| 17 | +from dataclasses import dataclass, field |
| 18 | +from typing import Any |
| 19 | + |
| 20 | +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( |
| 21 | + Ed25519PrivateKey, |
| 22 | + Ed25519PublicKey, |
| 23 | +) |
| 24 | +from cryptography.hazmat.primitives.serialization import ( |
| 25 | + Encoding, |
| 26 | + NoEncryption, |
| 27 | + PrivateFormat, |
| 28 | + PublicFormat, |
| 29 | +) |
| 30 | + |
| 31 | +# --------------------------------------------------------------------------- |
| 32 | +# RFC 8785 JCS canonicalization (minimal subset sufficient for this envelope) |
| 33 | +# --------------------------------------------------------------------------- |
| 34 | + |
| 35 | +def canonicalize_jcs(value: Any) -> bytes: |
| 36 | + """RFC 8785 JSON Canonicalization Scheme (minimal).""" |
| 37 | + return _encode(value).encode("utf-8") |
| 38 | + |
| 39 | + |
| 40 | +def _encode(value: Any) -> str: |
| 41 | + if value is None: |
| 42 | + return "null" |
| 43 | + if value is True: |
| 44 | + return "true" |
| 45 | + if value is False: |
| 46 | + return "false" |
| 47 | + if isinstance(value, (int, float)): |
| 48 | + if isinstance(value, float) and value == int(value): |
| 49 | + return str(int(value)) |
| 50 | + return json.dumps(value) |
| 51 | + if isinstance(value, str): |
| 52 | + return json.dumps(value, ensure_ascii=False) |
| 53 | + if isinstance(value, list): |
| 54 | + return "[" + ",".join(_encode(v) for v in value) + "]" |
| 55 | + if isinstance(value, dict): |
| 56 | + keys = sorted(value.keys()) |
| 57 | + return "{" + ",".join( |
| 58 | + f"{json.dumps(k, ensure_ascii=False)}:{_encode(value[k])}" for k in keys |
| 59 | + ) + "}" |
| 60 | + raise TypeError(f"Not JSON-serializable: {type(value).__name__}") |
| 61 | + |
| 62 | + |
| 63 | +# --------------------------------------------------------------------------- |
| 64 | +# Envelope types |
| 65 | +# --------------------------------------------------------------------------- |
| 66 | + |
| 67 | +@dataclass |
| 68 | +class FeatureActivation: |
| 69 | + feature_id: str |
| 70 | + activation_statistic: float |
| 71 | + label: str = "" |
| 72 | + |
| 73 | + |
| 74 | +@dataclass |
| 75 | +class CognitiveAttestation: |
| 76 | + spec_version: str = "1.0" |
| 77 | + action_ref: str = "" |
| 78 | + dictionary_ref: str = "" |
| 79 | + feature_activations: list[FeatureActivation] = field(default_factory=list) |
| 80 | + canonical_hash: str = "" |
| 81 | + signer_role: str = "agent" |
| 82 | + signer_pubkey_hex: str = "" |
| 83 | + signature_b64: str = "" |
| 84 | + |
| 85 | + |
| 86 | +def build_envelope( |
| 87 | + action: dict[str, Any], |
| 88 | + features: list[FeatureActivation], |
| 89 | + dictionary_ref: str, |
| 90 | + signer_role: str = "agent", |
| 91 | +) -> dict[str, Any]: |
| 92 | + """Build the unsigned envelope (canonical form, ready to sign).""" |
| 93 | + action_bytes = canonicalize_jcs(action) |
| 94 | + action_ref = "sha256:" + hashlib.sha256(action_bytes).hexdigest() |
| 95 | + |
| 96 | + # Canonical sort: (feature_id, activation_statistic) as spec requires |
| 97 | + sorted_features = sorted( |
| 98 | + features, |
| 99 | + key=lambda f: (f.feature_id, f.activation_statistic), |
| 100 | + ) |
| 101 | + |
| 102 | + envelope = { |
| 103 | + "spec_version": "1.0", |
| 104 | + "action_ref": action_ref, |
| 105 | + "dictionary_ref": dictionary_ref, |
| 106 | + "feature_activations": [ |
| 107 | + { |
| 108 | + "feature_id": f.feature_id, |
| 109 | + "activation_statistic": f.activation_statistic, |
| 110 | + "label": f.label, |
| 111 | + } |
| 112 | + for f in sorted_features |
| 113 | + ], |
| 114 | + "signer_role": signer_role, |
| 115 | + } |
| 116 | + canonical = canonicalize_jcs(envelope) |
| 117 | + envelope["canonical_hash"] = "sha256:" + hashlib.sha256(canonical).hexdigest() |
| 118 | + return envelope |
| 119 | + |
| 120 | + |
| 121 | +def sign_envelope(unsigned: dict[str, Any], sk: Ed25519PrivateKey) -> dict[str, Any]: |
| 122 | + """Attach signer pubkey, then sign the canonical form (excluding signature).""" |
| 123 | + import base64 |
| 124 | + pk = sk.public_key() |
| 125 | + pk_hex = pk.public_bytes(Encoding.Raw, PublicFormat.Raw).hex() |
| 126 | + |
| 127 | + # Build the exact dict that the verifier will canonicalize |
| 128 | + to_sign = dict(unsigned) |
| 129 | + to_sign["signer_pubkey_hex"] = pk_hex |
| 130 | + # canonicalize over everything except the signature itself |
| 131 | + canonical = canonicalize_jcs(to_sign) |
| 132 | + sig = sk.sign(canonical) |
| 133 | + |
| 134 | + signed = dict(to_sign) |
| 135 | + signed["signature_b64"] = base64.b64encode(sig).decode("ascii") |
| 136 | + return signed |
| 137 | + |
| 138 | + |
| 139 | +def verify_envelope(signed: dict[str, Any]) -> bool: |
| 140 | + import base64 |
| 141 | + from cryptography.exceptions import InvalidSignature |
| 142 | + |
| 143 | + to_verify = {k: v for k, v in signed.items() if k not in {"signature_b64"}} |
| 144 | + canonical = canonicalize_jcs(to_verify) |
| 145 | + sig = base64.b64decode(signed["signature_b64"]) |
| 146 | + pk = Ed25519PublicKey.from_public_bytes(bytes.fromhex(signed["signer_pubkey_hex"])) |
| 147 | + try: |
| 148 | + pk.verify(sig, canonical) |
| 149 | + return True |
| 150 | + except InvalidSignature: |
| 151 | + return False |
| 152 | + |
| 153 | + |
| 154 | +# --------------------------------------------------------------------------- |
| 155 | +# Minimal AGT-style policy evaluator (stand-in so the example runs standalone) |
| 156 | +# --------------------------------------------------------------------------- |
| 157 | + |
| 158 | +def evaluate_policy(action: dict[str, Any], policy: dict[str, Any]) -> dict[str, Any]: |
| 159 | + """Minimal policy check. In production, use agent-governance-toolkit.""" |
| 160 | + tool = action.get("tool", "") |
| 161 | + for rule in policy.get("rules", []): |
| 162 | + match = rule.get("match", {}).get("tool", {}) |
| 163 | + one_of = match.get("one_of", []) |
| 164 | + if tool in one_of: |
| 165 | + return { |
| 166 | + "decision": rule["action"], |
| 167 | + "rule_id": rule["id"], |
| 168 | + "reason": rule.get("reason", ""), |
| 169 | + } |
| 170 | + return { |
| 171 | + "decision": policy.get("default_action", "deny"), |
| 172 | + "rule_id": "default", |
| 173 | + "reason": "No matching rule.", |
| 174 | + } |
| 175 | + |
| 176 | + |
| 177 | +# --------------------------------------------------------------------------- |
| 178 | +# Demo |
| 179 | +# --------------------------------------------------------------------------- |
| 180 | + |
| 181 | +def main() -> None: |
| 182 | + # Step 1: agent has an Ed25519 key |
| 183 | + sk = Ed25519PrivateKey.generate() |
| 184 | + pk_hex = sk.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex() |
| 185 | + print(f"Agent pubkey: {pk_hex[:20]}...") |
| 186 | + |
| 187 | + # Step 2: policy (AGT-compatible shape) |
| 188 | + policy = { |
| 189 | + "version": 1, |
| 190 | + "name": "cogattest-demo-policy", |
| 191 | + "default_action": "deny", |
| 192 | + "rules": [ |
| 193 | + { |
| 194 | + "id": "allow-read", |
| 195 | + "match": {"tool": {"one_of": ["web_search", "file_read"]}}, |
| 196 | + "action": "allow", |
| 197 | + }, |
| 198 | + { |
| 199 | + "id": "deny-destructive", |
| 200 | + "match": {"tool": {"one_of": ["file_delete", "drop_database"]}}, |
| 201 | + "action": "deny", |
| 202 | + "reason": "Destructive operations blocked.", |
| 203 | + }, |
| 204 | + ], |
| 205 | + } |
| 206 | + |
| 207 | + # Step 3: proposed action |
| 208 | + action = { |
| 209 | + "tool": "web_search", |
| 210 | + "params": {"query": "ed25519 signature properties"}, |
| 211 | + "target": "mcp://search", |
| 212 | + } |
| 213 | + |
| 214 | + # Step 4: AGT-style policy decision |
| 215 | + decision = evaluate_policy(action, policy) |
| 216 | + print(f"\nAGT policy decision: {decision['decision']} " |
| 217 | + f"(rule={decision['rule_id']})") |
| 218 | + |
| 219 | + if decision["decision"] != "allow": |
| 220 | + print("Action blocked. No attestation produced.") |
| 221 | + return |
| 222 | + |
| 223 | + # Step 5: agent's reasoning substrate, decomposed into SAE features. |
| 224 | + # In production this comes from running a sparse autoencoder over |
| 225 | + # the model's residual stream. Here we use a fixed demo dictionary |
| 226 | + # and static features so the output is reproducible. |
| 227 | + features = [ |
| 228 | + FeatureActivation("f_0412", 0.87, "search-intent"), |
| 229 | + FeatureActivation("f_1055", 0.54, "cryptography-topic"), |
| 230 | + FeatureActivation("f_0233", 0.33, "query-formulation"), |
| 231 | + ] |
| 232 | + dictionary_ref = ( |
| 233 | + "sae://neuronpedia/gpt2-small/" |
| 234 | + "res-jb/12288/v1" |
| 235 | + ) |
| 236 | + |
| 237 | + # Step 6: build + sign Cognitive Attestation envelope |
| 238 | + unsigned = build_envelope( |
| 239 | + action=action, |
| 240 | + features=features, |
| 241 | + dictionary_ref=dictionary_ref, |
| 242 | + signer_role="agent", |
| 243 | + ) |
| 244 | + signed = sign_envelope(unsigned, sk) |
| 245 | + print(f"\nEnvelope action_ref: {signed['action_ref']}") |
| 246 | + print(f"Envelope canonical_hash: {signed['canonical_hash']}") |
| 247 | + print(f"Signer pubkey: {signed['signer_pubkey_hex'][:20]}...") |
| 248 | + print(f"Signature (b64): {signed['signature_b64'][:40]}...") |
| 249 | + print(f"Features attested: {len(signed['feature_activations'])}") |
| 250 | + |
| 251 | + # Step 7: offline verification |
| 252 | + ok = verify_envelope(signed) |
| 253 | + print(f"\nOffline verification: {'PASS' if ok else 'FAIL'}") |
| 254 | + |
| 255 | + # Step 8: tamper check |
| 256 | + tampered = json.loads(json.dumps(signed)) |
| 257 | + tampered["feature_activations"][0]["activation_statistic"] = 0.99 |
| 258 | + ok2 = verify_envelope(tampered) |
| 259 | + print(f"Tamper detection: {'PASS (rejected)' if not ok2 else 'FAIL (accepted)'}") |
| 260 | + |
| 261 | + |
| 262 | +if __name__ == "__main__": |
| 263 | + main() |
0 commit comments