Skip to content

Commit d991ea0

Browse files
authored
Merge pull request #22 from nassor/testing-module
Add a public `testing` module with batteries-included test fixtures
2 parents f3c0355 + 6b75b4e commit d991ea0

18 files changed

Lines changed: 1999 additions & 308 deletions

File tree

cano/Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ metrics = { version = "0.24.5", optional = true }
3131
scheduler = ["dep:chrono", "dep:cron", "tokio/signal"]
3232
tracing = ["dep:tracing"]
3333
recovery = ["dep:redb", "dep:postcard"]
34-
all = ["scheduler", "tracing", "recovery", "metrics"]
3534
metrics = ["dep:metrics"]
35+
testing = []
36+
all = ["scheduler", "tracing", "recovery", "metrics", "testing"]
3637

3738
[dev-dependencies]
3839
axum = "0.8.9"
@@ -263,6 +264,11 @@ path = "examples/store_custom_backend.rs"
263264
name = "resources_advanced"
264265
path = "examples/resources_advanced.rs"
265266

267+
[[example]]
268+
name = "testing_helpers"
269+
path = "examples/testing_helpers.rs"
270+
required-features = ["testing"]
271+
266272
[[bench]]
267273
name = "workflow_performance"
268274
harness = false

cano/examples/testing_helpers.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
//! # `cano::testing` — batteries-included test fixtures
2+
//!
3+
//! Run with: `cargo run --example testing_helpers --features testing`
4+
//!
5+
//! Everything here lives behind the `testing` feature and is imported wholesale with
6+
//! `use cano::testing::*;`. This example exercises each helper the way a test would:
7+
//!
8+
//! - [`RecordingObserver`] — capture and assert the path a workflow took.
9+
//! - [`InMemoryCheckpointStore`] — a process-local checkpoint store (no `recovery`
10+
//! feature, no on-disk file) whose `Checkpoint` events the observer records.
11+
//! - [`TestResources`] — build the [`Resources`] a workflow needs in one chain.
12+
//! - [`panic_on_attempt`] — a task that panics; the engine converts it to an error
13+
//! (panic safety) and fails fast.
14+
//! - [`assert_compensation_ran`] — assert the order a saga rolled back in.
15+
16+
use std::sync::{Arc, Mutex};
17+
18+
use cano::prelude::*;
19+
use cano::testing::*;
20+
21+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
22+
enum Step {
23+
Start,
24+
Work,
25+
Finish,
26+
Done,
27+
}
28+
29+
struct StartTask;
30+
#[task(state = Step)]
31+
impl StartTask {
32+
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
33+
Ok(TaskResult::Single(Step::Work))
34+
}
35+
}
36+
37+
struct WorkTask;
38+
#[task(state = Step)]
39+
impl WorkTask {
40+
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
41+
let store = res.get::<MemoryStore, _>("store")?;
42+
store.put("processed", 42_u32)?;
43+
Ok(TaskResult::Single(Step::Done))
44+
}
45+
}
46+
47+
// ---- A tiny saga whose compensations record the order they undo in. -------------------
48+
49+
/// Shared, ordered log of which compensations ran. Registered as a resource so every
50+
/// step can reach it; `Clone` shares the inner `Arc`, so a handle kept outside the
51+
/// workflow reads the same `Vec`.
52+
#[derive(Default, Clone)]
53+
struct CompensationLog(Arc<Mutex<Vec<String>>>);
54+
#[resource]
55+
impl Resource for CompensationLog {}
56+
57+
#[derive(Debug, serde::Serialize, serde::Deserialize)]
58+
struct Unit;
59+
60+
struct Reserve;
61+
#[saga::task(state = Step)]
62+
impl Reserve {
63+
type Output = Unit;
64+
async fn run(&self, _res: &Resources) -> Result<(TaskResult<Step>, Unit), CanoError> {
65+
Ok((TaskResult::Single(Step::Work), Unit))
66+
}
67+
async fn compensate(&self, res: &Resources, _out: Unit) -> Result<(), CanoError> {
68+
res.get::<CompensationLog, _>("log")?
69+
.0
70+
.lock()
71+
.unwrap()
72+
.push("reserve".into());
73+
Ok(())
74+
}
75+
}
76+
77+
struct Charge;
78+
#[saga::task(state = Step)]
79+
impl Charge {
80+
type Output = Unit;
81+
async fn run(&self, _res: &Resources) -> Result<(TaskResult<Step>, Unit), CanoError> {
82+
Ok((TaskResult::Single(Step::Finish), Unit))
83+
}
84+
async fn compensate(&self, res: &Resources, _out: Unit) -> Result<(), CanoError> {
85+
res.get::<CompensationLog, _>("log")?
86+
.0
87+
.lock()
88+
.unwrap()
89+
.push("charge".into());
90+
Ok(())
91+
}
92+
}
93+
94+
struct Boom;
95+
#[task(state = Step)]
96+
impl Boom {
97+
fn config(&self) -> TaskConfig {
98+
TaskConfig::minimal()
99+
}
100+
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
101+
Err(CanoError::task_execution("boom"))
102+
}
103+
}
104+
105+
#[tokio::main]
106+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
107+
// 1. RecordingObserver + InMemoryCheckpointStore + TestResources --------------------
108+
let observer = Arc::new(RecordingObserver::new());
109+
let checkpoints = Arc::new(InMemoryCheckpointStore::new());
110+
let resources = TestResources::new().with_store("store").build();
111+
112+
let workflow = Workflow::new(resources)
113+
.register(Step::Start, StartTask)
114+
.register(Step::Work, WorkTask)
115+
.add_exit_state(Step::Done)
116+
.with_observer(observer.clone())
117+
.with_checkpoint_store(checkpoints.clone())
118+
.with_workflow_id("demo-run");
119+
120+
let final_state = workflow.orchestrate(Step::Start).await?;
121+
assert_eq!(final_state, Step::Done);
122+
123+
// The observer captured the whole path and the checkpoint appends along the way.
124+
observer.assert_path(&["Start", "Work", "Done"]);
125+
observer.assert_completed_with("Done");
126+
let checkpoint_events = observer
127+
.events()
128+
.into_iter()
129+
.filter(|e| matches!(e, RecordedEvent::Checkpoint { .. }))
130+
.count();
131+
println!("observer recorded path {:?}", observer.states_entered());
132+
println!("observer saw {checkpoint_events} checkpoint append(s)");
133+
134+
// 2. panic_on_attempt — panic safety: the panic becomes an error, fails fast. -------
135+
let panicky = Workflow::bare()
136+
.register(Step::Start, panic_on_attempt(1, Step::Done))
137+
.add_exit_state(Step::Done);
138+
match panicky.orchestrate(Step::Start).await {
139+
Ok(_) => unreachable!("the task panics on its first attempt"),
140+
Err(e) => println!("panic_on_attempt surfaced as error: {e}"),
141+
}
142+
143+
// 3. assert_compensation_ran — a saga that rolls back in reverse. --------------------
144+
let log = CompensationLog::default();
145+
let handle = log.clone();
146+
let saga_resources = Resources::new().insert("log", log);
147+
let saga = Workflow::new(saga_resources)
148+
.register_with_compensation(Step::Start, Reserve)
149+
.register_with_compensation(Step::Work, Charge)
150+
.register(Step::Finish, Boom) // fails → drains the compensation stack in reverse
151+
.add_exit_state(Step::Done);
152+
let _ = saga.orchestrate(Step::Start).await; // expected to fail and roll back
153+
154+
let ran = handle.0.lock().unwrap().clone();
155+
// Charge ran last, so it compensates first; then Reserve.
156+
assert_compensation_ran(&ran, &["charge", "reserve"]);
157+
println!("compensation ran in order: {ran:?}");
158+
159+
println!("\nall testing helpers exercised ✔");
160+
Ok(())
161+
}

