Skip to content

Commit 0a67c6b

Browse files
committed
fix(claude-code): respawn persistent CLI process on broken pipe
Replace OnceCell with Mutex<Option<CliProcess>> so a dead CLI process can be detected and respawned. The persistent stream-json process (introduced in aaif-goose#7029) could die unexpectedly (e.g. after Ctrl+C), leaving the OnceCell permanently poisoned with a stale process whose stdin pipe is closed. Signed-off-by: rabi <ramishra@redhat.com>
1 parent 948cb91 commit 0a67c6b

File tree

1 file changed

+181
-130
lines changed

1 file changed

+181
-130
lines changed

crates/goose/src/providers/claude_code.rs

Lines changed: 181 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,68 @@ struct CliProcess {
2929
child: tokio::process::Child,
3030
stdin: tokio::process::ChildStdin,
3131
reader: BufReader<tokio::process::ChildStdout>,
32-
#[allow(dead_code)]
3332
stderr_handle: tokio::task::JoinHandle<String>,
3433
messages_sent: usize,
3534
}
3635

36+
impl CliProcess {
37+
async fn send_and_read(
38+
&mut self,
39+
content_blocks: &[Value],
40+
) -> Result<Vec<String>, ProviderError> {
41+
let ndjson_line = build_stream_json_input(content_blocks);
42+
self.stdin
43+
.write_all(ndjson_line.as_bytes())
44+
.await
45+
.map_err(|e| {
46+
ProviderError::RequestFailed(format!("Failed to write to stdin: {}", e))
47+
})?;
48+
self.stdin.write_all(b"\n").await.map_err(|e| {
49+
ProviderError::RequestFailed(format!("Failed to write newline to stdin: {}", e))
50+
})?;
51+
52+
let mut lines = Vec::new();
53+
let mut line = String::new();
54+
55+
loop {
56+
line.clear();
57+
match self.reader.read_line(&mut line).await {
58+
Ok(0) => {
59+
return Err(ProviderError::RequestFailed(
60+
"Claude CLI process terminated unexpectedly".to_string(),
61+
));
62+
}
63+
Ok(_) => {
64+
let trimmed = line.trim();
65+
if trimmed.is_empty() {
66+
continue;
67+
}
68+
lines.push(trimmed.to_string());
69+
70+
if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
71+
match parsed.get("type").and_then(|t| t.as_str()) {
72+
Some("result") => break,
73+
Some("error") => break,
74+
_ => {}
75+
}
76+
}
77+
}
78+
Err(e) => {
79+
return Err(ProviderError::RequestFailed(format!(
80+
"Failed to read output: {}",
81+
e
82+
)));
83+
}
84+
}
85+
}
86+
87+
Ok(lines)
88+
}
89+
}
90+
3791
impl Drop for CliProcess {
3892
fn drop(&mut self) {
93+
self.stderr_handle.abort();
3994
let _ = self.child.start_kill();
4095
}
4196
}
@@ -52,7 +107,7 @@ pub struct ClaudeCodeProvider {
52107
#[serde(skip)]
53108
name: String,
54109
#[serde(skip)]
55-
cli_process: tokio::sync::OnceCell<tokio::sync::Mutex<CliProcess>>,
110+
cli_process: tokio::sync::Mutex<Option<CliProcess>>,
56111
}
57112

58113
impl ClaudeCodeProvider {
@@ -65,7 +120,7 @@ impl ClaudeCodeProvider {
65120
command: resolved_command,
66121
model,
67122
name: CLAUDE_CODE_PROVIDER_NAME.to_string(),
68-
cli_process: tokio::sync::OnceCell::new(),
123+
cli_process: tokio::sync::Mutex::new(None),
69124
})
70125
}
71126

@@ -252,6 +307,71 @@ impl ClaudeCodeProvider {
252307
Ok((response_message, usage))
253308
}
254309

