diff --git a/docs/tutorials/01-policy-engine.md b/docs/tutorials/01-policy-engine.md new file mode 100644 index 00000000..61d5d712 --- /dev/null +++ b/docs/tutorials/01-policy-engine.md @@ -0,0 +1,800 @@ +# Tutorial: Policy Engine + +The policy engine is the governance backbone of the Agent Governance Toolkit. It +evaluates declarative YAML rules against runtime context and returns +allow/deny/audit/block decisions—before an agent ever touches a tool or sends a +response. + +**What you'll learn:** + +| Section | Topic | +|---------|-------| +| [Quick Start](#quick-start) | Evaluate your first policy in 5 lines | +| [Policy YAML Syntax](#policy-yaml-syntax) | Full rule and operator reference | +| [GovernancePolicy Dataclass](#governancepolicy-dataclass) | Programmatic policy configuration | +| [Conflict Resolution](#conflict-resolution-strategies) | 4 strategies for competing rules | +| [Advanced Patterns](#advanced-patterns) | Regex/glob blocking, policy composition | +| [Middleware Integration](#integration-with-middleware) | Wire policies into an MAF agent | + +--- + +## Installation + +```bash +pip install agent-os-kernel # core package +pip install agent-os-kernel[nexus] # adds YAML policy support +pip install agent-os-kernel[full] # everything (recommended for tutorials) +``` + +--- + +## Quick Start + +```python +from agent_os.policies import PolicyEvaluator + +evaluator = PolicyEvaluator() +evaluator.load_policies("./policies/") # loads every .yaml/.yml in the dir +decision = evaluator.evaluate({"tool_name": "execute_code", "token_count": 500}) +print(decision.allowed, decision.reason) # False, "Code execution is blocked …" +``` + +That's it. Four moving parts: **load → build context → evaluate → act on +decision**. + +--- + +## Policy YAML Syntax + +Every policy file follows the same schema: + +```yaml +version: "1.0" +name: my-policy +description: What this policy enforces + +rules: + - name: rule-name + condition: + field: + operator: + value: + action: allow | deny | audit | block + priority: 100 # higher = evaluated first + message: Human-readable explanation + +defaults: + action: allow # fallback when no rule matches + max_tokens: 4096 + max_tool_calls: 10 + confidence_threshold: 0.8 +``` + +### Actions + +| Action | Behaviour | +|--------|-----------| +| `allow` | Permit the request. `decision.allowed = True`. | +| `deny` | Reject the request. `decision.allowed = False`. | +| `audit` | Permit but log. `decision.allowed = True`, entry written to audit trail. | +| `block` | Hard block with message. `decision.allowed = False`, message surfaced to caller. | + +### Operators — Complete Reference + +#### `eq` — Equality + +```yaml +- name: block-code-execution + condition: + field: tool_name + operator: eq + value: execute_code + action: block + priority: 100 + message: Code execution is blocked in production +``` + +#### `ne` — Not Equal + +```yaml +- name: audit-non-search-tools + condition: + field: tool_name + operator: ne + value: "" + action: audit + priority: 50 + message: Auditing tool call for compliance +``` + +#### `gt` — Greater Than + +```yaml +- name: token-limit + condition: + field: token_count + operator: gt + value: 4096 + action: deny + priority: 100 + message: Token count exceeds the default limit of 4096 +``` + +#### `lt` — Less Than + +```yaml +- name: low-confidence + condition: + field: confidence + operator: lt + value: 0.8 + action: deny + priority: 90 + message: Confidence score is below the minimum threshold of 0.8 +``` + +#### `gte` — Greater Than or Equal + +```yaml +- name: audit-all-messages + condition: + field: message_count + operator: gte + value: 0 + action: audit + priority: 10 + message: All agent actions are audit-logged +``` + +#### `lte` — Less Than or Equal + +```yaml +- name: allow-small-requests + condition: + field: token_count + operator: lte + value: 256 + action: allow + priority: 80 + message: Small requests are always allowed +``` + +#### `in` — Value In List + +```yaml +- name: allow-safe-tools + condition: + field: tool_name + operator: in + value: [web_search, read_file, summarize] + action: allow + priority: 70 + message: Tool is on the approved list +``` + +#### `contains` — Substring Match + +```yaml +- name: block-secrets-access + condition: + field: message + operator: contains + value: "secrets" + action: deny + priority: 100 + message: Access to secret resources is restricted by governance policy +``` + +#### `matches` — Regex Match + +```yaml +- name: block-sql-injection + condition: + field: message + operator: matches + value: "(?i)(drop|delete|truncate)\\s+table" + action: block + priority: 100 + message: Potential SQL injection detected +``` + +### Real-World Policy Files + +**Production — strict.yaml** + +```yaml +version: "1.0" +name: strict +description: Production safety policy with tight limits and audit requirements + +rules: + - name: max_tokens + condition: + field: token_count + operator: gt + value: 2048 + action: deny + priority: 100 + message: Token count exceeds production limit of 2048 + + - name: max_tool_calls + condition: + field: tool_call_count + operator: gt + value: 5 + action: deny + priority: 99 + message: Tool call count exceeds production limit of 5 + + - name: block_exec + condition: + field: tool_name + operator: eq + value: execute_code + action: block + priority: 98 + message: Code execution is blocked in production + + - name: block_shell + condition: + field: tool_name + operator: eq + value: run_shell + action: block + priority: 97 + message: Shell access is blocked in production + + - name: confidence_threshold + condition: + field: confidence + operator: lt + value: 0.95 + action: deny + priority: 90 + message: Confidence score is below the production threshold of 0.95 + + - name: audit_all_tool_calls + condition: + field: tool_name + operator: ne + value: "" + action: audit + priority: 50 + message: Auditing tool call for compliance + +defaults: + action: deny + max_tokens: 2048 + max_tool_calls: 5 + confidence_threshold: 0.95 +``` + +**Development — development.yaml** + +```yaml +version: "1.0" +name: development +description: Relaxed policy for local development and experimentation + +rules: + - name: max_tokens + condition: + field: token_count + operator: gt + value: 16384 + action: deny + priority: 100 + message: Token count exceeds generous dev limit of 16384 + + - name: max_tool_calls + condition: + field: tool_call_count + operator: gt + value: 50 + action: deny + priority: 99 + message: Tool call count exceeds dev limit of 50 + +defaults: + action: allow + max_tokens: 16384 + max_tool_calls: 50 + confidence_threshold: 0.5 +``` + +--- + +## Building Policies in Python + +You don't have to use YAML. Build policies programmatically when you need +dynamic rules: + +```python +from agent_os.policies import ( + PolicyDocument, + PolicyRule, + PolicyCondition, + PolicyAction, + PolicyOperator, + PolicyDefaults, + PolicyEvaluator, +) + +rule = PolicyRule( + name="block_code_execution", + condition=PolicyCondition( + field="tool_name", + operator=PolicyOperator.EQ, + value="execute_code", + ), + action=PolicyAction.DENY, + priority=100, + message="Code execution is blocked in production", +) + +policy = PolicyDocument( + name="production_safety", + description="Safe production policy", + rules=[rule], + defaults=PolicyDefaults( + action=PolicyAction.ALLOW, + max_tokens=2048, + max_tool_calls=5, + confidence_threshold=0.95, + ), +) + +# Serialize to YAML for version control +policy.to_yaml("policies/production_safety.yaml") + +# Or evaluate directly +evaluator = PolicyEvaluator([policy]) +decision = evaluator.evaluate({"tool_name": "execute_code"}) +assert not decision.allowed +print(decision.reason) # "Code execution is blocked in production" +``` + +### PolicyDecision Object + +Every call to `evaluator.evaluate()` returns a `PolicyDecision`: + +| Field | Type | Description | +|-------|------|-------------| +| `allowed` | `bool` | Whether the action is permitted. Default `True`. | +| `matched_rule` | `str \| None` | Name of the rule that fired. `None` if defaults applied. | +| `action` | `str` | The action taken: `allow`, `deny`, `audit`, or `block`. | +| `reason` | `str` | Human-readable explanation. | +| `audit_entry` | `dict` | Structured audit data (policy name, timestamp, context snapshot). | + +```python +decision = evaluator.evaluate(context) + +# Audit entry structure +# { +# "policy": "production_safety", +# "rule": "block_code_execution", +# "action": "deny", +# "context_snapshot": { ... }, +# "timestamp": "2025-01-15T10:30:00Z", +# "error": False, +# } +``` + +--- + +## GovernancePolicy Dataclass + +`GovernancePolicy` is a higher-level dataclass in `agent_os.integrations.base` +that bundles constraints, thresholds, and audit settings into a single +configuration object. Use it when you need more than rule-based evaluation— +tool allowlists, pattern blocking, drift detection, and concurrency controls. + +```python +from agent_os.integrations.base import GovernancePolicy, PatternType + +policy = GovernancePolicy( + name="production", + max_tokens=2048, + max_tool_calls=5, + allowed_tools=["web_search", "read_file"], + blocked_patterns=[ + "password", # substring match (default) + ("rm\\s+-rf", PatternType.REGEX), # regex match + ("*.exe", PatternType.GLOB), # glob match + ], + require_human_approval=False, + timeout_seconds=120, + confidence_threshold=0.95, + drift_threshold=0.10, + log_all_calls=True, + checkpoint_frequency=3, + max_concurrent=5, + backpressure_threshold=4, + version="2.0.0", +) + +# Validate the policy (raises ValueError on invalid config) +policy.validate() +``` + +### Full Field Reference + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `name` | `str` | `"default"` | Policy identifier | +| `max_tokens` | `int` | `4096` | Max tokens per request (must be > 0) | +| `max_tool_calls` | `int` | `10` | Max tool invocations per request (≥ 0) | +| `allowed_tools` | `list[str]` | `[]` | Tool allowlist; empty = all allowed | +| `blocked_patterns` | `list[str \| tuple[str, PatternType]]` | `[]` | Content patterns to block | +| `require_human_approval` | `bool` | `False` | Require human sign-off before execution | +| `timeout_seconds` | `int` | `300` | Max wall-clock time (> 0) | +| `confidence_threshold` | `float` | `0.8` | Minimum confidence score \[0.0–1.0\] | +| `drift_threshold` | `float` | `0.15` | Maximum semantic drift \[0.0–1.0\] | +| `log_all_calls` | `bool` | `True` | Log every tool call to audit trail | +| `checkpoint_frequency` | `int` | `5` | Checkpoint every N tool calls (> 0) | +| `max_concurrent` | `int` | `10` | Max simultaneous executions (> 0) | +| `backpressure_threshold` | `int` | `8` | Start throttling at this level (> 0, < `max_concurrent`) | +| `version` | `str` | `"1.0.0"` | Semantic version for policy tracking | + +### PatternType Enum + +| Value | Behaviour | +|-------|-----------| +| `PatternType.SUBSTRING` | Simple substring match (default when you pass a plain `str`). | +| `PatternType.REGEX` | Compiled regex, case-insensitive. | +| `PatternType.GLOB` | Glob pattern (e.g., `*.exe`, `secret_*`). | + +### Key Methods + +```python +# Check for pattern matches in text +matches = policy.matches_pattern("please run rm -rf /tmp") +# Returns: ["rm\\s+-rf"] + +# Detect conflicting settings +warnings = policy.detect_conflicts() +# e.g., ["backpressure_threshold >= max_concurrent"] + +# Compare policies +base = GovernancePolicy() +print(policy.is_stricter_than(base)) # True +print(policy.format_diff(base)) # Human-readable diff + +# Serialize / deserialize +policy.save("policies/production.yaml") +loaded = GovernancePolicy.load("policies/production.yaml") + +yaml_str = policy.to_yaml() +restored = GovernancePolicy.from_yaml(yaml_str) +``` + +--- + +## Conflict Resolution Strategies + +When multiple policies apply to the same request, their rules can conflict. The +`PolicyConflictResolver` in `agentmesh.governance.conflict_resolution` resolves +these with one of four strategies. + +```python +from agentmesh.governance.conflict_resolution import ( + PolicyConflictResolver, + ConflictResolutionStrategy, + CandidateDecision, + PolicyScope, +) +``` + +### The Four Strategies + +#### 1. `deny_overrides` — Safety First + +Any deny wins. Among multiple denies, highest priority wins. + +**Use when:** You want a default-allow posture with hard deny guardrails. This +is the safest choice for most enterprise deployments. + +```python +resolver = PolicyConflictResolver(ConflictResolutionStrategy.DENY_OVERRIDES) + +candidates = [ + CandidateDecision( + action="allow", priority=50, + scope=PolicyScope.GLOBAL, rule_name="allow_web_search", + ), + CandidateDecision( + action="deny", priority=10, + scope=PolicyScope.AGENT, rule_name="block_internal_access", + ), +] + +result = resolver.resolve(candidates) +assert result.winning_decision.action == "deny" # deny always wins +assert result.conflict_detected is True +``` + +#### 2. `allow_overrides` — Permissive with Exceptions + +Any allow wins. Among multiple allows, highest priority wins. + +**Use when:** Your baseline is deny-all and you grant explicit exceptions per +agent or team. + +```python +resolver = PolicyConflictResolver(ConflictResolutionStrategy.ALLOW_OVERRIDES) + +candidates = [ + CandidateDecision(action="deny", priority=100, scope=PolicyScope.GLOBAL, rule_name="deny_all"), + CandidateDecision(action="allow", priority=50, scope=PolicyScope.AGENT, rule_name="research_exception"), +] + +result = resolver.resolve(candidates) +assert result.winning_decision.action == "allow" # allow overrides +``` + +#### 3. `priority_first_match` — Highest Priority Wins + +The candidate with the highest numeric priority wins, regardless of action. +This is the **default strategy** and mirrors how `PolicyEvaluator` resolves +rules within a single policy. + +**Use when:** You want predictable, priority-ordered evaluation across +policies. + +```python +resolver = PolicyConflictResolver(ConflictResolutionStrategy.PRIORITY_FIRST_MATCH) + +candidates = [ + CandidateDecision(action="allow", priority=50, rule_name="general_allow"), + CandidateDecision(action="deny", priority=100, rule_name="high_priority_deny"), +] + +result = resolver.resolve(candidates) +assert result.winning_decision.rule_name == "high_priority_deny" +``` + +#### 4. `most_specific_wins` — Scope-Based Resolution + +More specific scopes override broader ones: **Agent > Tenant > Global**. +Priority breaks ties within the same scope. + +**Use when:** You have a multi-tenant setup where team-level or agent-level +policies should override organization-wide defaults. + +```python +resolver = PolicyConflictResolver(ConflictResolutionStrategy.MOST_SPECIFIC_WINS) + +candidates = [ + CandidateDecision( + action="deny", priority=100, + scope=PolicyScope.GLOBAL, rule_name="org_wide_deny", + ), + CandidateDecision( + action="allow", priority=50, + scope=PolicyScope.AGENT, rule_name="agent_exception", + ), +] + +result = resolver.resolve(candidates) +assert result.winning_decision.rule_name == "agent_exception" # agent scope wins +``` + +### ResolutionResult + +Every `resolve()` call returns a `ResolutionResult`: + +| Field | Type | Description | +|-------|------|-------------| +| `winning_decision` | `CandidateDecision` | The decision that prevailed. | +| `strategy_used` | `ConflictResolutionStrategy` | Which strategy was applied. | +| `candidates_evaluated` | `int` | Number of candidates considered. | +| `conflict_detected` | `bool` | `True` if there was a mix of allow and deny candidates. | +| `resolution_trace` | `list[str]` | Step-by-step log of the resolution logic. | + +```python +for line in result.resolution_trace: + print(line) +# "Evaluating 2 candidates with deny_overrides strategy" +# "Found 1 deny candidate(s) — deny overrides" +# "Winner: block_internal_access (deny, priority=10, scope=agent)" +``` + +### Strategy Selection Guide + +| Scenario | Recommended Strategy | +|----------|---------------------| +| Enterprise default with deny guardrails | `deny_overrides` | +| Zero-trust baseline with explicit grants | `allow_overrides` | +| Single-policy system or backward compat | `priority_first_match` | +| Multi-tenant with org → team → agent layering | `most_specific_wins` | + +--- + +## Advanced Patterns + +### Blocked Patterns with Regex and Glob + +`GovernancePolicy.blocked_patterns` accepts plain strings (substring), regex +tuples, and glob tuples. All three can be mixed in a single policy. + +```python +from agent_os.integrations.base import GovernancePolicy, PatternType + +policy = GovernancePolicy( + name="content-filter", + blocked_patterns=[ + # Substring — matches anywhere in text + "password", + "api_key", + + # Regex — case-insensitive compiled pattern + (r"Bearer\s+[A-Za-z0-9\-._~+/]+=*", PatternType.REGEX), # JWT tokens + (r"rm\s+-rf\s+/", PatternType.REGEX), # destructive commands + (r"(?i)drop\s+table", PatternType.REGEX), # SQL injection + + # Glob — shell-style wildcards + ("*.exe", PatternType.GLOB), + ("secret_*", PatternType.GLOB), + ], +) + +# Check if text triggers any pattern +matches = policy.matches_pattern("please delete Bearer eyJhbGciOi... from the cache") +print(matches) # ["Bearer\\s+[A-Za-z0-9\\-._~+/]+=*"] +``` + +### Policy Composition and Comparison + +Create a base policy and derive stricter variants. Use `diff()` and +`is_stricter_than()` to verify invariants in CI: + +```python +base = GovernancePolicy(name="base", max_tokens=4096, max_tool_calls=10) + +production = GovernancePolicy( + name="production", + max_tokens=2048, + max_tool_calls=5, + allowed_tools=["web_search", "read_file"], + confidence_threshold=0.95, + require_human_approval=True, +) + +# Verify production is stricter +assert production.is_stricter_than(base) + +# Show what changed +diff = production.diff(base) +for field, (prod_val, base_val) in diff.items(): + print(f" {field}: {base_val} → {prod_val}") +# max_tokens: 4096 → 2048 +# max_tool_calls: 10 → 5 +# confidence_threshold: 0.8 → 0.95 +# require_human_approval: False → True +# allowed_tools: [] → ['web_search', 'read_file'] +``` + +### Loading Policies from Multiple Directories + +`PolicyEvaluator.load_policies()` can be called multiple times. Rules from all +loaded documents are merged and sorted by priority: + +```python +evaluator = PolicyEvaluator() +evaluator.load_policies("./policies/global/") +evaluator.load_policies("./policies/team-specific/") +evaluator.load_policies("./policies/agent-overrides/") + +# All rules from all directories are evaluated together. +# Highest-priority rule across all files wins. +decision = evaluator.evaluate(context) +``` + +--- + +## Integration with Middleware + +The `GovernancePolicyMiddleware` plugs into the Microsoft Agent Framework (MAF) +middleware pipeline. Every agent invocation passes through the middleware stack +before execution. + +### Quick Middleware Setup + +```python +from agent_os.policies import PolicyEvaluator +from agent_os.integrations.maf_adapter import GovernancePolicyMiddleware + +evaluator = PolicyEvaluator() +evaluator.load_policies("./policies/") + +middleware = GovernancePolicyMiddleware(evaluator=evaluator) +``` + +### Full Governance Stack with Factory + +`create_governance_middleware()` assembles the complete stack in the correct +order: + +```python +from agent_os.integrations.maf_adapter import create_governance_middleware + +stack = create_governance_middleware( + policy_directory="./policies/", + allowed_tools=["web_search", "read_file"], + denied_tools=["execute_code", "run_shell"], + agent_id="research-agent", + enable_rogue_detection=True, +) +``` + +The factory returns an ordered list of middleware (evaluated bottom-up): + +| Order | Middleware | Purpose | +|-------|-----------|---------| +| 1 | `AuditTrailMiddleware` | Pre/post execution audit entries with timing | +| 2 | `GovernancePolicyMiddleware` | Declarative YAML policy evaluation | +| 3 | `CapabilityGuardMiddleware` | Tool allow/deny list enforcement | +| 4 | `RogueDetectionMiddleware` | Anomaly detection on tool invocations | + +### Wiring into an Agent + +```python +from agent_framework import Agent +from agent_os.integrations.maf_adapter import create_governance_middleware + +stack = create_governance_middleware( + policy_directory="./policies/", + allowed_tools=["web_search", "read_file"], + denied_tools=["execute_code"], + agent_id="research-agent", + enable_rogue_detection=True, +) + +agent = Agent( + name="researcher", + instructions="You are a research assistant.", + middleware=stack, +) +``` + +### What Happens at Runtime + +1. Agent receives an invocation. +2. **AuditTrailMiddleware** writes a pre-execution audit entry. +3. **GovernancePolicyMiddleware** builds a context dict from the incoming + message (`{agent, message, timestamp, stream, message_count}`) and calls + `evaluator.evaluate(context)`. + - If **denied**: sets an `AgentResponse` with the denial reason, logs to + audit, and raises `MiddlewareTermination`. The agent never executes. + - If **allowed**: stores the `PolicyDecision` in + `context.metadata["governance_decision"]` and proceeds. +4. **CapabilityGuardMiddleware** checks each tool call against `allowed_tools` + and `denied_tools`. Denied tools are blocked (`denied_tools` takes + precedence over `allowed_tools`). +5. **RogueDetectionMiddleware** feeds each tool invocation to + `RogueAgentDetector` and blocks high-risk calls. +6. **AuditTrailMiddleware** writes a post-execution entry with timing. + +### Accessing Governance Decisions Downstream + +After middleware runs, the decision is available in context metadata: + +```python +decision = context.metadata.get("governance_decision") +if decision: + print(f"Policy: {decision.action}, Rule: {decision.matched_rule}") +``` + +--- + +## Source Files + +| Component | Location | +|-----------|----------| +| Schema models | `packages/agent-os/src/agent_os/policies/schema.py` | +| Evaluator | `packages/agent-os/src/agent_os/policies/evaluator.py` | +| GovernancePolicy | `packages/agent-os/src/agent_os/integrations/base.py` | +| MAF middleware | `packages/agent-os/src/agent_os/integrations/maf_adapter.py` | +| Conflict resolution | `packages/agent-mesh/src/agentmesh/governance/conflict_resolution.py` | +| Policy examples | `packages/agent-os/examples/policies/` | +| Research demo | `demo/policies/research_policy.yaml` | diff --git a/docs/tutorials/02-trust-and-identity.md b/docs/tutorials/02-trust-and-identity.md new file mode 100644 index 00000000..4a09f1aa --- /dev/null +++ b/docs/tutorials/02-trust-and-identity.md @@ -0,0 +1,685 @@ +# Tutorial 02 — Trust and Identity + +## Building Verifiable Agent Identity and Dynamic Trust + +**Prerequisites:** `pip install agentmesh-platform` +**Modules:** `agentmesh.identity`, `agentmesh.trust`, `agentmesh.governance` + +--- + +## 1. Introduction — Why Agent Identity Matters + +In a multi-agent system, any agent can claim to be anything. Without +cryptographic identity and continuous trust evaluation, you have no way to +answer three critical questions: + +1. **Who is this agent?** — Verified identity via decentralized identifiers (DIDs) +2. **Should I trust it?** — Dynamic trust scoring based on observed behavior +3. **What can it do?** — Capability-scoped, time-limited credentials + +The Agent Governance Toolkit solves this with three layers: + +| Layer | Purpose | Key Classes | +|-------|---------|-------------| +| **Identity** | Cryptographic agent IDs, Ed25519 keys, human sponsors | `AgentIdentity`, `AgentDID`, `HumanSponsor` | +| **Trust** | Handshake protocol, peer verification, capability scoping | `TrustHandshake`, `TrustBridge`, `CapabilityRegistry` | +| **Governance** | Policy enforcement, risk scoring, audit trail | `RiskScorer`, `TrustPolicy`, `AuditLog` | + +Every agent gets a `did:mesh:*` identifier bound to an Ed25519 key pair, +a human sponsor for accountability, and short-lived credentials (15-minute TTL) +that auto-rotate and can be revoked instantly. + +--- + +## 2. Quick Start — Register an Agent and Get a Trust Score + +```python +from agentmesh import AgentIdentity, RiskScorer + +# Create an agent with a human sponsor +agent = AgentIdentity.create( + name="DataProcessor", + sponsor="alice@company.com", + capabilities=["read:data", "write:reports"], + organization="Analytics", +) + +print(agent.did) # did:mesh:a1b2c3d4e5f6... +print(agent.public_key) # Base64-encoded Ed25519 public key +print(agent.status) # "active" + +# Check the agent's risk score (0-1000, higher = safer) +scorer = RiskScorer() +score = scorer.get_score(str(agent.did)) + +print(score.total_score) # 500 (default starting score) +print(score.risk_level) # "medium" +``` + +That's it — your agent now has a cryptographic identity and a baseline trust +score. The rest of this tutorial explains how each piece works. + +--- + +## 3. DID and Key Management + +### 3.1 Decentralized Identifiers + +Every agent receives a DID in the format `did:mesh:`, where the +unique ID is a SHA-256 hash derived from the agent's name and organization. + +```python +from agentmesh import AgentDID + +# Generate a DID +did = AgentDID.generate(name="ReportWriter", org="Analytics") +print(did) # did:mesh:7f3a... + +# Parse an existing DID string +did = AgentDID.from_string("did:mesh:7f3a9b2c...") +print(did.method) # "mesh" +print(did.unique_id) # "7f3a9b2c..." +``` + +### 3.2 Ed25519 Key Pairs + +Agent identities are backed by Ed25519 elliptic curve keys. The private key +never leaves the agent; only the public key is shared. + +```python +agent = AgentIdentity.create( + name="Signer", + sponsor="bob@company.com", +) + +# Sign data +signature = agent.sign(b"payload to authenticate") + +# Verify signature (any party with the public key can do this) +is_valid = agent.verify_signature(b"payload to authenticate", signature) +print(is_valid) # True + +# Export as JWK (JSON Web Key) for interoperability +jwk = agent.to_jwk(include_private=False) + +# Export as a W3C DID Document +did_doc = agent.to_did_document() +``` + +### 3.3 Human Sponsors + +Every agent must have a human sponsor — the person accountable for the agent's +behavior. This prevents "orphan agents" from operating without oversight. + +```python +from agentmesh import HumanSponsor + +sponsor = HumanSponsor.create( + email="alice@company.com", + name="Alice", + organization="Analytics", + allowed_capabilities=["read:data", "write:reports", "execute:analysis"], +) + +# Verify the sponsor (typically via email or SSO) +sponsor.verify(method="email") + +# Sponsors have limits +print(sponsor.max_agents) # 10 +print(sponsor.max_delegation_depth) # 3 + +# Check before creating a new agent +if sponsor.can_sponsor_agent(): + agent = AgentIdentity.create( + name="NewAgent", + sponsor=sponsor.email, + capabilities=["read:data"], + ) +``` + +### 3.4 Delegation — Creating Child Agents + +An agent can delegate a subset of its capabilities to a child agent. +Capabilities can only **narrow**, never expand. + +```python +# Parent agent with broad capabilities +parent = AgentIdentity.create( + name="OrchestratorAgent", + sponsor="alice@company.com", + capabilities=["read:data", "write:reports", "execute:analysis"], +) + +# Delegate a narrower set to a child +child = parent.delegate( + name="ReportWriter", + capabilities=["write:reports"], # Must be a subset of parent's +) + +print(child.parent_did) # Parent's DID +print(child.has_capability("write:reports")) # True +print(child.has_capability("read:data")) # False — not delegated +``` + +--- + +## 4. Trust Scoring System + +### 4.1 Risk Score Components + +Trust is measured through a `RiskScore` on a 0–1000 scale (higher = safer). +The score is composed of four weighted dimensions: + +| Component | Weight | What It Measures | +|-----------|--------|-----------------| +| `identity_score` | 25% | Identity verification strength, sponsor status | +| `behavior_score` | 20% | Behavioral patterns, anomaly detection | +| `network_score` | 15% | Network activity, communication patterns | +| `compliance_score` | 25% | Regulatory compliance, policy adherence | + +```python +from agentmesh import RiskScorer, RiskScore + +scorer = RiskScorer() + +# Get or create a score for an agent +score = scorer.get_score("did:mesh:abc123...") + +print(score.total_score) # 500 (default) +print(score.identity_score) # 0-100 component +print(score.behavior_score) # 0-100 component +print(score.compliance_score) # 0-100 component +print(score.risk_level) # "critical" | "high" | "medium" | "low" | "minimal" +``` + +### 4.2 Trust Tiers (Ring Model) + +Scores map to access tiers that control what an agent can do: + +| Tier | Score Range | Trust Level | Access | +|------|------------|-------------|--------| +| **Ring 0** | ≥ 900 | `verified_partner` | Full access, cross-mesh federation | +| **Ring 1** | ≥ 700 | `trusted` | Standard operations, peer communication | +| **Ring 2** | ≥ 500 | `standard` | Limited operations, monitored | +| **Ring 3** | < 500 | `untrusted` / `probationary` | Restricted, requires approval | + +``` +Ring 0 (≥900) ████████████████████████████████████████ Full Trust +Ring 1 (≥700) ██████████████████████████████ Trusted +Ring 2 (≥500) ████████████████████ Standard +Ring 3 (<500) ██████████ Restricted + 0 200 400 600 800 1000 +``` + +Key thresholds from `agentmesh.constants`: + +```python +TRUST_SCORE_DEFAULT = 500 # Starting score +TRUST_WARNING_THRESHOLD = 500 # Below this triggers warnings +TRUST_REVOCATION_THRESHOLD = 300 # Below this triggers revocation +RISK_CRITICAL_THRESHOLD = 200 # Immediate action required +``` + +### 4.3 Risk Signals and Scoring + +Trust scores change based on **risk signals** — events that indicate positive +or negative agent behavior. Signals are weighted by severity: + +| Severity | Weight | Example | +|----------|--------|---------| +| `critical` | 1.0 | Credential compromise detected | +| `high` | 0.75 | Unauthorized resource access attempt | +| `medium` | 0.5 | Unusual request pattern | +| `low` | 0.25 | Minor policy deviation | +| `info` | 0.1 | Routine activity logged | + +```python +from agentmesh.identity import RiskSignal + +# Report a risk signal +scorer.add_signal( + agent_did="did:mesh:abc123...", + signal=RiskSignal( + signal_type="behavior.anomaly", + severity="high", + value=0.8, + source="anomaly_detector", + details="Unusual data access pattern detected", + ), +) + +# Score recalculates automatically (every ≤30 seconds) +updated_score = scorer.recalculate("did:mesh:abc123...") +print(updated_score.risk_level) # May have changed + +# Register an alert callback for critical events +def handle_alert(alert: dict): + if alert["type"] == "critical_risk": + print(f"ALERT: Agent {alert['agent_did']} is high risk!") + +scorer.on_alert(handle_alert) +``` + +### 4.4 Trust Decay + +Scores degrade over time without positive signals. An agent that stops +producing positive behavioral evidence will see its score drift downward. +The `RiskScorer` recalculates every 30 seconds, factoring in signal +recency and the absence of fresh positive signals. + +```python +# Find agents whose scores have decayed into high-risk territory +high_risk_agents = scorer.get_high_risk_agents(threshold=400) +for agent_score in high_risk_agents: + print(f"{agent_score.agent_did}: {agent_score.total_score}") + +# After remediation, clear old signals to allow recovery +scorer.clear_signals("did:mesh:abc123...") +``` + +--- + +## 5. Credential Lifecycle + +### 5.1 Issuing Credentials + +Credentials are short-lived tokens (default 15-minute TTL) scoped to specific +capabilities and resources. They are the runtime proof that an agent is +authorized to act. + +```python +from agentmesh import Credential, CredentialManager + +manager = CredentialManager(default_ttl=900) # 15 minutes + +# Issue a credential +cred = manager.issue( + agent_did="did:mesh:abc123...", + capabilities=["read:data"], + resources=["dataset_sales", "dataset_inventory"], + ttl_seconds=900, +) + +print(cred.credential_id) # "cred_a1b2c3..." +print(cred.status) # "active" +print(cred.time_remaining()) # ~15 minutes +print(cred.to_bearer_token()) # "Bearer " +``` + +### 5.2 Validation + +Incoming tokens are validated by hashing and comparing against stored +credential records: + +```python +# Validate an incoming token +incoming_token = request.headers["Authorization"].removeprefix("Bearer ") +cred = manager.validate(incoming_token) + +if cred and cred.is_valid(): + if cred.has_capability("read:data"): + if cred.can_access_resource("dataset_sales"): + # Authorized — proceed + ... +``` + +### 5.3 Rotation + +Credentials auto-rotate before expiry. The rotation threshold is 60 seconds — +when a credential is within 60 seconds of expiring, calling `rotate_if_needed` +issues a fresh one and marks the old credential as `"rotated"`. + +```python +# Check and rotate if needed +cred = manager.rotate_if_needed(cred.credential_id) +# Returns the same credential if not expiring, or a new one if it is + +# Force rotation (e.g., after a capability change) +new_cred = manager.rotate(cred.credential_id) +print(new_cred.previous_credential_id) # Links to the old credential +print(new_cred.rotation_count) # Incremented +``` + +### 5.4 Revocation + +Revocation is immediate and propagates within ≤5 seconds. Use it when an +agent is compromised or a policy violation is detected. + +```python +# Revoke a single credential +manager.revoke(cred.credential_id, reason="Suspected compromise") + +# Revoke ALL credentials for a compromised agent +count = manager.revoke_all_for_agent( + agent_did="did:mesh:compromised...", + reason="Agent suspended pending investigation", +) +print(f"Revoked {count} credentials") + +# Register a callback for revocation events +def on_revocation(event): + print(f"Credential {event['credential_id']} revoked: {event['reason']}") + +manager.on_revocation(on_revocation) + +# Cleanup expired credentials periodically +expired_count = manager.cleanup_expired() +``` + +### 5.5 Credential Status Flow + +``` + issue() rotate() revoke() + │ │ │ + ▼ ▼ ▼ + ┌────────┐ ┌─────────┐ ┌─────────┐ + │ active │─────▶│ rotated │ │ revoked │ + └────────┘ └─────────┘ └─────────┘ + │ ▲ + │ TTL expires │ + ▼ │ + ┌─────────┐ policy violation ───────┘ + │ expired │ + └─────────┘ +``` + +--- + +## 6. Trust-Based Access Control + +### 6.1 Capability Grants + +Capabilities follow the format `action:resource[:qualifier]`. Grants are +scoped to specific agents and optionally limited to specific resource IDs. + +```python +from agentmesh import CapabilityRegistry + +registry = CapabilityRegistry() + +# Grant scoped capability +grant = registry.grant( + capability="read:data", + to_agent="did:mesh:child...", + from_agent="did:mesh:parent...", + resource_ids=["dataset_sales", "dataset_inventory"], +) + +# Check capability (with resource scoping) +can_read = registry.check( + agent_did="did:mesh:child...", + capability="read:data", + resource_id="dataset_sales", # Specific resource +) +print(can_read) # True + +# Agent cannot access ungranted resources +can_read_hr = registry.check( + agent_did="did:mesh:child...", + capability="read:data", + resource_id="dataset_hr", +) +print(can_read_hr) # False +``` + +### 6.2 Linking Trust Scores to Permissions + +Use trust policies to enforce score-based access control. Policies are +defined in YAML and evaluated at runtime: + +```yaml +# policies/trust-access.yaml +name: "Trust-Based Access Control" +version: "1.0" +description: "Map trust tiers to allowed operations" + +defaults: + min_trust_score: 500 + max_delegation_depth: 3 + require_handshake: true + +rules: + - name: "Block Low-Trust Agents" + description: "Deny agents below Ring 3 threshold" + condition: + field: "trust_score" + operator: "lt" + value: 400 + action: "deny" + priority: 10 + + - name: "Require Approval for PII Access" + description: "Ring 2+ agents need sponsor approval for PII" + condition: + field: "action.type" + operator: "eq" + value: "access_pii" + action: "require_approval" + priority: 20 + + - name: "Allow Ring 0 Full Access" + description: "Verified partners can access all resources" + condition: + field: "trust_score" + operator: "gte" + value: 900 + action: "allow" + priority: 5 +``` + +```python +from agentmesh.governance import TrustPolicy, PolicyEvaluator + +# Load policies from YAML +policy = TrustPolicy.from_yaml("policies/trust-access.yaml") +evaluator = PolicyEvaluator([policy]) + +# Evaluate an agent's request +decision = evaluator.evaluate({ + "trust_score": 450, + "action": {"type": "read_data"}, + "agent": {"did": "did:mesh:abc123..."}, +}) + +print(decision.allowed) # True (score ≥ 400) +print(decision.action) # "allow" +print(decision.matched_rules) # Rules that fired + +# Low-trust agent trying to access PII +decision = evaluator.evaluate({ + "trust_score": 350, + "action": {"type": "access_pii"}, +}) + +print(decision.allowed) # False +print(decision.action) # "deny" (score < 400, highest priority rule) +``` + +### 6.3 Scope Chains for Delegation Auditing + +When agents delegate to other agents, a `ScopeChain` tracks the full +delegation path and ensures capabilities only narrow: + +```python +from agentmesh import ScopeChain, DelegationLink + +# Create a root chain from a human sponsor +chain, root_link = ScopeChain.create_root( + sponsor_email="alice@company.com", + root_agent_did="did:mesh:root...", + capabilities=["read:data", "write:reports", "execute:analysis"], + sponsor_verified=True, +) + +# Verify the entire chain is intact +is_valid, error = chain.verify() +print(is_valid) # True + +# Trace how a specific capability was granted +trace = chain.trace_capability("write:reports") +# Returns the full delegation path for that capability +``` + +--- + +## 7. Multi-Agent Trust Mesh + +### 7.1 Trust Handshake Protocol + +Before two agents communicate, they perform a cryptographic handshake: +challenge-response with nonce verification, trust score exchange, and +capability negotiation. + +```python +import asyncio +from agentmesh import TrustHandshake, AgentIdentity + +# Agent A creates a handshake initiator +agent_a = AgentIdentity.create( + name="AgentA", + sponsor="alice@company.com", + capabilities=["read:data", "write:reports"], +) + +handshake_a = TrustHandshake( + agent_did=str(agent_a.did), + identity=agent_a, + timeout_seconds=30.0, +) + +# Agent A initiates a handshake with Agent B +result = asyncio.run(handshake_a.initiate( + peer_did="did:mesh:agent_b...", + required_trust_score=700, # Minimum Ring 1 + required_capabilities=["read:data"], +)) + +if result.verified: + print(f"Peer: {result.peer_did}") + print(f"Trust Level: {result.trust_level}") # "trusted" + print(f"Trust Score: {result.trust_score}") + print(f"Latency: {result.latency_ms}ms") + # Proceed with secure communication +else: + print(f"Rejected: {result.rejection_reason}") +``` + +### 7.2 Responding to Handshakes + +On the receiving side, the agent responds to the challenge: + +```python +# Agent B receives a challenge and responds +agent_b_handshake = TrustHandshake( + agent_did=str(agent_b.did), + identity=agent_b, +) + +response = asyncio.run(agent_b_handshake.respond( + challenge=incoming_challenge, + my_capabilities=["read:data", "execute:analysis"], + my_trust_score=800, + identity=agent_b, +)) +# Response is sent back to Agent A for verification +``` + +### 7.3 Trust Bridge for Persistent Peers + +For agents that communicate frequently, a `TrustBridge` maintains verified +peer state and caches handshake results (default 15-minute cache TTL): + +```python +from agentmesh import TrustBridge + +bridge = TrustBridge( + agent_did=str(agent_a.did), + default_trust_threshold=700, +) + +# Register known peers +bridge.register_peer( + peer_did="did:mesh:agent_b...", + peer_name="DataAnalyzer", + protocol="iatp", +) + +# Verify a peer (uses cached result if available) +result = asyncio.run(bridge.verify_peer( + peer_did="did:mesh:agent_b...", + required_trust_score=700, + required_capabilities=["read:data"], +)) + +# Quick trust check +is_trusted = asyncio.run(bridge.is_peer_trusted( + peer_did="did:mesh:agent_b...", + required_score=700, +)) +``` + +### 7.4 Agent Cards for Discovery + +`TrustedAgentCard` objects serve as verifiable business cards for agents — +other agents can discover and verify peers through a `CardRegistry`: + +```python +from agentmesh.trust import TrustedAgentCard, CardRegistry + +card_registry = CardRegistry() + +# Cards are created automatically during registration +# Other agents can look up cards to discover peers +``` + +### 7.5 Audit Trail + +Every trust operation is logged to a tamper-evident audit chain backed by +Merkle tree hashing: + +```python +from agentmesh import AuditLog + +log = AuditLog() + +# Log a trust handshake +entry = log.add_entry( + event_type="trust_handshake", + agent_did="did:mesh:agent_a...", + action="initiate_handshake", + resource="did:mesh:agent_b...", + data={"trust_score": 800, "protocol": "iatp"}, + outcome="success", +) + +# Verify the entire audit chain hasn't been tampered with +is_intact = log.verify_integrity() +print(is_intact) # True + +# Query audit history for an agent +entries = log.get_entries( + agent_did="did:mesh:agent_a...", + event_type="trust_handshake", + limit=50, +) +``` + +--- + +## Summary + +| Concept | Key Class | What It Does | +|---------|-----------|-------------| +| Agent DID | `AgentDID` | `did:mesh:*` identifier bound to Ed25519 keys | +| Identity | `AgentIdentity` | Creates agents, signs data, manages delegation | +| Sponsor | `HumanSponsor` | Human accountability for every agent | +| Credentials | `CredentialManager` | 15-min TTL tokens with auto-rotation | +| Risk Scoring | `RiskScorer` | 0–1000 continuous trust assessment | +| Trust Tiers | Ring 0–3 | Score-based access control | +| Handshake | `TrustHandshake` | Cryptographic peer verification | +| Trust Bridge | `TrustBridge` | Persistent peer trust with caching | +| Capabilities | `CapabilityRegistry` | Fine-grained, scoped permission grants | +| Policies | `TrustPolicy` | YAML-based declarative trust rules | +| Audit | `AuditLog` | Tamper-evident Merkle-chained event log | + +**Next:** [Tutorial 03 — Policy Engine and Compliance](03-policy-engine.md) diff --git a/docs/tutorials/03-framework-integrations.md b/docs/tutorials/03-framework-integrations.md new file mode 100644 index 00000000..a92450ca --- /dev/null +++ b/docs/tutorials/03-framework-integrations.md @@ -0,0 +1,751 @@ +# Tutorial 03 — Wrapping AI Frameworks with Governance + +Every adapter in Agent OS follows the same pattern: **create a policy, create a +kernel, wrap the framework object, use the governed object as normal**. The +kernel sits between your code and the LLM framework—intercepting calls, +enforcing limits, blocking disallowed tools, and logging everything. + +``` +┌─────────────┐ ┌──────────────┐ ┌───────────────┐ +│ Your Code │ ──► │ Kernel │ ──► │ Framework │ +│ │ ◄── │ (governance │ ◄── │ (OpenAI, │ +│ │ │ layer) │ │ LangChain…) │ +└─────────────┘ └──────────────┘ └───────────────┘ + pre_execute() + tool interception + post_execute() + drift detection + audit log +``` + +All adapters live in `packages/agent-os/src/agent_os/integrations/` and inherit +from `BaseIntegration` (defined in `base.py`). Every kernel exposes: + +| Hook | When it fires | What it does | +|---|---|---| +| `pre_execute()` | Before the LLM call | Enforces token limits, timeout, blocked patterns | +| Tool interception | On each tool/function call | Validates against `allowed_tools` / `blocked_patterns` | +| `post_execute()` | After the LLM response | Drift detection, output scanning, audit entry | + +Violations raise `PolicyViolationError`. + +--- + +## Prerequisites + +```bash +pip install agent-os-kernel +``` + +Then install the framework package you need: + +```bash +pip install openai # for OpenAIKernel +pip install langchain-core # for LangChainKernel +pip install crewai # for CrewAIKernel +pip install anthropic # for AnthropicKernel +pip install google-generativeai # for GeminiKernel +pip install pyautogen # for AutoGenKernel +``` + +--- + +## 1. Quick Start — OpenAI in 5 Lines + +```python +from openai import OpenAI +from agent_os.integrations import OpenAIKernel, GovernancePolicy + +client = OpenAI() +assistant = client.beta.assistants.create( + name="analyst", + model="gpt-4o", + tools=[{"type": "code_interpreter"}], +) + +# 1. Define policy +policy = GovernancePolicy( + max_tokens=4096, + max_tool_calls=5, + allowed_tools=["code_interpreter"], + blocked_patterns=["rm -rf", "DROP TABLE"], + log_all_calls=True, +) + +# 2. Create kernel +kernel = OpenAIKernel(policy=policy) + +# 3. Wrap — returns a GovernedAssistant +governed = kernel.wrap(assistant, client) + +# 4. Use exactly like before +thread = client.beta.threads.create() +client.beta.threads.messages.create(thread.id, role="user", content="Summarize Q3 revenue") +run = governed.run(thread.id) +``` + +The `GovernedAssistant` proxies every run through the governance layer. +If the assistant tries to exceed `max_tool_calls` or matches a blocked pattern, +the kernel raises `PolicyViolationError` and logs the violation. + +### Inspecting execution state + +```python +ctx = governed.get_context() +print(ctx.call_count) # number of LLM round-trips +print(ctx.total_tokens) # cumulative token usage +print(ctx.tool_calls) # list of intercepted tool calls +``` + +--- + +## 2. LangChain Integration + +`LangChainKernel` wraps chains, agents, and runnables. It intercepts +`invoke()`, `ainvoke()`, `stream()`, `batch()`, and provides deep hooks into +the tool registry, memory writes, and sub-agent delegation. + +```python +from langchain_openai import ChatOpenAI +from langchain_core.prompts import ChatPromptTemplate +from langchain_core.output_parsers import StrOutputParser +from agent_os.integrations import LangChainKernel, GovernancePolicy + +llm = ChatOpenAI(model="gpt-4o") +chain = ChatPromptTemplate.from_template("Explain {topic}") | llm | StrOutputParser() + +policy = GovernancePolicy( + max_tokens=2048, + timeout_seconds=30, + blocked_patterns=[ + ("\\b\\d{3}-\\d{2}-\\d{4}\\b", "regex"), # block SSN patterns + ("password", "substring"), + ], + log_all_calls=True, +) + +kernel = LangChainKernel(policy=policy) +governed_chain = kernel.wrap(chain) + +# invoke() is now governed +result = governed_chain.invoke({"topic": "zero-trust architecture"}) +``` + +### Deep hooks + +LangChain's kernel intercepts more than just top-level calls: + +| Hook | What it catches | +|---|---| +| Tool registry | Every tool invocation is validated against `allowed_tools` | +| Memory writes | Detects and logs writes to conversation memory | +| Sub-agent spawning | Tracks when an agent delegates to another agent | +| PII detection | Built-in patterns catch SSNs, emails, secrets in output | + +### Async and streaming + +```python +# Async — same governance, async execution +result = await governed_chain.ainvoke({"topic": "mTLS"}) + +# Streaming — each chunk passes through post_execute +async for chunk in governed_chain.astream({"topic": "RBAC"}): + print(chunk, end="", flush=True) +``` + +### Wrapping an agent with tools + +```python +from langchain_core.tools import tool + +@tool +def query_database(sql: str) -> str: + """Run a read-only SQL query.""" + # ... + +policy = GovernancePolicy( + allowed_tools=["query_database"], + blocked_patterns=[ + ("DROP", "substring"), + ("DELETE", "substring"), + ("INSERT", "substring"), + ], + max_tool_calls=10, +) + +kernel = LangChainKernel(policy=policy) +governed_agent = kernel.wrap(agent_executor) +governed_agent.invoke({"input": "How many users signed up last week?"}) +``` + +--- + +## 3. CrewAI Integration + +`CrewAIKernel` wraps an entire crew, governing both `kickoff()` and +`kickoff_async()`. It also intercepts individual agent execution and tool +calls within the crew. + +```python +from crewai import Agent, Task, Crew +from agent_os.integrations import CrewAIKernel, GovernancePolicy + +researcher = Agent( + role="Researcher", + goal="Find accurate information", + tools=[search_tool, scrape_tool], +) +writer = Agent(role="Writer", goal="Write clear reports") + +task = Task( + description="Research and summarize recent AI governance frameworks", + agent=researcher, + expected_output="A 500-word summary", +) + +crew = Crew(agents=[researcher, writer], tasks=[task]) + +policy = GovernancePolicy( + allowed_tools=["search_tool", "scrape_tool"], + max_tool_calls=20, + timeout_seconds=600, + drift_threshold=0.15, + log_all_calls=True, +) + +kernel = CrewAIKernel(policy=policy) +governed_crew = kernel.wrap(crew) + +# kickoff() is now governed +result = governed_crew.kickoff() +``` + +### What the kernel intercepts + +- **kickoff() / kickoff_async()** — pre/post execution checks on the entire run +- **Individual agent steps** — each agent's step is validated +- **Tool calls** — every tool invocation checked against `allowed_tools` +- **Memory writes** — crew memory interactions are logged +- **Delegation** — when one agent delegates to another, the chain is tracked + +### Async crews + +```python +result = await governed_crew.kickoff_async() +``` + +--- + +## 4. Anthropic and Gemini + +### Anthropic — wrapping the client + +`AnthropicKernel` wraps the Anthropic client and intercepts every +`messages.create()` call. + +```python +from anthropic import Anthropic +from agent_os.integrations import AnthropicKernel, GovernancePolicy + +client = Anthropic() + +policy = GovernancePolicy( + max_tokens=4096, + blocked_patterns=["IGNORE PREVIOUS INSTRUCTIONS"], + allowed_tools=["get_weather"], + log_all_calls=True, +) + +kernel = AnthropicKernel(policy=policy, max_retries=3, timeout_seconds=120.0) +governed_client = kernel.wrap(client) # returns GovernedAnthropicClient + +# messages.create() is now governed +response = governed_client.messages.create( + model="claude-sonnet-4-20250514", + max_tokens=1024, + messages=[{"role": "user", "content": "Explain governance patterns"}], +) +``` + +Under the hood, `governed_client.messages` is a `_GovernedMessages` proxy that: +1. Runs `pre_execute()` — validates message content and tools +2. Calls the real `messages.create()` +3. Runs `post_execute()` — checks tool_use blocks, tracks tokens +4. Logs the full request/response to the audit trail + +```python +# Token tracking +usage = governed_client.get_token_usage() +print(usage) # {"input_tokens": 42, "output_tokens": 128, ...} + +# Cancel a long-running request +governed_client.sigkill(request_id="req_abc123") +``` + +There is also a convenience function: + +```python +from agent_os.integrations.anthropic_adapter import wrap_client + +governed = wrap_client(client, policy=policy) +``` + +### Gemini — wrapping a GenerativeModel + +`GeminiKernel` wraps Google's `GenerativeModel` and intercepts +`generate_content()`. + +```python +import google.generativeai as genai +from agent_os.integrations import GeminiKernel, GovernancePolicy + +genai.configure(api_key="...") +model = genai.GenerativeModel("gemini-1.5-pro") + +policy = GovernancePolicy( + max_tokens=8192, + blocked_patterns=["execute_code"], + log_all_calls=True, +) + +kernel = GeminiKernel(policy=policy) +governed_model = kernel.wrap(model) # returns GovernedGeminiModel + +response = governed_model.generate_content("Explain AI safety principles") +``` + +The kernel intercepts function calls in Gemini responses and validates them +against `allowed_tools` and `blocked_patterns`. Token usage is extracted from +the response's `usage_metadata`. + +```python +from agent_os.integrations.gemini_adapter import wrap_model + +governed = wrap_model(model, policy=policy) +``` + +--- + +## 5. AutoGen — Multi-Agent Governance + +AutoGen is different: you have multiple agents chatting with each other. +`AutoGenKernel` uses `govern()` to patch multiple agents at once via +monkey-patching. + +```python +from autogen import AssistantAgent, UserProxyAgent +from agent_os.integrations import AutoGenKernel, GovernancePolicy + +assistant = AssistantAgent("assistant", llm_config={"model": "gpt-4o"}) +user_proxy = UserProxyAgent("user_proxy", code_execution_config={"use_docker": True}) + +policy = GovernancePolicy( + blocked_patterns=[ + ("password", "substring"), + ("\\b\\d{3}-\\d{2}-\\d{4}\\b", "regex"), # SSN + ], + max_tool_calls=10, + timeout_seconds=300, + log_all_calls=True, +) + +kernel = AutoGenKernel( + policy=policy, + deep_hooks_enabled=True, + on_error=lambda exc, agent_id: print(f"[{agent_id}] Error: {exc}"), +) + +# govern() patches agents in-place and returns them +kernel.govern(assistant, user_proxy) + +# Initiate chat — all messages pass through governance +user_proxy.initiate_chat(assistant, message="Analyze this dataset") +``` + +### What govern() patches + +| Method | Behavior on violation | +|---|---| +| `initiate_chat()` | Raises `PolicyViolationError` | +| `generate_reply()` | Returns `[BLOCKED: reason]` string (keeps conversation flowing) | +| `receive()` | Guards inbound messages | +| Function call pipeline | Validates each function call (when `deep_hooks_enabled=True`) | +| GroupChat routing | Intercepts multi-agent message routing | +| State changes | Tracks and logs agent state transitions | + +### Unwrapping + +```python +# Remove governance from all agents +kernel.unwrap(assistant) +kernel.unwrap(user_proxy) +``` + +--- + +## 6. Microsoft Agent Framework (MAF) Middleware + +For MAF-based agents, Agent OS provides composable async middleware instead of a +kernel wrapper. + +```python +from agent_os.integrations import ( + MAFGovernancePolicyMiddleware, + MAFCapabilityGuardMiddleware, + MAFAuditTrailMiddleware, + maf_create_governance_middleware, +) +``` + +### Quick setup with the factory + +```python +middlewares = maf_create_governance_middleware( + policy_directory="./policies", + allowed_tools=["search", "calculator"], + denied_tools=["shell_exec", "file_write"], + agent_id="my-agent", + enable_rogue_detection=True, + audit_log=my_audit_log, +) + +# Register with your MAF agent +for mw in middlewares: + agent.add_middleware(mw) +``` + +The factory assembles middleware in order: +1. `AuditTrailMiddleware` — tamper-proof pre/post execution logging +2. `GovernancePolicyMiddleware` — declarative policy evaluation +3. `CapabilityGuardMiddleware` — tool allow/deny enforcement +4. `RogueDetectionMiddleware` — anomaly-based rogue agent detection + +### Manual middleware composition + +```python +from agent_os.integrations import ( + MAFGovernancePolicyMiddleware, + MAFCapabilityGuardMiddleware, +) + +# Policy middleware — evaluates governance rules +policy_mw = MAFGovernancePolicyMiddleware( + evaluator=my_policy_evaluator, + audit_log=audit_log, +) + +# Capability guard — tool allow/deny lists +capability_mw = MAFCapabilityGuardMiddleware( + allowed_tools=["search", "summarize"], + denied_tools=["delete_record"], + audit_log=audit_log, +) + +# Each middleware follows the process(context, call_next) pattern +# On violation: sets error response, logs, raises MiddlewareTermination +# On allow: calls call_next() to continue the chain +``` + +--- + +## 7. Common GovernancePolicy Patterns + +`GovernancePolicy` is a dataclass with sensible defaults. Here are +battle-tested configurations for common scenarios. + +### Read-only agent + +```python +readonly_policy = GovernancePolicy( + name="read-only", + max_tokens=4096, + max_tool_calls=10, + allowed_tools=["search", "retrieve", "summarize"], + blocked_patterns=[ + ("DELETE", "substring"), + ("DROP", "substring"), + ("INSERT", "substring"), + ("UPDATE", "substring"), + ("rm ", "substring"), + ("write_file", "substring"), + ], + require_human_approval=False, + log_all_calls=True, +) +``` + +### Production-strict + +```python +production_policy = GovernancePolicy( + name="production-strict", + max_tokens=2048, + max_tool_calls=5, + timeout_seconds=60, + allowed_tools=["approved_api_call", "read_database"], + blocked_patterns=[ + ("\\b\\d{3}-\\d{2}-\\d{4}\\b", "regex"), # SSN + ("\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b", "regex"), # email + ("(?i)password\\s*[:=]\\s*\\S+", "regex"), # credentials + ("IGNORE PREVIOUS", "substring"), # prompt injection + ], + require_human_approval=True, + confidence_threshold=0.9, + drift_threshold=0.10, + max_concurrent=5, + backpressure_threshold=4, + checkpoint_frequency=3, + log_all_calls=True, + version="1.0.0", +) +``` + +### Dev-permissive + +```python +dev_policy = GovernancePolicy( + name="dev-permissive", + max_tokens=16384, + max_tool_calls=50, + timeout_seconds=600, + allowed_tools=[], # empty = allow all + blocked_patterns=[ + ("rm -rf /", "substring"), # just the truly dangerous stuff + ], + require_human_approval=False, + confidence_threshold=0.5, + drift_threshold=0.5, + log_all_calls=True, +) +``` + +### Serialization + +Policies serialize to YAML for version-controlled policy-as-code: + +```python +# Save +production_policy.to_yaml("policies/production.yaml") + +# Load +policy = GovernancePolicy.from_yaml("policies/production.yaml") + +# Compare +if dev_policy.is_stricter_than(production_policy): + print("Dev policy is stricter — unusual!") + +# Diff two policies +changes = production_policy.diff(dev_policy) +for field, (prod_val, dev_val) in changes.items(): + print(f" {field}: {prod_val} → {dev_val}") +``` + +--- + +## 8. Building a Custom Adapter + +When your framework isn't covered by the 22 built-in adapters, extend +`BaseIntegration`. + +### Minimal adapter + +```python +from agent_os.integrations.base import ( + BaseIntegration, + GovernancePolicy, + ExecutionContext, + PolicyViolationError, +) + + +class MyFrameworkKernel(BaseIntegration): + """Governance adapter for MyFramework.""" + + def __init__(self, policy: GovernancePolicy | None = None) -> None: + super().__init__(policy) + + def wrap(self, agent): + """Wrap a MyFramework agent with governance.""" + ctx = self.create_context(agent_id=getattr(agent, "name", "unknown")) + return GovernedMyAgent(agent, self, ctx) + + def unwrap(self, governed_agent): + """Remove governance wrapper, return original agent.""" + return governed_agent._original +``` + +### The governed wrapper + +```python +class GovernedMyAgent: + """Transparent proxy that routes calls through governance.""" + + def __init__(self, original, kernel: MyFrameworkKernel, ctx: ExecutionContext): + self._original = original + self._kernel = kernel + self._ctx = ctx + + def run(self, prompt: str, **kwargs): + # Pre-execution check + allowed, reason = self._kernel.pre_execute(self._ctx, prompt) + if not allowed: + raise PolicyViolationError(reason) + + # Execute the real framework call + result = self._original.run(prompt, **kwargs) + + # Post-execution validation (drift detection, output scanning) + valid, reason = self._kernel.post_execute(self._ctx, result) + if not valid: + raise PolicyViolationError(reason) + + return result + + def get_context(self) -> ExecutionContext: + return self._ctx +``` + +### Adding tool call interception + +```python +from agent_os.integrations.base import ToolCallRequest, ToolCallResult, PolicyInterceptor + + +class GovernedMyAgent: + # ... (same as above) + + def call_tool(self, tool_name: str, arguments: dict): + request = ToolCallRequest( + tool_name=tool_name, + arguments=arguments, + agent_id=self._ctx.agent_id, + ) + + interceptor = PolicyInterceptor(self._kernel.policy) + result: ToolCallResult = interceptor.intercept(request) + + if not result.allowed: + raise PolicyViolationError( + f"Tool '{tool_name}' blocked: {result.reason}" + ) + + # Use modified arguments if the interceptor rewrote them + final_args = result.modified_arguments or arguments + return self._original.call_tool(tool_name, final_args) +``` + +### Adding event hooks + +```python +from agent_os.integrations.base import GovernanceEventType + + +class MyFrameworkKernel(BaseIntegration): + # ... (same as above) + + def wrap(self, agent): + ctx = self.create_context(agent_id=agent.name) + + # Emit event on wrap + self.emit(GovernanceEventType.POLICY_CHECK, { + "agent_id": agent.name, + "policy": self.policy.name, + "action": "wrap", + }) + + return GovernedMyAgent(agent, self, ctx) + + +# Usage — register listeners before wrapping +kernel = MyFrameworkKernel(policy=my_policy) + +kernel.on(GovernanceEventType.POLICY_VIOLATION, lambda data: ( + alert_ops_team(data) +)) + +kernel.on(GovernanceEventType.TOOL_CALL_BLOCKED, lambda data: ( + log_blocked_tool(data["tool_name"], data["reason"]) +)) + +governed = kernel.wrap(my_agent) +``` + +### Composing multiple interceptors + +```python +from agent_os.integrations.base import ( + CompositeInterceptor, + PolicyInterceptor, + ToolCallInterceptor, +) + + +class CustomRateLimitInterceptor: + """Example: rate-limit tool calls per minute.""" + + def intercept(self, request: ToolCallRequest) -> ToolCallResult: + if self._over_limit(request.agent_id): + return ToolCallResult(allowed=False, reason="Rate limit exceeded") + return ToolCallResult(allowed=True) + + +# Chain interceptors — all must allow the call +composite = CompositeInterceptor([ + PolicyInterceptor(policy), + CustomRateLimitInterceptor(), +]) + +result = composite.intercept(tool_request) +``` + +--- + +## Putting It All Together + +A real-world pattern: same policy, multiple frameworks, centralized audit. + +```python +from agent_os.integrations import ( + GovernancePolicy, + OpenAIKernel, + LangChainKernel, + AnthropicKernel, + GovernanceEventType, +) + +# One policy for the whole system +policy = GovernancePolicy.from_yaml("policies/production.yaml") + +# Centralized violation handler +def on_violation(data): + send_to_siem(data) + page_on_call(data["agent_id"], data["reason"]) + +# OpenAI assistant +oai_kernel = OpenAIKernel(policy=policy) +oai_kernel.on(GovernanceEventType.POLICY_VIOLATION, on_violation) +governed_assistant = oai_kernel.wrap(assistant, client) + +# LangChain RAG chain +lc_kernel = LangChainKernel(policy=policy) +lc_kernel.on(GovernanceEventType.POLICY_VIOLATION, on_violation) +governed_chain = lc_kernel.wrap(rag_chain) + +# Anthropic summarizer +anth_kernel = AnthropicKernel(policy=policy) +anth_kernel.on(GovernanceEventType.POLICY_VIOLATION, on_violation) +governed_claude = anth_kernel.wrap(anthropic_client) +``` + +Every call across all three frameworks is now governed by the same policy, +violations route to the same handler, and the audit trail is unified. + +--- + +## Next Steps + +- [Tutorial 01 — Getting Started](./01-getting-started.md) +- [Tutorial 02 — Policy Configuration](./02-policy-configuration.md) +- [OWASP Compliance Mapping](../OWASP-COMPLIANCE.md) +- [API Reference — `BaseIntegration`](../../packages/agent-os/src/agent_os/integrations/base.py) +- [All 22+ adapters](../../packages/agent-os/src/agent_os/integrations/) diff --git a/docs/tutorials/04-audit-and-compliance.md b/docs/tutorials/04-audit-and-compliance.md new file mode 100644 index 00000000..0a50fc43 --- /dev/null +++ b/docs/tutorials/04-audit-and-compliance.md @@ -0,0 +1,736 @@ +# Tutorial 4 — Audit Logging & Compliance + +Every action an AI agent takes — tool calls, policy decisions, trust +handshakes — must be recorded in a **tamper-proof** log. Without it, +you cannot answer the question every auditor will ask: *"What exactly did +this agent do, and who authorised it?"* + +The Agent Governance Toolkit gives you two complementary pieces: + +| Package | Install | Purpose | +|---------|---------|---------| +| `agentmesh-platform` | `pip install agentmesh-platform` | `AuditLog` with Merkle-chain integrity | +| `ai-agent-compliance` | `pip install ai-agent-compliance` | OWASP ASI 2026 compliance CLI | + +This tutorial walks through both, from a single log call to a CI/CD +compliance gate. + +--- + +## 1 — Quick Start + +```python +from agentmesh.governance.audit import AuditLog + +# Create an in-memory audit log +audit = AuditLog() + +# Record a tool invocation +entry = audit.log( + event_type="tool_invocation", + agent_did="did:web:sales-assistant.example.com", + action="allow", + resource="/crm/contacts", + data={"tool": "crm_lookup", "query": "acme corp"}, + outcome="success", + trace_id="trace-7f3a", +) + +print(entry.entry_id) # unique UUID +print(entry.entry_hash) # SHA-256 hash of the entry +print(entry.timestamp) # UTC datetime + +# Verify nothing has been tampered with +is_valid, error = audit.verify_integrity() +assert is_valid, f"Chain broken: {error}" +print("✅ Audit chain intact") +``` + +Run it: + +```bash +pip install agentmesh-platform +python quickstart_audit.py +``` + +--- + +## 2 — AuditLog API Reference + +### 2.1 Creating an AuditLog + +```python +from agentmesh.governance.audit import AuditLog + +# In-memory only +audit = AuditLog() + +# With an external sink (see §6) +from agentmesh.governance.audit_backends import FileAuditSink + +sink = FileAuditSink(path="audit.jsonl", secret_key=b"my-hmac-secret") +audit = AuditLog(sink=sink) +``` + +### 2.2 `log()` — Record an Event + +```python +entry = audit.log( + event_type="tool_invocation", # see event types below + agent_did="did:web:agent.example.com", + action="allow", # allow | deny | audit | quarantine | warning + resource="/api/users", # what the agent accessed + data={"method": "GET"}, # arbitrary metadata (secrets are stripped) + outcome="success", # success | failure | denied | error + policy_decision="allowed", # human-readable policy result + trace_id="trace-abc123", # correlation ID for distributed tracing +) +``` + +**Event types:** + +| Event Type | When | +|---|---| +| `tool_invocation` | Agent successfully called a tool | +| `tool_blocked` | Policy denied a tool call | +| `policy_evaluation` | Policy engine evaluated a request | +| `policy_violation` | Agent violated a governance policy | +| `rogue_detection` | Anomaly detection flagged an agent | +| `agent_invocation` | Agent-to-agent delegation occurred | + +**Outcomes:** `success`, `failure`, `denied`, `error` + +**Actions:** `allow`, `deny`, `audit`, `quarantine`, `warning` + +### 2.3 `query()` — Search the Audit Trail + +```python +from datetime import datetime, timezone, timedelta + +now = datetime.now(timezone.utc) +one_hour_ago = now - timedelta(hours=1) + +entries = audit.query( + agent_did="did:web:agent.example.com", # filter by agent + event_type="tool_invocation", # filter by event type + start_time=one_hour_ago, # time range start + end_time=now, # time range end + outcome="success", # filter by outcome + limit=50, # max results (default 100) +) + +for e in entries: + print(f"{e.timestamp} | {e.action} | {e.resource}") +``` + +### 2.4 `get_entry()` — Look Up a Single Entry + +```python +entry = audit.get_entry(entry_id="some-uuid-here") +print(entry.event_type, entry.outcome) +``` + +### 2.5 `get_entries_for_agent()` and `get_entries_by_type()` + +Convenience shortcuts when you only need one filter: + +```python +# Everything agent X did (last 100 by default) +agent_entries = audit.get_entries_for_agent("did:web:agent.example.com", limit=200) + +# All policy violations +violations = audit.get_entries_by_type("policy_violation", limit=50) +``` + +### 2.6 `verify_integrity()` — Full Chain Verification + +```python +is_valid, error_msg = audit.verify_integrity() + +if not is_valid: + raise RuntimeError(f"Audit trail tampered: {error_msg}") +``` + +This verifies the entire Merkle chain and every entry hash. Call it +periodically or before exporting data. + +### 2.7 `get_proof()` — Merkle Inclusion Proof + +```python +proof = audit.get_proof(entry.entry_id) + +print(proof["entry"]) # the AuditEntry +print(proof["merkle_root"]) # current Merkle root hash +print(proof["merkle_proof"]) # list of (hash, position) tuples +print(proof["verified"]) # True if the proof checks out +``` + +A third party can verify the proof against the published root hash +without needing the full log. + +### 2.8 `export()` and `export_cloudevents()` + +```python +# Plain dict export (entries + metadata) +data = audit.export(start_time=one_hour_ago, end_time=now) +print(data["entries"]) # list of entry dicts +print(data["metadata"]) # chain metadata + +# CloudEvents v1.0 JSON envelopes +events = audit.export_cloudevents(start_time=one_hour_ago) +for ce in events: + print(ce["type"]) # e.g. "ai.agentmesh.tool.invoked" + print(ce["source"]) # agent DID +``` + +--- + +## 3 — Merkle Chain Integrity + +### How It Works + +Every entry that enters `AuditLog` is added to an internal +`MerkleAuditChain`. The chain builds a **Merkle tree** over all +entries: + +``` + Root Hash + / \ + H(AB) H(CD) + / \ / \ + H(A) H(B) H(C) H(D) ← leaf = SHA-256 of entry +``` + +Key properties: + +* **Append-only** — entries cannot be removed or reordered. +* **Tamper-evident** — changing any entry changes the root hash. +* **Efficient proofs** — proving an entry exists requires only + O(log n) hashes, not the full log. + +### Verifying the Chain Programmatically + +```python +from agentmesh.governance.audit import AuditLog + +audit = AuditLog() + +# Log several events +for i in range(100): + audit.log( + event_type="tool_invocation", + agent_did=f"did:web:agent-{i % 5}.example.com", + action="allow", + resource=f"/api/resource/{i}", + outcome="success", + ) + +# Full integrity check +is_valid, error = audit.verify_integrity() +print(f"Chain valid: {is_valid}") # True + +# Get the Merkle root (publish this for external auditors) +root = audit._chain.get_root_hash() +print(f"Merkle root: {root}") + +# Prove a specific entry is in the log +proof = audit.get_proof(entry.entry_id) +assert proof["verified"], "Proof failed" +``` + +### Verifying a Proof Externally + +A verifier who only has the root hash can confirm inclusion: + +```python +from agentmesh.governance.audit import MerkleAuditChain + +# Auditor receives: entry_hash, proof, and published root_hash +verified = MerkleAuditChain.verify_proof( + entry_hash="abc123...", + proof=[("def456...", "left"), ("789aaa...", "right")], + root_hash="expected-root...", +) +print(f"Entry in log: {verified}") +``` + +--- + +## 4 — Querying the Audit Trail + +### Find All Denied Tool Calls in the Last 24 Hours + +```python +from datetime import datetime, timezone, timedelta + +yesterday = datetime.now(timezone.utc) - timedelta(days=1) + +denied = audit.query( + event_type="tool_blocked", + outcome="denied", + start_time=yesterday, + limit=200, +) + +print(f"Blocked {len(denied)} tool calls in the last 24h") +for e in denied: + print(f" {e.agent_did} tried {e.resource} — {e.policy_decision}") +``` + +### Investigate a Specific Agent + +```python +agent = "did:web:support-bot.example.com" + +# Everything this agent did +all_actions = audit.get_entries_for_agent(agent, limit=500) + +# Only violations +violations = audit.query( + agent_did=agent, + event_type="policy_violation", +) + +# Rogue detection alerts +alerts = audit.query( + agent_did=agent, + event_type="rogue_detection", +) + +print(f"Agent {agent}:") +print(f" Total actions: {len(all_actions)}") +print(f" Violations: {len(violations)}") +print(f" Rogue alerts: {len(alerts)}") +``` + +### Trace a Request Across Agents + +Use `trace_id` to correlate entries across a multi-agent workflow: + +```python +trace = "trace-7f3a-b2c1" + +# query() doesn't filter by trace_id directly, so export and filter +all_entries = audit.export()["entries"] +trace_entries = [e for e in all_entries if e.get("trace_id") == trace] + +for e in trace_entries: + print(f"{e['timestamp']} | {e['agent_did']} | {e['action']} | {e['resource']}") +``` + +--- + +## 5 — External Sinks + +In-memory audit is fine for development. In production you need +durable storage. The toolkit provides `FileAuditSink` out of the box +and defines the `AuditSink` protocol so you can write your own. + +### 5.1 FileAuditSink — JSON-Lines on Disk + +```python +from agentmesh.governance.audit import AuditLog +from agentmesh.governance.audit_backends import FileAuditSink + +# Every entry is HMAC-signed and hash-chained +sink = FileAuditSink( + path="audit_trail.jsonl", + secret_key=b"change-me-to-a-real-secret", + max_file_size=50 * 1024 * 1024, # rotate at 50 MB (0 = no rotation) +) + +audit = AuditLog(sink=sink) + +# Log events as normal — they're persisted automatically +audit.log( + event_type="tool_invocation", + agent_did="did:web:agent.example.com", + action="allow", + resource="/api/data", + outcome="success", +) + +# Verify the on-disk chain independently +is_valid, error = sink.verify_integrity() +print(f"File chain valid: {is_valid}") + +# Read back signed entries +signed_entries = sink.read_entries() +for se in signed_entries: + print(f"{se.entry_id}: hash={se.content_hash[:16]}... sig={se.signature[:16]}...") + +# Always close when done +sink.close() +``` + +The output file (`audit_trail.jsonl`) contains one JSON object per +line. Each entry includes `content_hash`, `previous_hash`, and +an HMAC `signature`. + +### 5.2 Writing a Custom Sink + +Implement the `AuditSink` protocol to push entries to a database, +message queue, or cloud service: + +```python +from agentmesh.governance.audit import AuditEntry +from agentmesh.governance.audit_backends import AuditSink + +class PostgresSink: + """Push audit entries to a PostgreSQL table.""" + + def __init__(self, dsn: str): + import psycopg2 + self._conn = psycopg2.connect(dsn) + + def write(self, entry: AuditEntry) -> None: + with self._conn.cursor() as cur: + cur.execute( + """ + INSERT INTO audit_log + (entry_id, timestamp, event_type, agent_did, + action, resource, outcome, entry_hash, trace_id) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + """, + ( + entry.entry_id, + entry.timestamp.isoformat(), + entry.event_type, + entry.agent_did, + entry.action, + entry.resource, + entry.outcome, + entry.entry_hash, + entry.trace_id, + ), + ) + self._conn.commit() + + def write_batch(self, entries: list[AuditEntry]) -> None: + for entry in entries: + self.write(entry) + + def verify_integrity(self) -> tuple[bool, str | None]: + # Implement chain verification against DB rows + return True, None + + def close(self) -> None: + self._conn.close() + + +# Use it +sink = PostgresSink(dsn="postgresql://user:pass@localhost/agents") +audit = AuditLog(sink=sink) +``` + +> **Tip:** The protocol is defined with `@runtime_checkable`, so you +> can verify your sink with `isinstance(my_sink, AuditSink)`. + +--- + +## 6 — OWASP ASI 2026 Compliance Checking + +The `ai-agent-compliance` package verifies that your deployment covers +all 10 OWASP ASI 2026 security controls. + +### 6.1 Install + +```bash +pip install ai-agent-compliance +``` + +### 6.2 Verify Governance Coverage + +```bash +# Human-readable summary +agent-compliance verify +``` + +Output: + +``` +OWASP ASI 2026 Governance Verification +======================================= +✅ ASI-01 Prompt Injection PolicyInterceptor +✅ ASI-02 Insecure Tool Use ToolAliasRegistry +✅ ASI-03 Excessive Agency GovernancePolicy +✅ ASI-04 Unauthorized Escalation EscalationPolicy +✅ ASI-05 Trust Boundary Violation CardRegistry +✅ ASI-06 Insufficient Logging AuditChain +✅ ASI-07 Insecure Identity AgentIdentity +✅ ASI-08 Policy Bypass PolicyConflictResolver +✅ ASI-09 Supply Chain Integrity IntegrityVerifier +✅ ASI-10 Behavioral Anomaly ComplianceEngine + +Coverage: 10/10 (100%) +``` + +```bash +# Machine-readable JSON +agent-compliance verify --json +``` + +```bash +# Shields.io badge for your README +agent-compliance verify --badge +``` + +Output: + +```markdown +[![OWASP ASI 2026](https://img.shields.io/badge/OWASP%20ASI%202026-10%2F10-brightgreen)](https://owaspai.org/asi/) +``` + +### 6.3 The 10 ASI Controls + +| Control | Risk | Governance Component | +|---------|------|---------------------| +| ASI-01 | Prompt Injection | `PolicyInterceptor` in `agent_os.integrations.base` | +| ASI-02 | Insecure Tool Use | `ToolAliasRegistry` in `agent_os.integrations.tool_aliases` | +| ASI-03 | Excessive Agency | `GovernancePolicy` in `agent_os.integrations.base` | +| ASI-04 | Unauthorized Escalation | `EscalationPolicy` in `agent_os.integrations.escalation` | +| ASI-05 | Trust Boundary Violation | `CardRegistry` in `agentmesh.trust.cards` | +| ASI-06 | Insufficient Logging | `AuditChain` in `agentmesh.governance.audit` | +| ASI-07 | Insecure Identity | `AgentIdentity` in `agentmesh.identity.agent_id` | +| ASI-08 | Policy Bypass | `PolicyConflictResolver` in `agentmesh.governance.conflict_resolution` | +| ASI-09 | Supply Chain Integrity | `IntegrityVerifier` in `agent_compliance.integrity` | +| ASI-10 | Behavioral Anomaly | `ComplianceEngine` in `agentmesh.governance.compliance` | + +### 6.4 Verify Supply-Chain Integrity + +Check that governance module source files and critical functions haven't +been tampered with: + +```bash +# Generate a baseline manifest +agent-compliance integrity --generate integrity.json + +# Later, verify against it +agent-compliance integrity --manifest integrity.json +``` + +```bash +# JSON output for automation +agent-compliance integrity --manifest integrity.json --json +``` + +The integrity checker verifies: + +* **File hashes** — SHA-256 of every governance module source file. +* **Function bytecode hashes** — critical functions like + `PolicyEngine.evaluate`, `AuditChain.add_entry`, and + `CardRegistry.is_verified` are bytecode-hashed to detect patches. + +### 6.5 Programmatic Verification + +```python +from agent_compliance.verify import GovernanceVerifier +from agent_compliance.integrity import IntegrityVerifier + +# OWASP ASI coverage +verifier = GovernanceVerifier() +attestation = verifier.verify() + +print(f"Passed: {attestation.passed}") +print(f"Coverage: {attestation.coverage_pct()}%") +print(f"Hash: {attestation.attestation_hash}") + +# Print per-control results +print(attestation.summary()) + +# Get JSON for storage or CI artifacts +report_json = attestation.to_json() + +# Supply chain integrity +integrity = IntegrityVerifier(manifest_path="integrity.json") +report = integrity.verify() + +print(report.summary()) +print(f"Modules checked: {report.modules_checked}") +print(f"Missing modules: {report.modules_missing}") +``` + +--- + +## 7 — Compliance Reporting + +### 7.1 Generate an Audit Report for Auditors + +Combine audit export with compliance attestation into a single report: + +```python +import json +from datetime import datetime, timezone, timedelta +from pathlib import Path + +from agentmesh.governance.audit import AuditLog +from agentmesh.governance.audit_backends import FileAuditSink +from agent_compliance.verify import GovernanceVerifier +from agent_compliance.integrity import IntegrityVerifier + + +def generate_compliance_report( + audit: AuditLog, + output_path: str = "compliance_report.json", + days: int = 30, +) -> dict: + """Generate a compliance report covering the last N days.""" + + now = datetime.now(timezone.utc) + start = now - timedelta(days=days) + + # 1. Audit trail summary + export = audit.export(start_time=start, end_time=now) + entries = export["entries"] + + event_counts = {} + outcome_counts = {} + for e in entries: + event_counts[e["event_type"]] = event_counts.get(e["event_type"], 0) + 1 + outcome_counts[e["outcome"]] = outcome_counts.get(e["outcome"], 0) + 1 + + # 2. Chain integrity + chain_valid, chain_error = audit.verify_integrity() + + # 3. OWASP ASI attestation + attestation = GovernanceVerifier().verify() + + # 4. Supply chain integrity + try: + integrity = IntegrityVerifier(manifest_path="integrity.json") + integrity_report = integrity.verify() + integrity_passed = integrity_report.passed + except FileNotFoundError: + integrity_passed = None # no manifest on file + + # Assemble report + report = { + "report_generated": now.isoformat(), + "period_start": start.isoformat(), + "period_end": now.isoformat(), + "audit_trail": { + "total_entries": len(entries), + "events_by_type": event_counts, + "events_by_outcome": outcome_counts, + "chain_integrity_valid": chain_valid, + "chain_integrity_error": chain_error, + "merkle_root": audit._chain.get_root_hash(), + }, + "owasp_asi_2026": { + "passed": attestation.passed, + "controls_passed": attestation.controls_passed, + "controls_total": attestation.controls_total, + "coverage_pct": attestation.coverage_pct(), + "attestation_hash": attestation.attestation_hash, + }, + "supply_chain_integrity": { + "passed": integrity_passed, + }, + } + + Path(output_path).write_text( + json.dumps(report, indent=2, default=str), encoding="utf-8" + ) + print(f"📄 Report written to {output_path}") + return report + + +# Usage +audit = AuditLog() +# ... after logging events ... +report = generate_compliance_report(audit, days=30) +``` + +### 7.2 CI/CD Compliance Gate + +Add compliance checks to your GitHub Actions pipeline so a failing +check blocks deployment: + +```yaml +# .github/workflows/compliance.yml +name: Governance Compliance + +on: + push: + branches: [main] + pull_request: + +jobs: + compliance: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install governance packages + run: | + pip install agentmesh-platform ai-agent-compliance + + - name: Generate integrity manifest + run: agent-compliance integrity --generate integrity.json + + - name: Verify OWASP ASI 2026 coverage + run: agent-compliance verify --json > asi_report.json + + - name: Verify supply-chain integrity + run: agent-compliance integrity --manifest integrity.json --json > integrity_report.json + + - name: Upload compliance artifacts + if: always() + uses: actions/upload-artifact@v4 + with: + name: compliance-reports + path: | + asi_report.json + integrity_report.json + integrity.json +``` + +> **Tip:** `agent-compliance verify` exits with code **1** if any +> control is missing, so the pipeline step will fail automatically. + +--- + +## 8 — AuditEntry Reference + +Every call to `audit.log()` returns an `AuditEntry` with these fields: + +| Field | Type | Description | +|-------|------|-------------| +| `entry_id` | `str` | Unique UUID | +| `timestamp` | `datetime` | UTC timestamp | +| `event_type` | `str` | One of the event types above | +| `agent_did` | `str` | DID of the acting agent | +| `action` | `str` | Policy action taken | +| `resource` | `str \| None` | Resource accessed | +| `target_did` | `str \| None` | DID of the target agent (for delegation) | +| `data` | `dict` | Arbitrary metadata | +| `outcome` | `str` | success / failure / denied / error | +| `policy_decision` | `str \| None` | Human-readable policy result | +| `matched_rule` | `str \| None` | ID of the policy rule that matched | +| `previous_hash` | `str` | Hash of the prior entry in the chain | +| `entry_hash` | `str` | SHA-256 hash of this entry | +| `trace_id` | `str \| None` | Distributed tracing correlation ID | +| `session_id` | `str \| None` | Session identifier | + +Key methods on `AuditEntry`: + +```python +entry.compute_hash() # recompute SHA-256 +entry.verify_hash() # True if stored hash matches computed hash +entry.to_cloudevent() # CloudEvents v1.0 JSON envelope +``` + +--- + +## Next Steps + +* **Tutorial 01–03** — Identity, trust, and policy (prerequisites for + a full governance stack). +* **[OWASP ASI 2026](https://owaspai.org/asi/)** — Read the full + specification for context on each control. +* **`examples/quickstart.py`** and **`examples/governed_agent.py`** in + `packages/agent-compliance/` — runnable demos you can adapt. diff --git a/docs/tutorials/05-agent-reliability.md b/docs/tutorials/05-agent-reliability.md new file mode 100644 index 00000000..dbe36ef7 --- /dev/null +++ b/docs/tutorials/05-agent-reliability.md @@ -0,0 +1,685 @@ +# Agent Reliability Engineering + +Site Reliability Engineering (SRE) practices adapted for autonomous AI agents — +rogue detection, circuit breakers, SLOs, chaos testing, and cost controls. + +> **Install:** `pip install agent-sre` +> +> **See also:** [Deployment Guides](../deployment/README.md) | [OWASP ASI Reference](https://owaspai.org/) + +--- + +## Table of Contents + +- [Why SRE for AI Agents?](#why-sre-for-ai-agents) +- [Quick Start: Rogue Detection](#quick-start-rogue-detection) +- [Circuit Breaker Pattern](#circuit-breaker-pattern) +- [SLO Tracking](#slo-tracking) +- [Chaos Testing](#chaos-testing) +- [Cost Controls](#cost-controls) +- [Putting It Together](#putting-it-together) + +--- + +## Why SRE for AI Agents? + +Traditional services fail in predictable ways — timeouts, crashes, resource +exhaustion. Agents add new failure modes: + +| Failure Mode | Traditional Service | AI Agent | +|---|---|---| +| **Runaway loops** | Process hangs | Agent calls tools in an infinite loop, burning tokens | +| **Behavioral drift** | Bug in new deploy | Model update changes decision patterns silently | +| **Cost explosion** | Resource leak | Single task consumes $500 in API calls | +| **Rogue behavior** | Compromised service | Agent uses unauthorized tools or exfiltrates data | +| **Cascading failure** | Service dependency down | Agent A fails → Agent B retries → Agent C overloaded | + +`agent-sre` gives you the building blocks to detect, contain, and recover from +all of these. + +--- + +## Quick Start: Rogue Detection + +The `RogueAgentDetector` (OWASP ASI-10) combines three signals to flag +compromised or malfunctioning agents: + +1. **Tool-call frequency** — z-score spike detection over a sliding window +2. **Action entropy** — flags both suspiciously repetitive and chaotic behavior +3. **Capability violations** — tools used outside the agent's allowed profile + +### Basic setup + +```python +from agent_sre.anomaly import RogueAgentDetector, RogueDetectorConfig, RiskLevel + +# Configure detection thresholds +config = RogueDetectorConfig( + frequency_window_seconds=60.0, # Sliding window for frequency analysis + frequency_z_threshold=2.5, # Standard deviations before flagging + entropy_low_threshold=0.3, # Too repetitive (possible loop) + entropy_high_threshold=3.5, # Too chaotic (possible compromise) + quarantine_risk_level=RiskLevel.HIGH, # Auto-quarantine at this level +) + +detector = RogueAgentDetector(config=config) +``` + +### Define what each agent is allowed to do + +```python +# Register allowed tools per agent +detector.register_capability_profile( + agent_id="support-agent", + allowed_tools=["search_kb", "create_ticket", "send_email"], +) + +detector.register_capability_profile( + agent_id="code-reviewer", + allowed_tools=["read_file", "search_code", "post_comment"], +) +``` + +### Feed actions and assess risk + +```python +import time + +# Simulate normal behavior +for i in range(20): + detector.record_action( + agent_id="support-agent", + action="search", + tool_name="search_kb", + timestamp=time.time() + i, + ) + +# Assess risk +assessment = detector.assess("support-agent") +print(f"Risk: {assessment.risk_level.value}") # "low" +print(f"Composite score: {assessment.composite_score}") # ~0.0 +print(f"Quarantine? {assessment.quarantine_recommended}") # False +``` + +### Detect a compromised agent + +```python +# Agent starts using unauthorized tools rapidly +base = time.time() + 100 +for i in range(50): + detector.record_action( + agent_id="support-agent", + action="exfiltrate", + tool_name="shell_exec", # Not in allowed tools! + timestamp=base + i * 0.5, # 2 calls/second (frequency spike) + ) + +assessment = detector.assess("support-agent", timestamp=base + 25) +print(f"Risk: {assessment.risk_level.value}") # "high" or "critical" +print(f"Frequency score: {assessment.frequency_score}") # Elevated +print(f"Capability score: {assessment.capability_score}") # >0 (violations) +print(f"Quarantine? {assessment.quarantine_recommended}") # True + +if assessment.quarantine_recommended: + print(f"⚠ QUARANTINE agent '{assessment.agent_id}'") +``` + +--- + +## Circuit Breaker Pattern + +When an agent starts failing, you don't want it to keep hammering downstream +services. The circuit breaker isolates failing agents automatically. + +``` +CLOSED ──(failures >= threshold)──→ OPEN ──(timeout elapsed)──→ HALF_OPEN + ↑ │ + └──────────(success)────────────────────────────────────────────←─┘ + (failure) ──→ OPEN +``` + +### Using the cascade circuit breaker + +```python +from agent_sre.cascade.circuit_breaker import ( + CircuitBreaker, + CircuitBreakerConfig, + CircuitOpenError, +) + +# Configure: open after 3 failures, test recovery after 30s +config = CircuitBreakerConfig( + failure_threshold=3, + recovery_timeout_seconds=30.0, + half_open_max_calls=1, +) + +breaker = CircuitBreaker(agent_id="data-analyst", config=config) +``` + +### Wrap agent calls + +```python +def run_agent_task(task: dict) -> str: + """Your agent's main function.""" + # ... agent logic ... + return "result" + +# The circuit breaker wraps the call +try: + result = breaker.call(run_agent_task, {"query": "revenue Q3"}) + print(f"Result: {result}") +except CircuitOpenError as e: + print(f"Agent isolated: {e}") + print(f"Retry after: {e.retry_after:.0f}s") +``` + +### Manual failure tracking + +```python +# If you manage the call yourself: +try: + result = run_agent_task(task) + breaker.record_success() +except Exception: + breaker.record_failure() + raise + +# Check state +print(f"State: {breaker.state}") # CLOSED, OPEN, or HALF_OPEN +print(f"Failures: {breaker.failure_count}") + +# Manual reset (e.g., after deploying a fix) +breaker.reset() +``` + +### Circuit breaker for multiple agents + +```python +class AgentFleetBreakers: + """Manage circuit breakers for a fleet of agents.""" + + def __init__(self, config: CircuitBreakerConfig | None = None): + self._config = config or CircuitBreakerConfig() + self._breakers: dict[str, CircuitBreaker] = {} + + def get(self, agent_id: str) -> CircuitBreaker: + if agent_id not in self._breakers: + self._breakers[agent_id] = CircuitBreaker(agent_id, self._config) + return self._breakers[agent_id] + + def open_circuits(self) -> list[str]: + return [aid for aid, cb in self._breakers.items() if cb.state == "OPEN"] + +fleet = AgentFleetBreakers( + config=CircuitBreakerConfig(failure_threshold=5, recovery_timeout_seconds=60.0), +) + +# Use per-agent breakers +cb = fleet.get("summarizer-agent") +cb.record_failure() +print(f"Open circuits: {fleet.open_circuits()}") +``` + +--- + +## SLO Tracking + +Define what "reliable" means for your agents with Service Level Objectives +backed by error budgets and burn rate alerts. + +### Available SLI types + +| SLI Type | What It Measures | Example Target | +|---|---|---| +| Latency | Task completion time | p99 < 10s | +| Error rate | Fraction of failed tasks | < 1% | +| Cost | Per-task spend | < $0.50/task | +| Token usage | Tokens per completion | < 4096 | +| Hallucination | Factual accuracy score | > 95% | +| Tool success | Tool call success rate | > 99% | +| Human feedback | User satisfaction score | > 4.0/5.0 | + +### Define an SLI and SLO + +```python +from agent_sre.slo import SLI, SLIValue, SLO, ErrorBudget +from agent_sre.slo.indicators import TimeWindow +from agent_sre.slo.objectives import BurnRateAlert, ExhaustionAction + +# Create a concrete SLI by subclassing +class TaskSuccessRateSLI(SLI): + """Tracks task success rate.""" + + def collect(self) -> SLIValue: + values = self.values_in_window() + if not values: + return self.record(1.0) + good = sum(1 for v in values if v.is_good) + return self.record(good / len(values)) + +# 99.5% success rate target over a 24h window +success_sli = TaskSuccessRateSLI( + name="task_success_rate", + target=0.995, + window="24h", +) + +# Create the SLO with error budget +slo = SLO( + name="code-reviewer-reliability", + indicators=[success_sli], + error_budget=ErrorBudget( + total=0.005, # 0.5% error budget (1 - 0.995) + window_seconds=2_592_000, # 30-day window + burn_rate_alert=2.0, # Warn at 2× burn rate + burn_rate_critical=10.0, # Critical at 10× burn rate + exhaustion_action=ExhaustionAction.THROTTLE, # Auto-throttle on exhaustion + ), + agent_id="code-reviewer", +) +``` + +### Record events and check status + +```python +# Record outcomes as they happen +for _ in range(95): + slo.error_budget.record_event(good=True) + +for _ in range(5): + slo.error_budget.record_event(good=False) + +# Check error budget +budget = slo.error_budget +print(f"Budget remaining: {budget.remaining_percent:.1f}%") +print(f"Exhausted: {budget.is_exhausted}") + +# Check burn rate (are we burning budget too fast?) +burn_rate = budget.burn_rate(window_seconds=3600) # Last hour +print(f"1h burn rate: {burn_rate:.2f}x") + +# Check for firing alerts +for alert in budget.firing_alerts(): + print(f"🔥 {alert.name}: burn rate {burn_rate:.1f}x (threshold: {alert.rate}x)") +``` + +### SLO status reporting + +```python +# Serialize for dashboards / alerting +status = slo.error_budget.to_dict() +# { +# "total": 0.005, +# "consumed": 5.0, +# "remaining_percent": ..., +# "is_exhausted": False, +# "burn_rate": ..., +# "exhaustion_action": "throttle", +# "firing_alerts": ["burn_rate_critical"] +# } +``` + +--- + +## Chaos Testing + +Inject faults into your agent pipeline to verify resilience *before* +production incidents find the gaps for you. + +### Define an experiment + +```python +from agent_sre.chaos import ( + ChaosExperiment, + Fault, + FaultType, + AbortCondition, + ResilienceScore, +) + +# Create faults to inject +faults = [ + Fault.latency_injection("openai-api", delay_ms=5000, rate=0.3), + Fault.error_injection("search_tool", error="timeout", rate=0.1), + Fault.timeout_injection("database", delay_ms=30000, rate=0.05), +] + +# Safety: abort if success rate drops below 50% +abort_conditions = [ + AbortCondition(metric="success_rate", threshold=0.5, comparator="lte"), +] + +experiment = ChaosExperiment( + name="llm-latency-resilience", + target_agent="code-reviewer", + faults=faults, + duration_seconds=1800, # 30 minutes + abort_conditions=abort_conditions, + blast_radius=0.3, # Affect 30% of traffic + description="Verify code-reviewer handles LLM latency gracefully", +) +``` + +### Run the experiment + +```python +# Start the experiment +experiment.start() +print(f"State: {experiment.state.value}") # "running" + +# In your agent middleware, inject faults +for fault in experiment.faults: + experiment.inject_fault(fault, applied=True) + +# Periodically check abort conditions +metrics = {"success_rate": 0.85, "latency_p99": 8500} +if experiment.check_abort(metrics): + print(f"Aborted: {experiment.abort_reason}") +else: + # Experiment completed normally + score = experiment.calculate_resilience( + baseline_success_rate=0.99, + experiment_success_rate=0.85, + ) + experiment.complete(resilience=score) + +print(f"Resilience: {experiment.resilience.overall:.0f}/100") +print(f"Passed: {experiment.resilience.passed}") +``` + +### Adversarial chaos testing + +Test your agent's security boundaries with adversarial fault types: + +```python +# Security-focused faults +security_faults = [ + Fault.prompt_injection("code-reviewer", technique="direct_override"), + Fault.privilege_escalation("code-reviewer", target_role="admin"), + Fault.tool_abuse("code-reviewer", tool_name="shell_exec"), +] + +security_experiment = ChaosExperiment( + name="security-boundary-test", + target_agent="code-reviewer", + faults=security_faults, + duration_seconds=600, + description="Verify agent rejects adversarial inputs", +) +``` + +### Use the chaos library for templates + +```python +from agent_sre.chaos import ChaosLibrary, ExperimentTemplate + +library = ChaosLibrary() + +# List available templates +for template in library.templates: + print(f" {template.name}: {template.description}") + +# Serialize experiment results for reporting +report = experiment.to_dict() +# Includes: experiment_id, state, faults, injection_count, resilience scores +``` + +--- + +## Cost Controls + +Prevent runaway spending with per-task budgets, auto-throttle, and a +kill-switch. + +### Set up cost guard + +```python +from agent_sre.cost import CostGuard, AgentBudget, BudgetAction + +guard = CostGuard( + per_task_limit=2.00, # Max $2 per task + per_agent_daily_limit=50.00, # Max $50/day per agent + org_monthly_budget=5000.00, # Org-wide cap + auto_throttle=True, # Throttle at 85% daily budget + kill_switch_threshold=0.95, # Kill agent at 95% daily budget + anomaly_detection=True, # Detect cost spikes +) +``` + +### Pre-flight checks + +```python +# Before running an expensive task, check if budget allows it +allowed, reason = guard.check_task("research-agent", estimated_cost=1.50) +if not allowed: + print(f"Blocked: {reason}") +else: + # Run the task... + pass +``` + +### Record costs and handle alerts + +```python +# After each task, record the actual cost +alerts = guard.record_cost( + agent_id="research-agent", + task_id="task-001", + cost_usd=0.45, + breakdown={"input_tokens": 0.15, "output_tokens": 0.25, "tool_calls": 0.05}, +) + +for alert in alerts: + print(f"[{alert.severity.value}] {alert.message}") + if alert.action == BudgetAction.KILL: + print("🛑 Agent killed — stop all tasks immediately") + elif alert.action == BudgetAction.THROTTLE: + print("⚠ Agent throttled — reduce task rate") +``` + +### Monitor budget utilization + +```python +budget = guard.get_budget("research-agent") +print(f"Spent today: ${budget.spent_today_usd:.2f}") +print(f"Remaining: ${budget.remaining_today_usd:.2f}") +print(f"Utilization: {budget.utilization_percent:.0f}%") +print(f"Avg/task: ${budget.avg_cost_per_task:.4f}") +print(f"Throttled: {budget.throttled}") +print(f"Killed: {budget.killed}") +``` + +### Cost anomaly detection + +```python +from agent_sre.cost import CostAnomalyDetector, CostDataPoint + +anomaly_detector = CostAnomalyDetector() + +# Feed historical cost data to build a baseline +for cost in [0.40, 0.42, 0.38, 0.45, 0.41, 0.39, 0.43, 0.40, 0.42, 0.38]: + anomaly_detector.add_data_point(CostDataPoint(value=cost, agent_id="research-agent")) + +# Check a new data point for anomalies +result = anomaly_detector.analyze(CostDataPoint(value=4.50, agent_id="research-agent")) +if result.is_anomaly: + print(f"🚨 Cost anomaly: ${result.value:.2f} (expected {result.expected_range})") + print(f" Severity: {result.severity.value}, Score: {result.score:.1f}") +``` + +--- + +## Putting It Together + +Here's a production-ready SRE pipeline that combines all the components: + +```python +"""Production SRE pipeline for AI agents.""" + +import time + +from agent_sre.anomaly import RogueAgentDetector, RogueDetectorConfig, RiskLevel +from agent_sre.cascade.circuit_breaker import ( + CircuitBreaker, + CircuitBreakerConfig, + CircuitOpenError, +) +from agent_sre.slo import SLI, SLIValue, SLO, ErrorBudget +from agent_sre.slo.indicators import TimeWindow +from agent_sre.slo.objectives import ExhaustionAction +from agent_sre.cost import CostGuard, BudgetAction + + +# ── 1. Configure all components ───────────────────────────────────── + +AGENT_ID = "production-agent" + +# Rogue detection +rogue_detector = RogueAgentDetector( + config=RogueDetectorConfig( + frequency_z_threshold=3.0, + quarantine_risk_level=RiskLevel.HIGH, + ), +) +rogue_detector.register_capability_profile( + AGENT_ID, + allowed_tools=["search", "read_file", "write_file", "run_tests"], +) + +# Circuit breaker +breaker = CircuitBreaker( + agent_id=AGENT_ID, + config=CircuitBreakerConfig( + failure_threshold=5, + recovery_timeout_seconds=60.0, + ), +) + +# SLO tracking +class SuccessRateSLI(SLI): + def collect(self) -> SLIValue: + values = self.values_in_window() + if not values: + return self.record(1.0) + good = sum(1 for v in values if v.is_good) + return self.record(good / len(values)) + +slo = SLO( + name=f"{AGENT_ID}-reliability", + indicators=[SuccessRateSLI(name="success_rate", target=0.995, window="24h")], + error_budget=ErrorBudget( + total=0.005, + exhaustion_action=ExhaustionAction.CIRCUIT_BREAK, + ), + agent_id=AGENT_ID, +) + +# Cost guard +cost_guard = CostGuard( + per_task_limit=2.00, + per_agent_daily_limit=100.00, + auto_throttle=True, + kill_switch_threshold=0.95, +) + + +# ── 2. Agent execution wrapper ────────────────────────────────────── + +def execute_task(agent_id: str, task: dict) -> dict: + """Run a task through the full SRE pipeline.""" + + # Pre-flight: cost check + allowed, reason = cost_guard.check_task(agent_id, estimated_cost=task.get("est_cost", 0)) + if not allowed: + return {"status": "blocked", "reason": reason} + + # Pre-flight: rogue check + assessment = rogue_detector.assess(agent_id) + if assessment.quarantine_recommended: + return { + "status": "quarantined", + "risk_level": assessment.risk_level.value, + "score": assessment.composite_score, + } + + # Execute through circuit breaker + try: + result = breaker.call(_run_agent, agent_id, task) + except CircuitOpenError as e: + return {"status": "circuit_open", "retry_after": e.retry_after} + except Exception as exc: + slo.error_budget.record_event(good=False) + return {"status": "error", "error": str(exc)} + + # Post-flight: record success + cost + slo.error_budget.record_event(good=True) + cost_alerts = cost_guard.record_cost( + agent_id=agent_id, + task_id=task["id"], + cost_usd=result.get("cost", 0), + ) + + # Record action for rogue detection + for tool in result.get("tools_used", []): + rogue_detector.record_action(agent_id, action="tool_call", tool_name=tool) + + # Check for critical alerts + for alert in cost_alerts: + if alert.action == BudgetAction.KILL: + breaker.record_failure() # Trip the circuit breaker too + + return {"status": "success", "result": result, "alerts": [a.to_dict() for a in cost_alerts]} + + +def _run_agent(agent_id: str, task: dict) -> dict: + """Placeholder for your actual agent logic.""" + return { + "output": "task completed", + "cost": 0.35, + "tools_used": ["search", "read_file"], + } + + +# ── 3. Run tasks ──────────────────────────────────────────────────── + +tasks = [ + {"id": "t1", "query": "Review PR #42", "est_cost": 0.50}, + {"id": "t2", "query": "Summarize docs", "est_cost": 0.30}, + {"id": "t3", "query": "Run test suite", "est_cost": 1.00}, +] + +for task in tasks: + result = execute_task(AGENT_ID, task) + print(f"Task {task['id']}: {result['status']}") + +# Report SRE health +print(f"\nCircuit state: {breaker.state}") +print(f"Error budget remaining: {slo.error_budget.remaining_percent:.1f}%") +budget = cost_guard.get_budget(AGENT_ID) +print(f"Cost today: ${budget.spent_today_usd:.2f} / ${budget.daily_limit_usd:.2f}") +``` + +### What this pipeline gives you + +| Layer | Component | Protection | +|---|---|---| +| **Pre-flight** | `CostGuard.check_task` | Blocks tasks that would exceed budget | +| **Pre-flight** | `RogueAgentDetector.assess` | Quarantines compromised agents | +| **Execution** | `CircuitBreaker.call` | Isolates failing agents | +| **Post-flight** | `ErrorBudget.record_event` | Tracks reliability over time | +| **Post-flight** | `CostGuard.record_cost` | Detects cost anomalies, auto-throttles | +| **Post-flight** | `RogueAgentDetector.record_action` | Builds behavioral baseline | + +--- + +## Next Steps + +- **Progressive delivery:** Use `agent_sre.delivery.BlueGreenManager` to + safely roll out new agent versions with validation and auto-rollback. +- **Alerting:** Connect `agent_sre.alerts.AlertManager` to your notification + system (PagerDuty, Slack, Teams). +- **Dashboards:** Export SLO data via `agent_sre.slo.dashboard` for real-time + visibility. +- **Scheduled chaos:** Use `agent_sre.chaos.ChaosScheduler` for recurring + resilience tests with blackout windows. diff --git a/docs/tutorials/06-execution-sandboxing.md b/docs/tutorials/06-execution-sandboxing.md new file mode 100644 index 00000000..f5a51537 --- /dev/null +++ b/docs/tutorials/06-execution-sandboxing.md @@ -0,0 +1,1006 @@ +# 🛡️ Tutorial 06 — Execution Sandboxing + +**Isolate AI agents at runtime using privilege rings, saga transactions, and kill switches.** + +See also: [Deployment Guide](../deployment/README.md) | [Agent Hypervisor README](../../packages/agent-hypervisor/README.md) + +--- + +## Table of Contents + +1. [Introduction](#1-introduction) +2. [Quick Start: Ring-Based Access Control](#2-quick-start-ring-based-access-control) +3. [The 4-Ring Model](#3-the-4-ring-model) +4. [Capability Guards](#4-capability-guards) +5. [Saga Orchestration](#5-saga-orchestration) +6. [Session Isolation](#6-session-isolation) +7. [Emergency Controls](#7-emergency-controls) +8. [Production Deployment](#8-production-deployment) + +--- + +## 1. Introduction + +AI agents that can read files, call APIs, and execute code need strict boundaries. +Without sandboxing, a misbehaving agent can: + +- **Exfiltrate data** — read secrets and send them to external endpoints. +- **Corrupt state** — write to databases or files it should never touch. +- **Consume resources** — spin up infinite loops that exhaust CPU and memory. +- **Cascade failures** — a failed step in a multi-agent workflow leaves the system in a broken half-finished state. + +The **Agent Hypervisor** (`pip install agent-hypervisor`) solves this with four +layers of defense: + +``` +┌─────────────────────────────────────────────────┐ +│ Execution Ring Model │ +│ Ring 0 (Root) → Ring 3 (Sandbox) │ +├─────────────────────────────────────────────────┤ +│ Capability Guards │ +│ Per-agent tool allow/deny lists │ +├─────────────────────────────────────────────────┤ +│ Saga Orchestration │ +│ Multi-step transactions with auto-rollback │ +├─────────────────────────────────────────────────┤ +│ Session Isolation │ +│ VFS namespacing, vector clocks, intent locks │ +├─────────────────────────────────────────────────┤ +│ Emergency Controls │ +│ Kill switch, rate limiting, breach detection │ +└─────────────────────────────────────────────────┘ +``` + +### Prerequisites + +- Python ≥ 3.11 +- `pip install agent-hypervisor` (v2.0.2+) +- For capability guards: `pip install agent-os` + +--- + +## 2. Quick Start: Ring-Based Access Control + +Get sandboxing running in under 20 lines: + +```python +from hypervisor import Hypervisor, ExecutionRing +from hypervisor.rings.classifier import ActionClassifier +from hypervisor.rings.enforcer import RingEnforcer + +# 1. Create the hypervisor +hv = Hypervisor() + +# 2. Classify an action — the classifier maps actions to rings +classifier = ActionClassifier() +result = classifier.classify_action_id("file.read") +print(result.ring) # ExecutionRing.RING_3_SANDBOX +print(result.risk_weight) # 0.1 + +result = classifier.classify_action_id("deploy.k8s") +print(result.ring) # ExecutionRing.RING_1_PRIVILEGED +print(result.risk_weight) # 0.9 + +# 3. Enforce the ring — block agents that lack privilege +enforcer = RingEnforcer() +agent_ring = ExecutionRing.from_eff_score(eff_score=0.72) +print(agent_ring) # ExecutionRing.RING_2_STANDARD + +# Agent in Ring 2 tries a Ring 1 action → blocked +# Agent in Ring 2 tries a Ring 3 action → allowed +``` + +That's it. The classifier decides which ring an action belongs to, and the +enforcer checks whether the agent's effective score grants sufficient privilege. + +--- + +## 3. The 4-Ring Model + +The hypervisor uses a hardware-inspired 4-ring privilege model. Each ring +defines what an agent can do, how many calls it can make, and what level of +trust is required. + +``` + ┌───────────────────────┐ + │ Ring 0 — Root │ eff_score: N/A (SRE Witness required) + │ Hypervisor config, │ Penalty/slashing operations + │ penalty ops │ Rate: unlimited + ├───────────────────────┤ + │ Ring 1 — Privileged │ eff_score ≥ 0.95 + consensus + │ Non-reversible ops │ Write, deploy, delete + │ (deploy, delete) │ Rate: 1000 calls/min + ├───────────────────────┤ + │ Ring 2 — Standard │ eff_score ≥ 0.60 + │ Reversible actions │ Read + limited write + │ (write files, APIs) │ Rate: 100 calls/min + ├───────────────────────┤ + │ Ring 3 — Sandbox │ Default for unknown agents + │ Read-only, research │ No network, no writes + │ (safe exploration) │ Rate: 10 calls/min + └───────────────────────┘ +``` + +### 3.1 Ring Assignment from Effective Score + +The `ExecutionRing` enum maps directly from an agent's **effective score** +(`eff_score`), which combines trust, reputation, and behavioral signals: + +```python +from hypervisor.models import ExecutionRing + +# Ring assignment is automatic based on eff_score +ring = ExecutionRing.from_eff_score(eff_score=0.98, has_consensus=True) +assert ring == ExecutionRing.RING_1_PRIVILEGED + +ring = ExecutionRing.from_eff_score(eff_score=0.75) +assert ring == ExecutionRing.RING_2_STANDARD + +ring = ExecutionRing.from_eff_score(eff_score=0.40) +assert ring == ExecutionRing.RING_3_SANDBOX +``` + +> **Note:** Ring 0 is never assigned by score alone — it requires an SRE +> Witness attestation and is reserved for hypervisor-level configuration. + +### 3.2 Action Classification + +Every action is classified by **risk weight** and **reversibility** to determine +which ring it requires: + +```python +from hypervisor.rings.classifier import ActionClassifier, ClassificationResult +from hypervisor.models import ReversibilityLevel + +classifier = ActionClassifier() + +# Read operations → Ring 3 (low risk, fully reversible) +result = classifier.classify_action_id("file.read") +assert result.ring == ExecutionRing.RING_3_SANDBOX +assert result.reversibility == ReversibilityLevel.REVERSIBLE + +# Write operations → Ring 2 (medium risk, reversible with effort) +result = classifier.classify_action_id("file.write") +assert result.ring == ExecutionRing.RING_2_STANDARD + +# Deployments → Ring 1 (high risk, non-reversible) +result = classifier.classify_action_id("deploy.k8s") +assert result.ring == ExecutionRing.RING_1_PRIVILEGED +assert result.reversibility == ReversibilityLevel.NON_REVERSIBLE + +# Override classification for custom actions +classifier.set_override("my_custom.action", ring=ExecutionRing.RING_2_STANDARD, risk_weight=0.5) +``` + +### 3.3 Ring Elevation (Privilege Escalation) + +Sometimes an agent needs temporary access to a higher ring. The +`RingElevationManager` handles time-bounded privilege escalation: + +```python +from hypervisor.rings.elevation import ( + RingElevationManager, + RingElevation, + ElevationDenialReason, +) + +manager = RingElevationManager() + +# Request elevation from Ring 2 → Ring 1 +elevation = manager.request_elevation( + agent_did="did:example:agent-42", + session_id="session-001", + current_ring=ExecutionRing.RING_2_STANDARD, + target_ring=ExecutionRing.RING_1_PRIVILEGED, + ttl_seconds=300, # 5-minute window (max: 3600s) + reason="Deploying approved release v2.1.0", + attestation="signed-approval-token-from-sre", +) + +if elevation.is_active: + # Agent now has Ring 1 privileges for 5 minutes + effective = manager.get_effective_ring( + agent_did="did:example:agent-42", + session_id="session-001", + base_ring=ExecutionRing.RING_2_STANDARD, + ) + assert effective == ExecutionRing.RING_1_PRIVILEGED + +# Revoke early if needed +manager.revoke_elevation(elevation.elevation_id) +``` + +> **Community Edition:** Elevation requests are always denied. The denial +> reason is `ElevationDenialReason.COMMUNITY_EDITION`. Upgrade to Enterprise +> for dynamic ring escalation. + +### 3.4 Breach Detection + +The `RingBreachDetector` monitors for agents attempting actions above their +ring level: + +```python +from hypervisor.rings.breach_detector import ( + RingBreachDetector, + BreachEvent, + BreachSeverity, +) + +detector = RingBreachDetector() + +# The detector fires events when an agent in Ring 3 attempts a Ring 1 action +# Severity depends on the gap between agent ring and action ring: +# 1-ring gap → WARNING +# 2-ring gap → HIGH +# 3-ring gap → CRITICAL (Ring 3 agent trying Ring 0 action) +``` + +--- + +## 4. Capability Guards + +While rings control *privilege levels*, **Capability Guards** control *which +specific tools* an agent can call. This is a second, orthogonal layer of defense. + +The `CapabilityGuardMiddleware` (from `agent-os`) enforces per-agent tool +allow/deny lists: + +```python +from agent_os.integrations.maf_adapter import ( + CapabilityGuardMiddleware, + GovernancePolicyMiddleware, + create_governance_middleware, +) + +# Option 1: Explicit allow list (whitelist) — only these tools are permitted +guard = CapabilityGuardMiddleware( + allowed_tools=["web_search", "file_read", "calculator"], +) + +# Option 2: Deny list (blacklist) — everything except these tools +guard = CapabilityGuardMiddleware( + denied_tools=["execute_code", "delete_file", "send_email"], +) + +# Option 3: Factory function for full governance stack +middleware = create_governance_middleware( + policy_directory="policies/", + allowed_tools=["web_search", "file_read"], + denied_tools=["execute_code", "delete_file"], + enable_rogue_detection=True, +) +``` + +### 4.1 Per-Ring Tool Restrictions + +Combine rings with capability guards for defense-in-depth: + +```python +from hypervisor.models import ExecutionRing +from agent_os.integrations.maf_adapter import CapabilityGuardMiddleware + +# Define tool sets per ring +RING_TOOL_POLICIES = { + ExecutionRing.RING_3_SANDBOX: CapabilityGuardMiddleware( + allowed_tools=["web_search", "file_read"], + ), + ExecutionRing.RING_2_STANDARD: CapabilityGuardMiddleware( + allowed_tools=["web_search", "file_read", "file_write", "api_call"], + denied_tools=["delete_file", "execute_code"], + ), + ExecutionRing.RING_1_PRIVILEGED: CapabilityGuardMiddleware( + denied_tools=["drop_database"], # everything else allowed + ), + ExecutionRing.RING_0_ROOT: CapabilityGuardMiddleware( + # No restrictions — full access + ), +} + +def get_guard_for_agent(eff_score: float) -> CapabilityGuardMiddleware: + """Return the capability guard matching an agent's privilege ring.""" + ring = ExecutionRing.from_eff_score(eff_score) + return RING_TOOL_POLICIES[ring] +``` + +### 4.2 Integrating with an Agent Framework + +```python +from agent_os.integrations.maf_adapter import ( + create_governance_middleware, + AuditTrailMiddleware, + RogueDetectionMiddleware, +) + +# Full governance middleware stack: policy + capability guard + audit + rogue detection +middleware = create_governance_middleware( + policy_directory="policies/", + allowed_tools=["web_search", "file_read"], + denied_tools=["execute_code"], + enable_rogue_detection=True, +) + +# Attach to your agent framework — the middleware intercepts every tool call +# and blocks anything not in the allow list (or in the deny list) +``` + +--- + +## 5. Saga Orchestration + +Multi-step agent workflows are dangerous: if step 3 of 5 fails, you're left +with a half-finished state. The **Saga Orchestrator** wraps multi-step +workflows in transactions with automatic compensation (rollback). + +### 5.1 Core Concepts + +``` +Step 1: Create PR ──→ Compensate: Close PR +Step 2: Run tests ──→ Compensate: Cancel test run +Step 3: Deploy to staging ──→ Compensate: Rollback deployment +Step 4: Notify team ──→ Compensate: Send failure notice + +If Step 3 fails: + → Compensate Step 2 (cancel tests) + → Compensate Step 1 (close PR) + → Saga state: COMPENSATING → FAILED +``` + +### 5.2 Creating a Saga + +```python +from hypervisor.saga.orchestrator import SagaOrchestrator +from hypervisor.saga.state_machine import SagaState, StepState + +orchestrator = SagaOrchestrator() + +# Create a new saga for this session +saga = orchestrator.create_saga(session_id="session-deploy-42") + +# Add steps with execute and undo APIs +orchestrator.add_step( + saga_id=saga.saga_id, + action_id="pr.create", + agent_did="did:example:dev-agent", + execute_api="/api/pr/create", + undo_api="/api/pr/close", # compensation action + timeout_seconds=60, + max_retries=2, +) + +orchestrator.add_step( + saga_id=saga.saga_id, + action_id="tests.run", + agent_did="did:example:ci-agent", + execute_api="/api/tests/run", + undo_api="/api/tests/cancel", + timeout_seconds=300, + max_retries=1, +) + +orchestrator.add_step( + saga_id=saga.saga_id, + action_id="deploy.staging", + agent_did="did:example:deploy-agent", + execute_api="/api/deploy/staging", + undo_api="/api/deploy/rollback", + timeout_seconds=600, +) +``` + +### 5.3 Step and Saga State Machines + +Each step transitions through a well-defined state machine: + +``` +StepState flow: + PENDING → EXECUTING → COMMITTED + ↘ FAILED → COMPENSATING → COMPENSATED + ↘ COMPENSATION_FAILED +``` + +The saga itself tracks the aggregate state: + +```python +from hypervisor.saga.state_machine import SagaState, StepState, STEP_TRANSITIONS + +# Valid step transitions are enforced — invalid transitions raise errors +step = SagaStep(step_id="s1", action_id="pr.create", ...) +step.transition(StepState.EXECUTING) # PENDING → EXECUTING ✓ +step.transition(StepState.COMMITTED) # EXECUTING → COMMITTED ✓ +# step.transition(StepState.PENDING) # COMMITTED → PENDING ✗ (raises error) +``` + +Saga-level states: + +| State | Meaning | +|-------|---------| +| `RUNNING` | Steps are being executed sequentially | +| `COMPENSATING` | A step failed; compensation is running in reverse | +| `COMPLETED` | All steps committed successfully | +| `FAILED` | All compensation finished (or some compensation failed) | +| `ESCALATED` | Compensation itself failed; human intervention required | + +### 5.4 Declarative Sagas with the DSL + +For complex workflows, define sagas declaratively: + +```python +from hypervisor.saga.dsl import SagaDSLParser, SagaDefinition + +saga_yaml = """ +saga: + id: deploy-pipeline + steps: + - id: create-pr + action_id: pr.create + agent: did:example:dev-agent + execute_api: /api/pr/create + undo_api: /api/pr/close + timeout: 60 + retries: 2 + + - id: run-tests + action_id: tests.run + agent: did:example:ci-agent + execute_api: /api/tests/run + undo_api: /api/tests/cancel + timeout: 300 + depends_on: [create-pr] + + - id: deploy-staging + action_id: deploy.staging + agent: did:example:deploy-agent + execute_api: /api/deploy/staging + undo_api: /api/deploy/rollback + timeout: 600 + depends_on: [run-tests] + checkpoint_goal: "Staging deployment matches PR diff" +""" + +parser = SagaDSLParser() +definition: SagaDefinition = parser.parse(saga_yaml) +``` + +### 5.5 Semantic Checkpoints + +Checkpoints verify that each step actually achieved its goal, not just that +it returned HTTP 200: + +```python +from hypervisor.saga.checkpoint import CheckpointManager, SemanticCheckpoint + +checkpoint_mgr = CheckpointManager() + +# After a deploy step, verify the deployment actually happened +checkpoint = SemanticCheckpoint( + step_id="deploy-staging", + goal="Staging deployment matches PR diff", +) +# The checkpoint manager evaluates whether the goal was met +``` + +### 5.6 Fan-Out Orchestration + +For parallel step execution (e.g., deploy to multiple regions simultaneously): + +```python +from hypervisor.saga.fan_out import FanOutOrchestrator, FanOutPolicy + +fan_out = FanOutOrchestrator() + +# Execute the same action across multiple agents in parallel +# with configurable failure policies (fail-fast, best-effort, quorum) +``` + +--- + +## 6. Session Isolation + +When multiple agents collaborate in a shared session, each agent gets an +**isolated view** of the workspace. No agent can read or modify another agent's +files without explicit sharing. + +### 6.1 Virtual File System (VFS) Namespacing + +The `SessionVFS` provides per-agent isolated file views within a shared session: + +```python +from hypervisor.session.sso import SessionVFS, VFSPermissionError + +vfs = SessionVFS() + +# Agent A writes a file — only Agent A can see it +vfs.write(path="/workspace/plan.md", agent_did="did:agent-a", value="# My Plan") + +# Agent A reads its own file — works fine +content = vfs.read(path="/workspace/plan.md", agent_did="did:agent-a") +assert content == "# My Plan" + +# Agent B tries to read Agent A's file — blocked +try: + vfs.read(path="/workspace/plan.md", agent_did="did:agent-b") +except VFSPermissionError: + print("Access denied: Agent B cannot read Agent A's namespace") + +# Agent B writes to the same path — it gets its own copy +vfs.write(path="/workspace/plan.md", agent_did="did:agent-b", value="# Different Plan") + +# Each agent sees its own version +assert vfs.read("/workspace/plan.md", "did:agent-a") == "# My Plan" +assert vfs.read("/workspace/plan.md", "did:agent-b") == "# Different Plan" + +# Delete is also scoped +vfs.delete(path="/workspace/plan.md", agent_did="did:agent-a") +``` + +### 6.2 Isolation Levels + +Choose the right isolation level based on your consistency requirements: + +```python +from hypervisor.session.isolation import IsolationLevel + +# Snapshot — each agent sees a consistent snapshot (cheapest) +level = IsolationLevel.SNAPSHOT +assert not level.requires_vector_clocks +assert not level.requires_intent_locks +assert level.allows_concurrent_writes +assert level.coordination_cost == "low" + +# Read Committed — agents see committed writes from others +level = IsolationLevel.READ_COMMITTED +assert level.requires_vector_clocks +assert not level.requires_intent_locks + +# Serializable — strongest consistency (most expensive) +level = IsolationLevel.SERIALIZABLE +assert level.requires_vector_clocks +assert level.requires_intent_locks +assert not level.allows_concurrent_writes +assert level.coordination_cost == "high" +``` + +### 6.3 Vector Clocks for Causal Ordering + +When agents produce concurrent writes, vector clocks establish a causal order: + +```python +from hypervisor.session.vector_clock import VectorClockManager, CausalViolationError + +clock_mgr = VectorClockManager() + +# Each agent gets its own logical clock +clock_a = clock_mgr.create_clock("did:agent-a") +clock_b = clock_mgr.create_clock("did:agent-b") + +# Agent A performs an action +clock_mgr.increment("did:agent-a") + +# Check causal ordering — did A's action happen before B's? +happened_before = clock_mgr.happens_before(clock_a, clock_b) +``` + +### 6.4 Intent Locks for Concurrency Control + +Prevent conflicting concurrent operations with intent locks: + +```python +from hypervisor.session.intent_locks import IntentLockManager, LockIntent, DeadlockError + +lock_mgr = IntentLockManager() + +# Agent A acquires a write lock on the session +lock_mgr.acquire_lock( + session_id="session-001", + agent_did="did:agent-a", + intent=LockIntent.WRITE, +) + +# Agent B tries an exclusive lock — blocked until A releases +try: + lock_mgr.acquire_lock( + session_id="session-001", + agent_did="did:agent-b", + intent=LockIntent.EXCLUSIVE, + ) +except DeadlockError: + print("Deadlock detected — aborting Agent B's operation") + +# Release when done +lock_mgr.release_lock(session_id="session-001", agent_did="did:agent-a") +``` + +### 6.5 Full Session Configuration + +Bring it all together with a `SharedSessionObject`: + +```python +from hypervisor.session import SharedSessionObject +from hypervisor.models import SessionConfig, ConsistencyMode + +config = SessionConfig( + consistency_mode=ConsistencyMode.SERIALIZABLE, + max_participants=5, + max_duration_seconds=3600, # 1 hour + min_eff_score=0.60, # minimum trust to join +) + +session = SharedSessionObject( + session_id="session-deploy-42", + config=config, +) + +# Session provides: +# session.vfs — SessionVFS (isolated file views) +# session.vector_clocks — VectorClockManager (causal ordering) +# session.intent_locks — IntentLockManager (concurrency control) +``` + +--- + +## 7. Emergency Controls + +When an agent goes rogue, you need to stop it *immediately* — not after the +next polling interval. + +### 7.1 Kill Switch + +The `KillSwitch` terminates an agent and triggers saga compensation for any +in-flight work: + +```python +from hypervisor.security.kill_switch import KillSwitch, KillReason, KillResult + +kill_switch = KillSwitch() + +# Immediate termination — all in-flight saga steps are compensated +result: KillResult = kill_switch.kill( + agent_did="did:example:rogue-agent", + session_id="session-001", + reason=KillReason.BEHAVIORAL_DRIFT, + details="Agent started accessing files outside its namespace", +) + +print(f"Kill ID: {result.kill_id}") +print(f"Compensation triggered: {result.compensation_triggered}") +print(f"Handoffs succeeded: {result.handoff_success_count}") +print(f"Timestamp: {result.timestamp}") +``` + +Available kill reasons: + +| Reason | When to use | +|--------|-------------| +| `BEHAVIORAL_DRIFT` | Agent deviates from expected behavior patterns | +| `RATE_LIMIT` | Agent exceeded its rate limit repeatedly | +| `RING_BREACH` | Agent attempted actions above its ring level | +| `MANUAL` | Human operator triggered the kill | +| `QUARANTINE_TIMEOUT` | Agent was quarantined and didn't recover | +| `SESSION_TIMEOUT` | Session exceeded its `max_duration_seconds` | + +### 7.2 Graceful Shutdown with Handoff + +Before killing, you can register substitute agents to take over in-flight work: + +```python +from hypervisor.security.kill_switch import KillSwitch, HandoffStatus + +kill_switch = KillSwitch() + +# Register a substitute agent that can take over work +kill_switch.register_substitute( + session_id="session-001", + agent_did="did:example:backup-agent", +) + +# Now when the primary agent is killed, its saga steps are handed off +result = kill_switch.kill( + agent_did="did:example:primary-agent", + session_id="session-001", + reason=KillReason.MANUAL, + details="Planned maintenance rotation", +) + +# Check handoff results +for handoff in result.handoffs: + print(f"Step {handoff.step_id}: {handoff.status}") + # HandoffStatus: PENDING, HANDED_OFF, FAILED, COMPENSATED + +# Review kill history +history = kill_switch.get_kill_history(agent_did="did:example:primary-agent") +``` + +### 7.3 Rate Limiting + +Prevent resource exhaustion with per-agent rate limits: + +```python +from hypervisor.security.rate_limiter import AgentRateLimiter, RateLimitExceeded + +# Ring 3 agents: 10 calls per minute +sandbox_limiter = AgentRateLimiter( + window_seconds=60.0, + max_calls=10, +) + +# Ring 2 agents: 100 calls per minute +standard_limiter = AgentRateLimiter( + window_seconds=60.0, + max_calls=100, +) + +# Check before each action +status = sandbox_limiter.check_rate_limit(agent_did="did:example:new-agent") +if not status.allowed: + print(f"Rate limited — retry after {status.retry_after_seconds}s") + +# Reset limits (e.g., after an agent is promoted) +sandbox_limiter.reset(agent_did="did:example:new-agent") +``` + +### 7.4 Quarantine + +Quarantine isolates an agent without killing it — useful for investigation: + +```python +from hypervisor.liability.quarantine import QuarantineManager, QuarantineReason + +quarantine = QuarantineManager() + +# Quarantine a suspect agent — it can't take new actions but existing +# saga steps are preserved for forensic analysis +``` + +### 7.5 Breach Detection Pipeline + +Wire breach detection into your kill switch for automated response: + +```python +from hypervisor.rings.breach_detector import RingBreachDetector, BreachSeverity +from hypervisor.security.kill_switch import KillSwitch, KillReason +from hypervisor.security.rate_limiter import AgentRateLimiter + +detector = RingBreachDetector() +kill_switch = KillSwitch() +limiter = AgentRateLimiter(window_seconds=60.0, max_calls=100) + +async def on_agent_action(agent_did: str, session_id: str, action_id: str): + """Example enforcement pipeline for every agent action.""" + + # Layer 1: Rate limit check + status = limiter.check_rate_limit(agent_did) + if not status.allowed: + kill_switch.kill(agent_did, session_id, KillReason.RATE_LIMIT) + return + + # Layer 2: Ring enforcement (breach detection) + # If a breach is CRITICAL severity → kill immediately + # If WARNING → log and allow (the agent might be testing boundaries) + + # Layer 3: Capability guard check (handled by middleware) + # Layer 4: Saga step execution (handled by orchestrator) +``` + +--- + +## 8. Production Deployment + +### 8.1 Running the Hypervisor API Server + +The hypervisor includes a FastAPI server for HTTP-based enforcement: + +```bash +# Install with API extras +pip install "agent-hypervisor[api]" + +# Start the server +hypervisor serve --host 0.0.0.0 --port 8000 +``` + +### 8.2 Docker Container + +```dockerfile +FROM python:3.11-slim + +WORKDIR /app + +RUN pip install "agent-hypervisor[full,api]" + +EXPOSE 8000 + +CMD ["hypervisor", "serve", "--host", "0.0.0.0", "--port", "8000"] +``` + +Build and run: + +```bash +docker build -t agent-hypervisor:latest . +docker run -p 8000:8000 agent-hypervisor:latest +``` + +### 8.3 Kubernetes Deployment + +Deploy the hypervisor as a sidecar alongside your agent pods: + +```yaml +# hypervisor-deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: agent-hypervisor + labels: + app: agent-hypervisor +spec: + replicas: 2 + selector: + matchLabels: + app: agent-hypervisor + template: + metadata: + labels: + app: agent-hypervisor + spec: + containers: + - name: hypervisor + image: agent-hypervisor:latest + ports: + - containerPort: 8000 + resources: + requests: + memory: "256Mi" + cpu: "250m" + limits: + memory: "512Mi" + cpu: "500m" + readinessProbe: + httpGet: + path: /health + port: 8000 + initialDelaySeconds: 5 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /health + port: 8000 + initialDelaySeconds: 10 + periodSeconds: 30 + env: + - name: HYPERVISOR_LOG_LEVEL + value: "INFO" +--- +apiVersion: v1 +kind: Service +metadata: + name: agent-hypervisor +spec: + selector: + app: agent-hypervisor + ports: + - port: 8000 + targetPort: 8000 +``` + +### 8.4 Sidecar Pattern + +For fine-grained per-pod enforcement, run the hypervisor as a sidecar: + +```yaml +# agent-pod-with-sidecar.yaml +apiVersion: v1 +kind: Pod +metadata: + name: agent-worker +spec: + containers: + # Your agent container + - name: agent + image: my-agent:latest + env: + - name: HYPERVISOR_URL + value: "http://localhost:8000" + + # Hypervisor sidecar — enforces sandboxing for this pod + - name: hypervisor-sidecar + image: agent-hypervisor:latest + ports: + - containerPort: 8000 + resources: + requests: + memory: "128Mi" + cpu: "100m" + limits: + memory: "256Mi" + cpu: "250m" +``` + +### 8.5 Helm Chart Values + +Create a values file for parameterized deployments: + +```yaml +# values.yaml +replicaCount: 2 + +image: + repository: agent-hypervisor + tag: "latest" + pullPolicy: IfNotPresent + +service: + type: ClusterIP + port: 8000 + +resources: + requests: + memory: "256Mi" + cpu: "250m" + limits: + memory: "512Mi" + cpu: "500m" + +hypervisor: + logLevel: INFO + rateLimiting: + ring3MaxCalls: 10 + ring2MaxCalls: 100 + ring1MaxCalls: 1000 + windowSeconds: 60 + session: + maxDurationSeconds: 3600 + maxParticipants: 10 + defaultIsolation: snapshot + saga: + defaultTimeoutSeconds: 300 + maxRetries: 2 +``` + +### 8.6 Observability + +Monitor your hypervisor with the built-in event bus: + +```python +from hypervisor.observability.event_bus import HypervisorEventBus, EventType +from hypervisor.observability.causal_trace import CausalTraceId + +event_bus = HypervisorEventBus() + +# Subscribe to security events +@event_bus.subscribe(EventType.RING_BREACH) +async def on_breach(event): + print(f"BREACH: {event.agent_did} attempted {event.action_id}") + +@event_bus.subscribe(EventType.KILL_SWITCH) +async def on_kill(event): + print(f"KILLED: {event.agent_did} — reason: {event.reason}") + +# Trace causality across distributed saga steps +trace_id = CausalTraceId.generate() +``` + +--- + +## Summary + +| Layer | Component | What It Does | +|-------|-----------|--------------| +| **Privilege** | `ExecutionRing` | 4-tier access model based on trust score | +| **Privilege** | `ActionClassifier` | Maps actions to rings by risk/reversibility | +| **Privilege** | `RingElevationManager` | Temporary privilege escalation with TTL | +| **Detection** | `RingBreachDetector` | Alerts on ring boundary violations | +| **Tools** | `CapabilityGuardMiddleware` | Per-agent tool allow/deny lists | +| **Transactions** | `SagaOrchestrator` | Multi-step workflows with auto-rollback | +| **Isolation** | `SessionVFS` | Per-agent virtual file system namespacing | +| **Isolation** | `IntentLockManager` | Concurrency control with intent locks | +| **Isolation** | `VectorClockManager` | Causal ordering of concurrent operations | +| **Emergency** | `KillSwitch` | Immediate agent termination | +| **Emergency** | `AgentRateLimiter` | Per-agent call rate enforcement | +| **Emergency** | `QuarantineManager` | Agent isolation for investigation | +| **Observability** | `HypervisorEventBus` | Real-time event streaming | + +--- + +## Next Steps + +- **Audit trails:** Explore `CommitmentEngine` and `DeltaEngine` for hash-chained, tamper-evident logging. +- **Liability:** See `LiabilityMatrix`, `CausalAttributor`, and `SlashingEngine` for agent accountability. +- **Deployment:** Read the [Azure Container Apps guide](../deployment/azure-container-apps.md) for cloud-native deployment patterns.