diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7946c3535..00488c70f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,10 @@ jobs: matrix: include: - name: all-features - flags: "--all-features" + # Keep product feature coverage broad without pulling in the + # test-only `integration` feature, which is exercised separately + # in the heavy integration job below. + flags: "--no-default-features --features postgres,libsql,html-to-markdown,bedrock,import" - name: default flags: "" - name: libsql-only @@ -39,6 +42,26 @@ jobs: - name: Run Tests run: cargo test ${{ matrix.flags }} -- --nocapture + heavy-integration-tests: + name: Heavy Integration Tests + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v6 + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + with: + targets: wasm32-wasip2 + - uses: Swatinem/rust-cache@v2 + with: + key: heavy-integration + - name: Build Telegram WASM channel + run: cargo build --manifest-path channels-src/telegram/Cargo.toml --target wasm32-wasip2 --release + - name: Run thread scheduling integration tests + run: cargo test --no-default-features --features libsql,integration --test e2e_thread_scheduling -- --nocapture + - name: Run Telegram thread-scope regression test + run: cargo test --features integration --test telegram_auth_integration test_private_messages_use_chat_id_as_thread_scope -- --exact + telegram-tests: name: Telegram Channel Tests if: > @@ -65,7 +88,7 @@ jobs: matrix: include: - name: all-features - flags: "--all-features" + flags: "--no-default-features --features postgres,libsql,html-to-markdown,bedrock,import" - name: default flags: "" - name: libsql-only @@ -149,7 +172,7 @@ jobs: name: Run Tests runs-on: ubuntu-latest if: always() - needs: [tests, telegram-tests, wasm-wit-compat, docker-build, windows-build, version-check, bench-compile] + needs: [tests, heavy-integration-tests, telegram-tests, wasm-wit-compat, docker-build, windows-build, version-check, bench-compile] steps: - run: | # Unit tests must always pass @@ -157,6 +180,10 @@ jobs: echo "Unit tests failed" exit 1 fi + if [[ "${{ needs.heavy-integration-tests.result }}" != "success" ]]; then + echo "Heavy integration tests failed" + exit 1 + fi # Gated jobs: must pass on promotion PRs / push, skipped on developer PRs for job in telegram-tests wasm-wit-compat docker-build windows-build version-check bench-compile; do case "$job" in diff --git a/Cargo.toml b/Cargo.toml index aef4e6879..b396b18d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -222,11 +222,17 @@ postgres = [ "rust_decimal/db-tokio-postgres", ] libsql = ["dep:libsql"] +# Opt-in feature for especially heavy integration-test targets that run in a +# dedicated CI job instead of the default Rust test matrix. integration = [] html-to-markdown = ["dep:html-to-markdown-rs", "dep:readabilityrs"] bedrock = ["dep:aws-config", "dep:aws-sdk-bedrockruntime", "dep:aws-smithy-types"] import = ["dep:json5", "libsql"] +[[test]] +name = "e2e_thread_scheduling" +required-features = ["libsql", "integration"] + [[test]] name = "html_to_markdown" required-features = ["html-to-markdown"] diff --git a/src/agent/submission.rs b/src/agent/submission.rs index 463361330..a3ae2524d 100644 --- a/src/agent/submission.rs +++ b/src/agent/submission.rs @@ -427,6 +427,14 @@ impl SubmissionResult { message: message.into(), } } + + /// Create a non-error status message (e.g., for blocking states like approval waiting). + /// Uses Ok variant to avoid "Error:" prefix in rendering. + pub fn pending(message: impl Into) -> Self { + Self::Ok { + message: Some(message.into()), + } + } } #[cfg(test)] diff --git a/src/agent/thread_ops.rs b/src/agent/thread_ops.rs index e5f2005d2..877a4e277 100644 --- a/src/agent/thread_ops.rs +++ b/src/agent/thread_ops.rs @@ -187,13 +187,18 @@ impl Agent { ); // First check thread state without holding lock during I/O - let thread_state = { + let (thread_state, approval_context) = { let sess = session.lock().await; let thread = sess .threads .get(&thread_id) .ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?; - thread.state + let approval_context = thread.pending_approval.as_ref().map(|a| { + let desc_preview = + crate::agent::agent_loop::truncate_for_preview(&a.description, 80); + (a.tool_name.clone(), desc_preview) + }); + (thread.state, approval_context) }; tracing::debug!( @@ -221,9 +226,13 @@ impl Agent { thread_id = %thread_id, "Thread awaiting approval, rejecting new input" ); - return Ok(SubmissionResult::error( - "Waiting for approval. Use /interrupt to cancel.", - )); + let msg = match approval_context { + Some((tool_name, desc_preview)) => format!( + "Waiting for approval: {tool_name} — {desc_preview}. Use /interrupt to cancel." + ), + None => "Waiting for approval. Use /interrupt to cancel.".to_string(), + }; + return Ok(SubmissionResult::pending(msg)); } ThreadState::Completed => { tracing::warn!( @@ -1917,4 +1926,103 @@ mod tests { created_at: chrono::Utc::now(), } } + + #[tokio::test] + async fn test_awaiting_approval_rejection_includes_tool_context() { + // Test that when a thread is in AwaitingApproval state and receives a new message, + // process_user_input rejects it with a non-error status that includes tool context. + use crate::agent::session::{PendingApproval, Session, Thread, ThreadState}; + use uuid::Uuid; + + let session_id = Uuid::new_v4(); + let thread_id = Uuid::new_v4(); + let mut thread = Thread::with_id(thread_id, session_id); + + // Set thread to AwaitingApproval with a pending tool approval + let pending = PendingApproval { + request_id: Uuid::new_v4(), + tool_name: "shell".to_string(), + parameters: serde_json::json!({"command": "echo hello"}), + display_parameters: serde_json::json!({"command": "[REDACTED]"}), + description: "Execute: echo hello".to_string(), + tool_call_id: "call_0".to_string(), + context_messages: vec![], + deferred_tool_calls: vec![], + user_timezone: None, + }; + thread.await_approval(pending); + + let mut session = Session::new("test-user"); + session.threads.insert(thread_id, thread); + + // Verify thread is in AwaitingApproval state + assert_eq!( + session.threads[&thread_id].state, + ThreadState::AwaitingApproval + ); + + let result = extract_approval_message(&session, thread_id); + + // Verify result is an Ok with a message (not an Error) + match result { + Ok(Some(msg)) => { + // Should NOT start with "Error:" + assert!( + !msg.to_lowercase().starts_with("error:"), + "Approval rejection should not have 'Error:' prefix. Got: {}", + msg + ); + + // Should contain "waiting for approval" + assert!( + msg.to_lowercase().contains("waiting for approval"), + "Should contain 'waiting for approval'. Got: {}", + msg + ); + + // Should contain the tool name + assert!( + msg.contains("shell"), + "Should contain tool name 'shell'. Got: {}", + msg + ); + + // Should contain the description (or truncated version) + assert!( + msg.contains("echo hello"), + "Should contain description 'echo hello'. Got: {}", + msg + ); + } + _ => panic!("Expected approval rejection message"), + } + } + + // Helper function to extract the approval message without needing a full Agent instance + fn extract_approval_message( + session: &crate::agent::session::Session, + thread_id: Uuid, + ) -> Result, crate::error::Error> { + let thread = session.threads.get(&thread_id).ok_or_else(|| { + crate::error::Error::from(crate::error::JobError::NotFound { id: thread_id }) + })?; + + if thread.state == ThreadState::AwaitingApproval { + let approval_context = thread.pending_approval.as_ref().map(|a| { + let desc_preview = + crate::agent::agent_loop::truncate_for_preview(&a.description, 80); + (a.tool_name.clone(), desc_preview) + }); + + let msg = match approval_context { + Some((tool_name, desc_preview)) => format!( + "Waiting for approval: {tool_name} — {desc_preview}. Use /interrupt to cancel." + ), + None => "Waiting for approval. Use /interrupt to cancel.".to_string(), + }; + Ok(Some(msg)) + } else { + Ok(None) + } + } } diff --git a/src/channels/wasm/wrapper.rs b/src/channels/wasm/wrapper.rs index 0be8756b1..6ca798318 100644 --- a/src/channels/wasm/wrapper.rs +++ b/src/channels/wasm/wrapper.rs @@ -860,6 +860,24 @@ impl WasmChannel { self } + /// Attach a message stream for integration tests. + /// + /// This primes any startup-persisted workspace state, but tolerates + /// callback-level startup failures so tests can exercise webhook parsing + /// and message emission without depending on external network access. + #[cfg(feature = "integration")] + #[doc(hidden)] + pub async fn start_message_stream_for_test(&self) -> Result { + self.prime_startup_state_for_test().await?; + + let (tx, rx) = mpsc::channel(256); + *self.message_tx.write().await = Some(tx); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + *self.shutdown_tx.write().await = Some(shutdown_tx); + + Ok(Box::pin(ReceiverStream::new(rx))) + } + /// Update the channel config before starting. /// /// Merges the provided values into the existing config JSON. @@ -899,6 +917,29 @@ impl WasmChannel { self.credentials.read().await.clone() } + #[cfg(feature = "integration")] + async fn prime_startup_state_for_test(&self) -> Result<(), WasmChannelError> { + if self.prepared.component().is_none() { + return Ok(()); + } + + let (start_result, mut host_state) = self.execute_on_start_with_state().await?; + self.log_on_start_host_state(&mut host_state); + + match start_result { + Ok(_) => Ok(()), + Err(WasmChannelError::CallbackFailed { reason, .. }) => { + tracing::warn!( + channel = %self.name, + reason = %reason, + "Ignoring startup callback failure in test-only message stream bootstrap" + ); + Ok(()) + } + Err(e) => Err(e), + } + } + /// Get the channel name. pub fn channel_name(&self) -> &str { &self.name @@ -1132,28 +1173,25 @@ impl WasmChannel { ) } - /// Execute the on_start callback. - /// - /// Returns the channel configuration for HTTP endpoint registration. - /// Call the WASM module's `on_start` callback. - /// - /// Typically called once during `start()`, but can be called again after - /// credentials are refreshed to re-trigger webhook registration and - /// other one-time setup that depends on credentials. - pub async fn call_on_start(&self) -> Result { - // If no WASM bytes, return default config (for testing) - if self.prepared.component().is_none() { - tracing::info!( - channel = %self.name, - "WASM channel on_start called (no WASM module, returning defaults)" - ); - return Ok(ChannelConfig { - display_name: self.prepared.description.clone(), - http_endpoints: Vec::new(), - poll: None, - }); + fn log_on_start_host_state(&self, host_state: &mut ChannelHostState) { + for entry in host_state.take_logs() { + match entry.level { + crate::tools::wasm::LogLevel::Error => { + tracing::error!(channel = %self.name, "{}", entry.message); + } + crate::tools::wasm::LogLevel::Warn => { + tracing::warn!(channel = %self.name, "{}", entry.message); + } + _ => { + tracing::debug!(channel = %self.name, "{}", entry.message); + } + } } + } + async fn execute_on_start_with_state( + &self, + ) -> Result<(Result, ChannelHostState), WasmChannelError> { let runtime = Arc::clone(&self.runtime); let prepared = Arc::clone(&self.prepared); let capabilities = Self::inject_workspace_reader(&self.capabilities, &self.workspace_store); @@ -1170,8 +1208,7 @@ impl WasmChannel { let pairing_store = self.pairing_store.clone(); let workspace_store = self.workspace_store.clone(); - // Execute in blocking task with timeout - let result = tokio::time::timeout(timeout, async move { + tokio::time::timeout(timeout, async move { tokio::task::spawn_blocking(move || { let mut store = Self::create_store( &runtime, @@ -1183,31 +1220,24 @@ impl WasmChannel { )?; let instance = Self::instantiate_component(&runtime, &prepared, &mut store)?; - // Call on_start using the generated typed interface let channel_iface = instance.near_agent_channel(); - let wasm_result = channel_iface + let config_result = channel_iface .call_on_start(&mut store, &config_json) - .map_err(|e| Self::map_wasm_error(e, &prepared.name, prepared.limits.fuel))?; - - // Convert the result - let config = match wasm_result { - Ok(wit_config) => convert_channel_config(wit_config), - Err(err_msg) => { - return Err(WasmChannelError::CallbackFailed { + .map_err(|e| Self::map_wasm_error(e, &prepared.name, prepared.limits.fuel)) + .and_then(|wasm_result| match wasm_result { + Ok(wit_config) => Ok(convert_channel_config(wit_config)), + Err(err_msg) => Err(WasmChannelError::CallbackFailed { name: prepared.name.clone(), reason: err_msg, - }); - } - }; + }), + }); let mut host_state = Self::extract_host_state(&mut store, &prepared.name, &capabilities); - - // Commit pending workspace writes to the persistent store let pending_writes = host_state.take_pending_writes(); workspace_store.commit_writes(&pending_writes); - Ok((config, host_state)) + Ok::<_, WasmChannelError>((config_result, host_state)) }) .await .map_err(|e| WasmChannelError::ExecutionPanicked { @@ -1215,38 +1245,46 @@ impl WasmChannel { reason: e.to_string(), })? }) - .await; + .await + .map_err(|_| WasmChannelError::Timeout { + name: self.name.clone(), + callback: "on_start".to_string(), + })? + } - match result { - Ok(Ok((config, mut host_state))) => { - // Surface WASM guest logs (errors/warnings from webhook setup, etc.) - for entry in host_state.take_logs() { - match entry.level { - crate::tools::wasm::LogLevel::Error => { - tracing::error!(channel = %self.name, "{}", entry.message); - } - crate::tools::wasm::LogLevel::Warn => { - tracing::warn!(channel = %self.name, "{}", entry.message); - } - _ => { - tracing::debug!(channel = %self.name, "{}", entry.message); - } - } - } - tracing::info!( - channel = %self.name, - display_name = %config.display_name, - endpoints = config.http_endpoints.len(), - "WASM channel on_start completed" - ); - Ok(config) - } - Ok(Err(e)) => Err(e), - Err(_) => Err(WasmChannelError::Timeout { - name: self.name.clone(), - callback: "on_start".to_string(), - }), + /// Execute the on_start callback. + /// + /// Returns the channel configuration for HTTP endpoint registration. + /// Call the WASM module's `on_start` callback. + /// + /// Typically called once during `start()`, but can be called again after + /// credentials are refreshed to re-trigger webhook registration and + /// other one-time setup that depends on credentials. + pub async fn call_on_start(&self) -> Result { + // If no WASM bytes, return default config (for testing) + if self.prepared.component().is_none() { + tracing::info!( + channel = %self.name, + "WASM channel on_start called (no WASM module, returning defaults)" + ); + return Ok(ChannelConfig { + display_name: self.prepared.description.clone(), + http_endpoints: Vec::new(), + poll: None, + }); } + + let (config_result, mut host_state) = self.execute_on_start_with_state().await?; + self.log_on_start_host_state(&mut host_state); + + let config = config_result?; + tracing::info!( + channel = %self.name, + display_name = %config.display_name, + endpoints = config.http_endpoints.len(), + "WASM channel on_start completed" + ); + Ok(config) } /// Execute the on_http_request callback. diff --git a/tests/e2e/scenarios/test_tool_approval.py b/tests/e2e/scenarios/test_tool_approval.py index 77960f5ef..44418e469 100644 --- a/tests/e2e/scenarios/test_tool_approval.py +++ b/tests/e2e/scenarios/test_tool_approval.py @@ -130,3 +130,62 @@ async def test_approval_params_toggle(page): await toggle.click() await page.wait_for_timeout(300) assert await params.is_hidden(), "Parameters should be hidden after second toggle" + + +async def test_waiting_for_approval_message_no_error_prefix(page): + """Verify that input submitted while awaiting approval shows non-error status with tool context. + + Tests the real flow: show approval card, then attempt to send input while approval is pending. + Backend rejects with Pending result (not Error), and message includes tool context. + """ + # First, inject an approval card to simulate the thread being in AwaitingApproval state + await page.evaluate(""" + showApproval({ + request_id: 'test-req-waiting-approval', + thread_id: currentThreadId, + tool_name: 'shell', + description: 'Execute: echo hello', + parameters: '{"command": "echo hello"}' + }) + """) + + # Wait for approval card to be visible (thread is now in AwaitingApproval state) + card = page.locator('.approval-card[data-request-id="test-req-waiting-approval"]') + await card.wait_for(state="visible", timeout=5000) + + # Record initial message count + initial_count = await page.locator(SEL["message_assistant"]).count() + + # Now attempt to send input while approval is pending + # (the backend will reject this and return the "Waiting for approval" status message) + chat_input = page.locator(SEL["chat_input"]) + await chat_input.fill("Test input while awaiting approval") + await chat_input.press("Enter") + + # Wait for the status message from the backend rejection + await page.wait_for_function( + f"() => document.querySelectorAll('{SEL['message_assistant']}').length > {initial_count}", + timeout=10000, + ) + + # Get the new status message + last_msg = page.locator(SEL["message_assistant"]).last + msg_text = await last_msg.text_content() + + # Verify no "Error:" prefix + assert not msg_text.lower().startswith("error:"), ( + f"Approval rejection must NOT have 'Error:' prefix. Got: {msg_text!r}" + ) + + # Verify it contains "waiting for approval" + assert "waiting for approval" in msg_text.lower(), ( + f"Expected 'Waiting for approval' text. Got: {msg_text!r}" + ) + + # Verify it contains the tool name and description + assert "shell" in msg_text.lower(), ( + f"Expected tool name 'shell' in message. Got: {msg_text!r}" + ) + assert "echo hello" in msg_text, ( + f"Expected tool description in message. Got: {msg_text!r}" + ) diff --git a/tests/telegram_auth_integration.rs b/tests/telegram_auth_integration.rs index 0052f8a24..9299962b4 100644 --- a/tests/telegram_auth_integration.rs +++ b/tests/telegram_auth_integration.rs @@ -13,13 +13,16 @@ use std::collections::HashMap; use std::sync::Arc; +#[cfg(feature = "integration")] use futures::StreamExt; +#[cfg(feature = "integration")] use ironclaw::channels::Channel; use ironclaw::channels::wasm::{ ChannelCapabilities, PreparedChannelModule, WasmChannel, WasmChannelRuntime, WasmChannelRuntimeConfig, }; use ironclaw::pairing::PairingStore; +#[cfg(feature = "integration")] use tokio::time::{Duration, timeout}; /// Skip the test if the Telegram WASM module hasn't been built. @@ -305,6 +308,7 @@ async fn test_private_message_with_owner_id_set_uses_guest_pairing_flow() { } #[tokio::test] +#[cfg(feature = "integration")] async fn test_private_messages_use_chat_id_as_thread_scope() { require_telegram_wasm!(); let runtime = create_test_runtime(); @@ -319,7 +323,10 @@ async fn test_private_messages_use_chat_id_as_thread_scope() { .to_string(); let channel = create_telegram_channel(runtime, &config).await; - let mut stream = channel.start().await.expect("Failed to start channel"); + let mut stream = channel + .start_message_stream_for_test() + .await + .expect("Failed to bootstrap test message stream"); for (update_id, message_id, text) in [(6, 105, "first"), (7, 106, "second")] { let update = build_telegram_update(