Skip to content

Commit 78844bf

Browse files
authored
feat(spec): add init handler field (#1455)
* **New Features** * Added an Init handler that runs (after preconditions) before workflow steps; its status is surfaced and failure prevents steps from running. * Added a canonical Abort handler and retained Cancel as deprecated. * Runtime and status now include OnInit nodes; option to skip inheriting base handlers for sub-DAG runs. * **Documentation** * Schema updated to document init and abort hooks and lifecycle ordering. * **Tests** * New integration tests for Init, abort/cancel behavior, skipping base handlers, and lifecycle paths.
1 parent 722028d commit 78844bf

13 files changed

Lines changed: 533 additions & 13 deletions

File tree

internal/cmd/enqueue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ func runEnqueue(ctx *Context, args []string) error {
5454
return fmt.Errorf("failed to get queue override: %w", err)
5555
}
5656

57-
// Load parameters and DAG
58-
dag, _, err := loadDAGWithParams(ctx, args)
57+
// Load parameters and DAG (enqueue is always for root DAGs, not sub-DAGs)
58+
dag, _, err := loadDAGWithParams(ctx, args, false)
5959
if err != nil {
6060
return err
6161
}

internal/cmd/start.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func runStart(ctx *Context, args []string) error {
132132
}
133133
} else {
134134
// Load parameters and DAG
135-
dag, params, err = loadDAGWithParams(ctx, args)
135+
dag, params, err = loadDAGWithParams(ctx, args, isSubDAGRun)
136136
if err != nil {
137137
return err
138138
}
@@ -302,8 +302,9 @@ func getDAGRunInfo(ctx *Context) (dagRunID, rootDAGRun, parentDAGRun string, isS
302302
return dagRunID, rootDAGRun, parentDAGRun, isSubDAGRun, nil
303303
}
304304

305-
// loadDAGWithParams loads the DAG and its parameters from command arguments
306-
func loadDAGWithParams(ctx *Context, args []string) (*core.DAG, string, error) {
305+
// loadDAGWithParams loads the DAG and its parameters from command arguments.
306+
// When isSubDAGRun is true, handlers from base config are skipped to prevent inheritance.
307+
func loadDAGWithParams(ctx *Context, args []string, isSubDAGRun bool) (*core.DAG, string, error) {
307308
var dagPath string
308309
var interactiveParams string
309310

@@ -350,6 +351,12 @@ func loadDAGWithParams(ctx *Context, args []string) (*core.DAG, string, error) {
350351
spec.WithDAGsDir(ctx.Config.Paths.DAGsDir),
351352
}
352353

354+
// Skip base handlers for sub-DAG runs to prevent handler inheritance.
355+
// Sub-DAGs should define their own handlers explicitly if needed.
356+
if isSubDAGRun {
357+
loadOpts = append(loadOpts, spec.WithSkipBaseHandlers())
358+
}
359+
353360
// Get name override from flags if provided
354361
nameOverride, err := ctx.StringParam("name")
355362
if err != nil {

internal/core/dag.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,7 @@ func (s *Schedule) UnmarshalJSON(data []byte) error {
474474

475475
// HandlerOn contains the steps to be executed on different events in the DAG.
476476
type HandlerOn struct {
477+
Init *Step `json:"init,omitempty"`
477478
Failure *Step `json:"failure,omitempty"`
478479
Success *Step `json:"success,omitempty"`
479480
Cancel *Step `json:"cancel,omitempty"`
@@ -516,6 +517,7 @@ type OTelConfig struct {
516517
type HandlerType string
517518

518519
const (
520+
HandlerOnInit HandlerType = "onInit"
519521
HandlerOnSuccess HandlerType = "onSuccess"
520522
HandlerOnFailure HandlerType = "onFailure"
521523
HandlerOnCancel HandlerType = "onCancel"

internal/core/execution/runstatus.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func InitialStatus(dag *core.DAG) DAGRunStatus {
1717
Status: core.NotStarted,
1818
PID: PID(0),
1919
Nodes: NewNodesFromSteps(dag.Steps),
20+
OnInit: NewNodeOrNil(dag.HandlerOn.Init),
2021
OnExit: NewNodeOrNil(dag.HandlerOn.Exit),
2122
OnSuccess: NewNodeOrNil(dag.HandlerOn.Success),
2223
OnFailure: NewNodeOrNil(dag.HandlerOn.Failure),
@@ -40,6 +41,7 @@ type DAGRunStatus struct {
4041
Status core.Status `json:"status"`
4142
PID PID `json:"pid,omitempty"`
4243
Nodes []*Node `json:"nodes,omitempty"`
44+
OnInit *Node `json:"onInit,omitempty"`
4345
OnExit *Node `json:"onExit,omitempty"`
4446
OnSuccess *Node `json:"onSuccess,omitempty"`
4547
OnFailure *Node `json:"onFailure,omitempty"`
@@ -67,6 +69,9 @@ func (st *DAGRunStatus) Errors() []error {
6769
errs = append(errs, fmt.Errorf("node %s: %s", node.Step.Name, node.Error))
6870
}
6971
}
72+
if st.OnInit != nil && st.OnInit.Error != "" {
73+
errs = append(errs, fmt.Errorf("onInit: %s", st.OnInit.Error))
74+
}
7075
if st.OnExit != nil && st.OnExit.Error != "" {
7176
errs = append(errs, fmt.Errorf("onExit: %s", st.OnExit.Error))
7277
}
@@ -89,6 +94,9 @@ func (st *DAGRunStatus) NodeByName(name string) (*Node, error) {
8994
return node, nil
9095
}
9196
}
97+
if st.OnInit != nil && st.OnInit.Step.Name == name {
98+
return st.OnInit, nil
99+
}
92100
if st.OnExit != nil && st.OnExit.Step.Name == name {
93101
return st.OnExit, nil
94102
}

internal/core/spec/builder.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const (
6262
BuildFlagOnlyMetadata
6363
BuildFlagAllowBuildErrors
6464
BuildFlagSkipSchemaValidation
65+
BuildFlagSkipBaseHandlers // Skip merging handlerOn from base config (for sub-DAG runs)
6566
)
6667

6768
// BuildOpts is used to control the behavior of the builder.
@@ -727,6 +728,13 @@ func buildLogDir(_ BuildContext, spec *definition, dag *core.DAG) (err error) {
727728
func buildHandlers(ctx BuildContext, spec *definition, dag *core.DAG) (err error) {
728729
buildCtx := StepBuildContext{BuildContext: ctx, dag: dag}
729730

731+
if spec.HandlerOn.Init != nil {
732+
spec.HandlerOn.Init.Name = core.HandlerOnInit.String()
733+
if dag.HandlerOn.Init, err = buildStep(buildCtx, *spec.HandlerOn.Init); err != nil {
734+
return err
735+
}
736+
}
737+
730738
if spec.HandlerOn.Exit != nil {
731739
spec.HandlerOn.Exit.Name = core.HandlerOnExit.String()
732740
if dag.HandlerOn.Exit, err = buildStep(buildCtx, *spec.HandlerOn.Exit); err != nil {
@@ -748,9 +756,21 @@ func buildHandlers(ctx BuildContext, spec *definition, dag *core.DAG) (err error
748756
}
749757
}
750758

751-
if spec.HandlerOn.Cancel != nil {
752-
spec.HandlerOn.Cancel.Name = core.HandlerOnCancel.String()
753-
if dag.HandlerOn.Cancel, err = buildStep(buildCtx, *spec.HandlerOn.Cancel); err != nil {
759+
// Handle Abort (canonical) and Cancel (deprecated, for backward compatibility)
760+
// Error if both are specified
761+
if spec.HandlerOn.Abort != nil && spec.HandlerOn.Cancel != nil {
762+
return fmt.Errorf("cannot specify both 'abort' and 'cancel' in handlerOn; use 'abort' (cancel is deprecated)")
763+
}
764+
var abortDef *stepDef
765+
switch {
766+
case spec.HandlerOn.Abort != nil:
767+
abortDef = spec.HandlerOn.Abort
768+
case spec.HandlerOn.Cancel != nil:
769+
abortDef = spec.HandlerOn.Cancel
770+
}
771+
if abortDef != nil {
772+
abortDef.Name = core.HandlerOnCancel.String()
773+
if dag.HandlerOn.Cancel, err = buildStep(buildCtx, *abortDef); err != nil {
754774
return
755775
}
756776
}

internal/core/spec/definition.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,11 @@ type definition struct {
9292

9393
// handlerOnDef defines the steps to be executed on different events.
9494
type handlerOnDef struct {
95+
Init *stepDef // Step to execute before steps (after preconditions pass)
9596
Failure *stepDef // Step to execute on failure
9697
Success *stepDef // Step to execute on success
97-
Cancel *stepDef // Step to execute on cancel
98+
Abort *stepDef // Step to execute on abort (canonical field)
99+
Cancel *stepDef // Step to execute on cancel (deprecated: use Abort instead)
98100
Exit *stepDef // Step to execute on exit
99101
}
100102

internal/core/spec/loader.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,15 @@ func SkipSchemaValidation() LoadOption {
109109
}
110110
}
111111

112+
// WithSkipBaseHandlers skips merging handlerOn from base config.
113+
// This is used for sub-DAG runs to prevent handler inheritance from base config.
114+
// Sub-DAGs should have their own handlers defined explicitly if needed.
115+
func WithSkipBaseHandlers() LoadOption {
116+
return func(o *LoadOptions) {
117+
o.flags |= BuildFlagSkipBaseHandlers
118+
}
119+
}
120+
112121
// Load loads a Directed Acyclic Graph (core.DAG) from a file path or name with the given options.
113122
//
114123
// The function handles different input formats:
@@ -343,6 +352,13 @@ func loadDAGsFromFile(ctx BuildContext, filePath string, baseDef *definition) ([
343352
if err != nil {
344353
return nil, fmt.Errorf("failed to build base core.DAG for document %d: %w", docIndex, err)
345354
}
355+
356+
// Skip handlers from base config for sub-DAG runs to prevent inheritance.
357+
// Sub-DAGs should define their own handlers explicitly if needed.
358+
if ctx.opts.Has(BuildFlagSkipBaseHandlers) {
359+
baseDAG.HandlerOn = core.HandlerOn{}
360+
}
361+
346362
dest = baseDAG
347363
} else {
348364
dest = new(core.DAG)

internal/integration/base_dag_test.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"os"
55
"path/filepath"
66
"testing"
7+
"time"
78

89
"github.com/dagu-org/dagu/internal/core"
910
"github.com/dagu-org/dagu/internal/core/execution"
@@ -94,3 +95,162 @@ func TestBaseDAGSpecialEnvVarsInHandler(t *testing.T) {
9495
require.NotContains(t, outputStr, "DAG_RUN_ID=\n", "DAG_RUN_ID should not be empty")
9596
require.NotContains(t, outputStr, "DAG_RUN_LOG_FILE=\n", "DAG_RUN_LOG_FILE should not be empty")
9697
}
98+
99+
func TestSkipBaseHandlers_SubDAGDoesNotInheritHandlers(t *testing.T) {
100+
t.Parallel()
101+
102+
// Create a temp directory to store base config
103+
tmpDir := t.TempDir()
104+
baseConfigPath := filepath.Join(tmpDir, "base.yaml")
105+
markerFile := filepath.Join(tmpDir, "marker.txt")
106+
107+
// Create base DAG with handlerOn: failure that writes a marker file
108+
baseConfig := `handlerOn:
109+
failure:
110+
command: echo "BASE_FAILURE_HANDLER_RAN" >> ` + markerFile + `
111+
`
112+
require.NoError(t, os.WriteFile(baseConfigPath, []byte(baseConfig), 0600))
113+
114+
// Load a DAG WITHOUT skip base handlers - should have handler
115+
th := test.Setup(t)
116+
dagContent := `steps:
117+
- name: failing-step
118+
command: exit 1
119+
`
120+
dagFile := th.CreateDAGFile(t, th.Config.Paths.DAGsDir, "test-no-skip", []byte(dagContent))
121+
122+
// Load without skip - should have failure handler from base config
123+
dagWithHandler, err := spec.Load(th.Context, dagFile, spec.WithBaseConfig(baseConfigPath))
124+
require.NoError(t, err)
125+
require.NotNil(t, dagWithHandler.HandlerOn.Failure, "failure handler from base config should be set")
126+
127+
// Load WITH skip base handlers - should NOT have handler
128+
dagWithoutHandler, err := spec.Load(th.Context, dagFile, spec.WithBaseConfig(baseConfigPath), spec.WithSkipBaseHandlers())
129+
require.NoError(t, err)
130+
require.Nil(t, dagWithoutHandler.HandlerOn.Failure, "failure handler should NOT be inherited when skip flag is set")
131+
}
132+
133+
func TestSkipBaseHandlers_ExplicitHandlersStillWork(t *testing.T) {
134+
t.Parallel()
135+
136+
// Create a temp directory to store base config
137+
tmpDir := t.TempDir()
138+
baseConfigPath := filepath.Join(tmpDir, "base.yaml")
139+
baseMarkerFile := filepath.Join(tmpDir, "base_marker.txt")
140+
dagMarkerFile := filepath.Join(tmpDir, "dag_marker.txt")
141+
142+
// Create base DAG with handlerOn: failure
143+
baseConfig := `handlerOn:
144+
failure:
145+
command: echo "BASE" >> ` + baseMarkerFile + `
146+
`
147+
require.NoError(t, os.WriteFile(baseConfigPath, []byte(baseConfig), 0600))
148+
149+
// Setup test helper
150+
th := test.Setup(t)
151+
152+
// Create a DAG file with its own failure handler
153+
dagContent := `handlerOn:
154+
failure:
155+
command: echo "DAG" >> ` + dagMarkerFile + `
156+
157+
steps:
158+
- name: failing-step
159+
command: exit 1
160+
`
161+
dagFile := th.CreateDAGFile(t, th.Config.Paths.DAGsDir, "test-explicit-handler", []byte(dagContent))
162+
163+
// Load WITH skip base handlers - should have DAG's own handler
164+
dag, err := spec.Load(th.Context, dagFile, spec.WithBaseConfig(baseConfigPath), spec.WithSkipBaseHandlers())
165+
require.NoError(t, err)
166+
require.NotNil(t, dag.HandlerOn.Failure, "DAG's own failure handler should be present")
167+
168+
// Run the DAG
169+
dagRunID := uuid.New().String()
170+
logDir := th.Config.Paths.LogDir
171+
logFile := filepath.Join(logDir, dagRunID+".log")
172+
root := execution.NewDAGRunRef(dag.Name, dagRunID)
173+
174+
drm := runtimepkg.NewManager(th.DAGRunStore, th.ProcStore, th.Config)
175+
176+
a := agent.New(
177+
dagRunID,
178+
dag,
179+
logDir,
180+
logFile,
181+
drm,
182+
th.DAGStore,
183+
th.DAGRunStore,
184+
th.ServiceRegistry,
185+
root,
186+
th.Config.Global.Peer,
187+
agent.Options{},
188+
)
189+
190+
// Run the agent - expect failure
191+
err = a.Run(th.Context)
192+
require.Error(t, err)
193+
194+
// Wait a bit for the handler file to be written
195+
time.Sleep(100 * time.Millisecond)
196+
197+
// Verify DAG's own handler ran
198+
dagOutput, err := os.ReadFile(dagMarkerFile)
199+
require.NoError(t, err, "DAG's failure handler should have written marker file")
200+
require.Contains(t, string(dagOutput), "DAG", "DAG's own failure handler should have run")
201+
202+
// Verify base handler did NOT run
203+
_, err = os.ReadFile(baseMarkerFile)
204+
require.True(t, os.IsNotExist(err), "base failure handler should NOT have run")
205+
}
206+
207+
func TestSkipBaseHandlers_AllHandlerTypesSkipped(t *testing.T) {
208+
t.Parallel()
209+
210+
// Create a temp directory to store base config
211+
tmpDir := t.TempDir()
212+
baseConfigPath := filepath.Join(tmpDir, "base.yaml")
213+
214+
// Create base DAG with all handler types
215+
baseConfig := `handlerOn:
216+
init:
217+
command: "true"
218+
success:
219+
command: "true"
220+
failure:
221+
command: "true"
222+
abort:
223+
command: "true"
224+
exit:
225+
command: "true"
226+
`
227+
require.NoError(t, os.WriteFile(baseConfigPath, []byte(baseConfig), 0600))
228+
229+
// Setup test helper
230+
th := test.Setup(t)
231+
232+
// Create a DAG file
233+
dagContent := `steps:
234+
- name: step1
235+
command: "true"
236+
`
237+
dagFile := th.CreateDAGFile(t, th.Config.Paths.DAGsDir, "test-all-handlers", []byte(dagContent))
238+
239+
// Load without skip - all handlers should be set
240+
dagWithHandlers, err := spec.Load(th.Context, dagFile, spec.WithBaseConfig(baseConfigPath))
241+
require.NoError(t, err)
242+
require.NotNil(t, dagWithHandlers.HandlerOn.Init, "init handler should be set")
243+
require.NotNil(t, dagWithHandlers.HandlerOn.Success, "success handler should be set")
244+
require.NotNil(t, dagWithHandlers.HandlerOn.Failure, "failure handler should be set")
245+
require.NotNil(t, dagWithHandlers.HandlerOn.Cancel, "abort/cancel handler should be set")
246+
require.NotNil(t, dagWithHandlers.HandlerOn.Exit, "exit handler should be set")
247+
248+
// Load WITH skip - no handlers should be set
249+
dagWithoutHandlers, err := spec.Load(th.Context, dagFile, spec.WithBaseConfig(baseConfigPath), spec.WithSkipBaseHandlers())
250+
require.NoError(t, err)
251+
require.Nil(t, dagWithoutHandlers.HandlerOn.Init, "init handler should NOT be set")
252+
require.Nil(t, dagWithoutHandlers.HandlerOn.Success, "success handler should NOT be set")
253+
require.Nil(t, dagWithoutHandlers.HandlerOn.Failure, "failure handler should NOT be set")
254+
require.Nil(t, dagWithoutHandlers.HandlerOn.Cancel, "abort/cancel handler should NOT be set")
255+
require.Nil(t, dagWithoutHandlers.HandlerOn.Exit, "exit handler should NOT be set")
256+
}

0 commit comments

Comments
 (0)