Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,68 @@ def _cache_result(self, did: str, result: TrustVerificationResult) -> None:
"""Cache a verification result."""
self._verified_peers[did] = (result, datetime.now(timezone.utc))

def _verify_scope_chain(
self,
peer_card: TrustedAgentCard,
) -> tuple[bool, str]:
"""Verify cryptographic validity and integrity of a peer scope chain.

The chain is anchored to this agent's identity as trust root and must
terminate at the peer card DID.
"""
if not peer_card.scope_chain or not peer_card.identity:
return True, ""

now = datetime.now(timezone.utc)
delegations = peer_card.scope_chain

# Anchor trust to this verifier's identity.
first = delegations[0]
if first.delegator != self.my_identity.did:
return False, "Scope chain root delegator does not match verifier identity"

# Chain must terminate at the advertised peer identity.
if delegations[-1].delegatee != peer_card.identity.did:
return False, "Scope chain does not terminate at peer identity"

known_public_keys: Dict[str, str] = {self.my_identity.did: self.my_identity.public_key}

for i, delegation in enumerate(delegations):
if delegation.expires_at and delegation.expires_at < now:
return False, f"Scope delegation at index {i} is expired"

if not delegation.signature:
return False, f"Scope delegation at index {i} is missing signature"

# Enforce deterministic linkage between adjacent delegations.
if i > 0:
prev = delegations[i - 1]
if delegation.delegator != prev.delegatee:
return False, f"Scope delegation linkage broken at index {i}"

expected_public_key = known_public_keys.get(delegation.delegator)
if expected_public_key and delegation.signature.public_key != expected_public_key:
return False, f"Delegator public key mismatch at index {i}"

delegation_data = json.dumps({
"delegator": delegation.delegator,
"delegatee": delegation.delegatee,
"capabilities": sorted(delegation.capabilities),
"expires_at": delegation.expires_at.isoformat() if delegation.expires_at else None,
}, sort_keys=True)

delegator_identity = VerificationIdentity(
did=delegation.delegator,
agent_name=f"delegator-{i}",
public_key=delegation.signature.public_key,
)
if not delegator_identity.verify_signature(delegation_data, delegation.signature):
return False, f"Invalid scope delegation signature at index {i}"

known_public_keys[delegation.delegator] = delegation.signature.public_key

return True, ""

def verify_peer(
self,
peer_card: TrustedAgentCard,
Expand Down Expand Up @@ -293,12 +355,15 @@ def verify_peer(

# Check scope chain if present
if peer_card.scope_chain:
# TODO: A full cryptographic verification of the scope chain is needed.
# This should verify the signature of each delegation and the integrity of the
# entire chain. The current check for expiration is insufficient.
warnings.append(
"Scope chain is present but its cryptographic validity is not verified."
)
scope_chain_valid, scope_chain_error = self._verify_scope_chain(peer_card)
if not scope_chain_valid:
return TrustVerificationResult(
trusted=False,
trust_score=peer_card.trust_score,
reason=scope_chain_error,
verified_capabilities=verified_caps,
warnings=warnings,
)

# All checks passed
result = TrustVerificationResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,135 @@ def test_cache_ttl(self):

assert result1.trusted == result2.trusted

def test_verify_valid_peer_scope_chain(self):
"""Valid cryptographic scope chain should be trusted."""
my_identity = VerificationIdentity.generate("my-agent")
peer_identity = VerificationIdentity.generate("peer-agent", ["required_cap"])

peer_card = TrustedAgentCard(
name="Peer Agent",
description="A peer",
capabilities=["required_cap"],
)
peer_card.sign(peer_identity)

chain = DelegationChain(my_identity)
chain.add_delegation(
delegatee=peer_card,
capabilities=["required_cap"],
expires_in_hours=24,
)
peer_card.scope_chain = chain.delegations

handshake = TrustHandshake(my_identity)
result = handshake.verify_peer(
peer_card,
required_capabilities=["required_cap"],
)

assert result.trusted
assert result.reason == "Verification successful"

def test_verify_peer_scope_chain_tampered_signature(self):
"""Tampering delegation payload should invalidate signature."""
my_identity = VerificationIdentity.generate("my-agent")
peer_identity = VerificationIdentity.generate("peer-agent", ["required_cap"])

peer_card = TrustedAgentCard(
name="Peer Agent",
description="A peer",
capabilities=["required_cap"],
)
peer_card.sign(peer_identity)

chain = DelegationChain(my_identity)
chain.add_delegation(
delegatee=peer_card,
capabilities=["required_cap"],
expires_in_hours=24,
)

# Mutate signed content after signature generation.
chain.delegations[0].capabilities.append("admin")
peer_card.scope_chain = chain.delegations

handshake = TrustHandshake(my_identity)
result = handshake.verify_peer(peer_card)

assert not result.trusted
assert "Invalid scope delegation signature" in result.reason

def test_verify_peer_scope_chain_tampered_linkage(self):
"""Broken delegation linkage should fail verification."""
my_identity = VerificationIdentity.generate("my-agent")
mid_identity = VerificationIdentity.generate("mid-agent")
peer_identity = VerificationIdentity.generate("peer-agent", ["required_cap"])

mid_card = TrustedAgentCard(
name="Mid Agent",
description="Delegation intermediary",
capabilities=["delegator"],
)
mid_card.sign(mid_identity)

peer_card = TrustedAgentCard(
name="Peer Agent",
description="Final peer",
capabilities=["required_cap"],
)
peer_card.sign(peer_identity)

chain = DelegationChain(my_identity)
chain.add_delegation(
delegatee=mid_card,
capabilities=["delegate"],
expires_in_hours=24,
)
chain.add_delegation(
delegatee=peer_card,
capabilities=["required_cap"],
expires_in_hours=24,
delegator_identity=mid_identity,
)

# Break did->did linkage between first delegatee and second delegator.
chain.delegations[1].delegator = "did:verification:maliciousdelegator"
peer_card.scope_chain = chain.delegations

handshake = TrustHandshake(my_identity)
result = handshake.verify_peer(peer_card)

assert not result.trusted
assert "Scope delegation linkage broken" in result.reason

def test_verify_peer_scope_chain_expired(self):
"""Expired delegations should fail verification."""
my_identity = VerificationIdentity.generate("my-agent")
peer_identity = VerificationIdentity.generate("peer-agent", ["required_cap"])

peer_card = TrustedAgentCard(
name="Peer Agent",
description="A peer",
capabilities=["required_cap"],
)
peer_card.sign(peer_identity)

chain = DelegationChain(my_identity)
chain.add_delegation(
delegatee=peer_card,
capabilities=["required_cap"],
expires_in_hours=24,
)

chain.delegations[0].expires_at = datetime.now(timezone.utc) - timedelta(hours=1)
peer_card.scope_chain = chain.delegations

handshake = TrustHandshake(my_identity)
result = handshake.verify_peer(peer_card)

assert not result.trusted
assert "is expired" in result.reason


class TestDelegationChain:
"""Tests for DelegationChain class."""
Expand Down
Loading