Skip to content

Commit 4b77952

Browse files
committed
Wire ACPX session selection through chat channels
1 parent 3e480f3 commit 4b77952

19 files changed

Lines changed: 353 additions & 57 deletions

BACKLOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@
7979
- [ ] Explore a local web channel for desktop/LAN testing that uses the same
8080
identity, routing, message-envelope, artifact, and proxy policy paths as
8181
Telegram, Matrix, and text channels
82+
- [ ] Prototype a native ACP client adapter using the `agent-client-protocol`
83+
Rust crate, with Zed's `codex-acp` as the first smoke target
84+
- [ ] Evaluate ACP orchestrators such as AgentPool, cagent, and fast-agent as
85+
recipe-backed async work backends
8286
- [ ] Architecture decision records (ADRs)
8387

8488
### Observability

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,6 @@ bash scripts/install-git-hooks.sh
183183
- [Roadmap](docs/roadmap/)
184184
- [Staging test matrix](docs/staging-test-matrix.md)
185185
- [Channel secret-input deprecation note](docs/roadmap/channel-secret-input-deprecation.md)
186-
- [Internal research and planning notes](research/)
187186

188187
## License
189188

crates/calciforge/src/adapters/acpx.rs

Lines changed: 61 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ use tracing::{debug, info};
1414

1515
use crate::adapters::{AdapterError, AgentAdapter, DispatchContext};
1616

17+
/// Shared cwd for ACPX sessions created through Calciforge.
18+
pub const ACPX_SESSION_DIR: &str = "/tmp/acpx-sessions";
19+
1720
/// ACPX adapter — wraps acpx CLI for ACP agent communication
1821
pub struct AcpxAdapter {
1922
agent_name: String,
@@ -36,7 +39,7 @@ impl AcpxAdapter {
3639
_args: args.unwrap_or_default(),
3740
env: env.unwrap_or_default(),
3841
timeout_ms: timeout_ms.unwrap_or(300_000),
39-
session_dir: PathBuf::from("/tmp/acpx-sessions"),
42+
session_dir: PathBuf::from(ACPX_SESSION_DIR),
4043
}
4144
}
4245

