Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion packages/agent-mesh/src/agentmesh/trust/cards.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,18 +186,28 @@ class CardRegistry:
Registry for trusted agent cards.

Provides discovery and caching of verified cards.
Optionally integrates with a ``RevocationList`` to reject cards
whose agent DID has been revoked.
"""

def __init__(self, cache_ttl_seconds: int = 900):
def __init__(
self,
cache_ttl_seconds: int = 900,
revocation_list: Optional["RevocationList"] = None,
):
"""Initialise the card registry.

Args:
cache_ttl_seconds: Time-to-live in seconds for the
verification cache. Defaults to 900 (15 minutes).
revocation_list: Optional revocation list to check during
verification. When set, revoked agent DIDs fail
``is_verified()`` even if their signatures are valid.
"""
self._cards: Dict[str, TrustedAgentCard] = {}
self._verified_cache: Dict[str, tuple[bool, datetime]] = {}
self._cache_ttl = timedelta(seconds=cache_ttl_seconds)
self._revocation_list = revocation_list

def register(self, card: TrustedAgentCard) -> bool:
"""
Expand Down Expand Up @@ -229,12 +239,29 @@ def get(self, agent_did: str) -> Optional[TrustedAgentCard]:
"""
return self._cards.get(agent_did)

@property
def revocation_list(self) -> Optional["RevocationList"]:
"""The attached revocation list, if any."""
return self._revocation_list

@revocation_list.setter
def revocation_list(self, value: Optional["RevocationList"]) -> None:
self._revocation_list = value
self.clear_cache()

def is_verified(self, agent_did: str) -> bool:
"""
Check if a card is verified (with caching).

Uses TTL-based caching to avoid repeated verification.
Returns False if the agent DID is on the revocation list,
even if the cryptographic signature is valid.
"""
# Revocation check always runs (not cached — revocations are instant)
if self._revocation_list and self._revocation_list.is_revoked(agent_did):
self._verified_cache.pop(agent_did, None)
return False

if agent_did in self._verified_cache:
verified, timestamp = self._verified_cache[agent_did]
if datetime.now(timezone.utc) - timestamp < self._cache_ttl:
Expand Down
148 changes: 148 additions & 0 deletions packages/agent-mesh/tests/test_revocation_rotation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Tests for revocation integration with CardRegistry."""

import pytest
from datetime import datetime, timezone

from agentmesh.identity.agent_id import AgentIdentity
from agentmesh.identity.revocation import RevocationList, RevocationEntry
from agentmesh.trust.cards import TrustedAgentCard, CardRegistry


@pytest.fixture
def identity():
return AgentIdentity.create("test-agent", sponsor="test@example.com")


@pytest.fixture
def signed_card(identity):
card = TrustedAgentCard.from_identity(identity)
return card


class TestRevocationList:
def test_revoke_and_check(self):
rl = RevocationList()
rl.revoke("did:agent:bad", reason="compromised")
assert rl.is_revoked("did:agent:bad") is True
assert rl.is_revoked("did:agent:good") is False

def test_unrevoke(self):
rl = RevocationList()
rl.revoke("did:agent:temp", reason="testing")
assert rl.unrevoke("did:agent:temp") is True
assert rl.is_revoked("did:agent:temp") is False

def test_temporary_revocation_expires(self):
rl = RevocationList()
rl.revoke("did:agent:temp", reason="timeout", ttl_seconds=0)
# TTL=0 means it expires immediately
assert rl.is_revoked("did:agent:temp") is False

def test_list_revoked(self):
rl = RevocationList()
rl.revoke("did:agent:a", reason="r1")
rl.revoke("did:agent:b", reason="r2")
assert len(rl.list_revoked()) == 2

def test_cleanup_expired(self):
rl = RevocationList()
rl.revoke("did:agent:a", reason="r", ttl_seconds=0)
removed = rl.cleanup_expired()
assert removed == 1
assert len(rl) == 0


class TestCardRegistryRevocation:
def test_registry_without_revocation(self, signed_card):
registry = CardRegistry()
assert registry.register(signed_card) is True
assert registry.is_verified(signed_card.agent_did) is True

def test_registry_blocks_revoked_agent(self, signed_card):
rl = RevocationList()
rl.revoke(signed_card.agent_did, reason="compromised key")
registry = CardRegistry(revocation_list=rl)
registry.register(signed_card)
assert registry.is_verified(signed_card.agent_did) is False

