Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/agentmesh-integrations/langchain-agentmesh/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,24 @@ agent = create_agent(callbacks=[callback])
| `audit_logging` | False | Enable audit trail |
| `cache_ttl` | 900 | Verification cache TTL (seconds) |

### Scope Chain Verification

`TrustHandshake` now performs strict cryptographic verification for `scope_chain` when present.

- Canonical payload verification is used for delegation signatures.
- Delegation signatures must use `verification-Ed25519`.
- Chain integrity is enforced end-to-end (root anchoring, linkage, and peer termination).
- Expired delegations are rejected.
- Replay-hardening is applied through signature freshness checks and replay-window detection.

Policy knobs available in `TrustPolicy`:

- `max_delegation_signature_age_seconds` (default: 900)
- `max_signature_clock_skew_seconds` (default: 60)
- `replay_window_seconds` (default: 300)

Important: integrations that previously relied on warning-only behavior for invalid scope chains must provide valid chains to pass verification.

## Related

- [AgentMesh](https://github.com/microsoft/agent-governance-toolkit) - Core trust mesh platform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,56 @@
import json
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from hashlib import sha256
from typing import Any, Dict, List, Optional

from langchain_agentmesh.identity import VerificationIdentity, VerificationSignature, UserContext


def _canonical_json(data: Dict[str, Any]) -> str:
"""Serialize JSON deterministically for signing and verification."""
return json.dumps(data, sort_keys=True, separators=(",", ":"), ensure_ascii=False)


def _delegation_signing_payload(
delegator: str,
delegatee: str,
capabilities: List[str],
expires_at: Optional[datetime],
) -> str:
"""Build the canonical delegation payload used for signatures."""
payload = {
"delegator": delegator,
"delegatee": delegatee,
"capabilities": sorted(capabilities),
"expires_at": expires_at.isoformat() if expires_at else None,
}
return _canonical_json(payload)


def _scope_chain_fingerprint(scope_chain: List["Delegation"]) -> str:
"""Generate deterministic digest for replay-window tracking."""
payload = []
for delegation in scope_chain:
payload.append({
"delegator": delegation.delegator,
"delegatee": delegation.delegatee,
"capabilities": sorted(delegation.capabilities),
"expires_at": delegation.expires_at.isoformat() if delegation.expires_at else None,
"signature_algorithm": delegation.signature.algorithm if delegation.signature else None,
"signature_public_key": (
delegation.signature.public_key if delegation.signature else None
),
"signature": delegation.signature.signature if delegation.signature else None,
"signature_timestamp": (
delegation.signature.timestamp.isoformat()
if delegation.signature and delegation.signature.timestamp
else None
),
})
return sha256(_canonical_json({"scope_chain": payload}).encode("utf-8")).hexdigest()


@dataclass
class TrustPolicy:
"""Policy configuration for trust verification."""
Expand All @@ -24,6 +69,9 @@ class TrustPolicy:
audit_all_calls: bool = False
block_unverified: bool = True
cache_ttl_seconds: int = 900 # 15 minutes
max_delegation_signature_age_seconds: int = 900
max_signature_clock_skew_seconds: int = 60
replay_window_seconds: int = 300


@dataclass
Expand Down Expand Up @@ -208,6 +256,7 @@ def __init__(
self.policy = policy or TrustPolicy()
self._verified_peers: Dict[str, tuple[TrustVerificationResult, datetime]] = {}
self._cache_ttl = timedelta(seconds=self.policy.cache_ttl_seconds)
self._seen_scope_chains: Dict[str, datetime] = {}

def _get_cached_result(self, did: str) -> Optional[TrustVerificationResult]:
"""Get cached verification result if still valid."""
Expand All @@ -222,6 +271,109 @@ def _cache_result(self, did: str, result: TrustVerificationResult) -> None:
"""Cache a verification result."""
self._verified_peers[did] = (result, datetime.now(timezone.utc))

def _prune_seen_scope_chains(self, now: datetime) -> None:
"""Remove scope chain fingerprints outside replay window."""
replay_window = timedelta(seconds=self.policy.replay_window_seconds)
expired = [
fingerprint
for fingerprint, seen_at in self._seen_scope_chains.items()
if now - seen_at > replay_window
]
for fingerprint in expired:
del self._seen_scope_chains[fingerprint]

def _verify_scope_chain(
self,
peer_card: TrustedAgentCard,
) -> tuple[bool, str]:
"""Verify cryptographic validity and integrity of a peer scope chain.

The chain is anchored to this agent's identity as trust root and must
terminate at the peer card DID.
"""
if not peer_card.scope_chain or not peer_card.identity:
return True, ""

now = datetime.now(timezone.utc)
delegations = peer_card.scope_chain

# Anchor trust to this verifier's identity.
first = delegations[0]
if first.delegator != self.my_identity.did:
return False, "Scope chain error: root delegator does not match verifier identity"

# Chain must terminate at the advertised peer identity.
if delegations[-1].delegatee != peer_card.identity.did:
return False, "Scope chain error: chain does not terminate at peer identity"

scope_chain_fingerprint = _scope_chain_fingerprint(delegations)
self._prune_seen_scope_chains(now)
if scope_chain_fingerprint in self._seen_scope_chains:
return False, "Scope chain error: replay detected within replay window"

known_public_keys: Dict[str, str] = {self.my_identity.did: self.my_identity.public_key}

for i, delegation in enumerate(delegations):
if not delegation.delegator.startswith("did:verification:"):
return False, f"Scope chain error at index {i}: invalid delegator DID"

if not delegation.delegatee.startswith("did:verification:"):
return False, f"Scope chain error at index {i}: invalid delegatee DID"

if delegation.expires_at and delegation.expires_at < now:
return False, f"Scope chain error at index {i}: delegation is expired"

if not delegation.signature:
return False, f"Scope chain error at index {i}: missing signature"

if delegation.signature.algorithm != "verification-Ed25519":
return False, f"Scope chain error at index {i}: unsupported signature algorithm"

signature_timestamp = delegation.signature.timestamp
if signature_timestamp.tzinfo is None:
signature_timestamp = signature_timestamp.replace(tzinfo=timezone.utc)

if signature_timestamp > now + timedelta(
seconds=self.policy.max_signature_clock_skew_seconds
):
return False, f"Scope chain error at index {i}: signature timestamp is in the future"

if now - signature_timestamp > timedelta(
seconds=self.policy.max_delegation_signature_age_seconds
):
return False, f"Scope chain error at index {i}: signature is stale"

# Enforce deterministic linkage between adjacent delegations.
if i > 0:
prev = delegations[i - 1]
if delegation.delegator != prev.delegatee:
return False, f"Scope chain error at index {i}: delegation linkage broken"

expected_public_key = known_public_keys.get(delegation.delegator)
if expected_public_key and delegation.signature.public_key != expected_public_key:
return False, f"Scope chain error at index {i}: delegator public key mismatch"

delegation_data = _delegation_signing_payload(
delegation.delegator,
delegation.delegatee,
delegation.capabilities,
delegation.expires_at,
)

delegator_identity = VerificationIdentity(
did=delegation.delegator,
agent_name=f"delegator-{i}",
public_key=delegation.signature.public_key,
)
if not delegator_identity.verify_signature(delegation_data, delegation.signature):
return False, f"Scope chain error at index {i}: invalid delegation signature"

known_public_keys[delegation.delegator] = delegation.signature.public_key

self._seen_scope_chains[scope_chain_fingerprint] = now

return True, ""

def verify_peer(
self,
peer_card: TrustedAgentCard,
Expand Down Expand Up @@ -293,12 +445,15 @@ def verify_peer(

# Check scope chain if present
if peer_card.scope_chain:
# TODO: A full cryptographic verification of the scope chain is needed.
# This should verify the signature of each delegation and the integrity of the
# entire chain. The current check for expiration is insufficient.
warnings.append(
"Scope chain is present but its cryptographic validity is not verified."
)
scope_chain_valid, scope_chain_error = self._verify_scope_chain(peer_card)
if not scope_chain_valid:
return TrustVerificationResult(
trusted=False,
trust_score=peer_card.trust_score,
reason=scope_chain_error,
verified_capabilities=verified_caps,
warnings=warnings,
)

# All checks passed
result = TrustVerificationResult(
Expand All @@ -317,6 +472,7 @@ def verify_peer(
def clear_cache(self) -> None:
"""Clear all cached verification results."""
self._verified_peers.clear()
self._seen_scope_chains.clear()


class DelegationChain:
Expand Down Expand Up @@ -356,7 +512,9 @@ def add_delegation(
ValueError: If delegatee lacks identity
"""
if not delegatee.identity:
raise ValueError("Delegatee must have a VerificationIdentity to be part of a delegation")
raise ValueError(
"Delegatee must have a VerificationIdentity to be part of a delegation"
)

delegator = delegator_identity or self.root_identity
delegatee_did = delegatee.identity.did
Expand All @@ -366,12 +524,12 @@ def add_delegation(
expires_at = datetime.now(timezone.utc) + timedelta(hours=expires_in_hours)

# Create delegation data for signing
delegation_data = json.dumps({
"delegator": delegator.did,
"delegatee": delegatee_did,
"capabilities": sorted(capabilities),
"expires_at": expires_at.isoformat() if expires_at else None,
}, sort_keys=True)
delegation_data = _delegation_signing_payload(
delegator.did,
delegatee_did,
capabilities,
expires_at,
)

# Sign with delegator's identity
signature = delegator.sign(delegation_data)
Expand Down Expand Up @@ -415,12 +573,15 @@ def verify(self) -> bool:
return False

# Verify delegation signature
delegation_data = json.dumps({
"delegator": delegation.delegator,
"delegatee": delegation.delegatee,
"capabilities": sorted(delegation.capabilities),
"expires_at": delegation.expires_at.isoformat() if delegation.expires_at else None,
}, sort_keys=True)
if delegation.signature.algorithm != "verification-Ed25519":
return False

delegation_data = _delegation_signing_payload(
delegation.delegator,
delegation.delegatee,
delegation.capabilities,
delegation.expires_at,
)

if not delegator_identity.verify_signature(delegation_data, delegation.signature):
return False
Expand Down
Loading
Loading