Skip to content

Commit 73480a6

Browse files
committed
fix: rename benchmark from cow_performance to store_shared_performance and add shared operation tests in MemoryStore
1 parent fff6c8c commit 73480a6

3 files changed

Lines changed: 114 additions & 7 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,5 +135,5 @@ name = "task_performance"
135135
harness = false
136136

137137
[[bench]]
138-
name = "cow_performance"
138+
name = "store_shared_performance"
139139
harness = false
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use cano::{MemoryStore, store::KeyValueStore};
22
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
33
use std::sync::Arc;
44

5-
fn bench_cow_operations(c: &mut Criterion) {
6-
let mut group = c.benchmark_group("cow_performance");
5+
fn bench_store_shared_operations(c: &mut Criterion) {
6+
let mut group = c.benchmark_group("store_shared_performance");
77

88
// Test with different data sizes
99
// Small: 1KB
@@ -49,5 +49,5 @@ fn bench_cow_operations(c: &mut Criterion) {
4949
group.finish();
5050
}
5151

52-
criterion_group!(benches, bench_cow_operations);
52+
criterion_group!(benches, bench_store_shared_operations);
5353
criterion_main!(benches);

src/scheduler.rs

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,9 @@ where
354354
let handle = tokio::spawn(async move {
355355
execute_flow(workflow, initial_state, info).await;
356356
});
357-
self.scheduler_tasks.write().await.push(handle);
357+
let mut tasks = self.scheduler_tasks.write().await;
358+
tasks.retain(|h| !h.is_finished());
359+
tasks.push(handle);
358360
}
359361
}
360362
}
@@ -371,16 +373,23 @@ where
371373
let timeout = Duration::from_secs(30);
372374
let start_time = tokio::time::Instant::now();
373375

376+
let mut result = Ok(());
374377
while self.has_running_flows().await {
375378
if start_time.elapsed() >= timeout {
376-
return Err(CanoError::Workflow(
379+
result = Err(CanoError::Workflow(
377380
"Timeout waiting for workflows to complete".to_string(),
378381
));
382+
break;
379383
}
380384
sleep(Duration::from_millis(100)).await;
381385
}
382386

383-
Ok(())
387+
// Clear the command sender so subsequent stop()/trigger() calls report
388+
// the documented "Scheduler not running" error instead of a channel-closed
389+
// surprise, and allow clean restarts via a fresh start() call.
390+
*self.command_tx.write().await = None;
391+
392+
result
384393
}
385394

386395
/// Stop the scheduler and wait for all running workflows to complete
@@ -773,6 +782,104 @@ mod tests {
773782
assert!(status.is_none());
774783
}
775784

785+
#[tokio::test(flavor = "multi_thread")]
786+
async fn test_trigger_reaps_finished_handles() {
787+
let timeout = Duration::from_secs(5);
788+
let result = tokio::time::timeout(timeout, async {
789+
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
790+
scheduler
791+
.manual("manual_task", create_test_workflow(), TestState::Start)
792+
.unwrap();
793+
794+
let mut scheduler_for_start = scheduler.clone();
795+
let scheduler_handle = tokio::spawn(async move { scheduler_for_start.start().await });
796+
797+
// Let scheduler boot. Manual schedules push no loop handles.
798+
sleep(Duration::from_millis(50)).await;
799+
assert_eq!(scheduler.scheduler_tasks.read().await.len(), 0);
800+
801+
// Fire many triggers sequentially, letting each complete before the next.
802+
// Without reaping, scheduler_tasks would grow to 20 entries.
803+
for _ in 0..20 {
804+
scheduler.trigger("manual_task").await.unwrap();
805+
sleep(Duration::from_millis(30)).await;
806+
}
807+
808+
// After reaping, only the most recent (finished) handle can still sit in the vec.
809+
let in_flight = scheduler.scheduler_tasks.read().await.len();
810+
assert!(
811+
in_flight <= 1,
812+
"expected reaping to bound in-flight handles to <=1, got {in_flight}"
813+
);
814+
815+
// Sanity: workflow actually ran all 20 times.
816+
let status = scheduler.status("manual_task").await.unwrap();
817+
assert_eq!(status.run_count, 20);
818+
819+
scheduler.stop().await.unwrap();
820+
let _ = tokio::time::timeout(Duration::from_secs(1), scheduler_handle).await;
821+
})
822+
.await;
823+
824+
assert!(result.is_ok(), "Test timed out");
825+
}
826+
827+
#[tokio::test(flavor = "multi_thread")]
828+
async fn test_stop_and_trigger_after_shutdown_report_not_running() {
829+
let timeout = Duration::from_secs(5);
830+
let result = tokio::time::timeout(timeout, async {
831+
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
832+
scheduler
833+
.manual("manual_task", create_test_workflow(), TestState::Start)
834+
.unwrap();
835+
836+
// Before start(), both calls must report "not running".
837+
let err = scheduler.stop().await.unwrap_err();
838+
assert!(
839+
err.to_string().contains("Scheduler not running"),
840+
"expected not-running error before start, got: {err}"
841+
);
842+
let err = scheduler.trigger("manual_task").await.unwrap_err();
843+
assert!(
844+
err.to_string().contains("Scheduler not running"),
845+
"expected not-running error before start, got: {err}"
846+
);
847+
848+
// Start, then stop, then wait for start() to actually return.
849+
let mut scheduler_for_start = scheduler.clone();
850+
let scheduler_handle = tokio::spawn(async move { scheduler_for_start.start().await });
851+
sleep(Duration::from_millis(50)).await;
852+
scheduler.stop().await.unwrap();
853+
scheduler_handle.await.unwrap().unwrap();
854+
855+
// After start() returned, the API contract must hold again:
856+
// subsequent stop()/trigger() must report "not running" (not a
857+
// channel-closed error leak).
858+
let err = scheduler.stop().await.unwrap_err();
859+
assert!(
860+
err.to_string().contains("Scheduler not running"),
861+
"expected not-running error after shutdown, got: {err}"
862+
);
863+
let err = scheduler.trigger("manual_task").await.unwrap_err();
864+
assert!(
865+
err.to_string().contains("Scheduler not running"),
866+
"expected not-running error after shutdown, got: {err}"
867+
);
868+
869+
// And a clean restart must work.
870+
let mut scheduler_for_restart = scheduler.clone();
871+
let restart_handle = tokio::spawn(async move { scheduler_for_restart.start().await });
872+
sleep(Duration::from_millis(50)).await;
873+
scheduler.trigger("manual_task").await.unwrap();
874+
sleep(Duration::from_millis(50)).await;
875+
scheduler.stop().await.unwrap();
876+
restart_handle.await.unwrap().unwrap();
877+
})
878+
.await;
879+
880+
assert!(result.is_ok(), "Test timed out");
881+
}
882+
776883
#[tokio::test(flavor = "multi_thread")]
777884
async fn test_failed_workflow() {
778885
let timeout = Duration::from_secs(2);

0 commit comments

Comments
 (0)