Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion FEATURE_PARITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ This document tracks feature parity between IronClaw (Rust implementation) and O
| REPL (simple) | ✅ | ✅ | - | For testing |
| WASM channels | ❌ | ✅ | - | IronClaw innovation; host resolves owner scope vs sender identity |
| WhatsApp | ✅ | ❌ | P1 | Baileys (Web), same-phone mode with echo detection |
| Telegram | ✅ | ✅ | - | WASM channel(MTProto), DM pairing, caption, /start, bot_username, DM topics, setup-time owner verification, owner-scoped persistence |
| Telegram | ✅ | ✅ | - | WASM channel(MTProto), DM pairing, caption, /start, bot_username, DM topics, setup-time owner auto-verification, owner-scoped persistence |
| Discord | ✅ | ❌ | P2 | discord.js, thread parent binding inheritance |
| Signal | ✅ | ✅ | P2 | signal-cli daemonPC, SSE listener HTTP/JSON-R, user/group allowlists, DM pairing |
| Slack | ✅ | ✅ | - | WASM tool |
Expand Down
138 changes: 111 additions & 27 deletions src/agent/agent_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,65 @@ pub(crate) fn truncate_for_preview(output: &str, max_chars: usize) -> String {
}
}

#[cfg(test)]
fn resolve_routine_notification_user(metadata: &serde_json::Value) -> Option<String> {
metadata
.get("notify_user")
.and_then(|value| value.as_str())
.or_else(|| metadata.get("owner_id").and_then(|value| value.as_str()))
resolve_owner_scope_notification_user(
metadata.get("notify_user").and_then(|value| value.as_str()),
metadata.get("owner_id").and_then(|value| value.as_str()),
)
}

fn trimmed_option(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
}

fn resolve_owner_scope_notification_user(
explicit_user: Option<&str>,
owner_fallback: Option<&str>,
) -> Option<String> {
trimmed_option(explicit_user).or_else(|| trimmed_option(owner_fallback))
}

async fn resolve_channel_notification_user(
extension_manager: Option<&Arc<ExtensionManager>>,
channel: Option<&str>,
explicit_user: Option<&str>,
owner_fallback: Option<&str>,
) -> Option<String> {
if let Some(user) = trimmed_option(explicit_user) {
return Some(user);
}

if let Some(channel_name) = trimmed_option(channel)
&& let Some(extension_manager) = extension_manager
&& let Some(target) = extension_manager
.notification_target_for_channel(&channel_name)
.await
{
return Some(target);
}

resolve_owner_scope_notification_user(explicit_user, owner_fallback)
}

async fn resolve_routine_notification_target(
extension_manager: Option<&Arc<ExtensionManager>>,
metadata: &serde_json::Value,
) -> Option<String> {
resolve_channel_notification_user(
extension_manager,
metadata
.get("notify_channel")
.and_then(|value| value.as_str()),
metadata.get("notify_user").and_then(|value| value.as_str()),
metadata.get("owner_id").and_then(|value| value.as_str()),
)
.await
}