310+
fn spawn_process(&self, filtered_system: &str) -> Result<CliProcess, ProviderError> {
311+
let mut cmd = Command::new(&self.command);
312+
configure_command_no_window(&mut cmd);
313+
cmd.arg("--input-format")
314+
.arg("stream-json")
315+
.arg("--output-format")
316+
.arg("stream-json")
317+
.arg("--verbose")
318+
.arg("--system-prompt")
319+
.arg(filtered_system);
320+
321+
if CLAUDE_CODE_KNOWN_MODELS.contains(&self.model.model_name.as_str()) {
322+
cmd.arg("--model").arg(&self.model.model_name);
323+
}
324+
325+
Self::apply_permission_flags(&mut cmd)?;
326+
327+
cmd.stdin(Stdio::piped())
328+
.stdout(Stdio::piped())
329+
.stderr(Stdio::piped());
330+
331+
let mut child = cmd.spawn().map_err(|e| {
332+
ProviderError::RequestFailed(format!(
333+
"Failed to spawn Claude CLI command '{:?}': {}.",
334+
self.command, e
335+
))
336+
})?;
337+
338+
let stdin = child
339+
.stdin
340+
.take()
341+
.ok_or_else(|| ProviderError::RequestFailed("Failed to capture stdin".to_string()))?;
342+
let stdout = child
343+
.stdout
344+
.take()
345+
.ok_or_else(|| ProviderError::RequestFailed("Failed to capture stdout".to_string()))?;
346+
347+
// Drain stderr concurrently to prevent pipe buffer deadlock
348+
let stderr = child.stderr.take();
349+
let stderr_handle = tokio::spawn(async move {
350+
let mut output = String::new();
351+
if let Some(mut stderr) = stderr {
352+
use tokio::io::AsyncReadExt;
353+
let _ = stderr.read_to_string(&mut output).await;
354+
}
355+
output
356+
});
357+
358+
Ok(CliProcess {
359+
child,
360+
stdin,
361+
reader: BufReader::new(stdout),
362+
stderr_handle,
363+
messages_sent: 0,
364+
})
365+
}
366+
367+
fn is_process_alive(process: &mut CliProcess) -> bool {
368+
match process.child.try_wait() {
369+
Ok(None) => true,
370+
Ok(Some(_)) => false,
371+
Err(_) => false,
372+
}
373+
}
374+
255375
async fn execute_command(
256376
&self,
257377
system: &str,
@@ -268,146 +388,59 @@ impl ClaudeCodeProvider {
268388
"Filtered system prompt length: {} chars",
269389
filtered_system.len()
270390
);
271-
println!("Filtered system prompt: {}", filtered_system);
272391
println!("================================");
273392
}
274393

275-
// Spawn lazily on first call (OnceCell ensures exactly once)
276-
let process_mutex = self
277-
.cli_process
278-
.get_or_try_init(|| async {
279-
let mut cmd = Command::new(&self.command);
280-
// NO -p flag — persistent mode
281-
configure_command_no_window(&mut cmd);
282-
cmd.arg("--input-format")
283-
.arg("stream-json")
284-
.arg("--output-format")
285-
.arg("stream-json")
286-
.arg("--verbose")
287-
// System prompt is set once at process start. The provider
288-
// instance is not reused across sessions with different prompts.
289-
.arg("--system-prompt")
290-
.arg(&filtered_system);
291-
292-
// Only pass model parameter if it's in the known models list
293-
if CLAUDE_CODE_KNOWN_MODELS.contains(&self.model.model_name.as_str()) {
294-
cmd.arg("--model").arg(&self.model.model_name);
295-
}
394+
let mut guard = self.cli_process.lock().await;
296395

297-
// Add permission mode based on GOOSE_MODE setting
298-
Self::apply_permission_flags(&mut cmd)?;
299-
300-
cmd.stdin(Stdio::piped())
301-
.stdout(Stdio::piped())
302-
.stderr(Stdio::piped());
303-
304-
let mut child = cmd.spawn().map_err(|e| {
305-
ProviderError::RequestFailed(format!(
306-
"Failed to spawn Claude CLI command '{:?}': {}.",
307-
self.command, e
308-
))
309-
})?;
310-
311-
let stdin = child.stdin.take().ok_or_else(|| {
312-
ProviderError::RequestFailed("Failed to capture stdin".to_string())
313-
})?;
314-
let stdout = child.stdout.take().ok_or_else(|| {
315-
ProviderError::RequestFailed("Failed to capture stdout".to_string())
316-
})?;
317-
318-
// Drain stderr concurrently to prevent pipe buffer deadlock
319-
let stderr = child.stderr.take();
320-
let stderr_handle = tokio::spawn(async move {
321-
let mut output = String::new();
322-
if let Some(mut stderr) = stderr {
323-
use tokio::io::AsyncReadExt;
324-
let _ = stderr.read_to_string(&mut output).await;
325-
}
326-
output
327-
});
328-
329-
Ok::<_, ProviderError>(tokio::sync::Mutex::new(CliProcess {
330-
child,
331-
stdin,
332-
reader: BufReader::new(stdout),
333-
stderr_handle,
334-
messages_sent: 0,
335-
}))
336-
})
337-
.await?;
396+
let needs_spawn = match guard.as_mut() {
397+
None => true,
398+
Some(p) => !Self::is_process_alive(p),
399+
};
400+
if needs_spawn {
401+
*guard = Some(self.spawn_process(&filtered_system)?);
402+
}
338403

