Skip to content

Commit 9313dbb

Browse files
committed
Implement GenFsm timeouts and state actions
Features added: - State timeouts: Set via InitResult::ok_timeout(), EventResult::next_state_timeout(), or CallResult::reply_timeout(). Automatically cancelled on state change. - Generic named timeouts: Set via StateAction::GenericTimeout(name, duration), cancelled via StateAction::CancelTimeout(name) - StateAction processing: Reply, StateTimeout, GenericTimeout, CancelTimeout, Postpone - Postponed events: Events marked with StateAction::Postpone are re-queued for processing in the next state - Internal TimeoutManager handles cancellation and event delivery Tests added: - test_fsm_state_actions: Verifies StateAction processing with generic timeouts - test_fsm_enter_exit_callbacks: Verifies enter/exit state hooks are called
1 parent cf1ff54 commit 9313dbb

2 files changed

Lines changed: 588 additions & 106 deletions

File tree

crates/starlang/src/gen_fsm/mod.rs

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,4 +294,251 @@ mod tests {
294294
sleep(Duration::from_millis(50)).await;
295295
assert_eq!(cycle_count.load(Ordering::SeqCst), 1);
296296
}
297+
298+
// =========================================================================
299+
// StateAction Tests
300+
// =========================================================================
301+
302+
/// FSM that tests StateActions including generic timeouts
303+
struct ActionFsm;
304+
305+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
306+
enum ActionState {
307+
Waiting,
308+
Processing,
309+
Complete,
310+
}
311+
312+
#[derive(Debug, Clone, Serialize, Deserialize)]
313+
enum ActionEvent {
314+
Start,
315+
Process,
316+
Done,
317+
}
318+
319+
#[derive(Debug, Clone, Serialize, Deserialize)]
320+
enum ActionCall {
321+
GetState,
322+
GetCounter,
323+
}
324+
325+
#[derive(Debug, Clone, Serialize, Deserialize)]
326+
enum ActionReply {
327+
State(ActionState),
328+
Counter(u32),
329+
}
330+
331+
#[derive(Debug, Default)]
332+
struct ActionData {
333+
counter: u32,
334+
}
335+
336+
#[async_trait]
337+
impl GenFsm for ActionFsm {
338+
type State = ActionState;
339+
type Event = ActionEvent;
340+
type Data = ActionData;
341+
type InitArg = ();
342+
type Call = ActionCall;
343+
type Reply = ActionReply;
344+
345+
async fn init(_arg: ()) -> InitResult<ActionState, ActionData> {
346+
InitResult::ok(ActionState::Waiting, ActionData::default())
347+
}
348+
349+
async fn handle_event(
350+
state: &ActionState,
351+
event: ActionEvent,
352+
data: &mut ActionData,
353+
) -> EventResult<ActionState, ActionReply> {
354+
match (state, event) {
355+
(ActionState::Waiting, ActionEvent::Start) => {
356+
// Transition with a generic timeout action
357+
EventResult::next_state_actions(
358+
ActionState::Processing,
359+
vec![StateAction::GenericTimeout(
360+
"process_timeout".to_string(),
361+
Duration::from_millis(100),
362+
)],
363+
)
364+
}
365+
(ActionState::Processing, ActionEvent::Process) => {
366+
data.counter += 1;
367+
EventResult::keep_state()
368+
}
369+
(ActionState::Processing, ActionEvent::Done) => {
370+
// Cancel the timeout and transition
371+
EventResult::next_state_actions(
372+
ActionState::Complete,
373+
vec![StateAction::CancelTimeout("process_timeout".to_string())],
374+
)
375+
}
376+
_ => EventResult::keep_state(),
377+
}
378+
}
379+
380+
async fn handle_call(
381+
state: &ActionState,
382+
request: ActionCall,
383+
_from: From,
384+
data: &mut ActionData,
385+
) -> CallResult<ActionState, ActionReply> {
386+
match request {
387+
ActionCall::GetState => CallResult::reply(ActionReply::State(*state), *state),
388+
ActionCall::GetCounter => {
389+
CallResult::reply(ActionReply::Counter(data.counter), *state)
390+
}
391+
}
392+
}
393+
}
394+
395+
#[tokio::test]
396+
async fn test_fsm_state_actions() {
397+
crate::process::global::init();
398+
let handle = crate::process::global::handle();
399+
400+
let pid = start::<ActionFsm>(()).await.unwrap();
401+
402+
// Initial state should be Waiting
403+
let state = Arc::new(AtomicU32::new(0)); // 0=Waiting, 1=Processing, 2=Complete
404+
{
405+
let state = state.clone();
406+
handle.spawn(move || async move {
407+
if let Ok(ActionReply::State(s)) =
408+
call::<ActionFsm>(pid, ActionCall::GetState, Duration::from_secs(5)).await
409+
{
410+
let val = match s {
411+
ActionState::Waiting => 0,
412+
ActionState::Processing => 1,
413+
ActionState::Complete => 2,
414+
};
415+
state.store(val, Ordering::SeqCst);
416+
}
417+
});
418+
}
419+
sleep(Duration::from_millis(50)).await;
420+
assert_eq!(state.load(Ordering::SeqCst), 0); // Waiting
421+
422+
// Send Start event to transition to Processing
423+
send_event::<ActionFsm>(pid, ActionEvent::Start).unwrap();
424+
sleep(Duration::from_millis(50)).await;
425+
426+
{
427+
let state = state.clone();
428+
handle.spawn(move || async move {
429+
if let Ok(ActionReply::State(s)) =
430+
call::<ActionFsm>(pid, ActionCall::GetState, Duration::from_secs(5)).await
431+
{
432+
let val = match s {
433+
ActionState::Waiting => 0,
434+
ActionState::Processing => 1,
435+
ActionState::Complete => 2,
436+
};
437+
state.store(val, Ordering::SeqCst);
438+
}
439+
});
440+
}
441+
sleep(Duration::from_millis(50)).await;
442+
assert_eq!(state.load(Ordering::SeqCst), 1); // Processing
443+
444+
// Send Done to complete
445+
send_event::<ActionFsm>(pid, ActionEvent::Done).unwrap();
446+
sleep(Duration::from_millis(50)).await;
447+
448+
{
449+
let state = state.clone();
450+
handle.spawn(move || async move {
451+
if let Ok(ActionReply::State(s)) =
452+
call::<ActionFsm>(pid, ActionCall::GetState, Duration::from_secs(5)).await
453+
{
454+
let val = match s {
455+
ActionState::Waiting => 0,
456+
ActionState::Processing => 1,
457+
ActionState::Complete => 2,
458+
};
459+
state.store(val, Ordering::SeqCst);
460+
}
461+
});
462+
}
463+
sleep(Duration::from_millis(50)).await;
464+
assert_eq!(state.load(Ordering::SeqCst), 2); // Complete
465+
}
466+
467+
#[tokio::test]
468+
async fn test_fsm_enter_exit_callbacks() {
469+
use std::sync::atomic::AtomicBool;
470+
471+
static ENTERED_RED: AtomicBool = AtomicBool::new(false);
472+
static EXITED_RED: AtomicBool = AtomicBool::new(false);
473+
static ENTERED_GREEN: AtomicBool = AtomicBool::new(false);
474+
475+
struct CallbackFsm;
476+
477+
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
478+
enum CbState {
479+
Red,
480+
Green,
481+
}
482+
483+
#[derive(Debug, Clone, Serialize, Deserialize)]
484+
enum CbEvent {
485+
Switch,
486+
}
487+
488+
#[async_trait]
489+
impl GenFsm for CallbackFsm {
490+
type State = CbState;
491+
type Event = CbEvent;
492+
type Data = ();
493+
type InitArg = ();
494+
type Call = ();
495+
type Reply = ();
496+
497+
async fn init(_: ()) -> InitResult<CbState, ()> {
498+
InitResult::ok(CbState::Red, ())
499+
}
500+
501+
async fn handle_event(
502+
state: &CbState,
503+
_event: CbEvent,
504+
_data: &mut (),
505+
) -> EventResult<CbState, ()> {
506+
match state {
507+
CbState::Red => EventResult::next_state(CbState::Green),
508+
CbState::Green => EventResult::next_state(CbState::Red),
509+
}
510+
}
511+
512+
async fn enter_state(state: &CbState, _data: &mut ()) {
513+
match state {
514+
CbState::Red => ENTERED_RED.store(true, Ordering::SeqCst),
515+
CbState::Green => ENTERED_GREEN.store(true, Ordering::SeqCst),
516+
}
517+
}
518+
519+
async fn exit_state(state: &CbState, _data: &mut ()) {
520+
match state {
521+
CbState::Red => EXITED_RED.store(true, Ordering::SeqCst),
522+
CbState::Green => {}
523+
}
524+
}
525+
}
526+
527+
crate::process::global::init();
528+
529+
let pid = start::<CallbackFsm>(()).await.unwrap();
530+
531+
// enter_state should be called for initial state
532+
sleep(Duration::from_millis(50)).await;
533+
assert!(ENTERED_RED.load(Ordering::SeqCst));
534+
assert!(!EXITED_RED.load(Ordering::SeqCst));
535+
assert!(!ENTERED_GREEN.load(Ordering::SeqCst));
536+
537+
// Transition Red -> Green
538+
send_event::<CallbackFsm>(pid, CbEvent::Switch).unwrap();
539+
sleep(Duration::from_millis(50)).await;
540+
541+
assert!(EXITED_RED.load(Ordering::SeqCst));
542+
assert!(ENTERED_GREEN.load(Ordering::SeqCst));
543+
}
297544
}

0 commit comments

Comments
 (0)