def test_registry_allows_after_unrevoke(self, signed_card):
rl = RevocationList()
rl.revoke(signed_card.agent_did, reason="temporary")
registry = CardRegistry(revocation_list=rl)
registry.register(signed_card)
assert registry.is_verified(signed_card.agent_did) is False
rl.unrevoke(signed_card.agent_did)
assert registry.is_verified(signed_card.agent_did) is True

def test_setting_revocation_list_clears_cache(self, signed_card):
registry = CardRegistry()
registry.register(signed_card)
assert registry.is_verified(signed_card.agent_did) is True
rl = RevocationList()
rl.revoke(signed_card.agent_did, reason="late revoke")
registry.revocation_list = rl
assert registry.is_verified(signed_card.agent_did) is False

def test_revocation_list_property(self):
rl = RevocationList()
registry = CardRegistry(revocation_list=rl)
assert registry.revocation_list is rl


class TestKeyRotationManager:
def test_rotate_preserves_did(self):
from agentmesh.identity.rotation import KeyRotationManager

identity = AgentIdentity.create("rotate-agent", sponsor="test@example.com")
original_did = str(identity.did)
mgr = KeyRotationManager(identity, rotation_ttl_seconds=0)
mgr.rotate()
assert str(identity.did) == original_did

def test_rotate_changes_public_key(self):
from agentmesh.identity.rotation import KeyRotationManager

identity = AgentIdentity.create("rotate-agent", sponsor="test@example.com")
original_key = identity.public_key
mgr = KeyRotationManager(identity, rotation_ttl_seconds=0)
mgr.rotate()
assert identity.public_key != original_key

def test_key_history_tracked(self):
from agentmesh.identity.rotation import KeyRotationManager

identity = AgentIdentity.create("rotate-agent", sponsor="test@example.com")
mgr = KeyRotationManager(identity, rotation_ttl_seconds=0)
mgr.rotate()
mgr.rotate()
history = mgr.get_key_history()
assert len(history) == 2

def test_rotation_proof_valid(self):
from agentmesh.identity.rotation import KeyRotationManager

identity = AgentIdentity.create("rotate-agent", sponsor="test@example.com")
old_key = identity.public_key
mgr = KeyRotationManager(identity, rotation_ttl_seconds=0)
mgr.rotate()
new_key = identity.public_key
proof = mgr.get_rotation_proof()
assert KeyRotationManager.verify_rotation(old_key, new_key, proof) is True

def test_needs_rotation_respects_ttl(self):
from agentmesh.identity.rotation import KeyRotationManager

identity = AgentIdentity.create("rotate-agent", sponsor="test@example.com")
mgr = KeyRotationManager(identity, rotation_ttl_seconds=86400)
assert mgr.needs_rotation() is False

def test_max_history_trimmed(self):
from agentmesh.identity.rotation import KeyRotationManager

identity = AgentIdentity.create("rotate-agent", sponsor="test@example.com")
mgr = KeyRotationManager(identity, rotation_ttl_seconds=0, max_history=2)
for _ in range(5):
mgr.rotate()
assert len(mgr.get_key_history()) == 2
27 changes: 27 additions & 0 deletions packages/agent-os/src/agent_os/integrations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@
)
from .config import AgentOSConfig, get_config, reset_config
from .dry_run import DryRunCollector, DryRunDecision, DryRunPolicy, DryRunResult
from .escalation import (
ApprovalBackend,
DefaultTimeoutAction,
EscalationDecision,
EscalationHandler,
EscalationPolicy,
EscalationRequest,
EscalationResult,
InMemoryApprovalQueue,
WebhookApprovalBackend,
)
from .compat import CompatReport, check_compatibility, doctor, warn_on_import
from .health import ComponentHealth, HealthChecker, HealthReport, HealthStatus
from .logging import GovernanceLogger, JSONFormatter, get_logger
from .policy_compose import PolicyHierarchy, compose_policies, override_policy
Expand Down Expand Up @@ -167,6 +179,21 @@
"DryRunResult",
"DryRunDecision",
"DryRunCollector",
# Escalation (Human-in-the-Loop)
"EscalationPolicy",
"EscalationHandler",
"EscalationRequest",
"EscalationResult",
"EscalationDecision",
"DefaultTimeoutAction",
"ApprovalBackend",
"InMemoryApprovalQueue",
"WebhookApprovalBackend",
# Version Compatibility
"doctor",
"check_compatibility",
"CompatReport",
"warn_on_import",
# Rate Limiting
"RateLimiter",
"RateLimitStatus",
Expand Down
Loading
Loading