fn should_fallback_routine_notification(error: &ChannelError) -> bool {
!matches!(error, ChannelError::MissingRoutingTarget { .. })
}
Expand Down Expand Up @@ -395,11 +444,13 @@ impl Agent {
.timezone
.clone()
.or_else(|| Some(self.config.default_timezone.clone()));
if let Some(channel) = &hb_config.notify_channel {
let user = hb_config
.notify_user
.clone()
.unwrap_or_else(|| self.owner_id().to_string());
let heartbeat_notify_user = resolve_owner_scope_notification_user(
hb_config.notify_user.as_deref(),
Some(self.owner_id()),
);
if let Some(channel) = &hb_config.notify_channel
&& let Some(user) = heartbeat_notify_user.as_deref()
{
config = config.with_notify(user, channel);
}

Expand All @@ -409,26 +460,32 @@ impl Agent {

// Spawn notification forwarder that routes through channel manager
let notify_channel = hb_config.notify_channel.clone();
let notify_user = hb_config
.notify_user
.clone()
.unwrap_or_else(|| self.owner_id().to_string());
let notify_target = resolve_channel_notification_user(
self.deps.extension_manager.as_ref(),
hb_config.notify_channel.as_deref(),
hb_config.notify_user.as_deref(),
Some(self.owner_id()),
)
.await;
let notify_user = heartbeat_notify_user;
let channels = self.channels.clone();
tokio::spawn(async move {
while let Some(response) = notify_rx.recv().await {
// Try the configured channel first, fall back to
// broadcasting on all channels.
let targeted_ok = if let Some(ref channel) = notify_channel {
let targeted_ok = if let Some(ref channel) = notify_channel
&& let Some(ref user) = notify_target
{
channels
.broadcast(channel, &notify_user, response.clone())
.broadcast(channel, user, response.clone())
.await
.is_ok()
} else {
false
};

if !targeted_ok {
let results = channels.broadcast_all(&notify_user, response).await;
if !targeted_ok && let Some(ref user) = notify_user {
let results = channels.broadcast_all(user, response).await;
for (ch, result) in results {
if let Err(e) = result {
tracing::warn!(
Expand Down Expand Up @@ -496,14 +553,26 @@ impl Agent {

// Spawn notification forwarder (mirrors heartbeat pattern)
let channels = self.channels.clone();
let extension_manager = self.deps.extension_manager.clone();
tokio::spawn(async move {
while let Some(response) = notify_rx.recv().await {
let notify_channel = response
.metadata
.get("notify_channel")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let Some(user) = resolve_routine_notification_user(&response.metadata)
let fallback_user = resolve_owner_scope_notification_user(
response
.metadata
.get("notify_user")
.and_then(|v| v.as_str()),
response.metadata.get("owner_id").and_then(|v| v.as_str()),
);
let Some(user) = resolve_routine_notification_target(
extension_manager.as_ref(),
&response.metadata,
)
.await
else {
tracing::warn!(
notify_channel = ?notify_channel,
Expand Down Expand Up @@ -537,7 +606,7 @@ impl Agent {
false
};

if !targeted_ok {
if !targeted_ok && let Some(user) = fallback_user {
let results = channels.broadcast_all(&user, response).await;
for (ch, result) in results {
if let Err(e) = result {
Expand Down Expand Up @@ -624,6 +693,29 @@ impl Agent {
// Store successfully extracted document text in workspace for indexing
self.store_extracted_documents(&message).await;

// Event-triggered routines consume plain user input before it enters
// the normal chat/tool pipeline. This avoids a duplicate turn where
// the main agent responds and the routine also fires on the same
// inbound message.
if !message.is_internal
&& matches!(
SubmissionParser::parse(&message.content),
Submission::UserInput { .. }
)
&& let Some(ref engine) = routine_engine_for_loop
{
let fired = engine.check_event_triggers(&message).await;
if fired > 0 {
tracing::debug!(
channel = %message.channel,
user = %message.user_id,
fired,
"Consumed inbound user message with matching event-triggered routine(s)"
);
continue;
}
}

match self.handle_message(&message).await {
Ok(Some(response)) if !response.is_empty() => {
// Hook: BeforeOutbound — allow hooks to modify or suppress outbound
Expand Down Expand Up @@ -696,14 +788,6 @@ impl Agent {
}
}
}

// Check event triggers (cheap in-memory regex, fires async if matched)
if let Some(ref engine) = routine_engine_for_loop {
let fired = engine.check_event_triggers(&message).await;
if fired > 0 {
tracing::debug!("Fired {} event-triggered routines", fired);
}
}
}

// Cleanup
Expand Down
95 changes: 87 additions & 8 deletions src/channels/web/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2243,14 +2243,7 @@ async fn extensions_setup_submit_handler(
resp.auth_url = result.auth_url.clone();
resp.verification = result.verification.clone();
resp.instructions = result.verification.as_ref().map(|v| v.instructions.clone());
if result.verification.is_some() {
state.sse.broadcast(SseEvent::AuthRequired {
extension_name: name.clone(),
instructions: resp.instructions.clone(),
auth_url: None,
setup_url: None,
});
} else {
if result.verification.is_none() {
// Broadcast auth_completed so the chat UI can dismiss any in-progress
// auth card or setup modal that was triggered by tool_auth/tool_activate.
state.sse.broadcast(SseEvent::AuthCompleted {
Expand Down Expand Up @@ -2981,6 +2974,92 @@ mod tests {
);
}

#[tokio::test]
async fn test_extensions_setup_submit_telegram_verification_does_not_broadcast_auth_required() {
use axum::body::Body;
use tokio::time::{Duration, timeout};
use tower::ServiceExt;

let secrets = test_secrets_store();
let (ext_mgr, _wasm_tools_dir, wasm_channels_dir) = test_ext_mgr(secrets);

std::fs::write(
wasm_channels_dir.path().join("telegram.wasm"),
b"\0asm fake",
)
.expect("write fake telegram wasm");
let caps = serde_json::json!({
"type": "channel",
"name": "telegram",
"setup": {
"required_secrets": [
{
"name": "telegram_bot_token",
"prompt": "Enter your Telegram Bot API token (from @BotFather)"
}
]
}
});
std::fs::write(
wasm_channels_dir.path().join("telegram.capabilities.json"),
serde_json::to_string(&caps).expect("serialize telegram caps"),
)
.expect("write telegram caps");

ext_mgr
.set_test_telegram_pending_verification("iclaw-7qk2m9", Some("test_hot_bot"))
.await;

let state = test_gateway_state(Some(ext_mgr));
let mut receiver = state.sse.sender().subscribe();
let app = Router::new()
.route(
"/api/extensions/{name}/setup",
post(extensions_setup_submit_handler),
)
.with_state(state);

let req_body = serde_json::json!({
"secrets": {
"telegram_bot_token": "123456789:ABCdefGhI"
}
});
let req = axum::http::Request::builder()
.method("POST")
.uri("/api/extensions/telegram/setup")
.header("content-type", "application/json")
.body(Body::from(req_body.to_string()))
.expect("request");

let resp = ServiceExt::<axum::http::Request<Body>>::oneshot(app, req)
.await
.expect("response");
assert_eq!(resp.status(), StatusCode::OK);

let body = axum::body::to_bytes(resp.into_body(), 1024 * 64)
.await
.expect("body");
let parsed: serde_json::Value = serde_json::from_slice(&body).expect("json response");
assert_eq!(parsed["success"], serde_json::Value::Bool(true));
assert_eq!(parsed["activated"], serde_json::Value::Bool(false));
assert_eq!(parsed["verification"]["code"], "iclaw-7qk2m9");

let deadline = tokio::time::Instant::now() + Duration::from_millis(100);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
match timeout(remaining, receiver.recv()).await {
Ok(Ok(crate::channels::web::types::SseEvent::AuthRequired { .. })) => {
panic!("verification responses should not emit auth_required SSE events")
}
Ok(Ok(_)) => continue,
Ok(Err(_)) | Err(_) => break,
}
}
}

fn expired_flow_created_at() -> Option<std::time::Instant> {
std::time::Instant::now()
.checked_sub(oauth_defaults::OAUTH_FLOW_EXPIRY + std::time::Duration::from_secs(1))
Expand Down
Loading
Loading