Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ jobs:
matrix:
include:
- name: all-features
flags: "--all-features"
# Keep product feature coverage broad without pulling in the
# test-only `integration` feature, which is exercised separately
# in the heavy integration job below.
flags: "--no-default-features --features postgres,libsql,html-to-markdown,bedrock,import"
- name: default
flags: ""
- name: libsql-only
Expand All @@ -39,6 +42,26 @@ jobs:
- name: Run Tests
run: cargo test ${{ matrix.flags }} -- --nocapture

heavy-integration-tests:
name: Heavy Integration Tests
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v6
- name: Install Rust
uses: dtolnay/rust-toolchain@stable
with:
targets: wasm32-wasip2
- uses: Swatinem/rust-cache@v2
with:
key: heavy-integration
- name: Build Telegram WASM channel
run: cargo build --manifest-path channels-src/telegram/Cargo.toml --target wasm32-wasip2 --release
- name: Run thread scheduling integration tests
run: cargo test --no-default-features --features libsql,integration --test e2e_thread_scheduling -- --nocapture
- name: Run Telegram thread-scope regression test
run: cargo test --features integration --test telegram_auth_integration test_private_messages_use_chat_id_as_thread_scope -- --exact

telegram-tests:
name: Telegram Channel Tests
if: >
Expand All @@ -65,7 +88,7 @@ jobs:
matrix:
include:
- name: all-features
flags: "--all-features"
flags: "--no-default-features --features postgres,libsql,html-to-markdown,bedrock,import"
- name: default
flags: ""
- name: libsql-only
Expand Down Expand Up @@ -149,14 +172,18 @@ jobs:
name: Run Tests
runs-on: ubuntu-latest
if: always()
needs: [tests, telegram-tests, wasm-wit-compat, docker-build, windows-build, version-check, bench-compile]
needs: [tests, heavy-integration-tests, telegram-tests, wasm-wit-compat, docker-build, windows-build, version-check, bench-compile]
steps:
- run: |
# Unit tests must always pass
if [[ "${{ needs.tests.result }}" != "success" ]]; then
echo "Unit tests failed"
exit 1
fi
if [[ "${{ needs.heavy-integration-tests.result }}" != "success" ]]; then
echo "Heavy integration tests failed"
exit 1
fi
# Gated jobs: must pass on promotion PRs / push, skipped on developer PRs
for job in telegram-tests wasm-wit-compat docker-build windows-build version-check bench-compile; do
case "$job" in
Expand Down
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,17 @@ postgres = [
"rust_decimal/db-tokio-postgres",
]
libsql = ["dep:libsql"]
# Opt-in feature for especially heavy integration-test targets that run in a
# dedicated CI job instead of the default Rust test matrix.
integration = []
html-to-markdown = ["dep:html-to-markdown-rs", "dep:readabilityrs"]
bedrock = ["dep:aws-config", "dep:aws-sdk-bedrockruntime", "dep:aws-smithy-types"]
import = ["dep:json5", "libsql"]

[[test]]
name = "e2e_thread_scheduling"
required-features = ["libsql", "integration"]

[[test]]
name = "html_to_markdown"
required-features = ["html-to-markdown"]
Expand Down
8 changes: 8 additions & 0 deletions src/agent/submission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,14 @@ impl SubmissionResult {
message: message.into(),
}
}

/// Create a non-error status message (e.g., for blocking states like approval waiting).
/// Uses Ok variant to avoid "Error:" prefix in rendering.
pub fn pending(message: impl Into<String>) -> Self {
Self::Ok {
message: Some(message.into()),
}
}
}

#[cfg(test)]
Expand Down
118 changes: 113 additions & 5 deletions src/agent/thread_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,18 @@ impl Agent {
);

