diff --git a/charts/dagu/README.md b/charts/dagu/README.md index f108d13bc3..da02415241 100644 --- a/charts/dagu/README.md +++ b/charts/dagu/README.md @@ -34,7 +34,7 @@ The chart deploys four components: - **Coordinator**: gRPC server for distributed task execution (port 50055) - **Scheduler**: Manages DAG execution schedules (port 8090 for health) -- **Worker**: Executes DAG steps (2 replicas by default) +- **Worker**: Executes DAG steps (configurable pools with independent replicas) - **UI**: Web interface for managing DAGs (port 8080) All components share a single PersistentVolumeClaim with `ReadWriteMany` access mode. @@ -56,7 +56,57 @@ For local single-node clusters that don't support RWX: helm install dagu charts/dagu \ --set persistence.accessMode=ReadWriteOnce \ --set persistence.skipValidation=true \ - --set worker.replicas=1 + --set workerPools.general.replicas=1 +``` + +### Worker Pools + +Workers are organized into pools. Each pool creates a separate Kubernetes Deployment with its own replicas, labels, resources, and scheduling constraints. DAGs select workers via `workerSelector` labels that match a pool's labels. + +```yaml +workerPools: + general: + replicas: 2 + labels: {} + resources: + requests: + memory: "128Mi" + cpu: "100m" + limits: + memory: "256Mi" + cpu: "200m" + nodeSelector: {} + tolerations: [] + affinity: {} + + gpu: + replicas: 1 + labels: + gpu: "true" + resources: + requests: + memory: "512Mi" + cpu: "500m" + nvidia.com/gpu: "1" + limits: + memory: "1Gi" + cpu: "1000m" + nvidia.com/gpu: "1" + nodeSelector: + nvidia.com/gpu.present: "true" + tolerations: + - key: nvidia.com/gpu + operator: Exists + effect: NoSchedule + affinity: {} +``` + +A pool with `labels: {}` (like `general` above) matches any DAG that has no `workerSelector`. To route a DAG to a specific pool, set `workerSelector` in the DAG definition to match the pool's labels: + +```yaml +# In your DAG file +workerSelector: + gpu: "true" ``` ### Authentication @@ -101,12 +151,14 @@ scheduler: memory: "256Mi" cpu: "250m" -worker: - replicas: 2 - resources: - requests: - memory: "128Mi" - cpu: "100m" +workerPools: + general: + replicas: 2 + labels: {} + resources: + requests: + memory: "128Mi" + cpu: "100m" ui: replicas: 1 diff --git a/charts/dagu/templates/NOTES.txt b/charts/dagu/templates/NOTES.txt index c504609594..2d2ddc1a45 100644 --- a/charts/dagu/templates/NOTES.txt +++ b/charts/dagu/templates/NOTES.txt @@ -5,7 +5,11 @@ Make sure your storage class supports RWX access mode (e.g., NFS, EFS, CephFS). Components deployed: Scheduler: {{ .Values.scheduler.replicas }} replica(s) - Worker: {{ .Values.worker.replicas }} replica(s) + Worker Pools: + {{- range $poolName, $pool := .Values.workerPools }} + - {{ $poolName }}: {{ $pool.replicas }} replica(s) + {{- if $pool.labels }} (labels: {{ include "dagu.workerLabels" $pool.labels }}){{- end }} + {{- end }} UI: {{ .Values.ui.replicas }} replica(s) Access the UI: diff --git a/charts/dagu/templates/_helpers.tpl b/charts/dagu/templates/_helpers.tpl index d07974b325..9e966bcd60 100644 --- a/charts/dagu/templates/_helpers.tpl +++ b/charts/dagu/templates/_helpers.tpl @@ -22,3 +22,15 @@ app.kubernetes.io/managed-by: {{ .Release.Service }} helm.sh/chart: {{ .Chart.Name }}-{{ .Chart.Version | replace "+" "_" }} {{- end }} +{{- define "dagu.selectorLabels" -}} +app.kubernetes.io/name: {{ include "dagu.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{- define "dagu.workerLabels" -}} +{{- $pairs := list -}} +{{- range $key, $value := . -}} +{{- $pairs = append $pairs (printf "%s=%v" $key $value) -}} +{{- end -}} +{{- join "," $pairs -}} +{{- end }} diff --git a/charts/dagu/templates/configmap.yaml b/charts/dagu/templates/configmap.yaml index 4c44b5affa..779bfc29ea 100644 --- a/charts/dagu/templates/configmap.yaml +++ b/charts/dagu/templates/configmap.yaml @@ -15,6 +15,7 @@ data: host: "0.0.0.0" port: 8080 apiBasePath: "/api/v1" + defaultExecutionMode: "distributed" # Coordinator (distributed execution) coordinator: diff --git a/charts/dagu/templates/worker-deployment.yaml b/charts/dagu/templates/worker-deployment.yaml index fdf431a29e..ad3e779af7 100644 --- a/charts/dagu/templates/worker-deployment.yaml +++ b/charts/dagu/templates/worker-deployment.yaml @@ -1,34 +1,46 @@ +{{- range $poolName, $pool := .Values.workerPools }} +{{- if not (regexMatch "^[a-z][a-z0-9-]*$" $poolName) }} +{{- fail (printf "invalid workerPool name %q: must match ^[a-z][a-z0-9-]*$" $poolName) }} +{{- end }} +--- apiVersion: apps/v1 kind: Deployment metadata: - name: {{ include "dagu.fullname" . }}-worker + name: {{ include "dagu.fullname" $ }}-worker-{{ $poolName }} labels: - {{- include "dagu.labels" . | nindent 4 }} + {{- include "dagu.labels" $ | nindent 4 }} app.kubernetes.io/component: worker + dagu.io/worker-pool: {{ $poolName }} spec: - replicas: {{ .Values.worker.replicas }} + replicas: {{ $pool.replicas }} selector: matchLabels: - {{- include "dagu.labels" . | nindent 6 }} + {{- include "dagu.selectorLabels" $ | nindent 6 }} app.kubernetes.io/component: worker + dagu.io/worker-pool: {{ $poolName }} template: metadata: labels: - {{- include "dagu.labels" . | nindent 8 }} + {{- include "dagu.labels" $ | nindent 8 }} app.kubernetes.io/component: worker + dagu.io/worker-pool: {{ $poolName }} spec: # Disable Kubernetes Service env var injection to avoid overriding # dagu config values (e.g., scheduler.port) with Service URLs enableServiceLinks: false containers: - name: worker - image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" - imagePullPolicy: {{ .Values.image.pullPolicy }} + image: "{{ $.Values.image.repository }}:{{ $.Values.image.tag }}" + imagePullPolicy: {{ $.Values.image.pullPolicy }} command: - dagu - worker - --config - /etc/dagu/dagu.yaml + {{- if $pool.labels }} + - --worker.labels + - {{ include "dagu.workerLabels" $pool.labels | quote }} + {{- end }} env: - name: WORKER_ID valueFrom: @@ -40,11 +52,24 @@ spec: - name: config mountPath: /etc/dagu resources: - {{- toYaml .Values.worker.resources | nindent 12 }} + {{- toYaml $pool.resources | nindent 12 }} + {{- with $pool.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with $pool.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with $pool.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} volumes: - name: data persistentVolumeClaim: - claimName: {{ include "dagu.fullname" . }}-data + claimName: {{ include "dagu.fullname" $ }}-data - name: config configMap: - name: {{ include "dagu.fullname" . }}-config + name: {{ include "dagu.fullname" $ }}-config +{{- end }} diff --git a/charts/dagu/values.yaml b/charts/dagu/values.yaml index 49b06690d0..1e9b0d33c4 100644 --- a/charts/dagu/values.yaml +++ b/charts/dagu/values.yaml @@ -29,16 +29,23 @@ coordinator: memory: "256Mi" cpu: "200m" -# Worker configuration -worker: - replicas: 2 - resources: - requests: - memory: "128Mi" - cpu: "100m" - limits: - memory: "256Mi" - cpu: "200m" +# Worker pool configuration +# Each pool creates a separate Kubernetes Deployment. +# Pool names become part of the Deployment name: -dagu-worker- +workerPools: + general: + replicas: 2 + labels: {} + resources: + requests: + memory: "128Mi" + cpu: "100m" + limits: + memory: "256Mi" + cpu: "200m" + nodeSelector: {} + tolerations: [] + affinity: {} # UI configuration ui: diff --git a/internal/cmd/context.go b/internal/cmd/context.go index 31fedc6777..9de625f449 100644 --- a/internal/cmd/context.go +++ b/internal/cmd/context.go @@ -335,7 +335,7 @@ func (c *Context) NewScheduler() (*scheduler.Scheduler, error) { } coordinatorCli := c.NewCoordinatorClient() - de := scheduler.NewDAGExecutor(coordinatorCli, runtime.NewSubCmdBuilder(c.Config)) + de := scheduler.NewDAGExecutor(coordinatorCli, runtime.NewSubCmdBuilder(c.Config), c.Config.DefaultExecMode) m := scheduler.NewEntryReader(c.Config.Paths.DAGsDir, dr, c.DAGRunMgr, de, c.Config.Paths.Executable) return scheduler.New(c.Config, m, c.DAGRunMgr, c.DAGRunStore, c.QueueStore, c.ProcStore, c.ServiceRegistry, coordinatorCli) } diff --git a/internal/cmd/dry.go b/internal/cmd/dry.go index 01abe9639b..5016068740 100644 --- a/internal/cmd/dry.go +++ b/internal/cmd/dry.go @@ -81,6 +81,7 @@ func runDry(ctx *Context, args []string) error { ServiceRegistry: ctx.ServiceRegistry, RootDAGRun: exec.NewDAGRunRef(dag.Name, dagRunID), PeerConfig: ctx.Config.Core.Peer, + DefaultExecMode: ctx.Config.DefaultExecMode, }, ) diff --git a/internal/cmd/restart.go b/internal/cmd/restart.go index 0afb1f8b63..723b3d4f00 100644 --- a/internal/cmd/restart.go +++ b/internal/cmd/restart.go @@ -161,6 +161,7 @@ func executeDAGWithRunID(ctx *Context, cli runtime.Manager, dag *core.DAG, dagRu ServiceRegistry: ctx.ServiceRegistry, RootDAGRun: exec.NewDAGRunRef(dag.Name, dagRunID), PeerConfig: ctx.Config.Core.Peer, + DefaultExecMode: ctx.Config.DefaultExecMode, }) listenSignals(ctx, agentInstance) diff --git a/internal/cmd/retry.go b/internal/cmd/retry.go index d66365a5bb..db35c77be1 100644 --- a/internal/cmd/retry.go +++ b/internal/cmd/retry.go @@ -142,6 +142,7 @@ func executeRetry(ctx *Context, dag *core.DAG, status *exec.DAGRunStatus, rootRu RootDAGRun: rootRun, PeerConfig: ctx.Config.Core.Peer, TriggerType: core.TriggerTypeRetry, + DefaultExecMode: ctx.Config.DefaultExecMode, }, ) diff --git a/internal/cmd/start.go b/internal/cmd/start.go index 5633e72158..fcbba97ec3 100644 --- a/internal/cmd/start.go +++ b/internal/cmd/start.go @@ -200,14 +200,13 @@ var errProcAcquisitionFailed = errors.New("failed to acquire process handle") // tryExecuteDAG acquires a process handle and executes the DAG. func tryExecuteDAG(ctx *Context, dag *core.DAG, dagRunID string, root exec.DAGRunRef, workerID string, triggerType core.TriggerType) error { - // Check for workerSelector - dispatch to coordinator for distributed execution - // Skip if already running on a worker (workerID is set via --worker-id flag to a value other than "local") - if len(dag.WorkerSelector) > 0 && workerID == "local" { + // Check for dispatch to coordinator for distributed execution. + // Skip if already running on a worker (workerID != "local"). + if workerID == "local" { coordinatorCli := ctx.NewCoordinatorClient() - if coordinatorCli == nil { - return fmt.Errorf("coordinator required for DAG with workerSelector; configure peer settings") + if core.ShouldDispatchToCoordinator(dag, coordinatorCli != nil, ctx.Config.DefaultExecMode) { + return dispatchToCoordinatorAndWait(ctx, dag, dagRunID, coordinatorCli) } - return dispatchToCoordinatorAndWait(ctx, dag, dagRunID, coordinatorCli) } if err := ctx.ProcStore.Lock(ctx, dag.ProcGroup()); err != nil { @@ -400,6 +399,7 @@ func executeDAGRun(ctx *Context, d *core.DAG, parent exec.DAGRunRef, dagRunID st RootDAGRun: root, PeerConfig: ctx.Config.Core.Peer, TriggerType: triggerType, + DefaultExecMode: ctx.Config.DefaultExecMode, }, ) diff --git a/internal/cmn/config/config.go b/internal/cmn/config/config.go index 64f034a771..5d26af5df6 100644 --- a/internal/cmn/config/config.go +++ b/internal/cmn/config/config.go @@ -8,19 +8,20 @@ import ( // Config holds the overall configuration for the application. type Config struct { - Core Core - Server Server - Paths PathsConfig - UI UI - Queues Queues - Coordinator Coordinator - Worker Worker - Scheduler Scheduler - Monitoring MonitoringConfig - Cache CacheMode - GitSync GitSyncConfig - Tunnel TunnelConfig - Warnings []string + Core Core + Server Server + Paths PathsConfig + UI UI + Queues Queues + Coordinator Coordinator + Worker Worker + Scheduler Scheduler + Monitoring MonitoringConfig + DefaultExecMode ExecutionMode + Cache CacheMode + GitSync GitSyncConfig + Tunnel TunnelConfig + Warnings []string } // GitSyncConfig holds the configuration for Git sync functionality. @@ -84,6 +85,14 @@ type TunnelRateLimitConfig struct { const TunnelProviderTailscale = "tailscale" +// ExecutionMode represents the default execution mode for DAGs. +type ExecutionMode string + +const ( + ExecutionModeLocal ExecutionMode = "local" + ExecutionModeDistributed ExecutionMode = "distributed" +) + // MonitoringConfig holds the configuration for system monitoring. // Memory usage: ~4 metrics * (retention / interval) * 16 bytes per point. type MonitoringConfig struct { @@ -354,6 +363,9 @@ func (c *Config) Validate() error { if err := c.validateBuiltinAuth(); err != nil { return err } + if err := c.validateExecutionMode(); err != nil { + return err + } if err := c.validateGitSync(); err != nil { return err } @@ -438,6 +450,16 @@ func (c *Config) validateOIDCForBuiltin() error { return nil } +// validateExecutionMode validates the default execution mode. +func (c *Config) validateExecutionMode() error { + switch c.DefaultExecMode { + case ExecutionModeLocal, ExecutionModeDistributed: + return nil + default: + return fmt.Errorf("invalid defaultExecutionMode: %q (must be one of: local, distributed)", c.DefaultExecMode) + } +} + // validateGitSync validates the Git sync configuration. func (c *Config) validateGitSync() error { if !c.GitSync.Enabled { diff --git a/internal/cmn/config/config_test.go b/internal/cmn/config/config_test.go index 5c2fb91699..5683e9f340 100644 --- a/internal/cmn/config/config_test.go +++ b/internal/cmn/config/config_test.go @@ -7,35 +7,33 @@ import ( "github.com/stretchr/testify/require" ) +// validBaseConfig returns a minimal valid Config for use in tests. +// Callers can mutate the returned config to set up specific test scenarios. +func validBaseConfig() *Config { + return &Config{ + DefaultExecMode: ExecutionModeLocal, + Server: Server{ + Port: 8080, + Auth: Auth{Mode: AuthModeNone}, + }, + UI: UI{MaxDashboardPageLimit: 100}, + } +} + func TestConfig_Validate(t *testing.T) { t.Parallel() t.Run("ValidConfig", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Auth: Auth{ - Mode: AuthModeNone, - }, - Port: 8080, - }, - UI: UI{ - MaxDashboardPageLimit: 100, - }, - } + cfg := validBaseConfig() err := cfg.Validate() require.NoError(t, err) }) t.Run("InvalidPort_Negative", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: -1, - }, - UI: UI{ - MaxDashboardPageLimit: 100, - }, - } + cfg := validBaseConfig() + cfg.Server.Port = -1 + cfg.Server.Auth = Auth{} err := cfg.Validate() require.Error(t, err) assert.Contains(t, err.Error(), "invalid port number") @@ -43,14 +41,9 @@ func TestConfig_Validate(t *testing.T) { t.Run("InvalidPort_TooLarge", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 99999, - }, - UI: UI{ - MaxDashboardPageLimit: 100, - }, - } + cfg := validBaseConfig() + cfg.Server.Port = 99999 + cfg.Server.Auth = Auth{} err := cfg.Validate() require.Error(t, err) assert.Contains(t, err.Error(), "invalid port number") @@ -58,14 +51,9 @@ func TestConfig_Validate(t *testing.T) { t.Run("InvalidPort_MaxValue", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 65536, - }, - UI: UI{ - MaxDashboardPageLimit: 100, - }, - } + cfg := validBaseConfig() + cfg.Server.Port = 65536 + cfg.Server.Auth = Auth{} err := cfg.Validate() require.Error(t, err) assert.Contains(t, err.Error(), "invalid port number") @@ -73,53 +61,25 @@ func TestConfig_Validate(t *testing.T) { t.Run("ValidPort_MinValue", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Auth: Auth{ - Mode: AuthModeNone, - }, - Port: 0, - }, - UI: UI{ - MaxDashboardPageLimit: 100, - }, - } + cfg := validBaseConfig() + cfg.Server.Port = 0 err := cfg.Validate() require.NoError(t, err) }) t.Run("ValidPort_MaxValue", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Auth: Auth{ - Mode: AuthModeNone, - }, - Port: 65535, - }, - UI: UI{ - MaxDashboardPageLimit: 100, - }, - } + cfg := validBaseConfig() + cfg.Server.Port = 65535 err := cfg.Validate() require.NoError(t, err) }) t.Run("IncompleteTLS_MissingCert", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Auth: Auth{ - Mode: AuthModeNone, - }, - Port: 8080, - TLS: &TLSConfig{ - KeyFile: "/path/to/key.pem", - }, - }, - UI: UI{ - MaxDashboardPageLimit: 100, - }, + cfg := validBaseConfig() + cfg.Server.TLS = &TLSConfig{ + KeyFile: "/path/to/key.pem", } err := cfg.Validate() require.Error(t, err) @@ -128,19 +88,9 @@ func TestConfig_Validate(t *testing.T) { t.Run("IncompleteTLS_MissingKey", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Auth: Auth{ - Mode: AuthModeNone, - }, - Port: 8080, - TLS: &TLSConfig{ - CertFile: "/path/to/cert.pem", - }, - }, - UI: UI{ - MaxDashboardPageLimit: 100, - }, + cfg := validBaseConfig() + cfg.Server.TLS = &TLSConfig{ + CertFile: "/path/to/cert.pem", } err := cfg.Validate() require.Error(t, err) @@ -149,20 +99,10 @@ func TestConfig_Validate(t *testing.T) { t.Run("CompleteTLS", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - TLS: &TLSConfig{ - CertFile: "/path/to/cert.pem", - KeyFile: "/path/to/key.pem", - }, - Auth: Auth{ - Mode: AuthModeNone, - }, - }, - UI: UI{ - MaxDashboardPageLimit: 100, - }, + cfg := validBaseConfig() + cfg.Server.TLS = &TLSConfig{ + CertFile: "/path/to/cert.pem", + KeyFile: "/path/to/key.pem", } err := cfg.Validate() require.NoError(t, err) @@ -170,17 +110,8 @@ func TestConfig_Validate(t *testing.T) { t.Run("InvalidMaxDashboardPageLimit_Zero", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeNone, - }, - }, - UI: UI{ - MaxDashboardPageLimit: 0, - }, - } + cfg := validBaseConfig() + cfg.UI.MaxDashboardPageLimit = 0 err := cfg.Validate() require.Error(t, err) assert.Contains(t, err.Error(), "invalid max dashboard page limit") @@ -188,17 +119,8 @@ func TestConfig_Validate(t *testing.T) { t.Run("InvalidMaxDashboardPageLimit_Negative", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeNone, - }, - }, - UI: UI{ - MaxDashboardPageLimit: -1, - }, - } + cfg := validBaseConfig() + cfg.UI.MaxDashboardPageLimit = -1 err := cfg.Validate() require.Error(t, err) assert.Contains(t, err.Error(), "invalid max dashboard page limit") @@ -206,41 +128,24 @@ func TestConfig_Validate(t *testing.T) { t.Run("ValidMaxDashboardPageLimit_MinValue", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeNone, - }, - }, - UI: UI{ - MaxDashboardPageLimit: 1, - }, - } + cfg := validBaseConfig() + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.NoError(t, err) }) t.Run("BuiltinAuth_MissingUsersDir", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeBuiltin, - Builtin: AuthBuiltin{ - Admin: AdminConfig{Username: "admin"}, - Token: TokenConfig{Secret: "secret", TTL: 1}, - }, - }, - }, - Paths: PathsConfig{ - UsersDir: "", - }, - UI: UI{ - MaxDashboardPageLimit: 1, + cfg := validBaseConfig() + cfg.Server.Auth = Auth{ + Mode: AuthModeBuiltin, + Builtin: AuthBuiltin{ + Admin: AdminConfig{Username: "admin"}, + Token: TokenConfig{Secret: "secret", TTL: 1}, }, } + cfg.Paths.UsersDir = "" + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.Error(t, err) assert.Contains(t, err.Error(), "usersDir") @@ -248,24 +153,16 @@ func TestConfig_Validate(t *testing.T) { t.Run("BuiltinAuth_MissingTokenSecret", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeBuiltin, - Builtin: AuthBuiltin{ - Admin: AdminConfig{Username: "admin"}, - Token: TokenConfig{Secret: "", TTL: 1}, - }, - }, - }, - Paths: PathsConfig{ - UsersDir: "/tmp/users", - }, - UI: UI{ - MaxDashboardPageLimit: 1, + cfg := validBaseConfig() + cfg.Server.Auth = Auth{ + Mode: AuthModeBuiltin, + Builtin: AuthBuiltin{ + Admin: AdminConfig{Username: "admin"}, + Token: TokenConfig{Secret: "", TTL: 1}, }, } + cfg.Paths.UsersDir = "/tmp/users" + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.Error(t, err) assert.Contains(t, err.Error(), "token secret") @@ -273,24 +170,16 @@ func TestConfig_Validate(t *testing.T) { t.Run("BuiltinAuth_MissingAdminUsername", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeBuiltin, - Builtin: AuthBuiltin{ - Admin: AdminConfig{Username: ""}, - Token: TokenConfig{Secret: "secret", TTL: 1}, - }, - }, - }, - Paths: PathsConfig{ - UsersDir: "/tmp/users", - }, - UI: UI{ - MaxDashboardPageLimit: 1, + cfg := validBaseConfig() + cfg.Server.Auth = Auth{ + Mode: AuthModeBuiltin, + Builtin: AuthBuiltin{ + Admin: AdminConfig{Username: ""}, + Token: TokenConfig{Secret: "secret", TTL: 1}, }, } + cfg.Paths.UsersDir = "/tmp/users" + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.Error(t, err) assert.Contains(t, err.Error(), "admin username") @@ -298,59 +187,33 @@ func TestConfig_Validate(t *testing.T) { t.Run("BuiltinAuth_ValidConfig", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeBuiltin, - Builtin: AuthBuiltin{ - Admin: AdminConfig{Username: "admin"}, - Token: TokenConfig{Secret: "secret", TTL: 1}, - }, - }, - }, - Paths: PathsConfig{ - UsersDir: "/tmp/users", - }, - UI: UI{ - MaxDashboardPageLimit: 1, + cfg := validBaseConfig() + cfg.Server.Auth = Auth{ + Mode: AuthModeBuiltin, + Builtin: AuthBuiltin{ + Admin: AdminConfig{Username: "admin"}, + Token: TokenConfig{Secret: "secret", TTL: 1}, }, } + cfg.Paths.UsersDir = "/tmp/users" + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.NoError(t, err) }) t.Run("BuiltinAuth_SkippedForOtherModes", func(t *testing.T) { t.Parallel() - // When auth mode is not builtin, validation should pass even without builtin config - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeNone, - }, - }, - UI: UI{ - MaxDashboardPageLimit: 1, - }, - } + cfg := validBaseConfig() + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.NoError(t, err) }) t.Run("InvalidAuthMode", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: "invalid", - }, - }, - UI: UI{ - MaxDashboardPageLimit: 1, - }, - } + cfg := validBaseConfig() + cfg.Server.Auth.Mode = "invalid" + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.Error(t, err) assert.Contains(t, err.Error(), "invalid auth mode") @@ -358,24 +221,16 @@ func TestConfig_Validate(t *testing.T) { t.Run("BuiltinAuth_InvalidTokenTTL", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeBuiltin, - Builtin: AuthBuiltin{ - Admin: AdminConfig{Username: "admin"}, - Token: TokenConfig{Secret: "secret", TTL: 0}, - }, - }, - }, - Paths: PathsConfig{ - UsersDir: "/tmp/users", - }, - UI: UI{ - MaxDashboardPageLimit: 1, + cfg := validBaseConfig() + cfg.Server.Auth = Auth{ + Mode: AuthModeBuiltin, + Builtin: AuthBuiltin{ + Admin: AdminConfig{Username: "admin"}, + Token: TokenConfig{Secret: "secret", TTL: 0}, }, } + cfg.Paths.UsersDir = "/tmp/users" + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.Error(t, err) assert.Contains(t, err.Error(), "positive token TTL") @@ -386,31 +241,23 @@ func TestConfig_Validate(t *testing.T) { t.Run("BuiltinAuth_OIDC_IncompleteConfig_NoError", func(t *testing.T) { t.Parallel() // Missing clientId - OIDC is not enabled, no error - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeBuiltin, - Builtin: AuthBuiltin{ - Admin: AdminConfig{Username: "admin"}, - Token: TokenConfig{Secret: "secret", TTL: 1}, - }, - OIDC: AuthOIDC{ - ClientID: "", // Missing - ClientSecret: "secret", - ClientURL: "https://example.com", - Issuer: "https://issuer.com", - RoleMapping: OIDCRoleMapping{DefaultRole: "viewer"}, - }, - }, - }, - Paths: PathsConfig{ - UsersDir: "/tmp/users", - }, - UI: UI{ - MaxDashboardPageLimit: 1, + cfg := validBaseConfig() + cfg.Server.Auth = Auth{ + Mode: AuthModeBuiltin, + Builtin: AuthBuiltin{ + Admin: AdminConfig{Username: "admin"}, + Token: TokenConfig{Secret: "secret", TTL: 1}, + }, + OIDC: AuthOIDC{ + ClientID: "", // Missing + ClientSecret: "secret", + ClientURL: "https://example.com", + Issuer: "https://issuer.com", + RoleMapping: OIDCRoleMapping{DefaultRole: "viewer"}, }, } + cfg.Paths.UsersDir = "/tmp/users" + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.NoError(t, err, "incomplete OIDC config should not error - OIDC is simply not enabled") assert.False(t, cfg.Server.Auth.OIDC.IsConfigured(), "OIDC should not be configured") @@ -418,31 +265,23 @@ func TestConfig_Validate(t *testing.T) { t.Run("BuiltinAuth_OIDC_InvalidDefaultRole", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeBuiltin, - Builtin: AuthBuiltin{ - Admin: AdminConfig{Username: "admin"}, - Token: TokenConfig{Secret: "secret", TTL: 1}, - }, - OIDC: AuthOIDC{ - ClientID: "client-id", - ClientSecret: "secret", - ClientURL: "https://example.com", - Issuer: "https://issuer.com", - RoleMapping: OIDCRoleMapping{DefaultRole: "invalid-role"}, - }, - }, - }, - Paths: PathsConfig{ - UsersDir: "/tmp/users", - }, - UI: UI{ - MaxDashboardPageLimit: 1, + cfg := validBaseConfig() + cfg.Server.Auth = Auth{ + Mode: AuthModeBuiltin, + Builtin: AuthBuiltin{ + Admin: AdminConfig{Username: "admin"}, + Token: TokenConfig{Secret: "secret", TTL: 1}, + }, + OIDC: AuthOIDC{ + ClientID: "client-id", + ClientSecret: "secret", + ClientURL: "https://example.com", + Issuer: "https://issuer.com", + RoleMapping: OIDCRoleMapping{DefaultRole: "invalid-role"}, }, } + cfg.Paths.UsersDir = "/tmp/users" + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.Error(t, err) assert.Contains(t, err.Error(), "defaultRole") @@ -450,64 +289,48 @@ func TestConfig_Validate(t *testing.T) { t.Run("BuiltinAuth_OIDC_ValidConfig", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeBuiltin, - Builtin: AuthBuiltin{ - Admin: AdminConfig{Username: "admin"}, - Token: TokenConfig{Secret: "secret", TTL: 1}, - }, - OIDC: AuthOIDC{ - ClientID: "client-id", - ClientSecret: "secret", - ClientURL: "https://example.com", - Issuer: "https://issuer.com", - Scopes: []string{"openid", "profile", "email"}, - RoleMapping: OIDCRoleMapping{DefaultRole: "viewer"}, - }, - }, - }, - Paths: PathsConfig{ - UsersDir: "/tmp/users", - }, - UI: UI{ - MaxDashboardPageLimit: 1, + cfg := validBaseConfig() + cfg.Server.Auth = Auth{ + Mode: AuthModeBuiltin, + Builtin: AuthBuiltin{ + Admin: AdminConfig{Username: "admin"}, + Token: TokenConfig{Secret: "secret", TTL: 1}, + }, + OIDC: AuthOIDC{ + ClientID: "client-id", + ClientSecret: "secret", + ClientURL: "https://example.com", + Issuer: "https://issuer.com", + Scopes: []string{"openid", "profile", "email"}, + RoleMapping: OIDCRoleMapping{DefaultRole: "viewer"}, }, } + cfg.Paths.UsersDir = "/tmp/users" + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.NoError(t, err) }) t.Run("BuiltinAuth_OIDC_MissingEmailScope_AddsWarning", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeBuiltin, - Builtin: AuthBuiltin{ - Admin: AdminConfig{Username: "admin"}, - Token: TokenConfig{Secret: "secret", TTL: 1}, - }, - OIDC: AuthOIDC{ - ClientID: "client-id", - ClientSecret: "secret", - ClientURL: "https://example.com", - Issuer: "https://issuer.com", - Scopes: []string{"openid", "profile"}, // No email scope - RoleMapping: OIDCRoleMapping{DefaultRole: "viewer"}, - }, - }, - }, - Paths: PathsConfig{ - UsersDir: "/tmp/users", - }, - UI: UI{ - MaxDashboardPageLimit: 1, + cfg := validBaseConfig() + cfg.Server.Auth = Auth{ + Mode: AuthModeBuiltin, + Builtin: AuthBuiltin{ + Admin: AdminConfig{Username: "admin"}, + Token: TokenConfig{Secret: "secret", TTL: 1}, + }, + OIDC: AuthOIDC{ + ClientID: "client-id", + ClientSecret: "secret", + ClientURL: "https://example.com", + Issuer: "https://issuer.com", + Scopes: []string{"openid", "profile"}, // No email scope + RoleMapping: OIDCRoleMapping{DefaultRole: "viewer"}, }, } + cfg.Paths.UsersDir = "/tmp/users" + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.NoError(t, err) assert.Len(t, cfg.Warnings, 1) @@ -516,33 +339,25 @@ func TestConfig_Validate(t *testing.T) { t.Run("BuiltinAuth_OIDC_MissingEmailScope_WithWhitelist_ReturnsError", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeBuiltin, - Builtin: AuthBuiltin{ - Admin: AdminConfig{Username: "admin"}, - Token: TokenConfig{Secret: "secret", TTL: 1}, - }, - OIDC: AuthOIDC{ - ClientID: "client-id", - ClientSecret: "secret", - ClientURL: "https://example.com", - Issuer: "https://issuer.com", - Scopes: []string{"openid", "profile"}, // No email scope - Whitelist: []string{"user@example.com"}, // But whitelist is set - RoleMapping: OIDCRoleMapping{DefaultRole: "viewer"}, - }, - }, - }, - Paths: PathsConfig{ - UsersDir: "/tmp/users", - }, - UI: UI{ - MaxDashboardPageLimit: 1, + cfg := validBaseConfig() + cfg.Server.Auth = Auth{ + Mode: AuthModeBuiltin, + Builtin: AuthBuiltin{ + Admin: AdminConfig{Username: "admin"}, + Token: TokenConfig{Secret: "secret", TTL: 1}, + }, + OIDC: AuthOIDC{ + ClientID: "client-id", + ClientSecret: "secret", + ClientURL: "https://example.com", + Issuer: "https://issuer.com", + Scopes: []string{"openid", "profile"}, // No email scope + Whitelist: []string{"user@example.com"}, // But whitelist is set + RoleMapping: OIDCRoleMapping{DefaultRole: "viewer"}, }, } + cfg.Paths.UsersDir = "/tmp/users" + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.Error(t, err) assert.Contains(t, err.Error(), "email") @@ -552,32 +367,24 @@ func TestConfig_Validate(t *testing.T) { t.Parallel() validRoles := []string{"admin", "manager", "operator", "viewer"} for _, role := range validRoles { - cfg := &Config{ - Server: Server{ - Port: 8080, - Auth: Auth{ - Mode: AuthModeBuiltin, - Builtin: AuthBuiltin{ - Admin: AdminConfig{Username: "admin"}, - Token: TokenConfig{Secret: "secret", TTL: 1}, - }, - OIDC: AuthOIDC{ - ClientID: "client-id", - ClientSecret: "secret", - ClientURL: "https://example.com", - Issuer: "https://issuer.com", - Scopes: []string{"openid", "email"}, - RoleMapping: OIDCRoleMapping{DefaultRole: role}, - }, - }, - }, - Paths: PathsConfig{ - UsersDir: "/tmp/users", - }, - UI: UI{ - MaxDashboardPageLimit: 1, + cfg := validBaseConfig() + cfg.Server.Auth = Auth{ + Mode: AuthModeBuiltin, + Builtin: AuthBuiltin{ + Admin: AdminConfig{Username: "admin"}, + Token: TokenConfig{Secret: "secret", TTL: 1}, + }, + OIDC: AuthOIDC{ + ClientID: "client-id", + ClientSecret: "secret", + ClientURL: "https://example.com", + Issuer: "https://issuer.com", + Scopes: []string{"openid", "email"}, + RoleMapping: OIDCRoleMapping{DefaultRole: role}, }, } + cfg.Paths.UsersDir = "/tmp/users" + cfg.UI.MaxDashboardPageLimit = 1 err := cfg.Validate() require.NoError(t, err, "role %s should be valid", role) } @@ -586,17 +393,11 @@ func TestConfig_Validate(t *testing.T) { // Tunnel validation tests t.Run("TunnelPublicRequiresAuth", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Auth: Auth{Mode: AuthModeNone}, - Port: 8080, - }, - UI: UI{MaxDashboardPageLimit: 100}, - Tunnel: TunnelConfig{ - Enabled: true, - Tailscale: TailscaleTunnelConfig{ - Funnel: true, // Public access - }, + cfg := validBaseConfig() + cfg.Tunnel = TunnelConfig{ + Enabled: true, + Tailscale: TailscaleTunnelConfig{ + Funnel: true, // Public access }, } err := cfg.Validate() @@ -606,25 +407,20 @@ func TestConfig_Validate(t *testing.T) { t.Run("TunnelPublicWithAuthOK", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Auth: Auth{ - Mode: AuthModeBuiltin, - Basic: AuthBasic{Username: "user", Password: "pass"}, - Builtin: AuthBuiltin{ - Admin: AdminConfig{Username: "admin"}, - Token: TokenConfig{Secret: "secret", TTL: 1}, - }, - }, - Port: 8080, + cfg := validBaseConfig() + cfg.Server.Auth = Auth{ + Mode: AuthModeBuiltin, + Basic: AuthBasic{Username: "user", Password: "pass"}, + Builtin: AuthBuiltin{ + Admin: AdminConfig{Username: "admin"}, + Token: TokenConfig{Secret: "secret", TTL: 1}, }, - Paths: PathsConfig{UsersDir: "/tmp/users"}, - UI: UI{MaxDashboardPageLimit: 100}, - Tunnel: TunnelConfig{ - Enabled: true, - Tailscale: TailscaleTunnelConfig{ - Funnel: true, // Public access - }, + } + cfg.Paths.UsersDir = "/tmp/users" + cfg.Tunnel = TunnelConfig{ + Enabled: true, + Tailscale: TailscaleTunnelConfig{ + Funnel: true, // Public access }, } err := cfg.Validate() @@ -633,17 +429,11 @@ func TestConfig_Validate(t *testing.T) { t.Run("TunnelPrivateNoAuthOK", func(t *testing.T) { t.Parallel() - cfg := &Config{ - Server: Server{ - Auth: Auth{Mode: AuthModeNone}, - Port: 8080, - }, - UI: UI{MaxDashboardPageLimit: 100}, - Tunnel: TunnelConfig{ - Enabled: true, - Tailscale: TailscaleTunnelConfig{ - Funnel: false, // Private (tailnet only) - }, + cfg := validBaseConfig() + cfg.Tunnel = TunnelConfig{ + Enabled: true, + Tailscale: TailscaleTunnelConfig{ + Funnel: false, // Private (tailnet only) }, } err := cfg.Validate() @@ -653,21 +443,14 @@ func TestConfig_Validate(t *testing.T) { t.Run("TunnelDisabledNoValidation", func(t *testing.T) { t.Parallel() // When tunnel is disabled, validation should pass regardless of settings - cfg := &Config{ - Server: Server{ - Auth: Auth{Mode: AuthModeNone}, - Port: 8080, - }, - UI: UI{MaxDashboardPageLimit: 100}, - Tunnel: TunnelConfig{ - Enabled: false, - Tailscale: TailscaleTunnelConfig{ - Funnel: true, // Would require auth if enabled - }, + cfg := validBaseConfig() + cfg.Tunnel = TunnelConfig{ + Enabled: false, + Tailscale: TailscaleTunnelConfig{ + Funnel: true, // Would require auth if enabled }, } err := cfg.Validate() require.NoError(t, err) }) - } diff --git a/internal/cmn/config/definition.go b/internal/cmn/config/definition.go index 21ce41a355..87efc52dd6 100644 --- a/internal/cmn/config/definition.go +++ b/internal/cmn/config/definition.go @@ -66,6 +66,9 @@ type Definition struct { Scheduler *SchedulerDef `mapstructure:"scheduler"` Queues *QueueConfigDef `mapstructure:"queues"` + // Execution + DefaultExecutionMode string `mapstructure:"defaultExecutionMode"` + // Features Monitoring *MonitoringDef `mapstructure:"monitoring"` Metrics *string `mapstructure:"metrics"` // "public" or "private" diff --git a/internal/cmn/config/loader.go b/internal/cmn/config/loader.go index 277fca224c..b23640a339 100644 --- a/internal/cmn/config/loader.go +++ b/internal/cmn/config/loader.go @@ -252,6 +252,7 @@ func (l *ConfigLoader) buildConfig(def Definition) (*Config, error) { } l.loadCacheConfig(&cfg, def) + l.loadExecutionModeConfig(&cfg, def) if err := l.LoadLegacyFields(&cfg, def); err != nil { return nil, err @@ -910,6 +911,14 @@ func setDefaultIfNotPositive(target *int, defaultValue int) { } } +func (l *ConfigLoader) loadExecutionModeConfig(cfg *Config, _ Definition) { + mode := ExecutionMode(l.v.GetString("defaultExecutionMode")) + if mode == "" { + mode = ExecutionModeLocal + } + cfg.DefaultExecMode = mode +} + func (l *ConfigLoader) loadCacheConfig(cfg *Config, def Definition) { cfg.Cache = CacheModeNormal if def.Cache == nil { @@ -1134,6 +1143,9 @@ func (l *ConfigLoader) setViperDefaultValues(paths Paths) { l.v.SetDefault("ui.dags.sortField", "name") l.v.SetDefault("ui.dags.sortOrder", "asc") + // Execution + l.v.SetDefault("defaultExecutionMode", string(ExecutionModeLocal)) + // Queues l.v.SetDefault("queues.enabled", true) @@ -1250,6 +1262,9 @@ var envBindings = []envBinding{ {key: "paths.serviceRegistryDir", env: "SERVICE_REGISTRY_DIR", isPath: true}, {key: "paths.usersDir", env: "USERS_DIR", isPath: true}, + // Execution + {key: "defaultExecutionMode", env: "DEFAULT_EXECUTION_MODE"}, + // Queues {key: "queues.enabled", env: "QUEUE_ENABLED"}, diff --git a/internal/cmn/config/loader_test.go b/internal/cmn/config/loader_test.go index f5142af77b..bc4780fb27 100644 --- a/internal/cmn/config/loader_test.go +++ b/internal/cmn/config/loader_test.go @@ -235,8 +235,9 @@ func TestLoad_Env(t *testing.T) { BlockDurationSeconds: 1800, }, }, - Warnings: []string{"Auth mode auto-detected as 'oidc' based on OIDC configuration (issuer: https://auth.example.com)"}, - Cache: CacheModeNormal, + DefaultExecMode: ExecutionModeLocal, + Warnings: []string{"Auth mode auto-detected as 'oidc' based on OIDC configuration (issuer: https://auth.example.com)"}, + Cache: CacheModeNormal, } assert.Equal(t, expected, cfg) @@ -485,8 +486,9 @@ scheduler: Retention: 24 * time.Hour, Interval: 5 * time.Second, }, - Warnings: []string{"Auth mode auto-detected as 'oidc' based on OIDC configuration (issuer: https://accounts.example.com)"}, - Cache: CacheModeNormal, + DefaultExecMode: ExecutionModeLocal, + Warnings: []string{"Auth mode auto-detected as 'oidc' based on OIDC configuration (issuer: https://accounts.example.com)"}, + Cache: CacheModeNormal, } assert.Equal(t, expected, cfg) @@ -1347,3 +1349,39 @@ tunnel: assert.Empty(t, cfg.Tunnel.Tailscale.Hostname) }) } + +func TestLoad_DefaultExecutionMode(t *testing.T) { + t.Run("DefaultIsLocal", func(t *testing.T) { + cfg := loadFromYAML(t, "# empty") + assert.Equal(t, ExecutionModeLocal, cfg.DefaultExecMode) + }) + + t.Run("SetToDistributed", func(t *testing.T) { + cfg := loadFromYAML(t, ` +defaultExecutionMode: distributed +`) + assert.Equal(t, ExecutionModeDistributed, cfg.DefaultExecMode) + }) + + t.Run("SetToLocal", func(t *testing.T) { + cfg := loadFromYAML(t, ` +defaultExecutionMode: local +`) + assert.Equal(t, ExecutionModeLocal, cfg.DefaultExecMode) + }) + + t.Run("FromEnv", func(t *testing.T) { + cfg := loadWithEnv(t, "# empty", map[string]string{ + "DAGU_DEFAULT_EXECUTION_MODE": "distributed", + }) + assert.Equal(t, ExecutionModeDistributed, cfg.DefaultExecMode) + }) + + t.Run("InvalidValue", func(t *testing.T) { + err := loadWithErrorFromYAML(t, ` +defaultExecutionMode: invalid +`) + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid defaultExecutionMode") + }) +} diff --git a/internal/cmn/schema/config.schema.json b/internal/cmn/schema/config.schema.json index 12376aeda0..e4a528b34a 100644 --- a/internal/cmn/schema/config.schema.json +++ b/internal/cmn/schema/config.schema.json @@ -188,6 +188,12 @@ "gitSync": { "$ref": "#/definitions/GitSyncDef" }, + "defaultExecutionMode": { + "type": "string", + "description": "Default execution mode for DAGs. 'local' runs DAGs on the server process, 'distributed' dispatches to workers.", + "enum": ["local", "distributed"], + "default": "local" + }, "tunnel": { "$ref": "#/definitions/TunnelDef" } diff --git a/internal/cmn/schema/dag.schema.json b/internal/cmn/schema/dag.schema.json index 8c8f409a6c..636cd34b06 100644 --- a/internal/cmn/schema/dag.schema.json +++ b/internal/cmn/schema/dag.schema.json @@ -478,11 +478,25 @@ "description": "Configuration for controlling user interactions when starting DAG runs. Allows restricting parameter editing and custom run ID input." }, "workerSelector": { - "type": "object", - "additionalProperties": { - "type": "string" - }, - "description": "Key-value pairs specifying worker label requirements for executing this DAG. The DAG will only run on workers that have all specified labels with matching values. For example: {\"gpu\": \"true\", \"memory\": \"64G\"} requires a worker with both gpu=true and memory=64G labels. This setting applies to the entire DAG; individual steps can override this with their own workerSelector." + "oneOf": [ + { + "type": "object", + "additionalProperties": { + "oneOf": [ + { "type": "string" }, + { "type": "number" }, + { "type": "boolean" } + ] + }, + "description": "Key-value pairs specifying worker label requirements. Values can be strings, numbers, or booleans (non-string values are coerced to strings). For example: {\"gpu\": true, \"memory\": \"64G\"} requires a worker with gpu=true and memory=64G labels." + }, + { + "type": "string", + "enum": ["local"], + "description": "Set to \"local\" to force this DAG to run locally even when the server default execution mode is distributed." + } + ], + "description": "Worker selection for this DAG. Use a map of label key-value pairs to target specific workers, or the string \"local\" to force local execution. This setting applies to the entire DAG; individual steps can override this with their own workerSelector." }, "shell": { "oneOf": [ @@ -998,11 +1012,25 @@ "description": "Configuration for parallel execution of sub DAGs. Only applicable when 'run' is specified. Allows processing multiple items concurrently using the same workflow definition." }, "workerSelector": { - "type": "object", - "additionalProperties": { - "type": "string" - }, - "description": "Key-value pairs specifying worker label requirements for executing this step. The step will only run on workers that have all specified labels with matching values. For example: {\"gpu\": \"true\", \"memory\": \"64G\"} requires a worker with both gpu=true and memory=64G labels." + "oneOf": [ + { + "type": "object", + "additionalProperties": { + "oneOf": [ + { "type": "string" }, + { "type": "number" }, + { "type": "boolean" } + ] + }, + "description": "Key-value pairs specifying worker label requirements. Values can be strings, numbers, or booleans (non-string values are coerced to strings). For example: {\"gpu\": true, \"memory\": \"64G\"} requires a worker with gpu=true and memory=64G labels." + }, + { + "type": "string", + "enum": ["local"], + "description": "Set to \"local\" to force this step to run locally even when the server default execution mode is distributed." + } + ], + "description": "Worker selection for this step. Use a map of label key-value pairs to target specific workers, or the string \"local\" to force local execution." }, "env": { "oneOf": [ diff --git a/internal/core/dag.go b/internal/core/dag.go index cf96f40f6d..0f9fc7566e 100644 --- a/internal/core/dag.go +++ b/internal/core/dag.go @@ -158,6 +158,9 @@ type DAG struct { // WorkerSelector defines labels required for worker selection in distributed execution. // If specified, the DAG will only run on workers with matching tag. WorkerSelector map[string]string `json:"workerSelector,omitempty"` + // ForceLocal forces the DAG to run locally even when the server default is distributed. + // Set by workerSelector: local in the DAG spec. + ForceLocal bool `json:"forceLocal,omitempty"` // MaxOutputSize is the maximum size of step output to capture in bytes. // Default is 1MB. Output exceeding this will return an error. MaxOutputSize int `json:"maxOutputSize,omitempty"` diff --git a/internal/core/dispatch.go b/internal/core/dispatch.go new file mode 100644 index 0000000000..4990b38da9 --- /dev/null +++ b/internal/core/dispatch.go @@ -0,0 +1,21 @@ +package core + +import "github.com/dagu-org/dagu/internal/cmn/config" + +// ShouldDispatchToCoordinator decides whether a DAG should be dispatched +// to the coordinator for distributed execution. +func ShouldDispatchToCoordinator(dag *DAG, hasCoordinator bool, defaultMode config.ExecutionMode) bool { + if dag.ForceLocal { + return false + } + if !hasCoordinator { + return false + } + if len(dag.WorkerSelector) > 0 { + return true + } + if defaultMode == config.ExecutionModeDistributed { + return true + } + return false +} diff --git a/internal/core/dispatch_test.go b/internal/core/dispatch_test.go new file mode 100644 index 0000000000..21001c22b7 --- /dev/null +++ b/internal/core/dispatch_test.go @@ -0,0 +1,61 @@ +package core + +import ( + "testing" + + "github.com/dagu-org/dagu/internal/cmn/config" + "github.com/stretchr/testify/assert" +) + +func TestShouldDispatchToCoordinator(t *testing.T) { + tests := []struct { + name string + dag *DAG + hasCoordinator bool + defaultMode config.ExecutionMode + want bool + }{ + { + name: "ForceLocal is true — always local", + dag: &DAG{ForceLocal: true, WorkerSelector: map[string]string{"gpu": "true"}}, + hasCoordinator: true, + defaultMode: config.ExecutionModeDistributed, + want: false, + }, + { + name: "no coordinator — always local", + dag: &DAG{WorkerSelector: map[string]string{"gpu": "true"}}, + hasCoordinator: false, + defaultMode: config.ExecutionModeDistributed, + want: false, + }, + { + name: "workerSelector present — dispatch", + dag: &DAG{WorkerSelector: map[string]string{"gpu": "true"}}, + hasCoordinator: true, + defaultMode: config.ExecutionModeLocal, + want: true, + }, + { + name: "defaultMode distributed — dispatch", + dag: &DAG{}, + hasCoordinator: true, + defaultMode: config.ExecutionModeDistributed, + want: true, + }, + { + name: "defaultMode local, no workerSelector — local", + dag: &DAG{}, + hasCoordinator: true, + defaultMode: config.ExecutionModeLocal, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ShouldDispatchToCoordinator(tt.dag, tt.hasCoordinator, tt.defaultMode) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/internal/core/exec/context.go b/internal/core/exec/context.go index 65bfd9e876..e5f92abd7c 100644 --- a/internal/core/exec/context.go +++ b/internal/core/exec/context.go @@ -22,9 +22,10 @@ type Context struct { BaseEnv *config.BaseEnv EnvScope *eval.EnvScope // Unified environment scope - THE single source for all env vars CoordinatorCli Dispatcher - Shell string // Default shell for this DAG (from DAG.Shell) - LogEncodingCharset string // Character encoding for log files (e.g., "utf-8", "shift_jis", "euc-jp") - LogWriterFactory LogWriterFactory // For remote log streaming (nil = use local files) + Shell string // Default shell for this DAG (from DAG.Shell) + LogEncodingCharset string // Character encoding for log files (e.g., "utf-8", "shift_jis", "euc-jp") + LogWriterFactory LogWriterFactory // For remote log streaming (nil = use local files) + DefaultExecMode config.ExecutionMode // Server-level default execution mode (local or distributed) } // LogWriterFactory creates log writers for step stdout/stderr. @@ -141,6 +142,7 @@ type contextOptions struct { secretEnvs []string logEncodingCharset string logWriterFactory LogWriterFactory + defaultExecMode config.ExecutionMode } // ContextOption configures optional parameters for NewContext. @@ -196,6 +198,13 @@ func WithLogWriterFactory(factory LogWriterFactory) ContextOption { } } +// WithDefaultExecMode sets the server-level default execution mode. +func WithDefaultExecMode(mode config.ExecutionMode) ContextOption { + return func(o *contextOptions) { + o.defaultExecMode = mode + } +} + // NewContext creates a new context with DAG execution metadata. // Required: ctx, dag, dagRunID, logFile // Optional: use ContextOption functions (WithDatabase, WithParams, etc.) @@ -246,6 +255,7 @@ func NewContext( Shell: dag.Shell, LogEncodingCharset: options.logEncodingCharset, LogWriterFactory: options.logWriterFactory, + DefaultExecMode: options.defaultExecMode, }) } diff --git a/internal/core/spec/dag.go b/internal/core/spec/dag.go index 24f96f7a5e..3908394f36 100644 --- a/internal/core/spec/dag.go +++ b/internal/core/spec/dag.go @@ -97,7 +97,8 @@ type dag struct { // OTel is the OpenTelemetry configuration. OTel any // WorkerSelector specifies required worker labels for execution. - WorkerSelector map[string]string + // Can be a map of label key-value pairs or the string "local" to force local execution. + WorkerSelector any // Container is the container definition for the DAG. // Can be a string (existing container name to exec into) or an object (container configuration). Container any @@ -390,7 +391,7 @@ var metadataTransformers = []transform{ {"params", newTransformer("Params", buildParams)}, {"defaultParams", newTransformer("DefaultParams", buildDefaultParams)}, {"paramsJSON", newTransformer("ParamsJSON", buildParamsJSON)}, - {"workerSelector", newTransformer("WorkerSelector", buildWorkerSelector)}, + {"workerSelector", &workerSelectorTransformer{}}, {"timeout", newTransformer("Timeout", buildTimeout)}, {"delay", newTransformer("Delay", buildDelay)}, {"restartWait", newTransformer("RestartWait", buildRestartWait)}, @@ -886,16 +887,82 @@ func parseParamsInternal(ctx BuildContext, d *dag) (*paramsResult, error) { }, nil } -func buildWorkerSelector(_ BuildContext, d *dag) (map[string]string, error) { - if len(d.WorkerSelector) == 0 { - return nil, nil +// workerSelectorTransformer is a custom transformer that sets both WorkerSelector and ForceLocal fields. +type workerSelectorTransformer struct{} + +func (t *workerSelectorTransformer) Transform(ctx BuildContext, in *dag, out reflect.Value) error { + ws, forceLocal, err := buildWorkerSelector(ctx, in) + if err != nil { + return err + } + + if ws != nil { + wsField := out.FieldByName("WorkerSelector") + if wsField.IsValid() && wsField.CanSet() { + wsField.Set(reflect.ValueOf(ws)) + } + } + + if forceLocal { + flField := out.FieldByName("ForceLocal") + if flField.IsValid() && flField.CanSet() { + flField.SetBool(true) + } + } + + return nil +} + +func buildWorkerSelector(_ BuildContext, d *dag) (map[string]string, bool, error) { + if d.WorkerSelector == nil { + return nil, false, nil } - ret := make(map[string]string) - for key, val := range d.WorkerSelector { - ret[strings.TrimSpace(key)] = strings.TrimSpace(val) + switch v := d.WorkerSelector.(type) { + case string: + trimmed := strings.TrimSpace(v) + if strings.EqualFold(trimmed, "local") { + return nil, true, nil + } + return nil, false, fmt.Errorf("unsupported workerSelector string value %q; the only allowed string value is \"local\"", trimmed) + + case map[string]string: + if len(v) == 0 { + return nil, false, nil + } + ret := make(map[string]string) + for key, val := range v { + ret[strings.TrimSpace(key)] = strings.TrimSpace(val) + } + return ret, false, nil + + case map[string]any: + if len(v) == 0 { + return nil, false, nil + } + ret := make(map[string]string) + for key, val := range v { + ret[strings.TrimSpace(key)] = strings.TrimSpace(fmt.Sprint(val)) + } + return ret, false, nil + + case map[any]any: + if len(v) == 0 { + return nil, false, nil + } + ret := make(map[string]string) + for key, val := range v { + strKey, ok := key.(string) + if !ok { + return nil, false, fmt.Errorf("workerSelector keys must be strings, got %T", key) + } + ret[strings.TrimSpace(strKey)] = strings.TrimSpace(fmt.Sprint(val)) + } + return ret, false, nil + + default: + return nil, false, fmt.Errorf("workerSelector must be a map or \"local\", got %T", d.WorkerSelector) } - return ret, nil } // shellResult holds both shell and args for internal use diff --git a/internal/core/spec/dag_test.go b/internal/core/spec/dag_test.go index 56695bf224..bc78df9b35 100644 --- a/internal/core/spec/dag_test.go +++ b/internal/core/spec/dag_test.go @@ -593,9 +593,12 @@ func TestBuildWorkerSelector(t *testing.T) { t.Parallel() tests := []struct { - name string - input map[string]string - expected map[string]string + name string + input any + expected map[string]string + forceLocal bool + expectErr bool + errContains string }{ { name: "Nil", @@ -603,7 +606,7 @@ func TestBuildWorkerSelector(t *testing.T) { expected: nil, }, { - name: "Empty", + name: "EmptyMap", input: map[string]string{}, expected: nil, }, @@ -617,14 +620,61 @@ func TestBuildWorkerSelector(t *testing.T) { input: map[string]string{" key ": " value "}, expected: map[string]string{"key": "value"}, }, + { + name: "StringLocal", + input: "local", + forceLocal: true, + }, + { + name: "StringLocalUpperCase", + input: "LOCAL", + forceLocal: true, + }, + { + name: "StringLocalMixedCase", + input: "Local", + forceLocal: true, + }, + { + name: "StringLocalWithSpaces", + input: " local ", + forceLocal: true, + }, + { + name: "UnsupportedString", + input: "remote", + expectErr: true, + errContains: "the only allowed string value is \"local\"", + }, + { + name: "MapStringAny", + input: map[string]any{"env": "prod"}, + expected: map[string]string{"env": "prod"}, + }, + { + name: "MapStringAnyBoolValue", + input: map[string]any{"gpu": true}, + expected: map[string]string{"gpu": "true"}, + }, + { + name: "MapStringAnyIntValue", + input: map[string]any{"memory": 64}, + expected: map[string]string{"memory": "64"}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { d := &dag{WorkerSelector: tt.input} - result, err := buildWorkerSelector(testBuildContext(), d) + result, forceLocal, err := buildWorkerSelector(testBuildContext(), d) + if tt.expectErr { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errContains) + return + } require.NoError(t, err) assert.Equal(t, tt.expected, result) + assert.Equal(t, tt.forceLocal, forceLocal) }) } } diff --git a/internal/intg/distr/fixtures_test.go b/internal/intg/distr/fixtures_test.go index d4b2199c82..deda4c66b9 100644 --- a/internal/intg/distr/fixtures_test.go +++ b/internal/intg/distr/fixtures_test.go @@ -187,7 +187,7 @@ func (f *testFixture) waitForWorkerRegistration(workerID string, timeout time.Du func (f *testFixture) startScheduler(timeout time.Duration) { f.t.Helper() - de := scheduler.NewDAGExecutor(f.coordinatorClient, runtime.NewSubCmdBuilder(f.coord.Config)) + de := scheduler.NewDAGExecutor(f.coordinatorClient, runtime.NewSubCmdBuilder(f.coord.Config), f.coord.Config.DefaultExecMode) em := scheduler.NewEntryReader(f.coord.Config.Paths.DAGsDir, f.coord.DAGStore, f.coord.DAGRunMgr, de, "") schedulerInst, err := scheduler.New( diff --git a/internal/runtime/agent/agent.go b/internal/runtime/agent/agent.go index 4ed88236ae..9d99e2f1b5 100644 --- a/internal/runtime/agent/agent.go +++ b/internal/runtime/agent/agent.go @@ -138,6 +138,9 @@ type Agent struct { // triggerType indicates how this DAG run was initiated. triggerType core.TriggerType + // defaultExecMode is the server-level default execution mode. + defaultExecMode config.ExecutionMode + // tracer is the OpenTelemetry tracer for the agent. tracer *telemetry.Tracer @@ -218,6 +221,8 @@ type Options struct { PeerConfig config.Peer // TriggerType indicates how this DAG run was initiated. TriggerType core.TriggerType + // DefaultExecMode is the server-level default execution mode. + DefaultExecMode config.ExecutionMode } // New creates a new Agent. @@ -251,6 +256,7 @@ func New( queuedRun: opts.QueuedRun, attemptID: opts.AttemptID, triggerType: opts.TriggerType, + defaultExecMode: opts.DefaultExecMode, } // Initialize progress display if enabled @@ -386,6 +392,7 @@ func (a *Agent) Run(ctx context.Context) error { runtime.WithParams(a.dag.Params), runtime.WithCoordinator(coordinatorCli), runtime.WithSecrets(secretEnvs), + runtime.WithDefaultExecMode(a.defaultExecMode), } if a.logWriterFactory != nil { diff --git a/internal/runtime/builtin/chat/tools.go b/internal/runtime/builtin/chat/tools.go index 59cff92dc5..b798cc9319 100644 --- a/internal/runtime/builtin/chat/tools.go +++ b/internal/runtime/builtin/chat/tools.go @@ -10,7 +10,7 @@ import ( "strings" "github.com/dagu-org/dagu/internal/core" - exec1 "github.com/dagu-org/dagu/internal/core/exec" + "github.com/dagu-org/dagu/internal/core/exec" llmpkg "github.com/dagu-org/dagu/internal/llm" ) @@ -46,7 +46,7 @@ func NewToolRegistry(ctx context.Context, dagNames []string) (*ToolRegistry, err return nil, nil } - rCtx := exec1.GetContext(ctx) + rCtx := exec.GetContext(ctx) registry := &ToolRegistry{ tools: make(map[string]*toolInfo), diff --git a/internal/runtime/context.go b/internal/runtime/context.go index 28314e2db1..ef349871cf 100644 --- a/internal/runtime/context.go +++ b/internal/runtime/context.go @@ -40,6 +40,8 @@ var ( WithLogEncoding = exec.WithLogEncoding // WithLogWriterFactory sets the log writer factory for remote log streaming. WithLogWriterFactory = exec.WithLogWriterFactory + // WithDefaultExecMode sets the server-level default execution mode. + WithDefaultExecMode = exec.WithDefaultExecMode ) // LogWriterFactory is re-exported from execution package diff --git a/internal/runtime/executor/dag_runner.go b/internal/runtime/executor/dag_runner.go index 3e7acae272..a83df00af2 100644 --- a/internal/runtime/executor/dag_runner.go +++ b/internal/runtime/executor/dag_runner.go @@ -7,7 +7,7 @@ import ( "io" "log/slog" "os" - "os/exec" + osexec "os/exec" "strings" "sync" "time" @@ -19,7 +19,7 @@ import ( "github.com/dagu-org/dagu/internal/cmn/logger/tag" "github.com/dagu-org/dagu/internal/cmn/telemetry" "github.com/dagu-org/dagu/internal/core" - exec1 "github.com/dagu-org/dagu/internal/core/exec" + "github.com/dagu-org/dagu/internal/core/exec" "github.com/dagu-org/dagu/internal/proto/convert" coordinatorv1 "github.com/dagu-org/dagu/proto/coordinator/v1" ) @@ -42,13 +42,13 @@ type SubDAGExecutor struct { tempFile string // coordinatorCli is used for distributed execution - coordinatorCli exec1.Dispatcher + coordinatorCli exec.Dispatcher // Process tracking for ALL executions mu sync.Mutex - cmds map[string]*exec.Cmd // runID -> cmd for local processes - distributedRuns map[string]bool // runID -> true for distributed runs - dagCtx exec1.Context // for DB access when cancelling distributed runs + cmds map[string]*osexec.Cmd // runID -> cmd for local processes + distributedRuns map[string]bool // runID -> true for distributed runs + dagCtx exec.Context // for DB access when cancelling distributed runs // killed should be closed when Kill is called killed chan struct{} @@ -59,7 +59,7 @@ type SubDAGExecutor struct { // It handles the logic for finding the DAG - either from the database // or from local DAGs defined in the parent. func NewSubDAGExecutor(ctx context.Context, childName string) (*SubDAGExecutor, error) { - rCtx := exec1.GetContext(ctx) + rCtx := exec.GetContext(ctx) // First, check if it's a local DAG in the parent if rCtx.DAG != nil && rCtx.DAG.LocalDAGs != nil { @@ -86,7 +86,7 @@ func NewSubDAGExecutor(ctx context.Context, childName string) (*SubDAGExecutor, DAG: dag, tempFile: tempFile, coordinatorCli: rCtx.CoordinatorCli, - cmds: make(map[string]*exec.Cmd), + cmds: make(map[string]*osexec.Cmd), distributedRuns: make(map[string]bool), dagCtx: rCtx, killed: make(chan struct{}), @@ -103,7 +103,7 @@ func NewSubDAGExecutor(ctx context.Context, childName string) (*SubDAGExecutor, return &SubDAGExecutor{ DAG: dag, coordinatorCli: rCtx.CoordinatorCli, - cmds: make(map[string]*exec.Cmd), + cmds: make(map[string]*osexec.Cmd), distributedRuns: make(map[string]bool), dagCtx: rCtx, killed: make(chan struct{}), @@ -111,7 +111,7 @@ func NewSubDAGExecutor(ctx context.Context, childName string) (*SubDAGExecutor, } // buildCommand builds the command to execute the sub DAG. -func (e *SubDAGExecutor) buildCommand(ctx context.Context, runParams RunParams, workDir string) (*exec.Cmd, error) { +func (e *SubDAGExecutor) buildCommand(ctx context.Context, runParams RunParams, workDir string) (*osexec.Cmd, error) { executable, err := executablePath() if err != nil { return nil, fmt.Errorf("failed to find executable path: %w", err) @@ -121,7 +121,7 @@ func (e *SubDAGExecutor) buildCommand(ctx context.Context, runParams RunParams, return nil, errDAGRunIDNotSet } - rCtx := exec1.GetContext(ctx) + rCtx := exec.GetContext(ctx) if rCtx.RootDAGRun.Zero() { return nil, errRootDAGRunNotSet } @@ -145,7 +145,7 @@ func (e *SubDAGExecutor) buildCommand(ctx context.Context, runParams RunParams, args = append(args, "--", runParams.Params) } - cmd := exec.CommandContext(ctx, executable, args...) // nolint:gosec + cmd := osexec.CommandContext(ctx, executable, args...) // nolint:gosec cmd.Dir = workDir cmd.Env = os.Environ() cmd.Env = append(cmd.Env, rCtx.AllEnvs()...) @@ -168,7 +168,7 @@ func (e *SubDAGExecutor) buildCommand(ctx context.Context, runParams RunParams, // BuildCoordinatorTask creates a coordinator task for distributed execution func (e *SubDAGExecutor) BuildCoordinatorTask(ctx context.Context, runParams RunParams) (*coordinatorv1.Task, error) { - rCtx := exec1.GetContext(ctx) + rCtx := exec.GetContext(ctx) if runParams.RunID == "" { return nil, errDAGRunIDNotSet @@ -185,7 +185,7 @@ func (e *SubDAGExecutor) BuildCoordinatorTask(ctx context.Context, runParams Run coordinatorv1.Operation_OPERATION_START, runParams.RunID, WithRootDagRun(rCtx.RootDAGRun), - WithParentDagRun(exec1.DAGRunRef{ + WithParentDagRun(exec.DAGRunRef{ Name: rCtx.DAG.Name, ID: rCtx.DAGRunID, }), @@ -224,10 +224,11 @@ func (e *SubDAGExecutor) Cleanup(ctx context.Context) error { // Execute executes the sub DAG and returns the result. // This is useful for parallel execution where results need to be collected. -func (e *SubDAGExecutor) Execute(ctx context.Context, runParams RunParams, workDir string) (*exec1.RunStatus, error) { +func (e *SubDAGExecutor) Execute(ctx context.Context, runParams RunParams, workDir string) (*exec.RunStatus, error) { ctx = logger.WithValues(ctx, tag.SubDAG(e.DAG.Name), tag.SubRunID(runParams.RunID)) - if len(e.DAG.WorkerSelector) > 0 { + rCtx := exec.GetContext(ctx) + if core.ShouldDispatchToCoordinator(e.DAG, e.coordinatorCli != nil, rCtx.DefaultExecMode) { // Handle distributed execution logger.Info(ctx, "Executing sub DAG via distributed execution") @@ -282,7 +283,7 @@ func (e *SubDAGExecutor) Execute(ctx context.Context, runParams RunParams, workD default: } - rCtx := exec1.GetContext(ctx) + rCtx = exec.GetContext(ctx) result, resultErr := rCtx.DB.GetSubDAGRunStatus(ctx, runParams.RunID, rCtx.RootDAGRun) if resultErr != nil { errMsg := fmt.Sprintf("sub dag-run %q failed and wrote no status", runParams.RunID) @@ -304,7 +305,7 @@ func (e *SubDAGExecutor) Execute(ctx context.Context, runParams RunParams, workD } // dispatch runs the sub DAG via coordinator and returns the result. -func (e *SubDAGExecutor) dispatch(ctx context.Context, runParams RunParams) (*exec1.RunStatus, error) { +func (e *SubDAGExecutor) dispatch(ctx context.Context, runParams RunParams) (*exec.RunStatus, error) { dispatchCtx := logger.WithValues(ctx, tag.RunID(runParams.RunID), tag.DAG(e.DAG.Name), @@ -345,7 +346,7 @@ func (e *SubDAGExecutor) dispatchToCoordinator(ctx context.Context, runParams Ru return nil } -func (e *SubDAGExecutor) waitCompletion(ctx context.Context, dagRunID string) (*exec1.RunStatus, error) { +func (e *SubDAGExecutor) waitCompletion(ctx context.Context, dagRunID string) (*exec.RunStatus, error) { waitCtx := logger.WithValues(ctx, tag.RunID(dagRunID), tag.DAG(e.DAG.Name), @@ -404,7 +405,7 @@ func (e *SubDAGExecutor) waitCompletion(ctx context.Context, dagRunID string) (* } // waitForCancellation waits for a distributed sub DAG run to terminate after cancellation is requested. -func (e *SubDAGExecutor) waitForCancellation(ctx context.Context, dagRunID string, startTime time.Time) (*exec1.RunStatus, error) { +func (e *SubDAGExecutor) waitForCancellation(ctx context.Context, dagRunID string, startTime time.Time) (*exec.RunStatus, error) { waitCtx := logger.WithValues(ctx, tag.RunID(dagRunID), tag.DAG(e.DAG.Name), @@ -428,7 +429,7 @@ func (e *SubDAGExecutor) waitForCancellation(ctx context.Context, dagRunID strin // Use a fresh context for status queries since the original context may be canceled cancelWaitCtx := context.WithoutCancel(ctx) - var lastStatus *exec1.RunStatus + var lastStatus *exec.RunStatus for { status, err := e.getSubDAGRunStatus(cancelWaitCtx, dagRunID) @@ -465,8 +466,8 @@ func (e *SubDAGExecutor) waitForCancellation(ctx context.Context, dagRunID strin // getSubDAGRunStatus retrieves the status of a sub-DAG run. // For distributed runs, it queries the coordinator; otherwise, it uses the local DB. -func (e *SubDAGExecutor) getSubDAGRunStatus(ctx context.Context, dagRunID string) (*exec1.RunStatus, error) { - rCtx := exec1.GetContext(ctx) +func (e *SubDAGExecutor) getSubDAGRunStatus(ctx context.Context, dagRunID string) (*exec.RunStatus, error) { + rCtx := exec.GetContext(ctx) if e.coordinatorCli != nil { return e.getStatusFromCoordinator(ctx, dagRunID, rCtx.RootDAGRun) @@ -480,7 +481,7 @@ func (e *SubDAGExecutor) getSubDAGRunStatus(ctx context.Context, dagRunID string } // getStatusFromCoordinator queries the coordinator for sub-DAG run status. -func (e *SubDAGExecutor) getStatusFromCoordinator(ctx context.Context, dagRunID string, rootDAGRun exec1.DAGRunRef) (*exec1.RunStatus, error) { +func (e *SubDAGExecutor) getStatusFromCoordinator(ctx context.Context, dagRunID string, rootDAGRun exec.DAGRunRef) (*exec.RunStatus, error) { rootRef := &rootDAGRun resp, err := e.coordinatorCli.GetDAGRunStatus(ctx, e.DAG.Name, dagRunID, rootRef) if err != nil { @@ -499,7 +500,7 @@ func (e *SubDAGExecutor) getStatusFromCoordinator(ctx context.Context, dagRunID } outputs := extractOutputsFromNodes(dagRunStatus.Nodes) - return &exec1.RunStatus{ + return &exec.RunStatus{ Name: dagRunStatus.Name, DAGRunID: dagRunID, Params: dagRunStatus.Params, @@ -513,7 +514,7 @@ func (e *SubDAGExecutor) getStatusFromCoordinator(ctx context.Context, dagRunID // 1. Local format: key="VAR", value="VAR=actual_value" (KeyValue string) // 2. Proto format: key="VAR", value="actual_value" (direct value) // This function normalizes both formats to extract the actual value. -func extractOutputsFromNodes(nodes []*exec1.Node) map[string]string { +func extractOutputsFromNodes(nodes []*exec.Node) map[string]string { outputs := make(map[string]string) for _, node := range nodes { if node.OutputVariables == nil { @@ -579,7 +580,7 @@ func (e *SubDAGExecutor) Kill(sig os.Signal) error { // Fallback: try local DB (for non-shared-nothing mode with local dagRunStore) if e.dagCtx.DB != nil { if err := e.dagCtx.DB.RequestChildCancel(ctx, runID, e.dagCtx.RootDAGRun); err != nil { - if errors.Is(err, exec1.ErrDAGRunIDNotFound) { + if errors.Is(err, exec.ErrDAGRunIDNotFound) { continue } errs = append(errs, err) diff --git a/internal/service/frontend/api/v1/api.go b/internal/service/frontend/api/v1/api.go index e64627a5ec..4e7d6118d8 100644 --- a/internal/service/frontend/api/v1/api.go +++ b/internal/service/frontend/api/v1/api.go @@ -54,6 +54,7 @@ type API struct { auditService *audit.Service syncService SyncService tunnelService *tunnel.Service + defaultExecMode config.ExecutionMode dagWritesDisabled bool // True when git sync read-only mode is active agentConfigStore agent.ConfigStore } @@ -166,6 +167,7 @@ func New( serviceRegistry: sr, metricsRegistry: mr, resourceService: rs, + defaultExecMode: cfg.DefaultExecMode, } for _, opt := range opts { diff --git a/internal/service/frontend/api/v1/dagruns.go b/internal/service/frontend/api/v1/dagruns.go index 7c2c39cfcf..b1312a9545 100644 --- a/internal/service/frontend/api/v1/dagruns.go +++ b/internal/service/frontend/api/v1/dagruns.go @@ -1295,16 +1295,8 @@ func (a *API) RetryDAGRun(ctx context.Context, request api.RetryDAGRunRequestObj stepName := valueOf(request.Body.StepName) - // Check for workerSelector - dispatch to coordinator for distributed execution - if len(dag.WorkerSelector) > 0 { - if a.coordinatorCli == nil { - return nil, &Error{ - HTTPStatus: http.StatusServiceUnavailable, - Code: api.ErrorCodeInternalError, - Message: "coordinator not configured for distributed DAG retry", - } - } - + // Check if this DAG should be dispatched to the coordinator for distributed execution + if core.ShouldDispatchToCoordinator(dag, a.coordinatorCli != nil, a.defaultExecMode) { // Get previous status for retry context prevStatus, err := attempt.ReadStatus(ctx) if err != nil { diff --git a/internal/service/frontend/api/v1/dags.go b/internal/service/frontend/api/v1/dags.go index caebc2c69b..2d852555b6 100644 --- a/internal/service/frontend/api/v1/dags.go +++ b/internal/service/frontend/api/v1/dags.go @@ -8,7 +8,7 @@ import ( "log/slog" "net/http" "net/url" - "runtime" + osrt "runtime" "sort" "strconv" "strings" @@ -22,8 +22,10 @@ import ( "github.com/dagu-org/dagu/internal/core" "github.com/dagu-org/dagu/internal/core/exec" "github.com/dagu-org/dagu/internal/core/spec" - runtime1 "github.com/dagu-org/dagu/internal/runtime" + "github.com/dagu-org/dagu/internal/runtime" + "github.com/dagu-org/dagu/internal/runtime/executor" "github.com/dagu-org/dagu/internal/service/audit" + coordinatorv1 "github.com/dagu-org/dagu/proto/coordinator/v1" ) const defaultHistoryLimit = 30 @@ -883,13 +885,53 @@ func (a *API) waitForDAGStatusChange(ctx context.Context, dag *core.DAG, dagRunI return false } +// dispatchStartToCoordinator dispatches a DAG start operation to the coordinator +// and waits for the DAG status to change from NotStarted within the given timeout. +func (a *API) dispatchStartToCoordinator(ctx context.Context, dag *core.DAG, dagRunID string, timeout time.Duration) error { + var taskOpts []executor.TaskOption + if len(dag.WorkerSelector) > 0 { + taskOpts = append(taskOpts, executor.WithWorkerSelector(dag.WorkerSelector)) + } + + task := executor.CreateTask( + dag.Name, + string(dag.YamlData), + coordinatorv1.Operation_OPERATION_START, + dagRunID, + taskOpts..., + ) + + if err := a.coordinatorCli.Dispatch(ctx, task); err != nil { + return fmt.Errorf("error dispatching to coordinator: %w", err) + } + + if !a.waitForDAGStatusChange(ctx, dag, dagRunID, timeout) { + return &Error{ + HTTPStatus: http.StatusInternalServerError, + Code: api.ErrorCodeInternalError, + Message: "DAG did not start after coordinator dispatch", + } + } + + return nil +} + func (a *API) startDAGRunWithOptions(ctx context.Context, dag *core.DAG, opts startDAGRunOptions) error { + // Check if this DAG should be dispatched to the coordinator for distributed execution + if core.ShouldDispatchToCoordinator(dag, a.coordinatorCli != nil, a.defaultExecMode) { + timeout := 5 * time.Second + if osrt.GOOS == "windows" { + timeout = 10 * time.Second + } + return a.dispatchStartToCoordinator(ctx, dag, opts.dagRunID, timeout) + } + // Only pass trigger type if it's a known value (not TriggerTypeUnknown) triggerTypeStr := "" if opts.triggerType != core.TriggerTypeUnknown { triggerTypeStr = opts.triggerType.String() } - spec := a.subCmdBuilder.Start(dag, runtime1.StartOptions{ + spec := a.subCmdBuilder.Start(dag, runtime.StartOptions{ Params: opts.params, DAGRunID: opts.dagRunID, Quiet: true, @@ -899,12 +941,12 @@ func (a *API) startDAGRunWithOptions(ctx context.Context, dag *core.DAG, opts st TriggerType: triggerTypeStr, }) - if err := runtime1.Start(ctx, spec); err != nil { + if err := runtime.Start(ctx, spec); err != nil { return fmt.Errorf("error starting DAG: %w", err) } timeout := 5 * time.Second - if runtime.GOOS == "windows" { + if osrt.GOOS == "windows" { timeout = 10 * time.Second } @@ -1002,7 +1044,7 @@ func (a *API) enqueueDAGRun(ctx context.Context, dag *core.DAG, params, dagRunID if triggerType != core.TriggerTypeUnknown { triggerTypeStr = triggerType.String() } - opts := runtime1.EnqueueOptions{ + opts := runtime.EnqueueOptions{ Params: params, DAGRunID: dagRunID, NameOverride: nameOverride, @@ -1013,7 +1055,7 @@ func (a *API) enqueueDAGRun(ctx context.Context, dag *core.DAG, params, dagRunID } spec := a.subCmdBuilder.Enqueue(dag, opts) - if err := runtime1.Run(ctx, spec); err != nil { + if err := runtime.Run(ctx, spec); err != nil { return fmt.Errorf("error enqueuing DAG: %w", err) } diff --git a/internal/service/scheduler/dag_executor.go b/internal/service/scheduler/dag_executor.go index 5f93ea9e99..61240fd69b 100644 --- a/internal/service/scheduler/dag_executor.go +++ b/internal/service/scheduler/dag_executor.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" + "github.com/dagu-org/dagu/internal/cmn/config" "github.com/dagu-org/dagu/internal/cmn/logger" "github.com/dagu-org/dagu/internal/cmn/logger/tag" "github.com/dagu-org/dagu/internal/core" @@ -47,18 +48,21 @@ import ( // - HandleJob(): Entry point for new scheduled jobs (handles persistence) // - ExecuteDAG(): Executes/dispatches already-persisted jobs (no persistence) type DAGExecutor struct { - coordinatorCli exec.Dispatcher - subCmdBuilder *runtime.SubCmdBuilder + coordinatorCli exec.Dispatcher + subCmdBuilder *runtime.SubCmdBuilder + defaultExecMode config.ExecutionMode } // NewDAGExecutor creates a new DAGExecutor instance. func NewDAGExecutor( coordinatorCli exec.Dispatcher, subCmdBuilder *runtime.SubCmdBuilder, + defaultExecMode config.ExecutionMode, ) *DAGExecutor { return &DAGExecutor{ - coordinatorCli: coordinatorCli, - subCmdBuilder: subCmdBuilder, + coordinatorCli: coordinatorCli, + subCmdBuilder: subCmdBuilder, + defaultExecMode: defaultExecMode, } } @@ -159,14 +163,10 @@ func (e *DAGExecutor) ExecuteDAG( } // shouldUseDistributedExecution checks if distributed execution should be used. -// Returns true only if: -// 1. A coordinator client is configured (coordinator is available) -// 2. The DAG has workerSelector labels (DAG explicitly requests distributed execution) -// -// This ensures backward compatibility - DAGs without workerSelector continue -// to run locally even when a coordinator is configured. +// Delegates to core.ShouldDispatchToCoordinator for consistent dispatch logic +// across all execution paths (API, CLI, scheduler, sub-DAG). func (e *DAGExecutor) shouldUseDistributedExecution(dag *core.DAG) bool { - return e.coordinatorCli != nil && len(dag.WorkerSelector) > 0 + return core.ShouldDispatchToCoordinator(dag, e.coordinatorCli != nil, e.defaultExecMode) } // dispatchToCoordinator dispatches a task to the coordinator for distributed execution. diff --git a/internal/service/scheduler/dag_executor_test.go b/internal/service/scheduler/dag_executor_test.go index cf9aa1636e..4a3216abf3 100644 --- a/internal/service/scheduler/dag_executor_test.go +++ b/internal/service/scheduler/dag_executor_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/dagu-org/dagu/internal/cmn/config" "github.com/dagu-org/dagu/internal/core" "github.com/dagu-org/dagu/internal/core/spec" "github.com/dagu-org/dagu/internal/service/coordinator" @@ -23,7 +24,7 @@ steps: `) coordinatorCli := coordinator.New(th.ServiceRegistry, coordinator.DefaultConfig()) - dagExecutor := scheduler.NewDAGExecutor(coordinatorCli, th.SubCmdBuilder) + dagExecutor := scheduler.NewDAGExecutor(coordinatorCli, th.SubCmdBuilder, config.ExecutionModeLocal) t.Cleanup(func() { dagExecutor.Close(th.Context) }) @@ -67,7 +68,7 @@ steps: }) t.Run("HandleJob_Local_ExecutesDirectly", func(t *testing.T) { - localExecutor := scheduler.NewDAGExecutor(nil, th.SubCmdBuilder) + localExecutor := scheduler.NewDAGExecutor(nil, th.SubCmdBuilder, config.ExecutionModeLocal) dag, err := spec.Load(context.Background(), testDAG.Location) require.NoError(t, err) diff --git a/internal/service/scheduler/queue_processor_test.go b/internal/service/scheduler/queue_processor_test.go index adcaa7f769..604a6f32be 100644 --- a/internal/service/scheduler/queue_processor_test.go +++ b/internal/service/scheduler/queue_processor_test.go @@ -93,7 +93,7 @@ func (f *queueFixture) enqueueRuns(n int) *queueFixture { func (f *queueFixture) withProcessor(cfg config.Queues) *queueFixture { f.processor = NewQueueProcessor(f.queueStore, f.dagRunStore, f.procStore, - NewDAGExecutor(nil, runtime.NewSubCmdBuilder(&config.Config{Paths: config.PathsConfig{Executable: "/usr/bin/dagu"}})), + NewDAGExecutor(nil, runtime.NewSubCmdBuilder(&config.Config{Paths: config.PathsConfig{Executable: "/usr/bin/dagu"}}), config.ExecutionModeLocal), cfg, WithBackoffConfig(BackoffConfig{InitialInterval: 10 * time.Millisecond, MaxInterval: 50 * time.Millisecond, MaxRetries: 2}), ) return f diff --git a/internal/service/scheduler/scheduler.go b/internal/service/scheduler/scheduler.go index 6d84169dcc..3132af8e2d 100644 --- a/internal/service/scheduler/scheduler.go +++ b/internal/service/scheduler/scheduler.go @@ -91,7 +91,7 @@ func New( lockDir := filepath.Join(cfg.Paths.DataDir, "scheduler", "locks") dirLock := dirlock.New(lockDir, lockOpts) subCmdBuilder := runtime.NewSubCmdBuilder(cfg) - dagExecutor := NewDAGExecutor(coordinatorCli, subCmdBuilder) + dagExecutor := NewDAGExecutor(coordinatorCli, subCmdBuilder, cfg.DefaultExecMode) healthServer := NewHealthServer(cfg.Scheduler.Port) processor := NewQueueProcessor( queueStore, diff --git a/internal/service/worker/handler_test.go b/internal/service/worker/handler_test.go index ef125784f0..482216923e 100644 --- a/internal/service/worker/handler_test.go +++ b/internal/service/worker/handler_test.go @@ -4,7 +4,7 @@ import ( "context" "os" "path/filepath" - "runtime" + osrt "runtime" "strings" "testing" "time" @@ -12,7 +12,7 @@ import ( "github.com/dagu-org/dagu/internal/cmn/config" "github.com/dagu-org/dagu/internal/cmn/fileutil" "github.com/dagu-org/dagu/internal/core" - runtime1 "github.com/dagu-org/dagu/internal/runtime" + "github.com/dagu-org/dagu/internal/runtime" "github.com/dagu-org/dagu/internal/test" coordinatorv1 "github.com/dagu-org/dagu/proto/coordinator/v1" "github.com/google/uuid" @@ -36,8 +36,8 @@ func TestTaskHandler(t *testing.T) { // First, create an initial dag-run (simulating what coordinator does during enqueue) // This creates the status record that retry will use - spec := th.SubCmdBuilder.Start(dag.DAG, runtime1.StartOptions{}) - err := runtime1.Start(th.Context, spec) + spec := th.SubCmdBuilder.Start(dag.DAG, runtime.StartOptions{}) + err := runtime.Start(th.Context, spec) require.NoError(t, err) // Wait for the initial run to complete @@ -84,8 +84,8 @@ func TestTaskHandler(t *testing.T) { cli := th.DAGRunMgr // First, start a DAG run - spec := th.SubCmdBuilder.Start(dag.DAG, runtime1.StartOptions{}) - err := runtime1.Start(th.Context, spec) + spec := th.SubCmdBuilder.Start(dag.DAG, runtime.StartOptions{}) + err := runtime.Start(th.Context, spec) require.NoError(t, err) // Wait for the DAG to finish @@ -198,7 +198,7 @@ func TestCreateTempDAGFile(t *testing.T) { } func TestTaskHandlerStartWithDefinition(t *testing.T) { - if runtime.GOOS == "windows" { + if osrt.GOOS == "windows" { t.Skip("POSIX shell required for fake executable script") } diff --git a/internal/service/worker/remote_handler.go b/internal/service/worker/remote_handler.go index 334fb554b5..409ad6a2e6 100644 --- a/internal/service/worker/remote_handler.go +++ b/internal/service/worker/remote_handler.go @@ -290,6 +290,7 @@ func (h *remoteTaskHandler) executeDAGRun( ServiceRegistry: h.serviceRegistry, RootDAGRun: root, PeerConfig: h.peerConfig, + DefaultExecMode: h.config.DefaultExecMode, } // Add retry configuration if present diff --git a/internal/test/helper.go b/internal/test/helper.go index 0974cc5467..e8f70ae91d 100644 --- a/internal/test/helper.go +++ b/internal/test/helper.go @@ -614,6 +614,7 @@ func (d *DAG) Agent(opts ...AgentOption) *Agent { helper.opts.ServiceRegistry = d.ServiceRegistry helper.opts.RootDAGRun = root helper.opts.PeerConfig = d.Config.Core.Peer + helper.opts.DefaultExecMode = d.Config.DefaultExecMode helper.Agent = agent.New( dagRunID, diff --git a/internal/test/scheduler.go b/internal/test/scheduler.go index 87ac1ac151..cad161b668 100644 --- a/internal/test/scheduler.go +++ b/internal/test/scheduler.go @@ -67,7 +67,7 @@ func SetupScheduler(t *testing.T, opts ...HelperOption) *Scheduler { // Create entry reader coordinatorCli := coordinator.New(helper.ServiceRegistry, coordinator.DefaultConfig()) - de := scheduler.NewDAGExecutor(coordinatorCli, runtime.NewSubCmdBuilder(helper.Config)) + de := scheduler.NewDAGExecutor(coordinatorCli, runtime.NewSubCmdBuilder(helper.Config), helper.Config.DefaultExecMode) em := scheduler.NewEntryReader(helper.Config.Paths.DAGsDir, ds, drm, de, "") // Update helper with scheduler-specific stores diff --git a/rfcs/010-immediate-execution-worker-selector.md b/rfcs/010-immediate-execution-worker-selector.md new file mode 100644 index 0000000000..74a63cb861 --- /dev/null +++ b/rfcs/010-immediate-execution-worker-selector.md @@ -0,0 +1,455 @@ +--- +id: "010" +title: "Unified Execution Dispatch: workerSelector, Default Mode, and Local Escape Hatch" +status: draft +--- + +# RFC 010: Unified Execution Dispatch + +## Summary + +Introduce a unified dispatch model that fixes three gaps: + +1. **Bug** — Immediate execution from the UI/API ignores `workerSelector`, running DAGs locally on the wrong node. +2. **New capability** — When a coordinator is configured, DAGs *without* `workerSelector` should optionally be dispatched to any available worker (server-level `defaultExecutionMode: distributed`). +3. **Escape hatch** — A way to force local execution per-DAG (`workerSelector: local`) and per-server (`defaultExecutionMode: local`, the default). + +## Motivation + +### Bug: UI ignores workerSelector + +When a DAG defines `workerSelector` labels it expects to run on a matching worker. This works for scheduled runs, CLI starts, and retries, but the two central API functions — `startDAGRunWithOptions()` and `enqueueDAGRun()` — unconditionally spawn a local subprocess, bypassing the coordinator entirely. + +### Missing capability: distribute all DAGs + +Users who deploy a coordinator + workers want *all* DAGs dispatched to workers — not just those with explicit `workerSelector` labels. Today there is no server-level knob to opt into this behaviour; every DAG must carry a `workerSelector` block. + +### Missing escape hatch + +Once the server default is `distributed`, users need a way to pin specific DAGs to the coordinator/scheduler node itself (e.g., lightweight admin scripts). There is no mechanism for this today. + +## Proposal + +### 1. Server-level config: `defaultExecutionMode` + +Add a new field to `config.Config`: + +```go +// config.go + +type Config struct { + // ... existing fields ... + DefaultExecutionMode ExecutionMode // "local" (default) | "distributed" +} + +type ExecutionMode string + +const ( + ExecutionModeLocal ExecutionMode = "local" + ExecutionModeDistributed ExecutionMode = "distributed" +) +``` + +Environment variable: + +```sh +DAGU_DEFAULT_EXECUTION_MODE=distributed +``` + +This follows the existing env binding pattern in `internal/cmn/config/loader.go`. Add to `envBindings`: + +```go +{key: "defaultExecutionMode", env: "DEFAULT_EXECUTION_MODE"}, +``` + +YAML example: + +```yaml +# server config +defaultExecutionMode: distributed # dispatch all DAGs to workers +coordinator: + host: 0.0.0.0 + port: 5890 +``` + +Default is `local` — existing deployments are unaffected. + +### 2. DAG-level: `workerSelector: local` escape hatch + +Today `WorkerSelector` is typed `map[string]string` in both the spec and `core.DAG`. To support the special value `"local"`, change the **spec** field to `any`: + +```go +// internal/core/spec/dag.go + +type dag struct { + // ... + WorkerSelector any `yaml:"workerSelector"` // map[string]string | "local" +} +``` + +The builder parses this into two fields on `core.DAG`: + +```go +// internal/core/dag.go + +type DAG struct { + // ... + WorkerSelector map[string]string `json:"workerSelector,omitempty"` + ForceLocal bool `json:"forceLocal,omitempty"` +} +``` + +Builder logic: + +```go +// internal/core/spec/dag.go + +func buildWorkerSelector(_ BuildContext, d *dag) (map[string]string, bool, error) { + if d.WorkerSelector == nil { + return nil, false, nil + } + + // String "local" → force local execution + if s, ok := d.WorkerSelector.(string); ok { + if strings.EqualFold(strings.TrimSpace(s), "local") { + return nil, true, nil // ForceLocal = true + } + return nil, false, fmt.Errorf( + "workerSelector: unsupported string value %q (only \"local\" is allowed)", s, + ) + } + + // map[string]string → label selector (existing behaviour) + raw, ok := d.WorkerSelector.(map[string]any) + if !ok { + return nil, false, fmt.Errorf( + "workerSelector: expected map or \"local\", got %T", d.WorkerSelector, + ) + } + ret := make(map[string]string, len(raw)) + for k, v := range raw { + ret[strings.TrimSpace(k)] = strings.TrimSpace(fmt.Sprint(v)) + } + return ret, false, nil +} +``` + +DAG YAML examples: + +```yaml +# Dispatch to a worker with matching labels +workerSelector: + gpu: "true" + region: us-east-1 + +# Force local execution (even when server default is distributed) +workerSelector: local +``` + +### 3. Unified dispatch logic + +Every code path that runs a DAG must use the same decision function: + +```go +// internal/core/dispatch.go (new file, ~20 lines) + +// ShouldDispatchToCoordinator decides whether a DAG should be dispatched +// to a coordinator for distributed execution. +// +// Decision matrix: +// dag.ForceLocal → false (always local) +// coordinatorCli == nil → false (no coordinator) +// len(dag.WorkerSelector) > 0 → true (explicit labels) +// defaultMode == ExecutionModeDistributed → true (server opted in) +// otherwise → false (local) +func ShouldDispatchToCoordinator( + dag *DAG, + hasCoordinator bool, + defaultMode config.ExecutionMode, +) bool { + if dag.ForceLocal { + return false + } + if !hasCoordinator { + return false + } + if len(dag.WorkerSelector) > 0 { + return true + } + return defaultMode == config.ExecutionModeDistributed +} +``` + +### 4. Fix immediate execution paths + +#### `startDAGRunWithOptions` (`internal/service/frontend/api/v1/dags.go`) + +Before spawning a local subprocess, check dispatch: + +```go +func (a *API) startDAGRunWithOptions(ctx context.Context, dag *core.DAG, opts startDAGRunOptions) error { + if core.ShouldDispatchToCoordinator(dag, a.coordinatorCli != nil, a.defaultExecMode) { + if a.coordinatorCli == nil { + return &Error{ + HTTPStatus: http.StatusServiceUnavailable, + Code: api.ErrorCodeInternalError, + Message: "coordinator not configured for distributed DAG execution", + } + } + + taskOpts := []executor.TaskOption{} + if len(dag.WorkerSelector) > 0 { + taskOpts = append(taskOpts, executor.WithWorkerSelector(dag.WorkerSelector)) + } + + task := executor.CreateTask( + dag.Name, + string(dag.YamlData), + coordinatorv1.Operation_OPERATION_START, + opts.dagRunID, + taskOpts..., + ) + if err := a.coordinatorCli.Dispatch(ctx, task); err != nil { + return fmt.Errorf("error dispatching DAG to coordinator: %w", err) + } + + // Wait for the DAG to start on the remote worker (same timeout as local path) + ... + return nil + } + + // Local execution (existing code, unchanged) + ... +} +``` + +This single change covers all callers: + +| API Endpoint | Handler | +|---|---| +| Execute saved DAG | `ExecuteDAG` | +| Execute DAG and wait | `ExecuteDAGSync` | +| Execute inline spec | `ExecuteDAGRunFromSpec` | +| Reschedule DAG run | `RescheduleDAGRun` | + +#### `enqueueDAGRun` (`internal/service/frontend/api/v1/dags.go`) + +Same pattern: + +```go +func (a *API) enqueueDAGRun(ctx context.Context, dag *core.DAG, params, dagRunID, nameOverride string, triggerType core.TriggerType) error { + if core.ShouldDispatchToCoordinator(dag, a.coordinatorCli != nil, a.defaultExecMode) { + if a.coordinatorCli == nil { + return &Error{...} + } + + taskOpts := []executor.TaskOption{} + if len(dag.WorkerSelector) > 0 { + taskOpts = append(taskOpts, executor.WithWorkerSelector(dag.WorkerSelector)) + } + + task := executor.CreateTask( + dag.Name, + string(dag.YamlData), + coordinatorv1.Operation_OPERATION_START, + dagRunID, + taskOpts..., + ) + if err := a.coordinatorCli.Dispatch(ctx, task); err != nil { + return fmt.Errorf("error dispatching DAG to coordinator: %w", err) + } + + // Wait for status change + ... + return nil + } + + // Local enqueue (existing code, unchanged) + ... +} +``` + +Covers: + +| API Endpoint | Handler | +|---|---| +| Enqueue saved DAG | `EnqueueDAGDAGRun` | +| Enqueue inline spec | `EnqueueDAGRunFromSpec` | + +#### `RetryDAGRun` (`internal/service/frontend/api/v1/dagruns.go`) + +Update the existing `len(dag.WorkerSelector) > 0` guard to use `ShouldDispatchToCoordinator`: + +```go +if core.ShouldDispatchToCoordinator(dag, a.coordinatorCli != nil, a.defaultExecMode) { + // ... existing coordinator dispatch logic (unchanged) ... +} +``` + +### 5. Fix all other paths for consistency + +#### Scheduler (`internal/service/scheduler/dag_executor.go`) + +Replace `shouldUseDistributedExecution`: + +```go +func (e *DAGExecutor) shouldUseDistributedExecution(dag *core.DAG) bool { + return core.ShouldDispatchToCoordinator(dag, e.coordinatorCli != nil, e.defaultExecMode) +} +``` + +The `DAGExecutor` struct gains a `defaultExecMode config.ExecutionMode` field, injected at construction. + +#### CLI (`internal/cmd/start.go`) + +Update `tryExecuteDAG`: + +```go +func tryExecuteDAG(ctx *Context, dag *core.DAG, dagRunID string, root exec.DAGRunRef, workerID string, triggerType core.TriggerType) error { + // Already running on a worker — never re-dispatch + if workerID != "local" { + // ... existing local execution ... + } + + if core.ShouldDispatchToCoordinator(dag, ctx.NewCoordinatorClient() != nil, ctx.Config.DefaultExecutionMode) { + coordinatorCli := ctx.NewCoordinatorClient() + if coordinatorCli == nil { + return fmt.Errorf("coordinator required for distributed execution; configure peer settings") + } + return dispatchToCoordinatorAndWait(ctx, dag, dagRunID, coordinatorCli) + } + + // ... existing local execution ... +} +``` + +#### Sub-DAG executor (`internal/runtime/executor/dag_runner.go`) + +Update `SubDAGExecutor.Execute`: + +```go +func (e *SubDAGExecutor) Execute(ctx context.Context, runParams RunParams, workDir string) (*exec1.RunStatus, error) { + rCtx := exec.GetContext(ctx) + if core.ShouldDispatchToCoordinator(e.DAG, e.coordinatorCli != nil, rCtx.DefaultExecMode) { + // ... existing distributed dispatch ... + } + + // ... existing local execution ... +} +``` + +### 6. Passing `defaultExecMode` through the system + +The `config.DefaultExecutionMode` value must reach every dispatch site: + +| Component | How it receives the value | +|---|---| +| `API` struct | New field, set during server init from `config.DefaultExecutionMode` | +| `DAGExecutor` | New constructor param from scheduler setup | +| CLI `tryExecuteDAG` | Read from `ctx.Config.DefaultExecutionMode` | +| `SubDAGExecutor` | Via `exec.Context.DefaultExecMode` (new field on runtime context) | + +### 7. DAG JSON schema update + +Update `internal/cmn/schema/dag.schema.json` to accept both map and string for `workerSelector` (appears twice — DAG-level and step-level): + +```json +"workerSelector": { + "oneOf": [ + { + "type": "object", + "additionalProperties": { + "type": "string" + }, + "description": "Key-value pairs specifying worker label requirements." + }, + { + "type": "string", + "enum": ["local"], + "description": "The string \"local\" forces execution on the scheduler/coordinator node." + } + ], + "description": "Worker label requirements for distributed execution, or the string \"local\" to force local execution even when defaultExecutionMode is distributed." +} +``` + +### 8. Documentation updates + +The following docs reference `workerSelector` and must be updated to document the `"local"` string value and `defaultExecutionMode`: + +| File | What to add | +|---|---| +| `docs/features/distributed-execution.md` | Add `defaultExecutionMode` server config section; document `workerSelector: local` escape hatch | +| `docs/features/worker-labels.md` | Add `workerSelector: local` to label matching rules; note it bypasses distributed dispatch | +| `docs/reference/yaml.md` | Update `workerSelector` field type from `object` to `object \| "local"`; add example | +| `docs/configurations/base-config.md` | Add `workerSelector: local` example alongside existing label example | +| `docs/overview/architecture.md` | Mention `defaultExecutionMode` in task routing description | +| `docs/writing-workflows/examples.md` | Add example showing `workerSelector: local` in a distributed deployment | + +## Files Changed + +| File | Change | +|---|---| +| `internal/cmn/config/config.go` | Add `DefaultExecutionMode` field and `ExecutionMode` type | +| `internal/cmn/config/loader.go` | Add `DAGU_DEFAULT_EXECUTION_MODE` env binding | +| `internal/cmn/schema/dag.schema.json` | Update `workerSelector` schema to `oneOf: [object, "local"]` (DAG-level and step-level) | +| `internal/core/dag.go` | Add `ForceLocal bool` field to `DAG` struct | +| `internal/core/dispatch.go` | **New file** — `ShouldDispatchToCoordinator()` | +| `internal/core/spec/dag.go` | Change `WorkerSelector` spec type to `any`; update `buildWorkerSelector` to parse `"local"` | +| `internal/service/frontend/api/v1/dags.go` | Add coordinator dispatch to `startDAGRunWithOptions` and `enqueueDAGRun` | +| `internal/service/frontend/api/v1/dagruns.go` | Update `RetryDAGRun` to use `ShouldDispatchToCoordinator` | +| `internal/service/scheduler/dag_executor.go` | Update `shouldUseDistributedExecution` to delegate to `ShouldDispatchToCoordinator`; add `defaultExecMode` field | +| `internal/cmd/start.go` | Update `tryExecuteDAG` to use `ShouldDispatchToCoordinator` | +| `internal/runtime/executor/dag_runner.go` | Update `SubDAGExecutor.Execute` to use `ShouldDispatchToCoordinator` | +| `docs/features/distributed-execution.md` | Document `defaultExecutionMode` and `workerSelector: local` | +| `docs/features/worker-labels.md` | Add `workerSelector: local` to matching rules | +| `docs/reference/yaml.md` | Update `workerSelector` type and add example | +| `docs/configurations/base-config.md` | Add `workerSelector: local` example | +| `docs/overview/architecture.md` | Mention `defaultExecutionMode` in task routing | +| `docs/writing-workflows/examples.md` | Add `workerSelector: local` example | + +## Consistency + +After this change, every execution path uses `ShouldDispatchToCoordinator`: + +| Code Path | Uses shared logic | Respects ForceLocal | Respects defaultMode | +|---|---|---|---| +| Scheduler | Yes | Yes | Yes | +| CLI | Yes | Yes | Yes | +| API — RetryDAGRun | Yes | Yes | Yes | +| API — startDAGRunWithOptions | Yes | Yes | Yes | +| API — enqueueDAGRun | Yes | Yes | Yes | +| Sub-DAG executor | Yes | Yes | Yes | + +## Decision Matrix + +| `dag.ForceLocal` | `coordinatorCli` | `defaultExecutionMode` | `len(WorkerSelector)` | Result | +|---|---|---|---|---| +| `true` | any | any | any | **local** | +| `false` | `nil` | any | any | **local** | +| `false` | present | any | `> 0` | **coordinator** | +| `false` | present | `distributed` | `0` | **coordinator** | +| `false` | present | `local` | `0` | **local** | + +## Design Decisions + +1. **Single decision function** — `ShouldDispatchToCoordinator` is the only place dispatch logic lives. Every call site delegates to it, eliminating inconsistencies between paths. + +2. **`defaultExecutionMode` defaults to `local`** — Existing deployments (with or without a coordinator) are completely unaffected. Users must explicitly opt in to `distributed`. + +3. **`workerSelector: local` as escape hatch** — Reuses the existing YAML field rather than adding a new top-level key. The string `"local"` is unambiguous and won't collide with label maps. + +4. **`ForceLocal` on `core.DAG`, not on the spec** — The spec field stays flexible (`any`), while the runtime struct has a clean boolean. Parsing happens once in the builder. + +5. **Error when coordinator is not configured** — If `ShouldDispatchToCoordinator` returns `true` but no coordinator client exists, return `503 Service Unavailable` rather than silently falling back. Silent fallback would mask misconfiguration. + +6. **Tasks without WorkerSelector get no label filter** — When `defaultExecutionMode: distributed` dispatches a DAG that has no `workerSelector` labels, the task is created without `WithWorkerSelector`. The coordinator assigns it to any available worker. + +## Migration + +| Scenario | Before | After | +|---|---|---| +| No coordinator configured | All DAGs run locally | No change (default mode is `local`) | +| Coordinator + DAGs with `workerSelector` | Scheduled/CLI/retry dispatch; **UI runs locally (bug)** | All paths dispatch correctly | +| Coordinator + want all DAGs distributed | Must add `workerSelector` to every DAG | Set `defaultExecutionMode: distributed` once | +| Coordinator + distributed default + one DAG must stay local | Not possible | Add `workerSelector: local` to that DAG |