Skip to content

Commit 753ac4a

Browse files
committed
feat(acp-nats): add prompt handler (#20)
- Add prompt handler with session validation, cancel pre-flight, backpressure - Expand Bridge with CancelledSessions, PendingSessionPromptResponseWaiters - Update cancel to mark sessions cancelled and resolve pending prompt waiters - Add prompt_timeout and max_concurrent_client_tasks to config - Add session_prompt NATS subject Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 263c14b commit 753ac4a

File tree

5 files changed

+770
-69
lines changed

5 files changed

+770
-69
lines changed

rsworkspace/crates/acp-nats/src/agent/cancel.rs

Lines changed: 56 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use super::Bridge;
2+
use agent_client_protocol::{CancelNotification, PromptResponse, Result, StopReason};
23
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
34
use crate::session_id::AcpSessionId;
4-
use agent_client_protocol::{CancelNotification, Error, ErrorCode, Result};
5+
use agent_client_protocol::{Error, ErrorCode};
56
use tracing::{info, instrument, warn};
67
use trogon_std::time::GetElapsed;
78

8-
/// Publishes the cancel notification to the backend via NATS (fire-and-forget).
9-
/// The publish failure is logged and recorded as a metric but does not propagate
10-
/// to the caller, so the client always receives `Ok(())`.
9+
/// Handles cancel notification requests.
10+
///
11+
/// Marks the session as cancelled, resolves any pending prompt waiters, and publishes
12+
/// the cancellation to the backend. The backend publish is fire-and-forget - if it fails,
13+
/// the error is logged and recorded in metrics, but the method still returns `Ok(())`.
1114
#[instrument(
1215
name = "acp.session.cancel",
1316
skip(bridge, args),
@@ -22,18 +25,26 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
2225
info!(session_id = %args.session_id, "Cancel notification");
2326

2427
AcpSessionId::try_from(&args.session_id).map_err(|e| {
25-
bridge
26-
.metrics
27-
.record_request("cancel", bridge.clock.elapsed(start).as_secs_f64(), false);
28-
bridge
29-
.metrics
30-
.record_error("session_validate", "invalid_session_id");
28+
bridge.metrics.record_request(
29+
"cancel",
30+
bridge.clock.elapsed(start).as_secs_f64(),
31+
false,
32+
);
33+
bridge.metrics.record_error("cancel", "invalid_session_id");
3134
Error::new(
3235
ErrorCode::InvalidParams.into(),
3336
format!("Invalid session ID: {}", e),
3437
)
3538
})?;
3639

40+
bridge
41+
.cancelled_sessions
42+
.mark_cancelled(args.session_id.clone(), &bridge.clock);
43+
44+
bridge
45+
.pending_session_prompt_responses
46+
.resolve_waiter(&args.session_id, Ok(PromptResponse::new(StopReason::Cancelled)));
47+
3748
let subject = agent::session_cancel(bridge.config.acp_prefix(), &args.session_id.to_string());
3849

3950
let publish_result = nats::publish(
@@ -46,20 +57,24 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
4657
)
4758
.await;
4859

49-
if let Err(error) = publish_result {
50-
warn!(
51-
session_id = %args.session_id,
52-
error = %error,
53-
"Failed to publish cancel notification to backend"
54-
);
55-
bridge
56-
.metrics
57-
.record_error("cancel", "cancel_publish_failed");
58-
}
60+
let publish_ok = match publish_result {
61+
Ok(()) => true,
62+
Err(error) => {
63+
warn!(
64+
session_id = %args.session_id,
65+
error = %error,
66+
"Failed to publish cancel notification to backend"
67+
);
68+
bridge.metrics.record_error("cancel", "cancel_publish_failed");
69+
false
70+
}
71+
};
5972

60-
bridge
61-
.metrics
62-
.record_request("cancel", bridge.clock.elapsed(start).as_secs_f64(), true);
73+
bridge.metrics.record_request(
74+
"cancel",
75+
bridge.clock.elapsed(start).as_secs_f64(),
76+
publish_ok,
77+
);
6378

6479
Ok(())
6580
}
@@ -183,6 +198,20 @@ mod tests {
183198
.is_some()
184199
}
185200

201+
#[tokio::test]
202+
async fn cancel_resolves_pending_prompt_waiter_with_cancelled() {
203+
let (_mock, bridge) = mock_bridge();
204+
let rx = bridge
205+
.pending_session_prompt_responses
206+
.register_waiter(agent_client_protocol::SessionId::from("s1"))
207+
.unwrap();
208+
209+
let _ = bridge.cancel(CancelNotification::new("s1")).await;
210+
211+
let result = rx.await.unwrap().unwrap();
212+
assert_eq!(result.stop_reason, agent_client_protocol::StopReason::Cancelled);
213+
}
214+
186215
#[tokio::test]
187216
async fn cancel_publishes_to_correct_subject() {
188217
let (mock, bridge) = mock_bridge();
@@ -223,8 +252,8 @@ mod tests {
223252
"expected acp.request.count with method=cancel, success=false on validation failure"
224253
);
225254
assert!(
226-
has_error_metric(&finished_metrics, "session_validate", "invalid_session_id"),
227-
"expected acp.errors.total with operation=session_validate, reason=invalid_session_id"
255+
has_error_metric(&finished_metrics, "cancel", "invalid_session_id"),
256+
"expected acp.errors.total with operation=cancel, reason=invalid_session_id"
228257
);
229258
provider.shutdown().unwrap();
230259
}
@@ -258,8 +287,8 @@ mod tests {
258287
"expected acp.errors.total with operation=cancel, reason=cancel_publish_failed"
259288
);
260289
assert!(
261-
has_request_metric(&finished_metrics, "cancel", true),
262-
"publish failure is fire-and-forget; caller still gets Ok, so success=true"
290+
has_request_metric(&finished_metrics, "cancel", false),
291+
"request metric records publish outcome; success=false when publish fails"
263292
);
264293
provider.shutdown().unwrap();
265294
}

0 commit comments

Comments
 (0)