-
Notifications
You must be signed in to change notification settings - Fork 3.9k
06.1 Channel Architecture
Relevant source files
The following files were used as context for generating this wiki page:
- src/channels/cli.rs
- src/channels/dingtalk.rs
- src/channels/discord.rs
- src/channels/email_channel.rs
- src/channels/imessage.rs
- src/channels/lark.rs
- src/channels/matrix.rs
- src/channels/mod.rs
- src/channels/slack.rs
- src/channels/telegram.rs
- src/channels/traits.rs
- src/channels/whatsapp.rs
- src/config/schema.rs
- src/onboard/wizard.rs
This page documents the core channel architecture in ZeroClaw, including the Channel trait interface, message dispatching system, supervised listener lifecycle, and streaming response support. For details on individual channel implementations (Telegram, Discord, Slack, etc.), see Channel Implementations. For security features like allowlists and pairing authentication, see Channel Security. For runtime model switching capabilities, see Runtime Model Switching.
All messaging platform integrations implement the Channel trait, which provides a unified interface for sending and receiving messages. The trait is defined at src/channels/traits.rs:47-103 and requires implementations to be Send + Sync for safe concurrent access.
| Method | Purpose | Required |
|---|---|---|
name() |
Returns channel identifier (e.g. "telegram", "discord") | Yes |
send() |
Sends a message to a recipient | Yes |
listen() |
Long-running loop that receives messages via mpsc channel | Yes |
health_check() |
Returns channel health status | Optional (default: true) |
| Method | Purpose | Default |
|---|---|---|
supports_draft_updates() |
Whether channel supports progressive edits | false |
send_draft() |
Send initial draft message, returns message ID | No-op |
update_draft() |
Update draft with accumulated text | No-op |
finalize_draft() |
Replace draft with final formatted message | No-op |
start_typing() |
Show typing indicator | No-op |
stop_typing() |
Hide typing indicator | No-op |
Sources: src/channels/traits.rs:47-103
Represents an incoming message from a channel. Defined at src/channels/traits.rs:4-12:
ChannelMessage {
id: String // Platform-specific message ID
sender: String // User identifier (username, phone, etc.)
reply_target: String // Where to send replies (chat ID, thread ID, etc.)
content: String // Message text
channel: String // Channel name ("telegram", "discord", etc.)
timestamp: u64 // Unix timestamp
}
Represents an outgoing message. Defined at src/channels/traits.rs:15-44:
SendMessage {
content: String // Message text
recipient: String // Destination identifier
subject: Option<String> // Optional subject (used by email channel)
}
Sources: src/channels/traits.rs:4-44
graph TB
subgraph "Channel Implementations"
TelegramChannel["TelegramChannel"]
DiscordChannel["DiscordChannel"]
SlackChannel["SlackChannel"]
EmailChannel["EmailChannel"]
MatrixChannel["MatrixChannel"]
WhatsAppChannel["WhatsAppChannel"]
OtherChannels["+ 7 more channels"]
end
subgraph "Listener Supervision"
spawn_supervised_listener["spawn_supervised_listener()"]
BackoffLogic["Exponential Backoff<br/>initial=2s, max=60s"]
HealthTracking["health::mark_component_ok()<br/>health::mark_component_error()"]
end
subgraph "Message Pipeline"
ChannelMessageMpsc["tokio::sync::mpsc<br/>Sender<ChannelMessage>"]
DispatchLoop["run_message_dispatch_loop()"]
Semaphore["Semaphore<br/>max_in_flight_messages"]
ProcessWorker["process_channel_message()"]
end
subgraph "Per-Message Processing"
RuntimeContext["ChannelRuntimeContext"]
BuildHistory["Build conversation<br/>history from cache"]
MemoryRecall["build_memory_context()"]
ToolCallLoop["run_tool_call_loop()"]
StreamingDraft["Draft update loop<br/>(if supported)"]
end
TelegramChannel -->|".listen(tx)"| spawn_supervised_listener
DiscordChannel -->|".listen(tx)"| spawn_supervised_listener
SlackChannel -->|".listen(tx)"| spawn_supervised_listener
EmailChannel -->|".listen(tx)"| spawn_supervised_listener
MatrixChannel -->|".listen(tx)"| spawn_supervised_listener
WhatsAppChannel -->|"webhook mode"| spawn_supervised_listener
OtherChannels -->|".listen(tx)"| spawn_supervised_listener
spawn_supervised_listener -->|"tx.send()"| ChannelMessageMpsc
spawn_supervised_listener -.->|"auto-restart"| BackoffLogic
spawn_supervised_listener -.->|"report status"| HealthTracking
ChannelMessageMpsc --> DispatchLoop
DispatchLoop --> Semaphore
Semaphore -->|"acquire permit"| ProcessWorker
ProcessWorker --> RuntimeContext
RuntimeContext --> BuildHistory
RuntimeContext --> MemoryRecall
RuntimeContext --> ToolCallLoop
ProcessWorker -->|"if supports_draft_updates()"| StreamingDraft
ProcessWorker -->|".send()"| TelegramChannel
ProcessWorker -->|".send()"| DiscordChannel
ProcessWorker -->|".send()"| SlackChannel
Diagram: Channel message flow from listener supervision through processing pipeline
Sources: src/channels/mod.rs:471-844, src/channels/traits.rs:47-103
Each channel's listen() method runs in a supervised task that automatically restarts on failure. The supervisor is implemented at src/channels/mod.rs:471-509.
stateDiagram-v2
[*] --> Running
Running --> CheckResult: listen() returns
CheckResult --> CleanExit: tx.is_closed()
CheckResult --> UnexpectedExit: Ok(()) but tx open
CheckResult --> ErrorExit: Err(e)
UnexpectedExit --> MarkError: mark_component_error()
ErrorExit --> MarkError
MarkError --> BumpRestart: bump_component_restart()
BumpRestart --> Sleep: sleep(backoff)
Sleep --> DoubleBackoff: backoff *= 2
DoubleBackoff --> Running: capped at max_backoff
CleanExit --> [*]
Running --> MarkOk: mark_component_ok()
MarkOk --> Running
Diagram: Supervised listener state machine with exponential backoff
| Parameter | Default | Purpose |
|---|---|---|
DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS |
2 | Initial retry delay |
DEFAULT_CHANNEL_MAX_BACKOFF_SECS |
60 | Maximum retry delay |
On clean exit (e.g., channel closed), backoff resets to initial value. On error, backoff doubles up to the maximum.
Sources: src/channels/mod.rs:61-62, src/channels/mod.rs:471-509
The dispatch loop coordinates concurrent message processing with bounded parallelism. Implementation at src/channels/mod.rs:816-844.
sequenceDiagram
participant Listener as spawn_supervised_listener
participant Mpsc as tokio::sync::mpsc
participant Dispatcher as run_message_dispatch_loop
participant Semaphore as Semaphore
participant Worker as process_channel_message
Listener->>Mpsc: tx.send(ChannelMessage)
Mpsc->>Dispatcher: rx.recv()
Dispatcher->>Semaphore: acquire_owned()
Note over Semaphore: Blocks if max_in_flight<br/>reached
Semaphore-->>Dispatcher: Permit
Dispatcher->>Worker: spawn(process_channel_message)
Note over Worker: Permit released<br/>on drop
Worker->>Worker: Build history
Worker->>Worker: Recall memory context
Worker->>Worker: run_tool_call_loop()
Worker->>Listener: channel.send(response)
Diagram: Message dispatch coordination with semaphore-based flow control
The maximum in-flight messages is computed dynamically based on channel count at src/channels/mod.rs:511-518:
max_in_flight = (channel_count * CHANNEL_PARALLELISM_PER_CHANNEL)
.clamp(CHANNEL_MIN_IN_FLIGHT_MESSAGES, CHANNEL_MAX_IN_FLIGHT_MESSAGES)
| Constant | Default | Purpose |
|---|---|---|
CHANNEL_PARALLELISM_PER_CHANNEL |
4 | Messages per channel in parallel |
CHANNEL_MIN_IN_FLIGHT_MESSAGES |
8 | Minimum total capacity |
CHANNEL_MAX_IN_FLIGHT_MESSAGES |
64 | Maximum total capacity |
Sources: src/channels/mod.rs:66-68, src/channels/mod.rs:511-518, src/channels/mod.rs:816-844
Each message is processed within a ChannelRuntimeContext that provides access to all necessary subsystems. Defined at src/channels/mod.rs:102-123.
classDiagram
class ChannelRuntimeContext {
+Arc~HashMap~channels_by_name
+Arc~dyn Provider~ provider
+Arc~String~ default_provider
+Arc~dyn Memory~ memory
+Arc~Vec~Tool~~ tools_registry
+Arc~dyn Observer~ observer
+Arc~String~ system_prompt
+Arc~String~ model
+f64 temperature
+bool auto_save_memory
+usize max_tool_iterations
+f64 min_relevance_score
+ConversationHistoryMap conversation_histories
+ProviderCacheMap provider_cache
+RouteSelectionMap route_overrides
+Option~String~ api_key
+Option~String~ api_url
+Arc~ReliabilityConfig~ reliability
+ProviderRuntimeOptions provider_runtime_options
+Arc~PathBuf~ workspace_dir
}
class ConversationHistoryMap {
Arc~Mutex~HashMap~String Vec~ChatMessage~~~~
}
class ProviderCacheMap {
Arc~Mutex~HashMap~String Arc~dyn Provider~~~~
}
class RouteSelectionMap {
Arc~Mutex~HashMap~String ChannelRouteSelection~~~
}
ChannelRuntimeContext --> ConversationHistoryMap
ChannelRuntimeContext --> ProviderCacheMap
ChannelRuntimeContext --> RouteSelectionMap
Diagram: ChannelRuntimeContext structure with per-sender state
The context maintains three per-sender maps:
-
conversation_histories: Stores recent message history (up to
MAX_CHANNEL_HISTORY=50messages) keyed by"{channel}_{sender}"src/channels/mod.rs:54-56 -
provider_cache: Caches initialized provider instances to avoid repeated initialization src/channels/mod.rs:73
-
route_overrides: Stores per-sender provider and model selections for runtime switching src/channels/mod.rs:74
Sources: src/channels/mod.rs:54-123
The process_channel_message() function at src/channels/mod.rs:556-814 implements the full message processing pipeline.
flowchart TD
Start["process_channel_message(ctx, msg)"] --> CheckCommand{Runtime command?}
CheckCommand -->|"/models or /model"| HandleCommand["handle_runtime_command_if_needed()"]
HandleCommand --> End([Return])
CheckCommand -->|Regular message| GetRoute["get_route_selection(sender_key)"]
GetRoute --> CreateProvider["get_or_create_provider(provider_name)"]
CreateProvider -->|Error| SendError["Send error message"]
SendError --> End
CreateProvider -->|Success| BuildMemory["build_memory_context(msg.content)"]
BuildMemory --> AutoSave{auto_save_memory?}
AutoSave -->|Yes| StoreMsg["memory.store(msg.content)"]
AutoSave -->|No| EnrichMsg
StoreMsg --> EnrichMsg["Enrich message with<br/>memory context"]
EnrichMsg --> BuildHistory["Build conversation history<br/>from cache"]
BuildHistory --> CheckStreaming{supports_draft_updates()?}
CheckStreaming -->|Yes| SendDraft["channel.send_draft()"]
CheckStreaming -->|No| StartTyping
SendDraft --> SpawnUpdater["Spawn draft updater task"]
SpawnUpdater --> StartTyping["channel.start_typing()"]
StartTyping --> RunLoop["run_tool_call_loop()<br/>with timeout"]
RunLoop --> WaitUpdater["Wait for draft updater"]
WaitUpdater --> StopTyping["channel.stop_typing()"]
StopTyping --> CheckResult{Success?}
CheckResult -->|Ok| SaveHistory["Save to conversation_histories"]
CheckResult -->|Err or Timeout| SendErrMsg["Send error message"]
SaveHistory --> CheckDraft{Has draft_id?}
CheckDraft -->|Yes| FinalizeDraft["channel.finalize_draft()"]
CheckDraft -->|No| SendMsg["channel.send()"]
SendErrMsg --> End
FinalizeDraft --> End
SendMsg --> End
Diagram: Message processing pipeline with streaming support
All message processing is bounded by CHANNEL_MESSAGE_TIMEOUT_SECS=300 seconds src/channels/mod.rs:65. This timeout accounts for:
- On-device LLM latency (Ollama)
- Multiple tool execution rounds
- Network delays
Sources: src/channels/mod.rs:556-814, src/channels/mod.rs:65
Channels that implement supports_draft_updates() -> true receive progressive response updates as the LLM generates text. This is primarily used by Telegram and Discord.
sequenceDiagram
participant Process as process_channel_message
participant Channel as Channel Implementation
participant DeltaTx as delta_tx (mpsc)
participant Updater as Draft Updater Task
participant ToolLoop as run_tool_call_loop
Process->>Channel: supports_draft_updates()?
Channel-->>Process: true
Process->>DeltaTx: Create channel (capacity=64)
Process->>Channel: send_draft("...")
Channel-->>Process: Some(draft_message_id)
Process->>Updater: spawn draft updater
Note over Updater: Accumulates deltas,<br/>calls update_draft()
Process->>ToolLoop: run_tool_call_loop(delta_tx)
loop LLM generates tokens
ToolLoop->>DeltaTx: tx.send(delta)
DeltaTx->>Updater: rx.recv()
Updater->>Updater: accumulated += delta
Updater->>Channel: update_draft(accumulated)
end
ToolLoop-->>Process: Final response
Process->>DeltaTx: Drop tx (closes channel)
Updater->>Updater: Exit loop
Process->>Channel: finalize_draft(final_text)
Diagram: Streaming response flow with draft message updates
The draft updater task at src/channels/mod.rs:664-686 accumulates text deltas and calls update_draft() on the channel:
// Simplified from src/channels/mod.rs:672-683
let mut accumulated = String::new();
while let Some(delta) = rx.recv().await {
accumulated.push_str(&delta);
if let Err(e) = channel
.update_draft(&reply_target, &draft_id, &accumulated)
.await
{
tracing::debug!("Draft update failed: {e}");
}
}Channels implement rate-limiting to avoid API limits. For example, Telegram's implementation at src/channels/telegram.rs:302-308 uses:
-
stream_mode: Controls draft behavior -
draft_update_interval_ms: Minimum time between edits (default: 1000ms) -
last_draft_edit: Tracks last edit timestamp per message
Sources: src/channels/mod.rs:630-686, src/channels/telegram.rs:302-349
Channels can display typing indicators during message processing via start_typing() and stop_typing(). The typing task is spawned at src/channels/mod.rs:526-554.
stateDiagram-v2
[*] --> CheckSupport: process_channel_message()
CheckSupport --> SpawnTask: target_channel exists
CheckSupport --> SkipTyping: no channel
SpawnTask --> TypingLoop: spawn_scoped_typing_task()
TypingLoop --> WaitTick: interval.tick()
WaitTick --> CheckCancelled{cancelled?}
CheckCancelled -->|No| SendTyping: channel.start_typing()
CheckCancelled -->|Yes| StopTyping: channel.stop_typing()
SendTyping --> TypingLoop
StopTyping --> [*]
SkipTyping --> [*]
Diagram: Typing indicator lifecycle with cancellation token
The typing task refreshes the indicator every CHANNEL_TYPING_REFRESH_INTERVAL_SECS=4 seconds src/channels/mod.rs:69 until the cancellation token fires.
Sources: src/channels/mod.rs:526-554, src/channels/mod.rs:69, src/channels/mod.rs:688-727
Per-sender conversation history is maintained in ConversationHistoryMap to provide context continuity across messages.
History is keyed by "{channel}_{sender}" (function at src/channels/mod.rs:129-131):
fn conversation_history_key(msg: &traits::ChannelMessage) -> String {
format!("{}_{}", msg.channel, msg.sender)
}flowchart LR
Incoming["New message arrives"] --> GetKey["Generate history_key"]
GetKey --> Lookup["conversation_histories.lock()"]
Lookup --> Check{Entry exists?}
Check -->|Yes| LoadHist["Load Vec<ChatMessage>"]
Check -->|No| EmptyHist["Create empty Vec"]
LoadHist --> BuildMsg["Append system + history + user"]
EmptyHist --> BuildMsg
BuildMsg --> ProcessMsg["run_tool_call_loop()"]
ProcessMsg --> Success{Success?}
Success -->|Yes| AppendTurn["Append user + assistant"]
Success -->|No| SkipSave["Skip save"]
AppendTurn --> Trim{len > MAX_CHANNEL_HISTORY?}
Trim -->|Yes| RemoveOld["Remove oldest messages"]
Trim -->|No| Store
RemoveOld --> Store["Save to conversation_histories"]
Store --> Done([Done])
SkipSave --> Done
Diagram: Conversation history lifecycle with trimming
The history is trimmed to MAX_CHANNEL_HISTORY=50 messages src/channels/mod.rs:56 using a simple FIFO eviction at src/channels/mod.rs:740-743.
Sources: src/channels/mod.rs:54-56, src/channels/mod.rs:129-131, src/channels/mod.rs:614-744
Some channels require special message formatting. The system injects channel-specific instructions into the prompt at src/channels/mod.rs:133-140.
| Channel | Instruction | Purpose |
|---|---|---|
telegram |
Media marker syntax | Use [IMAGE:<path>], [DOCUMENT:<path>], etc. for attachments |
Example Telegram instruction:
When responding on Telegram, include media markers for files or URLs that should be
sent as attachments. Use one marker per attachment with this exact syntax:
[IMAGE:<path-or-url>], [DOCUMENT:<path-or-url>], [VIDEO:<path-or-url>],
[AUDIO:<path-or-url>], or [VOICE:<path-or-url>]. Keep normal user-facing text
outside markers and never wrap markers in code fences.
These instructions are appended to the message history before sending to the LLM at src/channels/mod.rs:626-628.
Sources: src/channels/mod.rs:133-140, src/channels/mod.rs:626-628
The system prompt is built once during initialization using build_system_prompt() at src/channels/mod.rs:888-1042. Key components:
- Tool Descriptions - Generated from registered tools
- Safety Guidelines - No data exfiltration, prefer trash over rm
- Skills - Loaded on-demand from workspace
- Workspace Context - Working directory
- Bootstrap Files - AGENTS.md, SOUL.md, IDENTITY.md, etc.
- Hardware Instructions - If GPIO/Arduino tools present
- Date/Time - Current timezone
- Runtime Info - Host, OS, model name
The prompt is stored in ChannelRuntimeContext.system_prompt as Arc<String> for efficient sharing across messages.
Sources: src/channels/mod.rs:888-1042
Before processing each message, relevant memory entries are recalled and prepended to the message content at src/channels/mod.rs:443-469.
flowchart TD
Start["process_channel_message()"] --> Recall["build_memory_context(msg.content)"]
Recall --> Query["memory.recall(query, limit=5)"]
Query --> Filter["Filter by min_relevance_score"]
Filter --> Check{Entries found?}
Check -->|No| Return["Return empty string"]
Check -->|Yes| Format["Format as '[Memory context]\\n- key: content'"]
Format --> Enrich["Prepend to msg.content"]
Enrich --> Process["Continue processing"]
Return --> Process
Diagram: Memory context integration before LLM call
Entries without a score (non-vector backends) are included regardless of threshold.
Sources: src/channels/mod.rs:443-469, src/channels/mod.rs:587-608
All channel implementations are registered in src/channels/mod.rs:1-31:
pub mod cli;
pub mod dingtalk;
pub mod discord;
pub mod email_channel;
pub mod imessage;
pub mod irc;
pub mod lark;
pub mod matrix;
pub mod mattermost;
pub mod qq;
pub mod signal;
pub mod slack;
pub mod telegram;
pub mod traits;
pub mod whatsapp;Channels are instantiated during startup based on configuration and stored in channels_by_name: Arc<HashMap<String, Arc<dyn Channel>>> within ChannelRuntimeContext.
Sources: src/channels/mod.rs:1-31, src/channels/mod.rs:103
Errors during process_channel_message() are handled at three levels:
-
Provider Initialization Error src/channels/mod.rs:571-586
- Sends error message to channel
- Includes sanitized error details
- Suggests using
/modelscommand
-
LLM/Tool Execution Error src/channels/mod.rs:769-787
- Logs error with elapsed time
- Sends "
⚠️ Error: {e}" to channel - Finalizes draft if streaming
-
Timeout src/channels/mod.rs:789-812
- Logs timeout with elapsed time
- Sends "
⚠️ Request timed out" message - Finalizes draft if streaming
Processing progress is logged to stdout at src/channels/mod.rs:557-562 and src/channels/mod.rs:745-749:
💬 [telegram] from alice: what's the weather?
⏳ Processing message...
🤖 Reply (3245ms): It's sunny and 72°F in San Francisco
Sources: src/channels/mod.rs:557-562, src/channels/mod.rs:745-749, src/channels/mod.rs:769-812