|
| 1 | +""" |
| 2 | +Tests for Chain of Custody (Delegation Chain) functionality. |
| 3 | +
|
| 4 | +Tests the ability for agents to prove they are acting on behalf of |
| 5 | +a user or another agent through recursive delegation chains. |
| 6 | +""" |
| 7 | + |
| 8 | +import pytest |
| 9 | +from vouch import Signer, Verifier, generate_identity, DelegationLink |
| 10 | + |
| 11 | + |
| 12 | +class TestDelegationChain: |
| 13 | + """Test delegation chain functionality.""" |
| 14 | + |
| 15 | + @pytest.fixture |
| 16 | + def user_identity(self): |
| 17 | + """Create a user identity.""" |
| 18 | + return generate_identity(domain="user.example.com") |
| 19 | + |
| 20 | + @pytest.fixture |
| 21 | + def agent_a_identity(self): |
| 22 | + """Create Agent A identity.""" |
| 23 | + return generate_identity(domain="agent-a.ai") |
| 24 | + |
| 25 | + @pytest.fixture |
| 26 | + def agent_b_identity(self): |
| 27 | + """Create Agent B identity.""" |
| 28 | + return generate_identity(domain="agent-b.ai") |
| 29 | + |
| 30 | + @pytest.fixture |
| 31 | + def agent_c_identity(self): |
| 32 | + """Create Agent C identity.""" |
| 33 | + return generate_identity(domain="agent-c.ai") |
| 34 | + |
| 35 | + def test_single_hop_no_delegation(self, user_identity): |
| 36 | + """Test that tokens without delegation have empty chain.""" |
| 37 | + signer = Signer( |
| 38 | + private_key=user_identity.private_key_jwk, |
| 39 | + did=user_identity.did |
| 40 | + ) |
| 41 | + |
| 42 | + token = signer.sign({'action': 'read_file'}) |
| 43 | + |
| 44 | + is_valid, passport = Verifier.verify( |
| 45 | + token, |
| 46 | + public_key_jwk=user_identity.public_key_jwk |
| 47 | + ) |
| 48 | + |
| 49 | + assert is_valid |
| 50 | + assert passport.delegation_chain == [] |
| 51 | + assert passport.iss == user_identity.did |
| 52 | + |
| 53 | + def test_two_hop_delegation(self, user_identity, agent_a_identity): |
| 54 | + """Test User -> Agent A delegation chain.""" |
| 55 | + # User creates initial token authorizing Agent A |
| 56 | + user_signer = Signer( |
| 57 | + private_key=user_identity.private_key_jwk, |
| 58 | + did=user_identity.did |
| 59 | + ) |
| 60 | + user_token = user_signer.sign({'action': 'analyze_data'}) |
| 61 | + |
| 62 | + # Agent A creates token with parent delegation |
| 63 | + agent_a_signer = Signer( |
| 64 | + private_key=agent_a_identity.private_key_jwk, |
| 65 | + did=agent_a_identity.did |
| 66 | + ) |
| 67 | + agent_a_token = agent_a_signer.sign( |
| 68 | + {'action': 'query_database'}, |
| 69 | + parent_token=user_token |
| 70 | + ) |
| 71 | + |
| 72 | + # Verify Agent A's token |
| 73 | + is_valid, passport = Verifier.verify( |
| 74 | + agent_a_token, |
| 75 | + public_key_jwk=agent_a_identity.public_key_jwk |
| 76 | + ) |
| 77 | + |
| 78 | + assert is_valid |
| 79 | + assert len(passport.delegation_chain) == 1 |
| 80 | + assert passport.delegation_chain[0].iss == user_identity.did |
| 81 | + assert passport.delegation_chain[0].sub == agent_a_identity.did |
| 82 | + |
| 83 | + def test_three_hop_delegation(self, user_identity, agent_a_identity, agent_b_identity): |
| 84 | + """Test User -> Agent A -> Agent B (3-hop chain).""" |
| 85 | + # Step 1: User authorizes Agent A |
| 86 | + user_signer = Signer( |
| 87 | + private_key=user_identity.private_key_jwk, |
| 88 | + did=user_identity.did |
| 89 | + ) |
| 90 | + user_token = user_signer.sign({'action': 'analyze_data'}) |
| 91 | + |
| 92 | + # Step 2: Agent A delegates to Agent B |
| 93 | + agent_a_signer = Signer( |
| 94 | + private_key=agent_a_identity.private_key_jwk, |
| 95 | + did=agent_a_identity.did |
| 96 | + ) |
| 97 | + agent_a_token = agent_a_signer.sign( |
| 98 | + {'action': 'fetch_records'}, |
| 99 | + parent_token=user_token |
| 100 | + ) |
| 101 | + |
| 102 | + # Step 3: Agent B acts with delegated authority |
| 103 | + agent_b_signer = Signer( |
| 104 | + private_key=agent_b_identity.private_key_jwk, |
| 105 | + did=agent_b_identity.did |
| 106 | + ) |
| 107 | + agent_b_token = agent_b_signer.sign( |
| 108 | + {'action': 'query_api'}, |
| 109 | + parent_token=agent_a_token |
| 110 | + ) |
| 111 | + |
| 112 | + # Verify Agent B's token |
| 113 | + is_valid, passport = Verifier.verify( |
| 114 | + agent_b_token, |
| 115 | + public_key_jwk=agent_b_identity.public_key_jwk |
| 116 | + ) |
| 117 | + |
| 118 | + assert is_valid |
| 119 | + assert len(passport.delegation_chain) == 2 |
| 120 | + |
| 121 | + # First link: User -> Agent A |
| 122 | + assert passport.delegation_chain[0].iss == user_identity.did |
| 123 | + assert passport.delegation_chain[0].sub == agent_a_identity.did |
| 124 | + |
| 125 | + # Second link: Agent A -> Agent B |
| 126 | + assert passport.delegation_chain[1].iss == agent_a_identity.did |
| 127 | + assert passport.delegation_chain[1].sub == agent_b_identity.did |
| 128 | + |
| 129 | + def test_max_depth_enforcement( |
| 130 | + self, user_identity, agent_a_identity, agent_b_identity, agent_c_identity |
| 131 | + ): |
| 132 | + """Test that max chain depth (5) is enforced.""" |
| 133 | + # Build a chain of 6 agents (which creates 5 delegation links - the max) |
| 134 | + identities = [ |
| 135 | + generate_identity(domain=f"agent-{i}.ai") |
| 136 | + for i in range(7) |
| 137 | + ] |
| 138 | + |
| 139 | + # Start with first agent (no parent - no chain yet) |
| 140 | + current_token = Signer( |
| 141 | + private_key=identities[0].private_key_jwk, |
| 142 | + did=identities[0].did |
| 143 | + ).sign({'action': 'step_0'}) |
| 144 | + |
| 145 | + # Build 5 more delegations (creates 5 links - the max allowed) |
| 146 | + for i in range(1, 6): |
| 147 | + current_token = Signer( |
| 148 | + private_key=identities[i].private_key_jwk, |
| 149 | + did=identities[i].did |
| 150 | + ).sign({'action': f'step_{i}'}, parent_token=current_token) |
| 151 | + |
| 152 | + # This should work (5 links) |
| 153 | + is_valid, passport = Verifier.verify( |
| 154 | + current_token, |
| 155 | + public_key_jwk=identities[5].public_key_jwk |
| 156 | + ) |
| 157 | + assert is_valid |
| 158 | + assert len(passport.delegation_chain) == 5 # Exactly at max |
| 159 | + |
| 160 | + # Trying to add 6th link should fail |
| 161 | + with pytest.raises(ValueError, match="max depth"): |
| 162 | + Signer( |
| 163 | + private_key=identities[6].private_key_jwk, |
| 164 | + did=identities[6].did |
| 165 | + ).sign({'action': 'step_6'}, parent_token=current_token) |
| 166 | + |
| 167 | + def test_delegation_chain_with_reputation(self, user_identity, agent_a_identity): |
| 168 | + """Test delegation chain combined with reputation score.""" |
| 169 | + user_signer = Signer( |
| 170 | + private_key=user_identity.private_key_jwk, |
| 171 | + did=user_identity.did |
| 172 | + ) |
| 173 | + user_token = user_signer.sign( |
| 174 | + {'action': 'authorize'}, |
| 175 | + reputation_score=90 |
| 176 | + ) |
| 177 | + |
| 178 | + agent_a_signer = Signer( |
| 179 | + private_key=agent_a_identity.private_key_jwk, |
| 180 | + did=agent_a_identity.did |
| 181 | + ) |
| 182 | + agent_a_token = agent_a_signer.sign( |
| 183 | + {'action': 'execute'}, |
| 184 | + parent_token=user_token, |
| 185 | + reputation_score=75 |
| 186 | + ) |
| 187 | + |
| 188 | + is_valid, passport = Verifier.verify( |
| 189 | + agent_a_token, |
| 190 | + public_key_jwk=agent_a_identity.public_key_jwk |
| 191 | + ) |
| 192 | + |
| 193 | + assert is_valid |
| 194 | + assert len(passport.delegation_chain) == 1 |
| 195 | + assert passport.reputation_score == 75 |
| 196 | + |
| 197 | + def test_delegation_link_has_intent(self, user_identity, agent_a_identity): |
| 198 | + """Test that delegation links capture the intent.""" |
| 199 | + user_signer = Signer( |
| 200 | + private_key=user_identity.private_key_jwk, |
| 201 | + did=user_identity.did |
| 202 | + ) |
| 203 | + user_token = user_signer.sign({'action': 'manage_files'}) |
| 204 | + |
| 205 | + agent_a_signer = Signer( |
| 206 | + private_key=agent_a_identity.private_key_jwk, |
| 207 | + did=agent_a_identity.did |
| 208 | + ) |
| 209 | + intent = {'action': 'read_file', 'path': '/data/report.pdf'} |
| 210 | + agent_a_token = agent_a_signer.sign(intent, parent_token=user_token) |
| 211 | + |
| 212 | + is_valid, passport = Verifier.verify( |
| 213 | + agent_a_token, |
| 214 | + public_key_jwk=agent_a_identity.public_key_jwk |
| 215 | + ) |
| 216 | + |
| 217 | + assert is_valid |
| 218 | + assert len(passport.delegation_chain) == 1 |
| 219 | + |
| 220 | + # The intent should be captured in the link |
| 221 | + link = passport.delegation_chain[0] |
| 222 | + assert 'read_file' in link.intent |
| 223 | + assert '/data/report.pdf' in link.intent |
| 224 | + |
| 225 | + |
| 226 | +class TestDelegationLinkDataclass: |
| 227 | + """Test DelegationLink dataclass.""" |
| 228 | + |
| 229 | + def test_delegation_link_creation(self): |
| 230 | + """Test creating a DelegationLink.""" |
| 231 | + link = DelegationLink( |
| 232 | + iss="did:web:alice.com", |
| 233 | + sub="did:web:agent.ai", |
| 234 | + intent='{"action": "read"}', |
| 235 | + iat=1704268800, |
| 236 | + signature="abc123" |
| 237 | + ) |
| 238 | + |
| 239 | + assert link.iss == "did:web:alice.com" |
| 240 | + assert link.sub == "did:web:agent.ai" |
| 241 | + assert link.intent == '{"action": "read"}' |
| 242 | + assert link.iat == 1704268800 |
| 243 | + assert link.signature == "abc123" |
0 commit comments