A production-ready implementation of the 2389 Agent Protocol in Rust, enabling interoperable AI agents that communicate via MQTT with comprehensive observability and robust error handling.
This is the initial release of the 2389 Agent Protocol Rust implementation.
What's Included:
- Complete v1.0 protocol implementation with all required message types
- Production-ready agent runtime with lifecycle management
- MQTT transport layer with QoS 1 and proper error handling
- Tool system with JSON schema validation
- Multi-LLM provider support (OpenAI, Anthropic)
- Comprehensive test suite (286 tests)
- Docker deployment support
What's Experimental:
- Dynamic routing (v2.0) features - 80% complete, marked as experimental
- See DYNAMIC_ROUTING_ANALYSIS.md for current implementation status
Known Limitations:
- Dynamic routing capability matching needs additional testing
- Hot-reload configuration not yet implemented
- Prometheus metrics export in development
The 2389 Agent Protocol enables AI agents to work together in distributed systems using standardized MQTT communication patterns. This Rust implementation prioritizes correctness, performance, and protocol compliance through strong typing and comprehensive testing.
Current Status: v0.1.0
- β Core protocol (v1.0) fully implemented and tested
- β 9-step task processing algorithm complete
- β MQTT transport with QoS 1 messaging
- β Agent discovery system with capability matching
- π§ Dynamic routing (v2.0) - Experimental (see DYNAMIC_ROUTING_ANALYSIS.md)
- β 286 tests passing (213 unit + 64 integration + 9 doc)
- Production-Ready Agent Runtime - Complete lifecycle management with proper startup/shutdown sequences
- MQTT-Based Communication - QoS 1 messaging with Last Will Testament and topic canonicalization
- Extensible Tool System - JSON schema-validated tools with built-in security and sandboxing
- Multiple LLM Support - Pluggable providers for OpenAI, Anthropic Claude, and custom implementations
- Comprehensive Observability - Structured logging, metrics collection, and health monitoring
- Container-First Design - Docker and Kubernetes deployment with health checks and monitoring
- Async-first design with Tokio runtime
- 1000+ messages/second throughput per agent
- Sub-100ms processing latency for simple tasks
- Memory-efficient with bounded resource usage
- Environment variable credential injection
- Tool parameter validation against JSON schemas
- Process isolation for tool execution
- No sensitive data in logs or error messages
- Trait-based tool system supporting custom implementations
- Plugin architecture for LLM providers
- Configurable pipeline depth and processing limits
- Hot-reloadable configuration (planned)
- Structured JSON logging with contextual spans
- Thread-safe metrics collection with percentiles
- HTTP health endpoints for orchestration platforms
- Prometheus-compatible metrics export
- Rust 1.75+
- MQTT broker (e.g., Mosquitto)
- LLM provider API key (OpenAI or Anthropic)
cargo install agent2389git clone https://github.com/2389-research/2389-agent-rust
cd 2389-agent-rust
cargo build --release
./target/release/agent2389 --helpdocker pull agent2389:latest- Start an MQTT broker:
docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto:2- Create agent configuration:
# agent.toml
[agent]
id = "my-agent-001"
description = "My first 2389 agent"
capabilities = ["general-purpose", "task-execution"]
[mqtt]
broker_url = "mqtt://localhost:1883"
username_env = "MQTT_USERNAME"
password_env = "MQTT_PASSWORD"
[llm]
provider = "openai"
model = "gpt-4o"
api_key_env = "OPENAI_API_KEY"
system_prompt = "You are a helpful AI assistant."
temperature = 0.7
max_tokens = 4000
[budget]
max_tool_calls = 15
max_iterations = 8
[tools]
web_search = "builtin"
http_request = "builtin"- Set environment variables:
export MQTT_USERNAME=your_mqtt_user
export MQTT_PASSWORD=your_mqtt_password
export OPENAI_API_KEY=sk-your-openai-key- Run the agent:
agent2389 run agent.toml- Verify it's working:
curl http://localhost:8080/health| Variable | Description | Required |
|---|---|---|
MQTT_USERNAME |
MQTT broker authentication username | Yes |
MQTT_PASSWORD |
MQTT broker authentication password | Yes |
OPENAI_API_KEY |
OpenAI API key (starts with 'sk-') | Yes* |
ANTHROPIC_API_KEY |
Anthropic API key (starts with 'sk-ant-') | Yes* |
LOG_LEVEL |
Logging level (ERROR, WARN, INFO, DEBUG) | No |
LOG_FORMAT |
Log format ('json' or 'pretty') | No |
*Choose one LLM provider
The budget system prevents infinite loops and resource exhaustion by limiting the number of tool calls and iterations per task:
[budget]
max_tool_calls = 15 # Maximum tool calls per task (prevents infinite loops)
max_iterations = 8 # Maximum processing iterations (stops runaway tasks)Budget Parameters:
max_tool_calls: Total number of tool executions allowed per task. Includes HTTP requests, file operations, and custom tools. Prevents agents from making excessive API calls.max_iterations: Number of processing rounds allowed. Each iteration may include LLM calls and tool executions. Prevents infinite task loops.
Recommended Values by Agent Type:
# Research-focused agents (need more web requests and HTTP calls)
[budget]
max_tool_calls = 25
max_iterations = 12
# Writing agents (fewer tool calls, more thinking)
[budget]
max_tool_calls = 15
max_iterations = 8
# Editor agents (minimal tool usage)
[budget]
max_tool_calls = 10
max_iterations = 6Budget Enforcement:
- Tool calls are counted per task execution
- Processing stops gracefully when limits are reached
- Budget exhaustion is logged and reported via MQTT
- No partial results are lost - agents provide best-effort responses
[agent]
id = "advanced-agent"
description = "Advanced configuration example"
[mqtt]
broker_url = "mqtts://secure-mqtt.example.com:8883"
username_env = "MQTT_USERNAME"
password_env = "MQTT_PASSWORD"
qos = 1
keep_alive = 60
ca_cert_path = "/app/certs/ca.crt"
client_cert_path = "/app/certs/client.crt"
client_key_path = "/app/certs/client.key"
[llm]
provider = "anthropic"
model = "claude-3-sonnet-20240229"
api_key_env = "ANTHROPIC_API_KEY"
system_prompt = "You are an expert software engineer."
temperature = 0.7
max_tokens = 4000
[budget]
max_tool_calls = 20
max_iterations = 10
[tools.http_request]
impl = "builtin"
config = { max_response_size = 1048576, timeout_seconds = 30 }
[tools.custom_tool]
impl = "external"
command = "/usr/local/bin/my-tool"
config = { working_dir = "/tmp", max_execution_time = 60 }
# Performance tuning
max_pipeline_depth = 16
task_timeout = 300
max_output_size = 2097152use agent2389::*;
use tokio;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
agent2389::observability::init_default_logging();
// Load configuration
let config = AgentConfig::from_file("agent.toml")?;
// Create and start agent
let mut agent = AgentRunner::new(config);
agent.startup().await?;
println!("Agent started successfully!");
// Run until shutdown signal
agent.run().await?;
Ok(())
}use agent2389::tools::*;
use async_trait::async_trait;
use serde_json::{json, Value};
struct CustomTool;
#[async_trait]
impl Tool for CustomTool {
async fn describe(&self) -> ToolDescription {
ToolDescription {
name: "custom_operation".to_string(),
description: "Performs custom business logic".to_string(),
parameters: json!({
"type": "object",
"properties": {
"input": {
"type": "string",
"description": "Input data to process"
}
},
"required": ["input"]
}),
}
}
async fn initialize(&mut self, config: &Value) -> Result<(), ToolError> {
// Initialize tool with configuration
Ok(())
}
async fn execute(&self, parameters: &Value) -> Result<Value, ToolError> {
let input = parameters["input"].as_str()
.ok_or(ToolError::InvalidParameter("input must be string"))?;
// Perform custom operation
let result = format!("Processed: {}", input);
Ok(json!({"result": result}))
}
}# Check overall health status
curl http://localhost:8080/health | jq
# Get complete metrics
curl http://localhost:8080/metrics | jq '.tasks'
# Monitor processing in real-time
watch 'curl -s http://localhost:8080/metrics | jq "{tasks_processing: .tasks.tasks_processing, mqtt_connected: .mqtt.connected}"'The 2389 Agent Protocol implementation follows a modular, async-first architecture:
graph TD
A["MQTT Broker<br/>Message Queue"]
B["Agent Runtime<br/>Task Processor"]
C["LLM Provider<br/>OpenAI/Claude"]
D["Tool System"]
E["HTTP Tool"]
F["File Tool"]
G["Exec Tool"]
A --- B
B --- C
B --> D
D --> E
D --> F
D --> G
- Agent Lifecycle Manager - Handles startup, shutdown, and state transitions
- MQTT Transport Layer - QoS 1 messaging with proper topic handling
- Task Processing Pipeline - 9-step algorithm for protocol compliance
- Tool Execution System - Secure, validated tool execution
- Observability System - Logging, metrics, and health monitoring
This implementation is 100% compliant with the 2389 Agent Protocol specification:
- β Complete 9-step task processing algorithm
- β Proper MQTT topic canonicalization
- β Pipeline depth enforcement (max 16)
- β Idempotency handling with task deduplication
- β Error publishing to conversation topics
- β Last Will Testament for availability status
# Run with environment variables
docker run -d \
--name agent2389 \
--restart unless-stopped \
-p 8080:8080 \
-e AGENT_ID=prod-agent-001 \
-e MQTT_HOST=mqtt.example.com \
-e OPENAI_API_KEY=sk-your-key \
-v $(pwd)/config:/app/config:ro \
agent2389:latestversion: '3.8'
services:
agent2389:
image: agent2389:latest
restart: unless-stopped
ports:
- "8080:8080"
environment:
- AGENT_ID=prod-agent-001
- MQTT_HOST=mqtt-broker
- OPENAI_API_KEY=${OPENAI_API_KEY}
- LOG_FORMAT=json
- LOG_LEVEL=INFO
volumes:
- ./config/agent.toml:/app/config/agent.toml:ro
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
depends_on:
- mqtt-broker
mqtt-broker:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf# Clone repository
git clone https://github.com/2389-research/2389-agent-rust
cd 2389-agent-rust
# Install development tools
cargo install cargo-watch cargo-nextest cargo-tarpaulin
# Run development loop
cargo watch -x fmt -x "clippy --fix --allow-dirty" -x test# Format code
cargo fmt
# Lint and fix issues
cargo clippy --fix --allow-dirty
# Run tests
cargo test
# Run tests with coverage
cargo tarpaulin --fail-under 80
# Generate documentation
cargo doc --no-deps --open
# Run with debug logging
LOG_LEVEL=DEBUG LOG_FORMAT=pretty cargo run -- run agent.tomlThe project uses comprehensive testing strategies:
- Unit Tests - Test individual components and functions
- Property-Based Tests - Use
proptestfor edge case discovery - Integration Tests - Test with real MQTT broker (localhost:1883 in dev, Mosquitto container in CI)
- Protocol Compliance Tests - Verify adherence to specification requirements
# Run all tests
cargo test
# Run specific test module
cargo test protocol::messages::tests
# Run integration tests with real MQTT
cargo test --test integration_test
# Run property-based tests
cargo test --test property_testsBefore submitting contributions, ensure all quality gates pass:
# Pre-commit quality check
./scripts/quality-check.sh
# Or run manually:
cargo fmt --check
cargo clippy --all-targets -- -D warnings
cargo test --lib
cargo test --test integration_testThis project includes several Rust-based development tools:
Monitor MQTT topics with real-time display and filtering:
# Monitor default agent
cargo run --bin mqtt-monitor
# Monitor specific agent with filtering
cargo run --bin mqtt-monitor -- --agent-id researcher-agent
# Connect to remote broker
cargo run --bin mqtt-monitor -- \
--broker-url mqtt://mqtt.example.com:1883 \
--username myuser \
--password mypassCreate and send TaskEnvelope v2.0 messages with dynamic routing:
# Send task with smart agent discovery
cargo run --bin dynamic-injector -- \
--query "Research AI developments" \
--discovery-timeout 5
# Preview routing decision without sending
cargo run --bin dynamic-injector -- \
--query "Process urgent customer request" \
--preview-only
# Send to specific agent
cargo run --bin dynamic-injector -- \
--query "Test message" \
--target-agent my-agent-001Send multi-agent pipeline tasks:
# Send research β write β edit pipeline
cargo run --bin pipeline-injector -- \
--topic "Rust async programming best practices"
# Custom pipeline
cargo run --bin pipeline-injector -- \
--topic "Custom topic" \
--first-agent researcher-agentSimple message injection for testing:
# Send basic task
cargo run --bin inject-message -- \
--agent-id test-agent \
--message "Test task content"- Type Safety: No more string manipulation errors from shell scripts
- Cross-Platform: Works on Windows, macOS, and Linux
- Protocol Compliant: Generates valid TaskEnvelope messages
- Async Operations: Concurrent operations with Tokio
- Self-Documenting: Built-in
--helpfor all commands
- Technical Requirements - Complete implementation specification
- System Architecture - Design decisions and component interactions
- TaskEnvelope Protocol - Protocol v1.0 and v2.0 specification
- Agent Capabilities - Agent discovery and capability system
- V2 Routing Architecture - Router-based workflow decisions
- Agent System Prompt Guidelines - Writing routing-agnostic agents
- Dynamic Routing Analysis - Current state of v2.0 routing implementation
- Testing Guide - Comprehensive testing procedures
- Test Coverage - Current test coverage metrics
- Deployment Guide - Production deployment patterns
- Observability Guide - Logging, metrics, and monitoring
- Task Injector Guide - Using message injection tools
Generate and view the complete API documentation:
cargo doc --no-deps --openThe documentation includes:
- Complete API reference with examples
- Protocol message format specifications
- Configuration options and validation rules
- Error types and handling patterns
- Performance characteristics and benchmarks
/health- Comprehensive health with component checks/ready- Kubernetes readiness probe (MQTT connectivity)/live- Kubernetes liveness probe (basic responsiveness)/metrics- Complete metrics in JSON format
{
"tasks": {
"tasks_completed": 1250,
"tasks_processing": 3,
"avg_processing_time_ms": 847.2,
"processing_time_p95_ms": 2100.0
},
"mqtt": {
"connected": true,
"messages_published": 2450,
"connection_failures": 1
},
"tools": {
"total_executions": 890,
"avg_execution_time_ms": 425.1
}
}Production logging uses structured JSON format:
{
"timestamp": "2024-01-01T12:00:00Z",
"level": "INFO",
"target": "agent2389::agent::processor",
"span": {
"name": "task_processing",
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"agent_id": "prod-agent-001"
},
"fields": {
"message": "Task completed successfully",
"duration_ms": 1247
}
}Task Processing:
- Simple tasks: ~10ms average latency
- Complex LLM tasks: 500ms-5s (depends on provider)
- Throughput: 1000+ messages/second per agent
Resource Usage:
- Baseline memory: ~50MB
- Per-task memory: ~1-10MB
- CPU: Low baseline, spikes during LLM calls
- Horizontal: Deploy multiple agents with unique IDs
- Vertical: Single agent handles high message volume
- Resource Limits: Configurable memory and processing bounds
- Credential Management - Environment variable injection, no secrets in config
- Input Validation - JSON schema validation for all tool parameters
- Process Isolation - Tools execute in separate processes with timeouts
- Output Sanitization - No sensitive data in logs or error messages
- TLS Support - MQTT over TLS with client certificate authentication
- Use TLS for MQTT connections (port 8883)
- Store API keys in proper secret management systems
- Enable client certificate authentication
- Configure network policies and firewall rules
- Regular security audits with
cargo audit - Container scanning for vulnerabilities
-
MQTT Connection Failed
# Test connectivity telnet mqtt-broker 1883 # Verify credentials mosquitto_pub -h mqtt-broker -t test -m hello -u username -P password
-
High Memory Usage
# Check pipeline depth curl http://localhost:8080/metrics | jq '.tasks.current_pipeline_depth'
-
LLM API Errors
# Test API key validity curl -H "Authorization: Bearer $OPENAI_API_KEY" https://api.openai.com/v1/models
# Enable detailed logging
LOG_LEVEL=DEBUG LOG_SPANS=true ./agent2389 run
# Monitor in real-time
tail -f logs/agent.log | jq 'select(.level=="ERROR")'- Check the troubleshooting guide
- Review logs with
LOG_LEVEL=DEBUG - Use health endpoints to diagnose issues
- Consult the API documentation
We welcome contributions! Please see our contributing guidelines for details.
- Fork the repository
- Create a feature branch
- Write tests for new functionality
- Ensure all quality gates pass
- Submit a pull request
- All code must pass
cargo clippywith no warnings - Test coverage must be β₯80%
- Documentation required for all public APIs
- Follow existing code style and patterns
This project is licensed under either of
- Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.
- π Documentation: Complete guides in the docs/ directory
- π Bug Reports: GitHub Issues
- π¬ Discussions: GitHub Discussions
- π§ Security Issues: [email protected]
Built with β€οΈ in Rust | Production-ready | Protocol Compliant | Fully Observable