Skip to content

Commit dacc692

Browse files
committed
fix: ensure command sender is cleared during graceful shutdown to prevent erroneous state reporting
1 parent e0f3ab4 commit dacc692

1 file changed

Lines changed: 99 additions & 0 deletions

File tree

src/scheduler.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,11 @@ where
342342
match cmd {
343343
SchedulerCommand::Stop => {
344344
*self.running.write().await = false;
345+
// Clear the sender immediately so callers racing with the
346+
// graceful-shutdown window (draining loop tasks + waiting on
347+
// in-flight workflows) get a deterministic "not running" error
348+
// instead of enqueueing commands into a queue nobody is draining.
349+
*self.command_tx.write().await = None;
345350
break;
346351
}
347352
SchedulerCommand::Trigger(id) => {
@@ -881,6 +886,100 @@ mod tests {
881886
assert!(result.is_ok(), "Test timed out");
882887
}
883888

889+
#[tokio::test(flavor = "multi_thread")]
890+
async fn test_stop_clears_sender_during_graceful_shutdown_window() {
891+
// Regression: while start() drains the command loop and waits on
892+
// in-flight workflows, a concurrent stop()/trigger() must report
893+
// "not running" instead of enqueueing into a queue nobody drains.
894+
#[derive(Clone)]
895+
struct SlowNode;
896+
897+
#[async_trait]
898+
impl Node<TestState> for SlowNode {
899+
type PrepResult = ();
900+
type ExecResult = ();
901+
902+
async fn prep(&self, _store: &MemoryStore) -> CanoResult<()> {
903+
Ok(())
904+
}
905+
906+
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {
907+
// Hold Status::Running long enough to span the shutdown window.
908+
sleep(Duration::from_millis(400)).await;
909+
}
910+
911+
async fn post(
912+
&self,
913+
_store: &MemoryStore,
914+
_exec_res: Self::ExecResult,
915+
) -> CanoResult<TestState> {
916+
Ok(TestState::Complete)
917+
}
918+
}
919+
920+
let timeout = Duration::from_secs(5);
921+
let result = tokio::time::timeout(timeout, async {
922+
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
923+
let slow_workflow = Workflow::new(MemoryStore::new())
924+
.register(TestState::Start, SlowNode)
925+
.add_exit_state(TestState::Complete)
926+
.add_exit_state(TestState::Error);
927+
scheduler
928+
.manual("slow_task", slow_workflow, TestState::Start)
929+
.unwrap();
930+
931+
let mut scheduler_for_start = scheduler.clone();
932+
let scheduler_handle = tokio::spawn(async move { scheduler_for_start.start().await });
933+
934+
// Let start() spin up.
935+
sleep(Duration::from_millis(50)).await;
936+
937+
// Kick off the slow workflow and wait until it is actually Running
938+
// so has_running_flows() will keep start() parked after Stop.
939+
scheduler.trigger("slow_task").await.unwrap();
940+
sleep(Duration::from_millis(50)).await;
941+
assert!(
942+
scheduler.has_running_flows().await,
943+
"slow workflow should be in Running state before we call stop()"
944+
);
945+
946+
// First stop(): succeeds, Stop lands in the channel.
947+
scheduler.stop().await.unwrap();
948+
949+
// Let start() dequeue Stop and clear command_tx. The slow workflow
950+
// is still in exec (~400ms total), so start() is parked inside
951+
// has_running_flows() — the shutdown window we want to probe.
952+
sleep(Duration::from_millis(50)).await;
953+
assert!(
954+
!scheduler_handle.is_finished(),
955+
"start() must still be parked waiting for the slow workflow"
956+
);
957+
assert!(
958+
scheduler.has_running_flows().await,
959+
"slow workflow must still be running during the shutdown window"
960+
);
961+
962+
// During the window, stop()/trigger() must report not-running
963+
// instead of filling a queue that will never be drained.
964+
let err = scheduler.stop().await.unwrap_err();
965+
assert!(
966+
err.to_string().contains("Scheduler not running"),
967+
"expected not-running during shutdown window, got: {err}"
968+
);
969+
let err = scheduler.trigger("slow_task").await.unwrap_err();
970+
assert!(
971+
err.to_string().contains("Scheduler not running"),
972+
"expected not-running during shutdown window, got: {err}"
973+
);
974+
975+
// Finally, start() completes cleanly once the slow workflow finishes.
976+
scheduler_handle.await.unwrap().unwrap();
977+
})
978+
.await;
979+
980+
assert!(result.is_ok(), "Test timed out");
981+
}
982+
884983
#[tokio::test(flavor = "multi_thread")]
885984
async fn test_failed_workflow() {
886985
let timeout = Duration::from_secs(2);

0 commit comments

Comments
 (0)