Skip to content

Commit c9b2592

Browse files
committed
Support for message timeouts
1 parent 67c5be0 commit c9b2592

5 files changed

+67
-18
lines changed

act/statemachine.go

+55-6
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,10 @@ type StateMachine[D any] struct {
7878
stateEnterCallback StateEnterCallback[D]
7979

8080
// Pointer to the most recently configured state timeout.
81-
activeStateTimeout *ActiveStateTimeout
81+
stateTimeout *ActiveStateTimeout
82+
83+
// Pointer to the most recently configured message timeout.
84+
messageTimeout *ActiveMessageTimeout
8285

8386
// genericTimeouts maps the name of a generic timeout to the timeout
8487
genericTimeouts map[gen.Atom]*ActiveGenericTimeout
@@ -116,6 +119,19 @@ type ActiveGenericTimeout struct {
116119
cancel context.CancelFunc
117120
}
118121

122+
type MessageTimeout struct {
123+
Duration time.Duration
124+
Message any
125+
}
126+
127+
func (MessageTimeout) isAction() {}
128+
129+
type ActiveMessageTimeout struct {
130+
timeout MessageTimeout
131+
ctx context.Context
132+
cancel context.CancelFunc
133+
}
134+
119135
// Type alias for MessageHandler callbacks.
120136
// D is the type of the data associated with the StateMachine.
121137
// M is the type of the message this handler accepts.
@@ -159,6 +175,7 @@ func NewStateMachineSpec[D any](initialState gen.Atom, options ...Option[D]) Sta
159175
}
160176
return spec
161177
}
178+
162179
func WithData[D any](data D) Option[D] {
163180
return func(s *StateMachineSpec[D]) {
164181
s.data = data
@@ -211,9 +228,9 @@ func (s *StateMachine[D]) SetCurrentState(state gen.Atom) {
211228
// just registered this timeout in `ProcessActions` and we should not
212229
// touch it. Otherwise we should cancel the active state timeout if there
213230
// is one.
214-
if s.hasActiveStateTimeout() && s.activeStateTimeout.state != state {
231+
if s.hasActiveStateTimeout() && s.stateTimeout.state != state {
215232
s.Log().Info("StateMachine: canceling state timeout for state %s", state)
216-
s.activeStateTimeout.cancel()
233+
s.stateTimeout.cancel()
217234
}
218235
// Execute state enter callback until no new transition is triggered.
219236
if s.stateEnterCallback != nil {
@@ -236,7 +253,11 @@ func (s *StateMachine[D]) SetData(data D) {
236253
}
237254

238255
func (s *StateMachine[D]) hasActiveStateTimeout() bool {
239-
return s.activeStateTimeout != nil && s.activeStateTimeout.ctx.Err() == nil
256+
return s.stateTimeout != nil && s.stateTimeout.ctx.Err() == nil
257+
}
258+
259+
func (s *StateMachine[D]) hasActiveMessageTimeout() bool {
260+
return s.messageTimeout != nil && s.messageTimeout.ctx.Err() == nil
240261
}
241262

242263
func (s *StateMachine[D]) hasActiveGenericTimeout(name gen.Atom) bool {
@@ -353,6 +374,11 @@ func (s *StateMachine[D]) ProcessRun() (rr error) {
353374
return nil
354375
}
355376

377+
// Any message should cancel the active message timeout
378+
if s.hasActiveMessageTimeout() {
379+
s.messageTimeout.cancel()
380+
}
381+
356382
switch message.Type {
357383
case gen.MailboxMessageTypeRegular:
358384
switch message.Message.(type) {
@@ -471,10 +497,10 @@ func (s *StateMachine[D]) ProcessActions(actions []Action, state gen.Atom) {
471497
switch action := action.(type) {
472498
case StateTimeout:
473499
if s.hasActiveStateTimeout() {
474-
s.activeStateTimeout.cancel()
500+
s.stateTimeout.cancel()
475501
}
476502
ctx, cancel := context.WithTimeout(context.Background(), action.Duration)
477-
s.activeStateTimeout = &ActiveStateTimeout{
503+
s.stateTimeout = &ActiveStateTimeout{
478504
state: state,
479505
timeout: action,
480506
ctx: ctx,
@@ -492,6 +518,14 @@ func (s *StateMachine[D]) ProcessActions(actions []Action, state gen.Atom) {
492518
cancel: cancel,
493519
}
494520
go startGenericTimeout(ctx, action.Name, action.Message, s)
521+
case MessageTimeout:
522+
ctx, cancel := context.WithTimeout(context.Background(), action.Duration)
523+
s.messageTimeout = &ActiveMessageTimeout{
524+
timeout: action,
525+
ctx: ctx,
526+
cancel: cancel,
527+
}
528+
go startMessageTimeout(ctx, action.Message, s)
495529
default:
496530
panic("unsupported action")
497531
}
@@ -528,6 +562,21 @@ func startGenericTimeout(ctx context.Context, name gen.Atom, message any, proc g
528562
}
529563
}
530564

565+
func startMessageTimeout(ctx context.Context, message any, proc gen.Process) {
566+
select {
567+
case <-ctx.Done():
568+
switch ctx.Err() {
569+
case context.DeadlineExceeded:
570+
proc.Log().Info("StateMachine: message timeout timed out")
571+
proc.Send(proc.PID(), message)
572+
return
573+
case context.Canceled:
574+
proc.Log().Info("StateMachine: message timeout canceled")
575+
return
576+
}
577+
}
578+
}
579+
531580
func (s *StateMachine[D]) lookupMessageHandler(messageType string) (any, bool) {
532581
if stateMessageHandlers, exists := s.stateMessageHandlers[s.currentState]; exists == true {
533582
if callback, exists := stateMessageHandlers[messageType]; exists == true {

tests/001_local/t018_statemachine_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type t18_state_transitions_data struct {
5555
type t18_state2 struct {
5656
}
5757

58-
type t18_get_transitions struct {
58+
type t18_get_state_transitions struct {
5959
}
6060

6161
func (sm *t18_state_transitions) Init(args ...any) (act.StateMachineSpec[t18_state_transitions_data], error) {
@@ -64,21 +64,21 @@ func (sm *t18_state_transitions) Init(args ...any) (act.StateMachineSpec[t18_sta
6464
act.WithData(t18_state_transitions_data{}),
6565

6666
// set up a message handler for the transition [state1] -> [state2]
67-
act.WithStateMessageHandler(gen.Atom("state1"), t18_move_to_state2),
67+
act.WithStateMessageHandler(gen.Atom("state1"), t18_handle_state2),
6868

6969
// set up a call handler to query the number of state transitions
70-
act.WithStateCallHandler(gen.Atom("state2"), t18_total_transitions),
70+
act.WithStateCallHandler(gen.Atom("state2"), t18_handle_get_state_transitions),
7171
)
7272

7373
return spec, nil
7474
}
7575

76-
func t18_move_to_state2(state gen.Atom, data t18_state_transitions_data, message t18_state2, proc gen.Process) (gen.Atom, t18_state_transitions_data, []act.Action, error) {
76+
func t18_handle_state2(state gen.Atom, data t18_state_transitions_data, message t18_state2, proc gen.Process) (gen.Atom, t18_state_transitions_data, []act.Action, error) {
7777
data.transitions++
7878
return gen.Atom("state2"), data, nil, nil
7979
}
8080

81-
func t18_total_transitions(state gen.Atom, data t18_state_transitions_data, message t18_get_transitions, proc gen.Process) (gen.Atom, t18_state_transitions_data, int, []act.Action, error) {
81+
func t18_handle_get_state_transitions(state gen.Atom, data t18_state_transitions_data, message t18_get_state_transitions, proc gen.Process) (gen.Atom, t18_state_transitions_data, int, []act.Action, error) {
8282
return state, data, data.transitions, nil, nil
8383
}
8484

@@ -103,9 +103,9 @@ func (t *t18) TestStateMachine(input any) {
103103
}
104104

105105
// Query the data from the state machine (and test StateCallHandler behavior)
106-
result, err := t.Call(pid, t18_get_transitions{})
106+
result, err := t.Call(pid, t18_get_state_transitions{})
107107
if err != nil {
108-
t.Log().Error("call 't18_get_transitions' failed: %s", err)
108+
t.Log().Error("call 't18_get_state_transitions' failed: %s", err)
109109
t.testcase.err <- err
110110
return
111111
}

tests/001_local/t019_statemachine_events_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,13 @@ func (sm *t19_events) Init(args ...any) (act.StateMachineSpec[t19_events_data],
7474
act.WithEventHandler(gen.Event{Name: "testEvent", Node: "t19node@localhost"}, t19_handle_test_event),
7575

7676
// register call handler
77-
act.WithStateCallHandler(gen.Atom("state1"), t19_events_received),
77+
act.WithStateCallHandler(gen.Atom("state1"), t19_handle_get_events_received),
7878
)
7979

8080
return spec, nil
8181
}
8282

83-
func t19_events_received(state gen.Atom, data t19_events_data, event t19_get_events_received, proc gen.Process) (gen.Atom, t19_events_data, int, []act.Action, error) {
83+
func t19_handle_get_events_received(state gen.Atom, data t19_events_data, event t19_get_events_received, proc gen.Process) (gen.Atom, t19_events_data, int, []act.Action, error) {
8484
return state, data, data.eventsReceived, nil, nil
8585
}
8686

tests/001_local/t020_statemachine_state_enter_callback_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (sm *t20_state_enter_callback) Init(args ...any) (act.StateMachineSpec[t20_
6565
act.WithData(t20_state_enter_callback_data{currentState: gen.Atom("state1")}),
6666

6767
// with message handler for [state1] -> [state2]
68-
act.WithStateMessageHandler(gen.Atom("state1"), t20_move_to_state2),
68+
act.WithStateMessageHandler(gen.Atom("state1"), t20_handle_state2),
6969

7070
// with call handler to query the data
7171
act.WithStateCallHandler(gen.Atom("state1"), t20_state_and_callback_count),
@@ -78,7 +78,7 @@ func (sm *t20_state_enter_callback) Init(args ...any) (act.StateMachineSpec[t20_
7878
return spec, nil
7979
}
8080

81-
func t20_move_to_state2(state gen.Atom, data t20_state_enter_callback_data, message t20_state2, proc gen.Process) (gen.Atom, t20_state_enter_callback_data, []act.Action, error) {
81+
func t20_handle_state2(state gen.Atom, data t20_state_enter_callback_data, message t20_state2, proc gen.Process) (gen.Atom, t20_state_enter_callback_data, []act.Action, error) {
8282
return gen.Atom("state2"), data, nil, nil
8383
}
8484

tests/001_local/t022_statemachine_generic_timeout_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (t *t22) TestGenericTimeoutTimingOut(input any) {
150150
// We expect 1 timeout.
151151
timeouts = result.(int)
152152
if timeouts != 1 {
153-
t.testcase.err <- fmt.Errorf(">> expected 1 timeout, got %d", timeouts)
153+
t.testcase.err <- fmt.Errorf("expected 1 timeout, got %d", timeouts)
154154
return
155155
}
156156

0 commit comments

Comments
 (0)