This document provides a deep dive into all five amplifier-foundation module types, their protocols, implementation patterns, and testing strategies.
Orchestrator modules control the agent execution loop. They manage:
- Turn-taking between user and agent
- Tool call execution and result injection
- Streaming or batch response delivery
- Error recovery and retry logic
Build an orchestrator when you need:
- Custom execution patterns (e.g., parallel tool calls)
- Specialized streaming logic
- Event-driven architectures
- Custom retry or error recovery
Note: Most applications use existing orchestrators (loop-basic, loop-streaming). Only build custom orchestrators for specialized needs.
async def mount(coordinator: Any, config: dict) -> dict[str, Any]:
"""Mount the orchestrator.
Args:
coordinator: The amplifier coordinator
config: Configuration dictionary
Returns:
Dict with 'execute_turn' function
"""
async def execute_turn(session: Any, user_message: str) -> AsyncIterator[dict]:
"""Execute one turn of conversation.
Args:
session: Session object with state and history
user_message: User's input message
Yields:
Event dicts with type, data, etc.
"""
# Implementation
pass
return {"execute_turn": execute_turn}execute_turn(session, user_message) -> AsyncIterator[dict]
Must yield events:
{"type": "turn_start", "turn_id": str}{"type": "message_delta", "delta": str}(streaming chunks){"type": "tool_call", "name": str, "args": dict}{"type": "tool_result", "name": str, "result": Any}{"type": "turn_complete", "message": str}{"type": "error", "error": str}(on errors)
config:
max_turns: 10 # Maximum turns per session
stream_chunks: true # Stream vs batch responses
parallel_tools: false # Allow parallel tool execution
retry_on_error: true # Retry failed tool calls
max_retries: 3- Implement execute_turn() async generator
- Yield turn_start event at beginning
- Handle user message injection
- Execute tool calls through coordinator
- Inject tool results back to provider
- Stream or batch response chunks
- Yield turn_complete event at end
- Handle errors gracefully
- Respect max_turns limit
- Support cancellation (AsyncIterator cleanup)
async def mount(coordinator, config):
"""Mount basic orchestrator."""
max_turns = config.get("max_turns", 10)
async def execute_turn(session, user_message):
"""Execute one conversation turn."""
turn_id = f"turn-{session.turn_count}"
# Start turn
yield {"type": "turn_start", "turn_id": turn_id}
# Add user message to history
session.add_message("user", user_message)
# Get provider
provider = await coordinator.get_provider()
# Request completion
async for event in provider.complete(session.messages):
if event["type"] == "text_delta":
yield {"type": "message_delta", "delta": event["delta"]}
elif event["type"] == "tool_call":
# Execute tool
tool = await coordinator.get_tool(event["name"])
result = await tool[event["name"]](**event["args"])
# Inject result
yield {"type": "tool_result", "name": event["name"], "result": result}
elif event["type"] == "complete":
# Add assistant message
session.add_message("assistant", event["message"])
yield {"type": "turn_complete", "message": event["message"]}
return {"execute_turn": execute_turn}Unit Tests: Test event generation logic
@pytest.mark.asyncio
async def test_turn_start_event():
"""Test turn_start event is yielded."""
orchestrator = await mount(coordinator=mock_coordinator, config={})
events = []
async for event in orchestrator["execute_turn"](mock_session, "hello"):
events.append(event)
assert events[0]["type"] == "turn_start"Integration Tests: Test with real provider and tools
@pytest.mark.asyncio
async def test_with_real_provider():
"""Test orchestrator with real provider."""
from amplifier_foundation import Coordinator
coordinator = Coordinator()
# Load provider and tools
# Test complete workflow- Forgetting to yield turn_complete: Always yield completion event
- Not handling errors: Wrap in try/except and yield error events
- Blocking on I/O: Use async/await for all I/O operations
- Infinite loops: Respect max_turns limit
Provider modules connect to AI model APIs and abstract vendor-specific details. They:
- Send prompts to model APIs
- Stream or batch responses
- Execute tool calls (function calling)
- Count tokens and handle rate limits
Build a provider when you want to:
- Support a new AI model API (e.g., Cohere, Hugging Face)
- Use a custom model deployment
- Implement specialized model routing logic
async def mount(coordinator: Any, config: dict) -> dict[str, Any]:
"""Mount the provider.
Args:
coordinator: The amplifier coordinator
config: Configuration with api_key, model, etc.
Returns:
Dict with 'complete', 'stream', 'count_tokens' functions
"""
async def complete(messages: list[dict]) -> dict:
"""Generate completion for messages.
Args:
messages: List of message dicts with role/content
Returns:
Dict with 'message', 'tool_calls', 'usage', etc.
"""
pass
async def stream(messages: list[dict]) -> AsyncIterator[dict]:
"""Stream completion chunks.
Args:
messages: List of message dicts
Yields:
Event dicts with type and data
"""
pass
def count_tokens(messages: list[dict]) -> int:
"""Count tokens in messages.
Args:
messages: List of message dicts
Returns:
Token count
"""
pass
return {
"complete": complete,
"stream": stream,
"count_tokens": count_tokens
}complete(messages) -> dict
- Send messages to API
- Return completion with message, tool_calls, usage
stream(messages) -> AsyncIterator[dict]
- Stream completion chunks
- Yield text_delta, tool_call, complete events
count_tokens(messages) -> int
- Count tokens in message list
- Use provider's tokenizer
config:
api_key: "sk-..." # API key
model: "claude-3-5-sonnet" # Model identifier
max_tokens: 4096 # Max completion tokens
temperature: 1.0 # Sampling temperature
timeout: 60 # Request timeout (seconds)
base_url: null # Custom API endpoint- Implement complete() for batch completions
- Implement stream() for streaming completions
- Implement count_tokens() using tokenizer
- Handle authentication (API keys, headers)
- Map vendor message format to common format
- Parse tool calls from responses
- Handle rate limiting (retry with backoff)
- Implement timeout handling
- Log API errors with context
- Support custom base URLs
import asyncio
async def mount(coordinator, config):
"""Mount mock provider for testing."""
async def complete(messages):
"""Return mock completion."""
# Simulate API delay
await asyncio.sleep(0.1)
user_message = messages[-1]["content"]
return {
"message": f"Mock response to: {user_message}",
"tool_calls": [],
"usage": {
"input_tokens": 10,
"output_tokens": 5,
"total_tokens": 15
}
}
async def stream(messages):
"""Stream mock completion."""
chunks = ["Mock ", "streamed ", "response"]
for chunk in chunks:
await asyncio.sleep(0.05)
yield {"type": "text_delta", "delta": chunk}
yield {
"type": "complete",
"message": "Mock streamed response",
"usage": {"total_tokens": 15}
}
def count_tokens(messages):
"""Count tokens (mock)."""
return sum(len(m["content"].split()) for m in messages)
return {
"complete": complete,
"stream": stream,
"count_tokens": count_tokens
}Unit Tests: Test message formatting and parsing
def test_message_format():
"""Test message format conversion."""
messages = [{"role": "user", "content": "hello"}]
formatted = _format_messages(messages)
assert formatted[0]["role"] == "user"Integration Tests: Test with real API (use test mode)
@pytest.mark.asyncio
async def test_real_api_completion():
"""Test with real API in test mode."""
provider = await mount(None, {"api_key": TEST_KEY})
result = await provider["complete"]([
{"role": "user", "content": "Say 'test'"}
])
assert "test" in result["message"].lower()Mocking Tests: Mock HTTP responses
@pytest.mark.asyncio
async def test_with_mocked_http(mock_http_client):
"""Test with mocked HTTP responses."""
mock_http_client.post.return_value = {"message": "response"}
provider = await mount(None, {"api_key": "test"})
result = await provider["complete"]([...])
assert result["message"] == "response"- Not handling rate limits: Implement exponential backoff
- Hardcoded timeouts: Make timeout configurable
- Ignoring streaming errors: Catch exceptions in async generators
- Token counting inaccuracy: Use provider's official tokenizer
Tool modules extend agent capabilities by providing callable functions. They:
- Perform operations (file I/O, API calls, calculations)
- Validate inputs and return structured outputs
- Provide JSON schemas for function calling
- Handle errors gracefully
Build a tool when you want agents to:
- Interact with external systems (databases, APIs, file systems)
- Perform calculations or data transformations
- Access specialized libraries or services
async def mount(coordinator: Any, config: dict) -> dict[str, Any]:
"""Mount the tool.
Args:
coordinator: The amplifier coordinator
config: Configuration dictionary
Returns:
Dict mapping function names to callable functions
"""
async def my_function(arg1: str, arg2: int) -> dict:
"""Tool function.
Args:
arg1: First argument
arg2: Second argument
Returns:
Result dictionary
"""
pass
return {
"my_function": my_function
}
def get_schema() -> dict:
"""Return JSON schemas for tool functions.
Returns:
Dict mapping function names to JSON schemas
"""
return {
"my_function": {
"description": "Function description",
"parameters": {
"type": "object",
"properties": {
"arg1": {"type": "string", "description": "..."},
"arg2": {"type": "integer", "description": "..."}
},
"required": ["arg1", "arg2"]
}
}
}mount(coordinator, config) -> dict
- Return dict of callable functions
- Functions should be async
- Include all tools provided by this module
get_schema() -> dict
- Return JSON schema for each function
- Follow OpenAI function calling format
- Include descriptions and parameter types
config:
max_file_size: 1048576 # 1MB max file size
allowed_paths: ["/data"] # Restrict paths
timeout: 30 # Operation timeout
cache_results: true # Cache function results- Implement mount() returning dict of functions
- Implement get_schema() with JSON schemas
- Validate all inputs (types, ranges, formats)
- Handle errors with clear messages
- Make all functions async
- Use type hints for all parameters
- Document each function with docstrings
- Implement timeout handling
- Handle resource cleanup (files, connections)
- Add security checks (path traversal, injection)
from pathlib import Path
from typing import Any
class FileError(Exception):
"""Base exception for file tool errors."""
pass
async def mount(coordinator: Any, config: dict) -> dict[str, Any]:
"""Mount file read tool."""
max_size = config.get("max_file_size", 1048576)
allowed_paths = [Path(p) for p in config.get("allowed_paths", ["."])]
async def read_file(path: str) -> dict:
"""Read file contents.
Args:
path: File path to read
Returns:
Dict with 'content' and 'size' keys
Raises:
FileError: If file doesn't exist or access denied
"""
file_path = Path(path).resolve()
# Security: Check path is allowed
if not any(file_path.is_relative_to(allowed) for allowed in allowed_paths):
raise FileError(f"Access denied to {path}")
# Check file exists
if not file_path.exists():
raise FileError(f"File not found: {path}")
# Check file size
size = file_path.stat().st_size
if size > max_size:
raise FileError(f"File too large: {size} bytes (max {max_size})")
# Read file
try:
content = file_path.read_text()
return {
"content": content,
"size": size,
"path": str(file_path)
}
except Exception as e:
raise FileError(f"Failed to read file: {e}")
return {"read_file": read_file}
def get_schema() -> dict:
"""Return schema for file tool."""
return {
"read_file": {
"description": "Read contents of a text file",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to file to read"
}
},
"required": ["path"]
}
}
}Unit Tests: Test with temporary files
@pytest.mark.asyncio
async def test_read_file_success(tmp_path):
"""Test reading file succeeds."""
# Create temp file
test_file = tmp_path / "test.txt"
test_file.write_text("content")
# Mount tool
tool = await mount(None, {"allowed_paths": [str(tmp_path)]})
# Test
result = await tool["read_file"](str(test_file))
assert result["content"] == "content"Error Tests: Test error conditions
@pytest.mark.asyncio
async def test_file_not_found():
"""Test file not found error."""
tool = await mount(None, {})
with pytest.raises(FileError, match="not found"):
await tool["read_file"]("/nonexistent.txt")Security Tests: Test path traversal protection
@pytest.mark.asyncio
async def test_path_traversal_blocked():
"""Test path traversal attack is blocked."""
tool = await mount(None, {"allowed_paths": ["/safe"]})
with pytest.raises(FileError, match="Access denied"):
await tool["read_file"]("/etc/passwd")- No input validation: Always validate inputs
- Path traversal vulnerabilities: Resolve and check paths
- Not handling large files: Implement size limits
- Blocking I/O: Use async file operations
- Poor error messages: Include context in exceptions
Context modules manage conversation state and memory. They:
- Store conversation history
- Inject relevant context into prompts
- Manage context windows
- Persist memory across sessions
Build a context module when you need:
- Custom memory management
- Specialized context injection logic
- Integration with external knowledge bases
- Complex conversation state tracking
async def mount(coordinator: Any, config: dict) -> dict[str, Any]:
"""Mount the context manager.
Args:
coordinator: The amplifier coordinator
config: Configuration dictionary
Returns:
Dict with 'add_message', 'get_messages', 'inject_context' functions
"""
async def add_message(role: str, content: str, metadata: dict = None):
"""Add message to context.
Args:
role: Message role (user, assistant, system)
content: Message content
metadata: Optional metadata dict
"""
pass
async def get_messages() -> list[dict]:
"""Get current message history.
Returns:
List of message dicts
"""
pass
async def inject_context(messages: list[dict]) -> list[dict]:
"""Inject additional context into messages.
Args:
messages: Current message list
Returns:
Enhanced message list with injected context
"""
pass
return {
"add_message": add_message,
"get_messages": get_messages,
"inject_context": inject_context
}add_message(role, content, metadata)
- Store message in context
- Update conversation state
- Optional: Trigger summarization
get_messages() -> list[dict]
- Return current message history
- Apply context window limits
- Include metadata if needed
inject_context(messages) -> list[dict]
- Enhance messages with additional context
- Add system messages with relevant info
- Respect token limits
config:
max_messages: 100 # Max messages to retain
max_tokens: 100000 # Max total tokens
summarize_threshold: 50 # Summarize after N messages
persistence: true # Persist to disk/DB
storage_path: "./memory" # Where to persist- Implement add_message() to store messages
- Implement get_messages() with limits
- Implement inject_context() for enhancement
- Handle context window limits (token counting)
- Implement persistence (optional)
- Support message metadata
- Implement summarization (optional)
- Thread-safe message access
- Efficient message retrieval
- Memory cleanup on session end
from typing import Any, Optional
async def mount(coordinator: Any, config: dict) -> dict[str, Any]:
"""Mount simple context manager."""
max_messages = config.get("max_messages", 100)
# In-memory message store
messages: list[dict] = []
async def add_message(
role: str,
content: str,
metadata: Optional[dict] = None
):
"""Add message to history."""
message = {
"role": role,
"content": content,
"metadata": metadata or {}
}
messages.append(message)
# Enforce message limit
if len(messages) > max_messages:
# Remove oldest messages (keep system messages)
non_system = [m for m in messages if m["role"] != "system"]
system = [m for m in messages if m["role"] == "system"]
messages.clear()
messages.extend(system)
messages.extend(non_system[-max_messages:])
async def get_messages() -> list[dict]:
"""Get current messages."""
return [
{"role": m["role"], "content": m["content"]}
for m in messages
]
async def inject_context(messages_list: list[dict]) -> list[dict]:
"""Inject context (no-op for simple context)."""
return messages_list
return {
"add_message": add_message,
"get_messages": get_messages,
"inject_context": inject_context
}Unit Tests: Test message storage and limits
@pytest.mark.asyncio
async def test_add_message():
"""Test adding messages."""
context = await mount(None, {"max_messages": 3})
await context["add_message"]("user", "hello")
messages = await context["get_messages"]()
assert len(messages) == 1
assert messages[0]["content"] == "hello"
@pytest.mark.asyncio
async def test_message_limit():
"""Test message limit enforcement."""
context = await mount(None, {"max_messages": 3})
for i in range(5):
await context["add_message"]("user", f"message {i}")
messages = await context["get_messages"]()
assert len(messages) == 3 # Limited to 3Integration Tests: Test with real sessions
@pytest.mark.asyncio
async def test_with_session():
"""Test context with real session."""
from amplifier_foundation import create_session
session = create_session(context_config={"max_messages": 10})
# Test adding messages through session- Not limiting context: Implement message/token limits
- Memory leaks: Clear context when sessions end
- Thread safety: Use locks for concurrent access
- Losing metadata: Preserve metadata in get_messages
Hook modules observe lifecycle events without blocking execution. They:
- Listen to turn start/end events
- Monitor tool calls and results
- Log events for debugging
- Trigger side effects (notifications, metrics)
Build a hook when you want to:
- Log agent behavior for debugging
- Track metrics (latency, token usage)
- Implement approval gates
- Send notifications
- Redact sensitive information
async def mount(coordinator: Any, config: dict) -> dict[str, Any]:
"""Mount the hook.
Args:
coordinator: The amplifier coordinator
config: Configuration dictionary
Returns:
Dict with hook functions
"""
async def on_turn_start(turn_id: str, user_message: str):
"""Called when turn starts.
Args:
turn_id: Unique turn identifier
user_message: User's input message
"""
pass
async def on_turn_end(turn_id: str, assistant_message: str):
"""Called when turn completes.
Args:
turn_id: Turn identifier
assistant_message: Agent's response
"""
pass
async def on_tool_call(tool_name: str, args: dict):
"""Called before tool execution.
Args:
tool_name: Name of tool being called
args: Tool arguments
"""
pass
async def on_error(error: Exception, context: dict):
"""Called on errors.
Args:
error: The exception that occurred
context: Context dict with turn_id, etc.
"""
pass
return {
"on_turn_start": on_turn_start,
"on_turn_end": on_turn_end,
"on_tool_call": on_tool_call,
"on_error": on_error
}Hooks can implement any subset of these lifecycle methods:
on_turn_start(turn_id, user_message)on_turn_end(turn_id, assistant_message)on_tool_call(tool_name, args)on_tool_result(tool_name, result)on_error(error, context)
All hook methods are optional - only implement what you need.
config:
enabled: true # Enable/disable hook
log_file: "agent.log" # Log file path
log_level: "INFO" # Logging level
async_execution: true # Run async (don't block)- Implement only needed lifecycle methods
- Make all hooks async
- Don't block execution (use fire-and-forget)
- Handle errors gracefully (don't crash agent)
- Log hook errors separately
- Make hooks configurable (enable/disable)
- Minimize performance impact
- Test hooks in isolation
- Document when each hook fires
- Support async cleanup
import logging
from pathlib import Path
from typing import Any
async def mount(coordinator: Any, config: dict) -> dict[str, Any]:
"""Mount logging hook."""
log_file = config.get("log_file", "agent.log")
log_level = config.get("log_level", "INFO")
# Setup logger
logger = logging.getLogger("amplifier.agent")
logger.setLevel(log_level)
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
))
logger.addHandler(handler)
async def on_turn_start(turn_id: str, user_message: str):
"""Log turn start."""
logger.info(f"Turn {turn_id} started. User: {user_message[:100]}")
async def on_turn_end(turn_id: str, assistant_message: str):
"""Log turn end."""
logger.info(f"Turn {turn_id} completed. Assistant: {assistant_message[:100]}")
async def on_tool_call(tool_name: str, args: dict):
"""Log tool call."""
logger.info(f"Tool call: {tool_name} with args {args}")
async def on_error(error: Exception, context: dict):
"""Log error."""
logger.error(f"Error in turn {context.get('turn_id')}: {error}", exc_info=True)
return {
"on_turn_start": on_turn_start,
"on_turn_end": on_turn_end,
"on_tool_call": on_tool_call,
"on_error": on_error
}Unit Tests: Test hook logic in isolation
@pytest.mark.asyncio
async def test_on_turn_start(tmp_path):
"""Test turn start hook logs correctly."""
log_file = tmp_path / "test.log"
hook = await mount(None, {"log_file": str(log_file)})
await hook["on_turn_start"]("turn-1", "hello")
# Check log file
logs = log_file.read_text()
assert "turn-1" in logs
assert "hello" in logsIntegration Tests: Test hooks with agent
@pytest.mark.asyncio
async def test_hook_with_agent(tmp_path):
"""Test hook integration with agent."""
log_file = tmp_path / "agent.log"
# Create agent with logging hook
# Execute turns
# Verify logs contain expected eventsPerformance Tests: Ensure hooks don't slow agent
@pytest.mark.asyncio
async def test_hook_performance():
"""Test hook doesn't impact performance."""
import time
hook = await mount(None, {})
start = time.time()
for i in range(100):
await hook["on_turn_start"](f"turn-{i}", "message")
duration = time.time() - start
assert duration < 0.1 # Should be very fast- Blocking execution: Don't await slow operations
- Not handling errors: Catch exceptions in hooks
- Performance impact: Keep hooks lightweight
- Modifying state: Hooks observe, don't modify
- Coupling to implementation: Use coordinator APIs
| Feature | Orchestrator | Provider | Tool | Context | Hook |
|---|---|---|---|---|---|
| Purpose | Control execution | Connect to models | Extend capabilities | Manage state | Observe events |
| Async | Required | Required | Required | Required | Required |
| Stateful | Usually yes | Usually no | Can be | Always | Usually no |
| Entry Point | amplifier.orchestrators |
amplifier.providers |
amplifier.tools |
amplifier.contexts |
amplifier.hooks |
| Schema | No | No | Yes (get_schema) | No | No |
| Side Effects | Yes | Yes | Yes | Yes | Yes (observe only) |
| Blocks Execution | Yes | Yes | Yes | No | No |
| Testing Complexity | High | Medium | Low | Medium | Low |
| Common Examples | loop-streaming | anthropic | filesystem | memory | logging |
- Read DEVELOPMENT_WORKFLOW.md for step-by-step development process
- Study EXAMPLES.md for complete working code
- Review TESTING_GUIDE.md for testing strategies
- Check API_PATTERNS.md for common implementation patterns