cano/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,11 @@
217217
//! - [`saga`]: [`CompensatableTask`] — pair a forward step with a compensating action; failures roll back via [`Workflow::register_with_compensation`]
218218
//! - [`rate_limit`]: [`RateLimiter`] (token bucket) and [`WindowedRateLimiter`] (fixed window) throttles, both implementing [`Meter`] so they compose into a [`MultiRateLimiter`] that enforces several weighted tiers (e.g. a 5h + 7d + per-model limit) at once
219219
//! - [`store`]: [`MemoryStore`] — a typed in-memory store that implements [`Resource`]
220+
//! - `testing` (requires `testing` feature): batteries-included test fixtures —
221+
//! a `RecordingObserver`, a public `InMemoryCheckpointStore`, a `TestResources`
222+
//! builder, a `panic_on_attempt` task factory and an `assert_compensation_ran`
223+
//! saga helper. Import explicitly with `use cano::testing::*;` — it is deliberately
224+
//! **not** in the [`prelude`] so production code never picks it up by accident.
220225
//! - [`error`]: [`CanoError`] variants and the [`CanoResult`] alias
221226
//!
222227
//! ## Getting Started
@@ -242,6 +247,9 @@ pub mod metrics;
242247
#[cfg(feature = "scheduler")]
243248
pub mod scheduler;
244249

250+
#[cfg(feature = "testing")]
251+
pub mod testing;
252+
245253
// Core public API - simplified imports
246254
pub use circuit::{CircuitBreaker, CircuitPolicy, CircuitState, Permit as CircuitPermit};
247255
pub use error::{CanoError, CanoResult};

cano/src/observer.rs