// First check thread state without holding lock during I/O
let thread_state = {
let (thread_state, approval_context) = {
let sess = session.lock().await;
let thread = sess
.threads
.get(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
thread.state
let approval_context = thread.pending_approval.as_ref().map(|a| {
let desc_preview =
crate::agent::agent_loop::truncate_for_preview(&a.description, 80);
(a.tool_name.clone(), desc_preview)
});
(thread.state, approval_context)
};

tracing::debug!(
Expand Down Expand Up @@ -221,9 +226,13 @@ impl Agent {
thread_id = %thread_id,
"Thread awaiting approval, rejecting new input"
);
return Ok(SubmissionResult::error(
"Waiting for approval. Use /interrupt to cancel.",
));
let msg = match approval_context {
Some((tool_name, desc_preview)) => format!(
"Waiting for approval: {tool_name} — {desc_preview}. Use /interrupt to cancel."
),
None => "Waiting for approval. Use /interrupt to cancel.".to_string(),
};
return Ok(SubmissionResult::pending(msg));
}
ThreadState::Completed => {
tracing::warn!(
Expand Down Expand Up @@ -1917,4 +1926,103 @@ mod tests {
created_at: chrono::Utc::now(),
}
}

#[tokio::test]
async fn test_awaiting_approval_rejection_includes_tool_context() {
// Test that when a thread is in AwaitingApproval state and receives a new message,
// process_user_input rejects it with a non-error status that includes tool context.
use crate::agent::session::{PendingApproval, Session, Thread, ThreadState};
use uuid::Uuid;

let session_id = Uuid::new_v4();
let thread_id = Uuid::new_v4();
let mut thread = Thread::with_id(thread_id, session_id);

// Set thread to AwaitingApproval with a pending tool approval
let pending = PendingApproval {
request_id: Uuid::new_v4(),
tool_name: "shell".to_string(),
parameters: serde_json::json!({"command": "echo hello"}),
display_parameters: serde_json::json!({"command": "[REDACTED]"}),
description: "Execute: echo hello".to_string(),
tool_call_id: "call_0".to_string(),
context_messages: vec![],
deferred_tool_calls: vec![],
user_timezone: None,
};
thread.await_approval(pending);

let mut session = Session::new("test-user");
session.threads.insert(thread_id, thread);

// Verify thread is in AwaitingApproval state
assert_eq!(
session.threads[&thread_id].state,
ThreadState::AwaitingApproval
);

let result = extract_approval_message(&session, thread_id);

// Verify result is an Ok with a message (not an Error)
match result {
Ok(Some(msg)) => {
// Should NOT start with "Error:"
assert!(
!msg.to_lowercase().starts_with("error:"),
"Approval rejection should not have 'Error:' prefix. Got: {}",
msg
);

// Should contain "waiting for approval"
assert!(
msg.to_lowercase().contains("waiting for approval"),
"Should contain 'waiting for approval'. Got: {}",
msg
);

// Should contain the tool name
assert!(
msg.contains("shell"),
"Should contain tool name 'shell'. Got: {}",
msg
);

// Should contain the description (or truncated version)
assert!(
msg.contains("echo hello"),
"Should contain description 'echo hello'. Got: {}",
msg
);
}
_ => panic!("Expected approval rejection message"),
}
}

// Helper function to extract the approval message without needing a full Agent instance
fn extract_approval_message(
session: &crate::agent::session::Session,
thread_id: Uuid,
) -> Result<Option<String>, crate::error::Error> {
let thread = session.threads.get(&thread_id).ok_or_else(|| {
crate::error::Error::from(crate::error::JobError::NotFound { id: thread_id })
})?;

if thread.state == ThreadState::AwaitingApproval {
let approval_context = thread.pending_approval.as_ref().map(|a| {
let desc_preview =
crate::agent::agent_loop::truncate_for_preview(&a.description, 80);
(a.tool_name.clone(), desc_preview)
});

let msg = match approval_context {
Some((tool_name, desc_preview)) => format!(
"Waiting for approval: {tool_name} — {desc_preview}. Use /interrupt to cancel."
),
None => "Waiting for approval. Use /interrupt to cancel.".to_string(),
};
Ok(Some(msg))
} else {
Ok(None)
}
}
}
Loading
Loading