Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 272 additions & 0 deletions third_party/ScopeBlind/signed_tool_call_receipts.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "00ace419",
"metadata": {},
"source": [
"# Signed receipts for every tool your agent runs\n",
"\n",
"A practical recipe for adding a **fail-closed policy gate** and **cryptographically signed, offline-verifiable receipts** to a Claude tool-use loop.\n",
"\n",
"Out of the box, an agent that calls tools leaves no audit trail a third party can independently verify: anything logged is operator-controlled and anything unsigned can be edited after the fact. This recipe wraps each tool call in two steps:\n",
"\n",
"1. **Gate** every tool call against a [Cedar](https://www.cedarpolicy.com/) policy with [`protect-mcp`](https://www.npmjs.com/package/protect-mcp). The gate fails **closed**: on any policy error it denies rather than allows.\n",
"2. **Sign** an Ed25519 receipt of each decision. Receipts are [JCS](https://datatracker.ietf.org/doc/html/rfc8785)-canonical and verifiable offline by anyone with the public key, with no vendor in the loop.\n",
"\n",
"We use the Anthropic SDK for a real tool-use loop, gate Claude's tool calls, sign the decisions, then verify the receipts offline and show that tampering breaks verification."
]
},
{
"cell_type": "markdown",
"id": "b95baf7e",
"metadata": {},
"source": [
"## Setup\n",
"\n",
"Requires Node.js 18+ (for `npx`) and an `ANTHROPIC_API_KEY`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f6db4623",
"metadata": {},
"outputs": [],
"source": [
"%pip install -q anthropic\n",
"import shutil\n",
"assert shutil.which(\"node\"), \"Node.js 18+ is required (provides npx)\"\n",
"print(\"node:\", shutil.which(\"node\"))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "81af0406",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import json\n",
"import subprocess\n",
"import pathlib\n",
"import anthropic\n",
"\n",
"MODEL = \"claude-sonnet-4-6\" # any current Claude model\n",
"client = anthropic.Anthropic() # reads ANTHROPIC_API_KEY from the environment\n",
"WORK = pathlib.Path(\"receipts_demo\")\n",
"WORK.mkdir(exist_ok=True)\n",
"os.chdir(WORK)"
]
},
{
"cell_type": "markdown",
"id": "a6fc637d",
"metadata": {},
"source": [
"## Step 1: a Cedar policy and a signing key\n",
"\n",
"The policy permits read-only tools and forbids destructive shell commands. Note the `.contains()` idiom: writing `context.command in [...]` is a Cedar type error that silently discards the rule, so we use set membership instead."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5ceaa27f",
"metadata": {},
"outputs": [],
"source": [
"policy = \"\"\"\n",
"permit(principal, action == Action::\"MCP::Tool::call\", resource == Tool::\"list_files\");\n",
"forbid(principal, action == Action::\"MCP::Tool::call\", resource == Tool::\"run_shell\")\n",
" when { [\"rm\", \"dd\", \"mkfs\", \"shutdown\"].contains(context.command) };\n",
"permit(principal, action == Action::\"MCP::Tool::call\", resource == Tool::\"run_shell\");\n",
"\"\"\"\n",
"pathlib.Path(\"cedar\").mkdir(exist_ok=True)\n",
"pathlib.Path(\"cedar/policy.cedar\").write_text(policy)\n",
"\n",
"subprocess.run([\"npx\", \"-y\", \"protect-mcp@0.7.1\", \"init\"], check=False)\n",
"print(\"key generated:\", pathlib.Path(\"keys/gateway.json\").exists())"
]
},
{
"cell_type": "markdown",
"id": "a7f82281",
"metadata": {},
"source": [
"## Step 2: gate and sign helpers\n",
"\n",
"`protect-mcp evaluate` exits `2` to deny and `0` to allow (a missing policy denies). `protect-mcp sign` appends an Ed25519 receipt; with no key it records an honest unsigned line rather than failing."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b460ccdb",
"metadata": {},
"outputs": [],
"source": [
"def gate(tool: str, tool_input: dict) -> bool:\n",
" \"\"\"Return True if the policy allows this call, False if it denies. Fails closed.\"\"\"\n",
" r = subprocess.run(\n",
" [\"npx\", \"-y\", \"protect-mcp@0.7.1\", \"evaluate\",\n",
" \"--cedar\", \"cedar\", \"--tool\", tool, \"--input\", json.dumps(tool_input)],\n",
" capture_output=True, text=True)\n",
" return r.returncode == 0 # 0 = allow, 2 = deny\n",
"\n",
"def sign(tool: str) -> None:\n",
" subprocess.run(\n",
" [\"npx\", \"-y\", \"protect-mcp@0.7.1\", \"sign\",\n",
" \"--tool\", tool, \"--receipts\", \"receipts\", \"--key\", \"keys/gateway.json\"],\n",
" capture_output=True, text=True)"
]
},
{
"cell_type": "markdown",
"id": "9ef7ee86",
"metadata": {},
"source": [
"## Step 3: the tool-use loop\n",
"\n",
"We give Claude two tools, `list_files` and `run_shell`. Before any tool runs, we gate it. Allowed calls execute and get a signed receipt; denied calls return a denial to Claude instead of running."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cc8262c3",
"metadata": {},
"outputs": [],
"source": [
"TOOLS = [\n",
" {\"name\": \"list_files\", \"description\": \"List files in the current directory.\",\n",
" \"input_schema\": {\"type\": \"object\", \"properties\": {}}},\n",
" {\"name\": \"run_shell\", \"description\": \"Run a shell command.\",\n",
" \"input_schema\": {\"type\": \"object\",\n",
" \"properties\": {\"command\": {\"type\": \"string\"}},\n",
" \"required\": [\"command\"]}},\n",
"]\n",
"\n",
"def execute(tool: str, tool_input: dict) -> str:\n",
" if tool == \"list_files\":\n",
" return \"\\n\".join(os.listdir(\".\"))\n",
" if tool == \"run_shell\":\n",
" return f\"(would run: {tool_input.get('command')})\"\n",
" return \"unknown tool\"\n",
"\n",
"def run_agent(prompt: str) -> None:\n",
" print(f\"USER: {prompt}\")\n",
" messages = [{\"role\": \"user\", \"content\": prompt}]\n",
" while True:\n",
" resp = client.messages.create(model=MODEL, max_tokens=1024, tools=TOOLS, messages=messages)\n",
" messages.append({\"role\": \"assistant\", \"content\": resp.content})\n",
" tool_uses = [b for b in resp.content if b.type == \"tool_use\"]\n",
" if not tool_uses:\n",
" text = \"\".join(b.text for b in resp.content if b.type == \"text\")\n",
" print(\"CLAUDE:\", text.strip())\n",
" return\n",
" results = []\n",
" for tu in tool_uses:\n",
" if gate(tu.name, tu.input):\n",
" out = execute(tu.name, tu.input)\n",
" sign(tu.name)\n",
" print(f\" ALLOW {tu.name} {tu.input} (receipt signed)\")\n",
" else:\n",
" out = \"DENIED by policy. This action was blocked before it ran.\"\n",
" print(f\" DENY {tu.name} {tu.input} (fail-closed gate)\")\n",
" results.append({\"type\": \"tool_result\", \"tool_use_id\": tu.id, \"content\": out})\n",
" messages.append({\"role\": \"user\", \"content\": results})"
]
},
{
"cell_type": "markdown",
"id": "31f41456",
"metadata": {},
"source": [
"## Step 4: watch it allow a safe call and block a dangerous one"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5da1fe36",
"metadata": {},
"outputs": [],
"source": [
"run_agent(\"List the files in the current directory.\")\n",
"print(\"-\" * 60)\n",
"run_agent(\"Run the shell command: rm -rf important_data\")"
]
},
{
"cell_type": "markdown",
"id": "a3e258aa",
"metadata": {},
"source": [
"## Step 5: verify the receipts offline\n",
"\n",
"The receipts verify with no network and no trust in the operator: the signature is checked against the public key embedded in each receipt."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4a6f7527",
"metadata": {},
"outputs": [],
"source": [
"subprocess.run([\"npx\", \"-y\", \"@veritasacta/verify\", \"receipts/receipts.jsonl\", \"--format\", \"jsonl\"])"
]
},
{
"cell_type": "markdown",
"id": "a8930321",
"metadata": {},
"source": [
"Now tamper with a receipt and verify again. The Ed25519 signature no longer matches the canonical bytes, so verification fails."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3153426e",
"metadata": {},
"outputs": [],
"source": [
"lines = pathlib.Path(\"receipts/receipts.jsonl\").read_text().splitlines()\n",
"if lines:\n",
" rec = json.loads(lines[0])\n",
" rec[\"tool\"] = \"tampered\"\n",
" lines[0] = json.dumps(rec)\n",
" pathlib.Path(\"receipts/receipts.jsonl\").write_text(\"\\n\".join(lines) + \"\\n\")\n",
"r = subprocess.run([\"npx\", \"-y\", \"@veritasacta/verify\", \"receipts/receipts.jsonl\", \"--format\", \"jsonl\"])\n",
"print(\"exit code:\", r.returncode, \"(non-zero = tamper detected)\")"
]
},
{
"cell_type": "markdown",
"id": "53444143",
"metadata": {},
"source": [
"## What you built\n",
"\n",
"Every tool call is now gated by a policy that fails closed, and every decision is an Ed25519 receipt anyone can verify offline. This is the building block for an auditable agent: who did what, under which policy, and whether it was allowed or denied, provable without trusting the operator.\n",
"\n",
"- [protect-mcp](https://www.npmjs.com/package/protect-mcp) (the gate and signer) and [@veritasacta/verify](https://www.npmjs.com/package/@veritasacta/verify) (the offline verifier)\n",
"- Receipt wire format: [draft-farley-acta-signed-receipts](https://datatracker.ietf.org/doc/draft-farley-acta-signed-receipts/) (IETF Internet-Draft)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 5
}