Common implementation patterns for amplifier-foundation modules.
async def mount(coordinator: Any, config: dict) -> dict[str, Any]:
"""Mount the module."""
async def my_function(arg: str) -> str:
return arg.upper()
return {"my_function": my_function}async def mount(coordinator: Any, config: dict) -> dict[str, Any]:
"""Mount with config validation."""
# Validate required config
if "api_key" not in config:
raise ValueError("Missing required config: api_key")
# Provide defaults
timeout = config.get("timeout", 30)
max_retries = config.get("max_retries", 3)
async def my_function():
pass
return {"my_function": my_function}class Service:
def __init__(self, config: dict):
self._config = config
self._cache = {}
async def my_function(self, arg: str) -> str:
if arg in self._cache:
return self._cache[arg]
result = arg.upper()
self._cache[arg] = result
return result
async def mount(coordinator: Any, config: dict) -> dict[str, Any]:
"""Mount with stateful service."""
service = Service(config)
return {"my_function": service.my_function}def get_schema() -> dict:
"""Return JSON schema."""
return {
"function_name": {
"description": "What this function does",
"parameters": {
"type": "object",
"properties": {
"arg": {"type": "string", "description": "Argument description"}
},
"required": ["arg"]
}
}
}def get_schema() -> dict:
return {
"search": {
"description": "Search for text",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"limit": {"type": "integer", "description": "Max results", "default": 10}
},
"required": ["query"] # limit is optional
}
}
}class ModuleError(Exception):
"""Base exception for module."""
pass
class ValidationError(ModuleError):
"""Invalid input."""
pass
async def my_function(arg: str) -> str:
if not arg:
raise ValidationError("Argument cannot be empty")
return arg.upper()async def my_function(url: str) -> dict:
try:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
except httpx.HTTPError as e:
raise ModuleError(f"HTTP request failed: {e}") from easync def process_batch(items: list[str]) -> list[str]:
"""Process items concurrently."""
import asyncio
tasks = [process_one(item) for item in items]
return await asyncio.gather(*tasks)class Connection:
async def __aenter__(self):
# Setup
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Cleanup
pass
async def mount(coordinator, config):
async def get_connection():
return Connection()
return {"get_connection": get_connection}async def stream_results(query: str):
"""Stream results."""
for i in range(10):
await asyncio.sleep(0.1)
yield {"item": i, "query": query}DEFAULT_CONFIG = {
"timeout": 30,
"max_retries": 3,
"batch_size": 100
}
async def mount(coordinator, config):
# Merge with defaults
final_config = {**DEFAULT_CONFIG, **config}from typing import TypedDict
class Config(TypedDict):
api_key: str
timeout: int
max_retries: int
def validate_config(config: dict) -> Config:
"""Validate and type config."""
if "api_key" not in config:
raise ValueError("Missing api_key")
return {
"api_key": config["api_key"],
"timeout": config.get("timeout", 30),
"max_retries": config.get("max_retries", 3)
}from typing import TypedDict
class SearchResult(TypedDict):
file: str
line_num: int
content: str
async def search(query: str) -> list[SearchResult]:
return [{"file": "test.py", "line_num": 1, "content": "hello"}]from typing import TypeVar, Generic
T = TypeVar("T")
async def cache_get(key: str) -> T | None:
"""Get cached value."""
passclass ResourceManager:
def __init__(self):
self._resources = []
async def acquire(self):
resource = await create_resource()
self._resources.append(resource)
return resource
async def cleanup(self):
for resource in self._resources:
await resource.close()
self._resources.clear()
async def mount(coordinator, config):
manager = ResourceManager()
async def get_resource():
return await manager.acquire()
async def cleanup():
await manager.cleanup()
return {
"get_resource": get_resource,
"cleanup": cleanup
}async def mount(coordinator, config):
async def process_file(path: str):
# Get filesystem tool from coordinator
fs = await coordinator.get_tool("filesystem")
content = await fs["read_file"](path)
return content.upper()
return {"process_file": process_file}async def mount(coordinator, config):
# Try to get optional tool
try:
logger = await coordinator.get_hook("logging")
except KeyError:
logger = None
async def my_function():
if logger:
await logger["log"]("Function called")
# Continue regardlessFor complete examples, see EXAMPLES.md.