339-
let mut process = process_mutex.lock().await;
404+
let new_messages = self.content_blocks_for(guard.as_ref().unwrap(), messages);
405+
let process = guard.as_mut().unwrap();
406+
match process.send_and_read(&new_messages).await {
407+
Ok(lines) => {
408+
process.messages_sent = messages.len();
409+
tracing::debug!("Command executed successfully, got {} lines", lines.len());
410+
Ok(lines)
411+
}
412+
Err(e) if Self::is_recoverable(&e) => {
413+
tracing::debug!("CLI process dead, respawning");
414+
let process = guard.insert(self.spawn_process(&filtered_system)?);
415+
let new_messages = self.content_blocks_for(process, messages);
416+
let lines = process.send_and_read(&new_messages).await?;
417+
process.messages_sent = messages.len();
418+
tracing::debug!(
419+
"Command executed successfully after respawn, got {} lines",
420+
lines.len()
421+
);
422+
Ok(lines)
423+
}
424+
Err(e) => Err(e),
425+
}
426+
}
340427

341-
// Build content from new messages only (skip already-sent ones).
342-
// If messages is shorter than messages_sent, the caller started a fresh
343-
// conversation on the same provider instance — send everything.
428+
fn content_blocks_for(&self, process: &CliProcess, messages: &[Message]) -> Vec<Value> {
344429
let new_messages = if process.messages_sent > 0 && process.messages_sent < messages.len() {
345430
&messages[process.messages_sent..]
346431
} else {
347432
messages
348433
};
349-
let new_blocks = self.messages_to_content_blocks(new_messages);
350-
351-
// Write NDJSON line to stdin
352-
let ndjson_line = build_stream_json_input(&new_blocks);
353-
process
354-
.stdin
355-
.write_all(ndjson_line.as_bytes())
356-
.await
357-
.map_err(|e| {
358-
ProviderError::RequestFailed(format!("Failed to write to stdin: {}", e))
359-
})?;
360-
process.stdin.write_all(b"\n").await.map_err(|e| {
361-
ProviderError::RequestFailed(format!("Failed to write newline to stdin: {}", e))
362-
})?;
363-
364-
// Read lines until we see a "result" event
365-
let mut lines = Vec::new();
366-
let mut line = String::new();
367-
368-
loop {
369-
line.clear();
370-
match process.reader.read_line(&mut line).await {
371-
Ok(0) => {
372-
// EOF means the process died
373-
return Err(ProviderError::RequestFailed(
374-
"Claude CLI process terminated unexpectedly".to_string(),
375-
));
376-
}
377-
Ok(_) => {
378-
let trimmed = line.trim();
379-
if trimmed.is_empty() {
380-
continue;
381-
}
382-
lines.push(trimmed.to_string());
434+
self.messages_to_content_blocks(new_messages)
435+
}
383436

384-
// Check if this is a result event (end of turn)
385-
if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
386-
match parsed.get("type").and_then(|t| t.as_str()) {
387-
Some("result") => break,
388-
Some("error") => break,
389-
_ => {}
390-
}
391-
}
392-
}
393-
Err(e) => {
394-
return Err(ProviderError::RequestFailed(format!(
395-
"Failed to read output: {}",
396-
e
397-
)));
398-
}
437+
fn is_recoverable(err: &ProviderError) -> bool {
438+
match err {
439+
ProviderError::RequestFailed(msg) => {
440+
msg.contains("Broken pipe") || msg.contains("terminated unexpectedly")
399441
}
442+
_ => false,
400443
}
401-
402-
// Update messages_sent for next turn
403-
process.messages_sent = messages.len();
404-
405-
tracing::debug!("Command executed successfully, got {} lines", lines.len());
406-
for (i, line) in lines.iter().enumerate() {
407-
tracing::debug!("Line {}: {}", i, line);
408-
}
409-
410-
Ok(lines)
411444
}
412445

413446
/// Generate a simple session description without calling subprocess
@@ -734,12 +767,30 @@ mod tests {
734767
);
735768
}
736769

770+
#[test]
771+
fn test_is_recoverable() {
772+
assert!(ClaudeCodeProvider::is_recoverable(
773+
&ProviderError::RequestFailed(
774+
"Failed to write to stdin: Broken pipe (os error 32)".into()
775+
)
776+
));
777+
assert!(ClaudeCodeProvider::is_recoverable(
778+
&ProviderError::RequestFailed("Claude CLI process terminated unexpectedly".into())
779+
));
780+
assert!(!ClaudeCodeProvider::is_recoverable(
781+
&ProviderError::RequestFailed("Failed to read output: connection reset".into())
782+
));
783+
assert!(!ClaudeCodeProvider::is_recoverable(
784+
&ProviderError::Authentication("Broken pipe".into()) // wrong variant
785+
));
786+
}
787+
737788
fn make_provider() -> ClaudeCodeProvider {
738789
ClaudeCodeProvider {
739790
command: PathBuf::from("claude"),
740791
model: ModelConfig::new("sonnet").unwrap(),
741792
name: "claude-code".to_string(),
742-
cli_process: tokio::sync::OnceCell::new(),
793+
cli_process: tokio::sync::Mutex::new(None),
743794
}
744795
}
745796
}

0 commit comments

Comments
 (0)