Skip to content

Commit 0d0959e

Browse files
author
超渡法師
committed
fix(cron): atomic write, kill child on timeout, add timeout test
- update_usercron_job: write to .toml.tmp then rename (atomic on POSIX) - check_disable_on_success: use spawn() + wait_with_output() to retain child handle; explicitly kill on timeout to prevent orphan processes - Add disable_on_success_kills_child_on_timeout test (sleep 999 + 1s timeout)
1 parent f8d3d16 commit 0d0959e

1 file changed

Lines changed: 84 additions & 21 deletions

File tree

src/cron.rs

Lines changed: 84 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -751,15 +751,16 @@ async fn check_disable_on_success(
751751
marker: &str,
752752
) -> DisableOnSuccessResult {
753753
let timeout_secs = job.disable_on_success_timeout_secs.max(1);
754-
let mut child = shell_command(command);
754+
let mut cmd = shell_command(command);
755755
if let Some(dir) = non_empty_opt(job.disable_on_success_working_dir.as_deref()) {
756-
child.current_dir(dir);
756+
cmd.current_dir(dir);
757757
}
758+
cmd.stdout(std::process::Stdio::piped());
759+
cmd.stderr(std::process::Stdio::piped());
758760

759-
let output = match timeout(std::time::Duration::from_secs(timeout_secs), child.output()).await
760-
{
761-
Ok(Ok(output)) => output,
762-
Ok(Err(e)) => {
761+
let mut child = match cmd.spawn() {
762+
Ok(child) => child,
763+
Err(e) => {
763764
warn!(
764765
id = job.id.as_deref().unwrap_or(""),
765766
command,
@@ -768,27 +769,74 @@ async fn check_disable_on_success(
768769
);
769770
return DisableOnSuccessResult::NotAchieved("command failed to start");
770771
}
771-
Err(_) => {
772+
};
773+
774+
// Take stdout/stderr handles and drain them concurrently to prevent pipe buffer deadlock.
775+
let stdout_handle = child.stdout.take();
776+
let stderr_handle = child.stderr.take();
777+
778+
let stdout_task = tokio::spawn(async move {
779+
let mut buf = Vec::new();
780+
if let Some(mut out) = stdout_handle {
781+
let _ = tokio::io::AsyncReadExt::read_to_end(&mut out, &mut buf).await;
782+
}
783+
buf
784+
});
785+
let stderr_task = tokio::spawn(async move {
786+
let mut buf = Vec::new();
787+
if let Some(mut err) = stderr_handle {
788+
let _ = tokio::io::AsyncReadExt::read_to_end(&mut err, &mut buf).await;
789+
}
790+
buf
791+
});
792+
793+
let deadline = tokio::time::sleep(std::time::Duration::from_secs(timeout_secs));
794+
tokio::pin!(deadline);
795+
796+
tokio::select! {
797+
status = child.wait() => {
798+
let status = match status {
799+
Ok(s) => s,
800+
Err(e) => {
801+
warn!(
802+
id = job.id.as_deref().unwrap_or(""),
803+
command,
804+
error = %e,
805+
"disable_on_success command wait failed"
806+
);
807+
stdout_task.abort();
808+
stderr_task.abort();
809+
return DisableOnSuccessResult::NotAchieved("command failed to start");
810+
}
811+
};
812+
if !status.success() {
813+
stdout_task.abort();
814+
stderr_task.abort();
815+
return DisableOnSuccessResult::NotAchieved("command exited non-zero");
816+
}
817+
let stdout_buf = stdout_task.await.unwrap_or_default();
818+
let stderr_buf = stderr_task.await.unwrap_or_default();
819+
let stdout = String::from_utf8_lossy(&stdout_buf);
820+
let stderr = String::from_utf8_lossy(&stderr_buf);
821+
if stdout.contains(marker) || stderr.contains(marker) {
822+
DisableOnSuccessResult::Achieved
823+
} else {
824+
DisableOnSuccessResult::NotAchieved("success marker not found")
825+
}
826+
}
827+
_ = &mut deadline => {
828+
// Timeout — kill the child to avoid orphan processes.
829+
let _ = child.kill().await;
830+
stdout_task.abort();
831+
stderr_task.abort();
772832
warn!(
773833
id = job.id.as_deref().unwrap_or(""),
774834
command,
775835
timeout_secs,
776836
"disable_on_success command timed out"
777837
);
778-
return DisableOnSuccessResult::NotAchieved("command timed out");
838+
DisableOnSuccessResult::NotAchieved("command timed out")
779839
}
780-
};
781-
782-
if !output.status.success() {
783-
return DisableOnSuccessResult::NotAchieved("command exited non-zero");
784-
}
785-
786-
let stdout = String::from_utf8_lossy(&output.stdout);
787-
let stderr = String::from_utf8_lossy(&output.stderr);
788-
if stdout.contains(marker) || stderr.contains(marker) {
789-
DisableOnSuccessResult::Achieved
790-
} else {
791-
DisableOnSuccessResult::NotAchieved("success marker not found")
792840
}
793841
}
794842

@@ -839,7 +887,10 @@ fn update_usercron_job(
839887
anyhow::bail!("usercron job id {:?} not found", id);
840888
}
841889

842-
std::fs::write(path, doc.to_string())?;
890+
// Atomic write: write to temp file then rename to avoid corruption on crash.
891+
let tmp = path.with_extension("toml.tmp");
892+
std::fs::write(&tmp, doc.to_string())?;
893+
std::fs::rename(&tmp, path)?;
843894
Ok(())
844895
}
845896

@@ -1451,6 +1502,18 @@ message = "a"
14511502
));
14521503
}
14531504

1505+
#[tokio::test]
1506+
async fn disable_on_success_kills_child_on_timeout() {
1507+
let mut job = test_cron_job();
1508+
job.disable_on_success_timeout_secs = 1;
1509+
1510+
let result = check_disable_on_success(&job, "sleep 999", "SUCCESS").await;
1511+
assert!(matches!(
1512+
result,
1513+
DisableOnSuccessResult::NotAchieved("command timed out")
1514+
));
1515+
}
1516+
14541517
fn test_cron_job() -> CronJobConfig {
14551518
CronJobConfig {
14561519
id: Some("goal".into()),

0 commit comments

Comments
 (0)