Skip to content

Commit 88e4d57

Browse files
committed
feat(acp-nats): add new_session handler with session.ready lifecycle
- Add new_session handler aligned with initialize.rs patterns (handler-specific error mapping, single-line generics, local subject variable) - Add spawn_session_ready to publish ext.session.ready after successful session creation - Add SessionReady extension type for bridge-backend coordination - Add session_new and ext_session_ready NATS subject functions - Add record_session_created() and record_error() to Metrics - Add Bridge task tracking for session_ready publish tasks - Add comprehensive test suite mirroring initialize handler tests Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 2f05030 commit 88e4d57

File tree

6 files changed

+402
-15
lines changed

6 files changed

+402
-15
lines changed

rsworkspace/crates/acp-nats/src/agent/mod.rs

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,30 @@
11
mod initialize;
2+
mod new_session;
23

34
use crate::config::Config;
4-
use crate::nats::{FlushClient, PublishClient, RequestClient};
5+
use crate::nats::{
6+
self, FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient, RetryPolicy,
7+
ExtSessionReady, agent,
8+
};
59
use crate::telemetry::metrics::Metrics;
610
use agent_client_protocol::ErrorCode;
711
use agent_client_protocol::{
812
Agent, AuthenticateRequest, AuthenticateResponse, CancelNotification, Error, ExtNotification,
913
ExtRequest, ExtResponse, InitializeRequest, InitializeResponse, LoadSessionRequest,
1014
LoadSessionResponse, NewSessionRequest, NewSessionResponse, PromptRequest, PromptResponse,
11-
Result, SetSessionModeRequest, SetSessionModeResponse,
15+
Result, SessionId, SetSessionModeRequest, SetSessionModeResponse,
1216
};
1317
use opentelemetry::metrics::Meter;
18+
use std::cell::RefCell;
19+
use tracing::{info, warn};
1420
use trogon_std::time::GetElapsed;
1521

1622
pub struct Bridge<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> {
1723
pub(crate) nats: N,
1824
pub(crate) clock: C,
1925
pub(crate) config: Config,
2026
pub(crate) metrics: Metrics,
27+
pub(crate) session_ready_publish_tasks: RefCell<Vec<tokio::task::JoinHandle<()>>>,
2128
}
2229

2330
impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Bridge<N, C> {
@@ -27,12 +34,50 @@ impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Bridge<N, C>
2734
clock,
2835
config,
2936
metrics: Metrics::new(meter),
37+
session_ready_publish_tasks: RefCell::new(Vec::new()),
3038
}
3139
}
3240

3341
pub(crate) fn nats(&self) -> &N {
3442
&self.nats
3543
}
44+
45+
pub(crate) fn spawn_session_ready(&self, session_id: &SessionId) {
46+
let nats_clone = self.nats.clone();
47+
let prefix = self.config.acp_prefix().to_owned();
48+
let session_id = session_id.clone();
49+
let metrics = self.metrics.clone();
50+
51+
let task = tokio::spawn(async move {
52+
let ready_subject =
53+
agent::ext_session_ready(&prefix, &session_id.to_string());
54+
info!(session_id = %session_id, subject = %ready_subject, "Publishing session.ready");
55+
56+
let ready_message = ExtSessionReady::new(session_id.clone());
57+
58+
let options = PublishOptions::builder()
59+
.publish_retry_policy(RetryPolicy::standard())
60+
.flush_policy(FlushPolicy::standard())
61+
.build();
62+
63+
if let Err(e) =
64+
nats::publish(&nats_clone, &ready_subject, &ready_message, options).await
65+
{
66+
warn!(
67+
error = %e,
68+
session_id = %session_id,
69+
"Failed to publish session.ready"
70+
);
71+
metrics.record_error("session_ready", "session_ready_publish_failed");
72+
} else {
73+
info!(session_id = %session_id, "Published session.ready");
74+
}
75+
});
76+
77+
let mut tasks = self.session_ready_publish_tasks.borrow_mut();
78+
tasks.retain(|t| !t.is_finished());
79+
tasks.push(task);
80+
}
3681
}
3782

3883
#[async_trait::async_trait(?Send)]
@@ -48,11 +93,8 @@ impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Agent for Br
4893
))
4994
}
5095

51-
async fn new_session(&self, _args: NewSessionRequest) -> Result<NewSessionResponse> {
52-
Err(Error::new(
53-
ErrorCode::InternalError.into(),
54-
"not yet implemented",
55-
))
96+
async fn new_session(&self, args: NewSessionRequest) -> Result<NewSessionResponse> {
97+
new_session::handle(self, args).await
5698
}
5799

58100
async fn load_session(&self, _args: LoadSessionRequest) -> Result<LoadSessionResponse> {
@@ -109,7 +151,7 @@ mod tests {
109151
use crate::config::Config;
110152
use agent_client_protocol::{
111153
Agent, AuthenticateRequest, CancelNotification, ExtNotification, ExtRequest,
112-
LoadSessionRequest, NewSessionRequest, PromptRequest, SetSessionModeRequest,
154+
LoadSessionRequest, PromptRequest, SetSessionModeRequest,
113155
};
114156
use trogon_nats::AdvancedMockNatsClient;
115157

@@ -137,12 +179,6 @@ mod tests {
137179
.await
138180
.is_err()
139181
);
140-
assert!(
141-
bridge
142-
.new_session(NewSessionRequest::new("."))
143-
.await
144-
.is_err()
145-
);
146182
assert!(
147183
bridge
148184
.load_session(LoadSessionRequest::new("s1", "."))

0 commit comments

Comments
 (0)