@@ -92,7 +95,9 @@ impl AcpxAdapter {
9295
let output = Command::new("acpx")
9396
.arg(&self.agent_name)
9497
.arg("sessions")
95-
.arg("new")
98+
.arg("ensure")
99+
.arg("--name")
100+
.arg(session_name)
96101
.current_dir(&self.session_dir)
97102
.envs(&self.env)
98103
.output()
@@ -142,6 +147,34 @@ impl AcpxAdapter {
142147
.to_string()
143148
}
144149

150+
async fn run_session_prompt(
151+
&self,
152+
message: &str,
153+
session: Option<&str>,
154+
) -> Result<std::process::Output, AdapterError> {
155+
let mut cmd = Command::new("acpx");
156+
cmd.arg("--format").arg("text").arg(&self.agent_name);
157+
if let Some(session) = session {
158+
cmd.arg("--session").arg(session);
159+
}
160+
cmd.arg("prompt")
161+
.arg(message)
162+
.current_dir(&self.session_dir)
163+
.envs(&self.env)
164+
.stdout(Stdio::piped())
165+
.stderr(Stdio::piped());
166+
167+
let timeout = std::time::Duration::from_millis(self.timeout_ms);
168+
let child = cmd
169+
.spawn()
170+
.map_err(|e| AdapterError::Unavailable(format!("Failed to spawn acpx: {}", e)))?;
171+
172+
tokio::time::timeout(timeout, child.wait_with_output())
173+
.await
174+
.map_err(|_| AdapterError::Unavailable("acpx prompt timed out".to_string()))?
175+
.map_err(|e| AdapterError::Unavailable(format!("Failed to run acpx: {}", e)))
176+
}
177+
145178
/// Execute one-shot prompt (no session persistence)
146179
async fn exec_prompt(&self, message: &str) -> Result<String, AdapterError> {
147180
self.ensure_session_dir().await?;
@@ -190,40 +223,38 @@ impl AcpxAdapter {
190223
}
191224

192225
/// Send prompt to persistent session
193-
async fn session_prompt(&self, message: &str) -> Result<String, AdapterError> {
226+
async fn session_prompt(
227+
&self,
228+
message: &str,
229+
session: Option<&str>,
230+
) -> Result<String, AdapterError> {
194231
self.ensure_session_dir().await?;
195232

196233
// Use cwd session (default session name)
197-
info!(agent = %self.agent_name, "Running acpx prompt with session");
198-
199-
let mut cmd = Command::new("acpx");
200-
cmd.arg("--format")
201-
.arg("text")
202-
.arg(&self.agent_name)
203-
.arg("prompt")
204-
.arg(message)
205-
.current_dir(&self.session_dir)
206-
.envs(&self.env)
207-
.stdout(Stdio::piped())
208-
.stderr(Stdio::piped());
209-
210-
let timeout = std::time::Duration::from_millis(self.timeout_ms);
211-
let child = cmd
212-
.spawn()
213-
.map_err(|e| AdapterError::Unavailable(format!("Failed to spawn acpx: {}", e)))?;
214-
215-
let result = tokio::time::timeout(timeout, child.wait_with_output()).await;
234+
info!(
235+
agent = %self.agent_name,
236+
session = ?session,
237+
"Running acpx prompt with session"
238+
);
216239

217-
match result {
218-
Ok(Ok(output)) => {
240+
match self.run_session_prompt(message, session).await {
241+
Ok(output) => {
219242
if !output.status.success() {
220243
let stderr = String::from_utf8_lossy(&output.stderr);
221244
// Session might not exist — try creating it
222245
if stderr.contains("session") || stderr.contains("not found") {
223246
info!("Session not found, creating...");
224-
self.ensure_session("cwd").await?;
225-
// Retry
226-
return self.exec_prompt(message).await;
247+
self.ensure_session(session.unwrap_or("cwd")).await?;
248+
let retry = self.run_session_prompt(message, session).await?;
249+
if retry.status.success() {
250+
let stdout = String::from_utf8_lossy(&retry.stdout);
251+
return Ok(Self::strip_acpx_noise(&stdout));
252+
}
253+
let retry_stderr = String::from_utf8_lossy(&retry.stderr);
254+
return Err(AdapterError::Protocol(format!(
255+
"acpx prompt failed: {}",
256+
retry_stderr
257+
)));
227258
}
228259
return Err(AdapterError::Protocol(format!(
229260
"acpx prompt failed: {}",
@@ -233,13 +264,7 @@ impl AcpxAdapter {
233264
let stdout = String::from_utf8_lossy(&output.stdout);
234265
Ok(Self::strip_acpx_noise(&stdout))
235266
}
236-
Ok(Err(e)) => Err(AdapterError::Unavailable(format!(
237-
"Failed to run acpx: {}",
238-
e
239-
))),
240-
Err(_) => Err(AdapterError::Unavailable(
241-
"acpx prompt timed out".to_string(),
242-
)),
267+
Err(e) => Err(e),
243268
}
244269
}
245270
}
@@ -262,8 +287,9 @@ impl AgentAdapter for AcpxAdapter {
262287
};
263288

264289
// Try session mode first, fall back to exec
265-
match self.session_prompt(&message).await {
290+
match self.session_prompt(&message, ctx.session).await {
266291
Ok(response) => Ok(response),
292+
Err(e @ AdapterError::Protocol(_)) if ctx.session.is_some() => Err(e),
267293
Err(AdapterError::Protocol(_)) => {
268294
// Session error — try one-shot exec
269295
self.exec_prompt(&message).await

crates/calciforge/src/adapters/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ pub struct DispatchContext<'a> {
113113
/// Optional per-request model override (used by alloy routing).
114114
#[allow(dead_code)]
115115
pub model_override: Option<&'a str>,
116+
/// Optional downstream session selected for session-aware adapters.
117+
pub session: Option<&'a str>,
116118
}
117119

118120
impl<'a> DispatchContext<'a> {
@@ -122,6 +124,7 @@ impl<'a> DispatchContext<'a> {
122124
message,
123125
sender: None,
124126
model_override: None,
127+
session: None,
125128
}
126129
}
127130
}

crates/calciforge/src/adapters/openai_compat.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ mod tests {
279279
message: "hello",
280280
sender: Some("brian"),
281281
model_override: Some("override"),
282+
session: None,
282283
})
283284
.await
284285
.unwrap();

crates/calciforge/src/adapters/openclaw_channel.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ mod tests {
493493
message: "hello from calciforge",
494494
sender: Some("brian"),
495495
model_override: None,
496+
session: None,
496497
})
497498
.await
498499
.expect("dispatch should succeed");
@@ -536,6 +537,7 @@ mod tests {
536537
message: "route this",
537538
sender: Some("renee"),
538539
model_override: None,
540+
session: None,
539541
})
540542
.await
541543
.expect("dispatch should return reply callback");
@@ -566,6 +568,7 @@ mod tests {
566568
message: "first",
567569
sender: Some("brian"),
568570
model_override: None,
571+
session: None,
569572
})
570573
.await
571574
.expect("first dispatch should start reply server");
@@ -577,6 +580,7 @@ mod tests {
577580
message: "second",
578581
sender: Some("renee"),
579582
model_override: None,
583+
session: None,
580584
})
581585
.await
582586
.expect("rebuilt adapter should reuse reply server/router");
@@ -603,6 +607,7 @@ mod tests {
603607
message: "will not send",
604608
sender: Some("brian"),
605609
model_override: None,
610+
session: None,
606611
})
607612
.await
608613
.expect_err("conflicting reply auth token should fail");

crates/calciforge/src/adapters/openclaw_native.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,7 @@ mod tests {
463463
message: "hello",
464464
sender: Some("brian"),
465465
model_override: None,
466+
session: None,
466467
};
467468
let result = a.dispatch_with_context(ctx).await;
468469

@@ -597,6 +598,7 @@ mod tests {
597598
message: msg,
598599
sender: Some("brian"),
599600
model_override: None,
601+
session: None,
600602
};
601603
let _ = a.dispatch_with_context(ctx).await;
602604
}
@@ -667,6 +669,7 @@ mod tests {
667669
message: "status check",
668670
sender: Some("renee"),
669671
model_override: None,
672+
session: None,
670673
};
671674
let _ = a.dispatch_with_context(ctx).await;
672675

crates/calciforge/src/adapters/zeroclaw_native.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ impl AgentAdapter for ZeroClawNativeAdapter {
194194
message: &full_message,
195195
sender: ctx.sender,
196196
model_override: ctx.model_override,
197+
session: ctx.session,
197198
};
198199

199200
match self.inner.dispatch_with_context(inner_ctx).await {
@@ -398,6 +399,7 @@ mod tests {
398399
message: "what is 2+2?",
399400
sender: Some("brian"),
400401
model_override: None,
402+
session: None,
401403
})
402404
.await;
403405
assert!(r1.is_ok(), "first dispatch failed: {:?}", r1);
@@ -408,6 +410,7 @@ mod tests {
408410
message: "and 3+3?",
409411
sender: Some("brian"),
410412
model_override: None,
413+
session: None,
411414
})
412415
.await;
413416
assert!(r2.is_ok(), "second dispatch failed: {:?}", r2);
@@ -496,6 +499,7 @@ mod tests {
496499
message: "brian first message",
497500
sender: Some("brian"),
498501
model_override: None,
502+
session: None,
499503
})
500504
.await;
501505

@@ -505,6 +509,7 @@ mod tests {
505509
message: "renee first message",
506510
sender: Some("renee"),
507511
model_override: None,
512+
session: None,
508513
})
509514
.await;
510515