Lines changed: 15 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -315,46 +315,11 @@ impl WorkflowObserver for MetricsObserver {
315315
#[cfg(test)]
316316
mod tests {
317317
use crate::prelude::*;
318+
use crate::workflow::test_support::EventLog;
318319
use std::borrow::Cow;
319-
use std::sync::{Arc, Mutex};
320+
use std::sync::Arc;
320321
use std::time::Duration;
321322

322-
/// Test observer that appends a stringified record of every event it receives.
323-
#[derive(Default)]
324-
struct RecordingObserver {
325-
events: Mutex<Vec<String>>,
326-
}
327-
328-
impl RecordingObserver {
329-
fn events(&self) -> Vec<String> {
330-
self.events.lock().unwrap().clone()
331-
}
332-
fn record(&self, event: String) {
333-
self.events.lock().unwrap().push(event);
334-
}
335-
}
336-
337-
impl WorkflowObserver for RecordingObserver {
338-
fn on_state_enter(&self, state: &str) {
339-
self.record(format!("state_enter:{state}"));
340-
}
341-
fn on_task_start(&self, task_id: &str) {
342-
self.record(format!("task_start:{task_id}"));
343-
}
344-
fn on_task_success(&self, task_id: &str) {
345-
self.record(format!("task_success:{task_id}"));
346-
}
347-
fn on_task_failure(&self, task_id: &str, _err: &CanoError) {
348-
self.record(format!("task_failure:{task_id}"));
349-
}
350-
fn on_retry(&self, task_id: &str, attempt: u32) {
351-
self.record(format!("retry:{task_id}:{attempt}"));
352-
}
353-
fn on_circuit_open(&self, task_id: &str) {
354-
self.record(format!("circuit_open:{task_id}"));
355-
}
356-
}
357-
358323
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
359324
enum S {
360325
Start,
@@ -411,15 +376,15 @@ mod tests {
411376

412377
#[tokio::test]
413378
async fn observer_fires_on_success_path() {
414-
let observer = Arc::new(RecordingObserver::default());
379+
let (obs, rec) = EventLog::new();
415380
let workflow = Workflow::bare()
416381
.register(S::Start, OkTask)
417382
.add_exit_state(S::Done)
418-
.with_observer(observer.clone());
383+
.with_observer(Arc::new(obs));
419384

420385
assert_eq!(workflow.orchestrate(S::Start).await.unwrap(), S::Done);
421386

422-
let events = observer.events();
387+
let events = rec.labels();
423388
assert!(
424389
events.contains(&"state_enter:Start".to_string()),
425390
"{events:?}"
@@ -444,15 +409,15 @@ mod tests {
444409

445410
#[tokio::test]
446411
async fn observer_fires_on_retry_and_failure() {
447-
let observer = Arc::new(RecordingObserver::default());
412+
let (obs, rec) = EventLog::new();
448413
let workflow = Workflow::bare()
449414
.register(S::Start, FailTask)
450415
.add_exit_state(S::Done)
451-
.with_observer(observer.clone());
416+
.with_observer(Arc::new(obs));
452417

453418
assert!(workflow.orchestrate(S::Start).await.is_err());
454419

455-
let events = observer.events();
420+
let events = rec.labels();
456421
assert!(
457422
events.contains(&"task_start:FailTask".to_string()),
458423
"{events:?}"
@@ -487,7 +452,7 @@ mod tests {
487452
let permit = breaker.try_acquire().expect("closed breaker admits");
488453
breaker.record_failure(permit);
489454

490-
let observer = Arc::new(RecordingObserver::default());
455+
let (obs, rec) = EventLog::new();
491456
let workflow = Workflow::bare()
492457
.register(
493458
S::Start,
@@ -496,13 +461,13 @@ mod tests {
496461
},
497462
)
498463
.add_exit_state(S::Done)
499-
.with_observer(observer.clone());
464+
.with_observer(Arc::new(obs));
500465

501466
let err = workflow.orchestrate(S::Start).await.unwrap_err();
502467
// The FSM wraps the failure with state context; the inner is CircuitOpen.
503468
assert!(matches!(err.inner(), CanoError::CircuitOpen(_)), "{err}");
504469

505-
let events = observer.events();
470+
let events = rec.labels();
506471
assert!(
507472
events.contains(&"circuit_open:CircuitTask".to_string()),
508473
"{events:?}"
@@ -571,21 +536,12 @@ mod tests {
571536

572537
#[test]
573538
fn on_workflow_timeout_can_be_overridden_to_record_event() {
574-
use std::sync::{Arc, Mutex};
575-
use std::time::Duration;
576-
#[derive(Default)]
577-
struct Recorder(Mutex<Vec<(Duration, Duration)>>);
578-
impl WorkflowObserver for Recorder {
579-
fn on_workflow_timeout(&self, elapsed: Duration, limit: Duration) {
580-
self.0.lock().unwrap().push((elapsed, limit));
581-
}
582-
}
583-
let obs = Arc::new(Recorder::default());
584-
let dyn_obs: Arc<dyn WorkflowObserver> = obs.clone();
539+
// `EventLog` is itself a custom override — proves the hook can be observed.
540+
let (obs, rec) = EventLog::new();
541+
let dyn_obs: Arc<dyn WorkflowObserver> = Arc::new(obs);
585542
dyn_obs.on_workflow_timeout(Duration::from_millis(150), Duration::from_millis(100));
586-
let events = obs.0.lock().unwrap().clone();
587543
assert_eq!(
588-
events,
544+
rec.timeouts(),
589545
vec![(Duration::from_millis(150), Duration::from_millis(100))]
590546
);
591547
}

0 commit comments

Comments
 (0)