Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cmd/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (c *Context) NewScheduler() (*scheduler.Scheduler, error) {
}

coordinatorCli := c.NewCoordinatorClient()
de := scheduler.NewDAGExecutor(coordinatorCli, runtime.NewSubCmdBuilder(c.Config))
de := scheduler.NewDAGExecutor(coordinatorCli, runtime.NewSubCmdBuilder(c.Config), c.Config.DefaultExecMode)
m := scheduler.NewEntryReader(c.Config.Paths.DAGsDir, dr, c.DAGRunMgr, de, c.Config.Paths.Executable)
return scheduler.New(c.Config, m, c.DAGRunMgr, c.DAGRunStore, c.QueueStore, c.ProcStore, c.ServiceRegistry, coordinatorCli)
}
Expand Down
1 change: 1 addition & 0 deletions internal/cmd/dry.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func runDry(ctx *Context, args []string) error {
ServiceRegistry: ctx.ServiceRegistry,
RootDAGRun: exec.NewDAGRunRef(dag.Name, dagRunID),
PeerConfig: ctx.Config.Core.Peer,
DefaultExecMode: ctx.Config.DefaultExecMode,
},
)

Expand Down
1 change: 1 addition & 0 deletions internal/cmd/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func executeDAGWithRunID(ctx *Context, cli runtime.Manager, dag *core.DAG, dagRu
ServiceRegistry: ctx.ServiceRegistry,
RootDAGRun: exec.NewDAGRunRef(dag.Name, dagRunID),
PeerConfig: ctx.Config.Core.Peer,
DefaultExecMode: ctx.Config.DefaultExecMode,
})

listenSignals(ctx, agentInstance)
Expand Down
1 change: 1 addition & 0 deletions internal/cmd/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func executeRetry(ctx *Context, dag *core.DAG, status *exec.DAGRunStatus, rootRu
RootDAGRun: rootRun,
PeerConfig: ctx.Config.Core.Peer,
TriggerType: core.TriggerTypeRetry,
DefaultExecMode: ctx.Config.DefaultExecMode,
},
)

Expand Down
12 changes: 6 additions & 6 deletions internal/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,13 @@ var errProcAcquisitionFailed = errors.New("failed to acquire process handle")

// tryExecuteDAG acquires a process handle and executes the DAG.
func tryExecuteDAG(ctx *Context, dag *core.DAG, dagRunID string, root exec.DAGRunRef, workerID string, triggerType core.TriggerType) error {
// Check for workerSelector - dispatch to coordinator for distributed execution
// Skip if already running on a worker (workerID is set via --worker-id flag to a value other than "local")
if len(dag.WorkerSelector) > 0 && workerID == "local" {
// Check for dispatch to coordinator for distributed execution.
// Skip if already running on a worker (workerID != "local").
if workerID == "local" {
coordinatorCli := ctx.NewCoordinatorClient()
if coordinatorCli == nil {
return fmt.Errorf("coordinator required for DAG with workerSelector; configure peer settings")
if core.ShouldDispatchToCoordinator(dag, coordinatorCli != nil, ctx.Config.DefaultExecMode) {
return dispatchToCoordinatorAndWait(ctx, dag, dagRunID, coordinatorCli)
}
return dispatchToCoordinatorAndWait(ctx, dag, dagRunID, coordinatorCli)
}

if err := ctx.ProcStore.Lock(ctx, dag.ProcGroup()); err != nil {
Expand Down Expand Up @@ -400,6 +399,7 @@ func executeDAGRun(ctx *Context, d *core.DAG, parent exec.DAGRunRef, dagRunID st
RootDAGRun: root,
PeerConfig: ctx.Config.Core.Peer,
TriggerType: triggerType,
DefaultExecMode: ctx.Config.DefaultExecMode,
},
)

Expand Down
48 changes: 35 additions & 13 deletions internal/cmn/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ import (

// Config holds the overall configuration for the application.
type Config struct {
Core Core
Server Server
Paths PathsConfig
UI UI
Queues Queues
Coordinator Coordinator
Worker Worker
Scheduler Scheduler
Monitoring MonitoringConfig
Cache CacheMode
GitSync GitSyncConfig
Tunnel TunnelConfig
Warnings []string
Core Core
Server Server
Paths PathsConfig
UI UI
Queues Queues
Coordinator Coordinator
Worker Worker
Scheduler Scheduler
Monitoring MonitoringConfig
DefaultExecMode ExecutionMode
Cache CacheMode
GitSync GitSyncConfig
Tunnel TunnelConfig
Warnings []string
}

// GitSyncConfig holds the configuration for Git sync functionality.
Expand Down Expand Up @@ -84,6 +85,14 @@ type TunnelRateLimitConfig struct {

const TunnelProviderTailscale = "tailscale"

// ExecutionMode represents the default execution mode for DAGs.
type ExecutionMode string

const (
ExecutionModeLocal ExecutionMode = "local"
ExecutionModeDistributed ExecutionMode = "distributed"
)

// MonitoringConfig holds the configuration for system monitoring.
// Memory usage: ~4 metrics * (retention / interval) * 16 bytes per point.
type MonitoringConfig struct {
Expand Down Expand Up @@ -353,6 +362,9 @@ func (c *Config) Validate() error {
if err := c.validateBuiltinAuth(); err != nil {
return err
}
if err := c.validateExecutionMode(); err != nil {
return err
}
if err := c.validateGitSync(); err != nil {
return err
}
Expand Down Expand Up @@ -437,6 +449,16 @@ func (c *Config) validateOIDCForBuiltin() error {
return nil
}

// validateExecutionMode validates the default execution mode.
func (c *Config) validateExecutionMode() error {
switch c.DefaultExecMode {
case ExecutionModeLocal, ExecutionModeDistributed:
return nil
default:
return fmt.Errorf("invalid defaultExecutionMode: %q (must be one of: local, distributed)", c.DefaultExecMode)
}
}

// validateGitSync validates the Git sync configuration.
func (c *Config) validateGitSync() error {
if !c.GitSync.Enabled {
Expand Down
Loading
Loading