@@ -514,6 +519,7 @@ mod tests {
514519
message: "brian second message",
515520
sender: Some("brian"),
516521
model_override: None,
522+
session: None,
517523
})
518524
.await;
519525

crates/calciforge/src/channels/matrix.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -832,14 +832,16 @@ async fn handle_message(
832832
);
833833
let dispatch_start = std::time::Instant::now();
834834
let model_override = cmd_handler.active_model_for_identity(identity_id);
835+
let selected_session = cmd_handler.active_session_for(identity_id, &agent_id);
835836

836837
match router
837-
.dispatch_message_with_sender_and_model(
838+
.dispatch_message_with_sender_model_and_session(
838839
&augmented,
839840
&agent,
840841
config,
841842
Some(identity_id),
842843
model_override.as_deref(),
844+
selected_session.as_deref(),
843845
)
844846
.await
845847
{

crates/calciforge/src/channels/signal.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,9 @@ impl<C: Channel + ?Sized + 'static> SignalChannel<C> {
363363

364364
let identity_id = identity.id.clone();
365365
let model_override = self.command_handler.active_model_for_identity(&identity_id);
366+
let selected_session = self
367+
.command_handler
368+
.active_session_for(&identity_id, &agent_id);
366369
let preserve_native_commands = crate::adapters::agent_supports_native_commands(&agent);
367370

368371
tokio::spawn(async move {
@@ -379,12 +382,13 @@ impl<C: Channel + ?Sized + 'static> SignalChannel<C> {
379382
let dispatch_start = std::time::Instant::now();
380383
match self
381384
.router
382-
.dispatch_message_with_sender_and_model(
385+
.dispatch_message_with_sender_model_and_session(
383386
&augmented,
384387
&agent,
385388
&self.config,
386389
Some(&identity_id),
387390
model_override.as_deref(),
391+
selected_session.as_deref(),
388392
)
389393
.await
390394
{

0 commit comments

Comments
 (0)