Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/cmd/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func runEnqueue(ctx *Context, args []string) error {
return fmt.Errorf("failed to get queue override: %w", err)
}

// Load parameters and DAG
dag, _, err := loadDAGWithParams(ctx, args)
// Load parameters and DAG (enqueue is always for root DAGs, not sub-DAGs)
dag, _, err := loadDAGWithParams(ctx, args, false)
if err != nil {
return err
}
Expand Down
13 changes: 10 additions & 3 deletions internal/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func runStart(ctx *Context, args []string) error {
}
} else {
// Load parameters and DAG
dag, params, err = loadDAGWithParams(ctx, args)
dag, params, err = loadDAGWithParams(ctx, args, isSubDAGRun)
if err != nil {
return err
}
Expand Down Expand Up @@ -302,8 +302,9 @@ func getDAGRunInfo(ctx *Context) (dagRunID, rootDAGRun, parentDAGRun string, isS
return dagRunID, rootDAGRun, parentDAGRun, isSubDAGRun, nil
}

// loadDAGWithParams loads the DAG and its parameters from command arguments
func loadDAGWithParams(ctx *Context, args []string) (*core.DAG, string, error) {
// loadDAGWithParams loads the DAG and its parameters from command arguments.
// When isSubDAGRun is true, handlers from base config are skipped to prevent inheritance.
func loadDAGWithParams(ctx *Context, args []string, isSubDAGRun bool) (*core.DAG, string, error) {
var dagPath string
var interactiveParams string

Expand Down Expand Up @@ -350,6 +351,12 @@ func loadDAGWithParams(ctx *Context, args []string) (*core.DAG, string, error) {
spec.WithDAGsDir(ctx.Config.Paths.DAGsDir),
}

// Skip base handlers for sub-DAG runs to prevent handler inheritance.
// Sub-DAGs should define their own handlers explicitly if needed.
if isSubDAGRun {
loadOpts = append(loadOpts, spec.WithSkipBaseHandlers())
}

// Get name override from flags if provided
nameOverride, err := ctx.StringParam("name")
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions internal/core/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ func (s *Schedule) UnmarshalJSON(data []byte) error {

// HandlerOn contains the steps to be executed on different events in the DAG.
type HandlerOn struct {
Init *Step `json:"init,omitempty"`
Failure *Step `json:"failure,omitempty"`
Success *Step `json:"success,omitempty"`
Cancel *Step `json:"cancel,omitempty"`
Expand Down Expand Up @@ -516,6 +517,7 @@ type OTelConfig struct {
type HandlerType string

const (
HandlerOnInit HandlerType = "onInit"
HandlerOnSuccess HandlerType = "onSuccess"
HandlerOnFailure HandlerType = "onFailure"
HandlerOnCancel HandlerType = "onCancel"
Expand Down
8 changes: 8 additions & 0 deletions internal/core/execution/runstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func InitialStatus(dag *core.DAG) DAGRunStatus {
Status: core.NotStarted,
PID: PID(0),
Nodes: NewNodesFromSteps(dag.Steps),
OnInit: NewNodeOrNil(dag.HandlerOn.Init),
OnExit: NewNodeOrNil(dag.HandlerOn.Exit),
OnSuccess: NewNodeOrNil(dag.HandlerOn.Success),
OnFailure: NewNodeOrNil(dag.HandlerOn.Failure),
Expand All @@ -40,6 +41,7 @@ type DAGRunStatus struct {
Status core.Status `json:"status"`
PID PID `json:"pid,omitempty"`
Nodes []*Node `json:"nodes,omitempty"`
OnInit *Node `json:"onInit,omitempty"`
OnExit *Node `json:"onExit,omitempty"`
OnSuccess *Node `json:"onSuccess,omitempty"`
OnFailure *Node `json:"onFailure,omitempty"`
Expand Down Expand Up @@ -67,6 +69,9 @@ func (st *DAGRunStatus) Errors() []error {
errs = append(errs, fmt.Errorf("node %s: %s", node.Step.Name, node.Error))
}
}
if st.OnInit != nil && st.OnInit.Error != "" {
errs = append(errs, fmt.Errorf("onInit: %s", st.OnInit.Error))
}
if st.OnExit != nil && st.OnExit.Error != "" {
errs = append(errs, fmt.Errorf("onExit: %s", st.OnExit.Error))
}
Expand All @@ -89,6 +94,9 @@ func (st *DAGRunStatus) NodeByName(name string) (*Node, error) {
return node, nil
}
}
if st.OnInit != nil && st.OnInit.Step.Name == name {
return st.OnInit, nil
}
if st.OnExit != nil && st.OnExit.Step.Name == name {
return st.OnExit, nil
}
Expand Down
26 changes: 23 additions & 3 deletions internal/core/spec/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
BuildFlagOnlyMetadata
BuildFlagAllowBuildErrors
BuildFlagSkipSchemaValidation
BuildFlagSkipBaseHandlers // Skip merging handlerOn from base config (for sub-DAG runs)
)

// BuildOpts is used to control the behavior of the builder.
Expand Down Expand Up @@ -727,6 +728,13 @@ func buildLogDir(_ BuildContext, spec *definition, dag *core.DAG) (err error) {
func buildHandlers(ctx BuildContext, spec *definition, dag *core.DAG) (err error) {
buildCtx := StepBuildContext{BuildContext: ctx, dag: dag}

if spec.HandlerOn.Init != nil {
spec.HandlerOn.Init.Name = core.HandlerOnInit.String()
if dag.HandlerOn.Init, err = buildStep(buildCtx, *spec.HandlerOn.Init); err != nil {
return err
}
}

if spec.HandlerOn.Exit != nil {
spec.HandlerOn.Exit.Name = core.HandlerOnExit.String()
if dag.HandlerOn.Exit, err = buildStep(buildCtx, *spec.HandlerOn.Exit); err != nil {
Expand All @@ -748,9 +756,21 @@ func buildHandlers(ctx BuildContext, spec *definition, dag *core.DAG) (err error
}
}

if spec.HandlerOn.Cancel != nil {
spec.HandlerOn.Cancel.Name = core.HandlerOnCancel.String()
if dag.HandlerOn.Cancel, err = buildStep(buildCtx, *spec.HandlerOn.Cancel); err != nil {
// Handle Abort (canonical) and Cancel (deprecated, for backward compatibility)
// Error if both are specified
if spec.HandlerOn.Abort != nil && spec.HandlerOn.Cancel != nil {
return fmt.Errorf("cannot specify both 'abort' and 'cancel' in handlerOn; use 'abort' (cancel is deprecated)")
}
var abortDef *stepDef
switch {
case spec.HandlerOn.Abort != nil:
abortDef = spec.HandlerOn.Abort
case spec.HandlerOn.Cancel != nil:
abortDef = spec.HandlerOn.Cancel
}
if abortDef != nil {
abortDef.Name = core.HandlerOnCancel.String()
if dag.HandlerOn.Cancel, err = buildStep(buildCtx, *abortDef); err != nil {
return
}
}
Expand Down
4 changes: 3 additions & 1 deletion internal/core/spec/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,11 @@ type definition struct {

// handlerOnDef defines the steps to be executed on different events.
type handlerOnDef struct {
Init *stepDef // Step to execute before steps (after preconditions pass)
Failure *stepDef // Step to execute on failure
Success *stepDef // Step to execute on success
Cancel *stepDef // Step to execute on cancel
Abort *stepDef // Step to execute on abort (canonical field)
Cancel *stepDef // Step to execute on cancel (deprecated: use Abort instead)
Exit *stepDef // Step to execute on exit
}

Expand Down
16 changes: 16 additions & 0 deletions internal/core/spec/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ func SkipSchemaValidation() LoadOption {
}
}

// WithSkipBaseHandlers skips merging handlerOn from base config.
// This is used for sub-DAG runs to prevent handler inheritance from base config.
// Sub-DAGs should have their own handlers defined explicitly if needed.
func WithSkipBaseHandlers() LoadOption {
return func(o *LoadOptions) {
o.flags |= BuildFlagSkipBaseHandlers
}
}

// Load loads a Directed Acyclic Graph (core.DAG) from a file path or name with the given options.
//
// The function handles different input formats:
Expand Down Expand Up @@ -343,6 +352,13 @@ func loadDAGsFromFile(ctx BuildContext, filePath string, baseDef *definition) ([
if err != nil {
return nil, fmt.Errorf("failed to build base core.DAG for document %d: %w", docIndex, err)
}

// Skip handlers from base config for sub-DAG runs to prevent inheritance.
// Sub-DAGs should define their own handlers explicitly if needed.
if ctx.opts.Has(BuildFlagSkipBaseHandlers) {
baseDAG.HandlerOn = core.HandlerOn{}
}

dest = baseDAG
} else {
dest = new(core.DAG)
Expand Down
160 changes: 160 additions & 0 deletions internal/integration/base_dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/dagu-org/dagu/internal/core"
"github.com/dagu-org/dagu/internal/core/execution"
Expand Down Expand Up @@ -94,3 +95,162 @@ func TestBaseDAGSpecialEnvVarsInHandler(t *testing.T) {
require.NotContains(t, outputStr, "DAG_RUN_ID=\n", "DAG_RUN_ID should not be empty")
require.NotContains(t, outputStr, "DAG_RUN_LOG_FILE=\n", "DAG_RUN_LOG_FILE should not be empty")
}

func TestSkipBaseHandlers_SubDAGDoesNotInheritHandlers(t *testing.T) {
t.Parallel()

// Create a temp directory to store base config
tmpDir := t.TempDir()
baseConfigPath := filepath.Join(tmpDir, "base.yaml")
markerFile := filepath.Join(tmpDir, "marker.txt")

// Create base DAG with handlerOn: failure that writes a marker file
baseConfig := `handlerOn:
failure:
command: echo "BASE_FAILURE_HANDLER_RAN" >> ` + markerFile + `
`
require.NoError(t, os.WriteFile(baseConfigPath, []byte(baseConfig), 0600))

// Load a DAG WITHOUT skip base handlers - should have handler
th := test.Setup(t)
dagContent := `steps:
- name: failing-step
command: exit 1
`
dagFile := th.CreateDAGFile(t, th.Config.Paths.DAGsDir, "test-no-skip", []byte(dagContent))

// Load without skip - should have failure handler from base config
dagWithHandler, err := spec.Load(th.Context, dagFile, spec.WithBaseConfig(baseConfigPath))
require.NoError(t, err)
require.NotNil(t, dagWithHandler.HandlerOn.Failure, "failure handler from base config should be set")

// Load WITH skip base handlers - should NOT have handler
dagWithoutHandler, err := spec.Load(th.Context, dagFile, spec.WithBaseConfig(baseConfigPath), spec.WithSkipBaseHandlers())
require.NoError(t, err)
require.Nil(t, dagWithoutHandler.HandlerOn.Failure, "failure handler should NOT be inherited when skip flag is set")
}

func TestSkipBaseHandlers_ExplicitHandlersStillWork(t *testing.T) {
t.Parallel()

// Create a temp directory to store base config
tmpDir := t.TempDir()
baseConfigPath := filepath.Join(tmpDir, "base.yaml")
baseMarkerFile := filepath.Join(tmpDir, "base_marker.txt")
dagMarkerFile := filepath.Join(tmpDir, "dag_marker.txt")

// Create base DAG with handlerOn: failure
baseConfig := `handlerOn:
failure:
command: echo "BASE" >> ` + baseMarkerFile + `
`
require.NoError(t, os.WriteFile(baseConfigPath, []byte(baseConfig), 0600))

// Setup test helper
th := test.Setup(t)

// Create a DAG file with its own failure handler
dagContent := `handlerOn:
failure:
command: echo "DAG" >> ` + dagMarkerFile + `

steps:
- name: failing-step
command: exit 1
`
dagFile := th.CreateDAGFile(t, th.Config.Paths.DAGsDir, "test-explicit-handler", []byte(dagContent))

// Load WITH skip base handlers - should have DAG's own handler
dag, err := spec.Load(th.Context, dagFile, spec.WithBaseConfig(baseConfigPath), spec.WithSkipBaseHandlers())
require.NoError(t, err)
require.NotNil(t, dag.HandlerOn.Failure, "DAG's own failure handler should be present")

// Run the DAG
dagRunID := uuid.New().String()
logDir := th.Config.Paths.LogDir
logFile := filepath.Join(logDir, dagRunID+".log")
root := execution.NewDAGRunRef(dag.Name, dagRunID)

drm := runtimepkg.NewManager(th.DAGRunStore, th.ProcStore, th.Config)

a := agent.New(
dagRunID,
dag,
logDir,
logFile,
drm,
th.DAGStore,
th.DAGRunStore,
th.ServiceRegistry,
root,
th.Config.Global.Peer,
agent.Options{},
)

// Run the agent - expect failure
err = a.Run(th.Context)
require.Error(t, err)

// Wait a bit for the handler file to be written
time.Sleep(100 * time.Millisecond)

// Verify DAG's own handler ran
dagOutput, err := os.ReadFile(dagMarkerFile)
require.NoError(t, err, "DAG's failure handler should have written marker file")
require.Contains(t, string(dagOutput), "DAG", "DAG's own failure handler should have run")

// Verify base handler did NOT run
_, err = os.ReadFile(baseMarkerFile)
require.True(t, os.IsNotExist(err), "base failure handler should NOT have run")
}

func TestSkipBaseHandlers_AllHandlerTypesSkipped(t *testing.T) {
t.Parallel()

// Create a temp directory to store base config
tmpDir := t.TempDir()
baseConfigPath := filepath.Join(tmpDir, "base.yaml")

// Create base DAG with all handler types
baseConfig := `handlerOn:
init:
command: "true"
success:
command: "true"
failure:
command: "true"
abort:
command: "true"
exit:
command: "true"
`
require.NoError(t, os.WriteFile(baseConfigPath, []byte(baseConfig), 0600))

// Setup test helper
th := test.Setup(t)

// Create a DAG file
dagContent := `steps:
- name: step1
command: "true"
`
dagFile := th.CreateDAGFile(t, th.Config.Paths.DAGsDir, "test-all-handlers", []byte(dagContent))

// Load without skip - all handlers should be set
dagWithHandlers, err := spec.Load(th.Context, dagFile, spec.WithBaseConfig(baseConfigPath))
require.NoError(t, err)
require.NotNil(t, dagWithHandlers.HandlerOn.Init, "init handler should be set")
require.NotNil(t, dagWithHandlers.HandlerOn.Success, "success handler should be set")
require.NotNil(t, dagWithHandlers.HandlerOn.Failure, "failure handler should be set")
require.NotNil(t, dagWithHandlers.HandlerOn.Cancel, "abort/cancel handler should be set")
require.NotNil(t, dagWithHandlers.HandlerOn.Exit, "exit handler should be set")

// Load WITH skip - no handlers should be set
dagWithoutHandlers, err := spec.Load(th.Context, dagFile, spec.WithBaseConfig(baseConfigPath), spec.WithSkipBaseHandlers())
require.NoError(t, err)
require.Nil(t, dagWithoutHandlers.HandlerOn.Init, "init handler should NOT be set")
require.Nil(t, dagWithoutHandlers.HandlerOn.Success, "success handler should NOT be set")
require.Nil(t, dagWithoutHandlers.HandlerOn.Failure, "failure handler should NOT be set")
require.Nil(t, dagWithoutHandlers.HandlerOn.Cancel, "abort/cancel handler should NOT be set")
require.Nil(t, dagWithoutHandlers.HandlerOn.Exit, "exit handler should NOT be set")
}
Loading
Loading