Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ This configuration is used for worker instances that execute DAGs. See the [Dist

| Environment Variable | Default | Description |
|---------------------|---------|-------------|
| `DAGU_COORDINATOR_ENABLED` | `true` | Enable coordinator service |
| `DAGU_COORDINATOR_HOST` | `127.0.0.1` | Coordinator gRPC server bind address |
| `DAGU_COORDINATOR_ADVERTISE` | (auto) | Address to advertise in service registry (default: hostname) |
| `DAGU_COORDINATOR_PORT` | `50055` | Coordinator gRPC server port |
Expand Down
5 changes: 5 additions & 0 deletions internal/cmd/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,12 @@ func (c *Context) NewServer(rs *resource.Service, opts ...frontend.ServerOption)
}

// NewCoordinatorClient creates a new coordinator client using the global peer configuration.
// Returns nil when the coordinator is disabled via configuration.
func (c *Context) NewCoordinatorClient() coordinator.Client {
if !c.Config.Coordinator.Enabled {
return nil
}

coordinatorCliCfg := coordinator.DefaultConfig()
coordinatorCliCfg.CAFile = c.Config.Core.Peer.ClientCaFile
coordinatorCliCfg.CertFile = c.Config.Core.Peer.CertFile
Expand Down
62 changes: 38 additions & 24 deletions internal/cmd/startall.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/dagu-org/dagu/internal/cmn/logger"
"github.com/dagu-org/dagu/internal/cmn/logger/tag"
"github.com/dagu-org/dagu/internal/service/coordinator"
"github.com/dagu-org/dagu/internal/service/resource"
"github.com/spf13/cobra"
)
Expand All @@ -26,9 +27,8 @@ and optionally 'dagu coordinator' into a single process. The web UI provides the
interface, the scheduler handles automated DAG-run execution based on defined schedules,
and the coordinator (when enabled) manages distributed task execution across workers.

By default, start-all runs in single instance mode without the coordinator. The coordinator
is only started when --coordinator.host is set to a non-localhost address (not 127.0.0.1
or localhost), enabling distributed execution mode.
The coordinator is enabled by default and can be disabled by setting coordinator.enabled=false
in the config file or DAGU_COORDINATOR_ENABLED=false as an environment variable.

Flags:
--host string Host address to bind the web server to (default: 127.0.0.1)
Expand All @@ -44,10 +44,13 @@ Flags:
--peer.skip-tls-verify Skip TLS certificate verification (insecure)

Example:
# Single instance mode (coordinator disabled)
# Default mode (coordinator enabled)
dagu start-all

# Distributed mode (coordinator enabled)
# Disable coordinator
DAGU_COORDINATOR_ENABLED=false dagu start-all

# Distributed mode with coordinator on all interfaces
dagu start-all --coordinator.host=0.0.0.0 --coordinator.port=50055

# Production with both web and coordinator on all interfaces
Expand Down Expand Up @@ -109,10 +112,17 @@ func runStartAll(ctx *Context, _ []string) error {
return fmt.Errorf("failed to initialize server: %w", err)
}

// Only start coordinator if not bound to localhost
coordinator, coordHandler, err := newCoordinator(ctx, ctx.Config, ctx.ServiceRegistry, ctx.DAGRunStore)
if err != nil {
return fmt.Errorf("failed to initialize coordinator: %w", err)
// Initialize coordinator if enabled
var coord *coordinator.Service
var coordHandler *coordinator.Handler
if ctx.Config.Coordinator.Enabled {
var err error
coord, coordHandler, err = newCoordinator(ctx, ctx.Config, ctx.ServiceRegistry, ctx.DAGRunStore)
if err != nil {
return fmt.Errorf("failed to initialize coordinator: %w", err)
}
} else {
logger.Info(serviceCtx, "Coordinator disabled via configuration")
}

// Start resource monitoring service (starts its own goroutine internally)
Expand All @@ -122,7 +132,7 @@ func runStartAll(ctx *Context, _ []string) error {

// WaitGroup to track all services
var wg sync.WaitGroup
serviceCount := 3 // scheduler + server + coordinator
serviceCount := 3 // scheduler + server + coordinator (max)
errCh := make(chan error, serviceCount)

// Start scheduler
Expand All @@ -138,14 +148,16 @@ func runStartAll(ctx *Context, _ []string) error {
}
})

wg.Go(func() {
if err := coordinator.Start(serviceCtx); err != nil {
select {
case errCh <- fmt.Errorf("coordinator failed: %w", err):
default:
if coord != nil {
wg.Go(func() {
if err := coord.Start(serviceCtx); err != nil {
select {
case errCh <- fmt.Errorf("coordinator failed: %w", err):
default:
}
}
}
})
})
}

// Start server
wg.Go(func() {
Expand Down Expand Up @@ -178,14 +190,16 @@ func runStartAll(ctx *Context, _ []string) error {
logger.Info(ctx, "Stopping all services")

// Stop coordinator first to unregister from service registry
if err := coordinator.Stop(ctx); err != nil {
logger.Error(ctx, "Failed to stop coordinator",
tag.Error(err),
)
if coord != nil {
if err := coord.Stop(ctx); err != nil {
logger.Error(ctx, "Failed to stop coordinator",
tag.Error(err),
)
}
// Clean up coordinator handler resources
coordHandler.WaitZombieDetector()
coordHandler.Close(ctx)
}
// Clean up coordinator handler resources
coordHandler.WaitZombieDetector()
coordHandler.Close(ctx)

// Stop resource service
if err := resourceService.Stop(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/startall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func TestStartAllCommand(t *testing.T) {
t.Run("StartAll", func(t *testing.T) {
th := test.SetupCommand(t)
th := test.SetupCommand(t, test.WithCoordinatorEnabled())
go func() {
time.Sleep(time.Millisecond * 500)
th.Cancel()
Expand Down
1 change: 1 addition & 0 deletions internal/cmn/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ type QueueConfig struct {

// Coordinator represents the coordinator service configuration.
type Coordinator struct {
Enabled bool // Default: true
ID string // Default: hostname@port
Host string // gRPC bind address
Advertise string // Registry address (auto-detected if empty)
Expand Down
1 change: 1 addition & 0 deletions internal/cmn/config/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ type RemoteNodeDef struct {

// CoordinatorDef configures the coordinator service.
type CoordinatorDef struct {
Enabled *bool `mapstructure:"enabled"` // Default: true
Host string `mapstructure:"host"`
Advertise string `mapstructure:"advertise"` // Auto-detected if empty
Port int `mapstructure:"port"`
Expand Down
13 changes: 13 additions & 0 deletions internal/cmn/config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ func (l *ConfigLoader) loadQueuesConfig(cfg *Config, def Definition) {
}

func (l *ConfigLoader) loadCoordinatorConfig(cfg *Config, def Definition) {
cfg.Coordinator.Enabled = l.resolveCoordinatorEnabled(def)

if def.Coordinator == nil {
return
}
Expand All @@ -622,6 +624,16 @@ func (l *ConfigLoader) loadCoordinatorConfig(cfg *Config, def Definition) {
cfg.Coordinator.Port = def.Coordinator.Port
}

func (l *ConfigLoader) resolveCoordinatorEnabled(def Definition) bool {
if l.v.IsSet("coordinator.enabled") {
return l.v.GetBool("coordinator.enabled")
}
if def.Coordinator != nil && def.Coordinator.Enabled != nil {
return *def.Coordinator.Enabled
}
return true // Default: enabled
}

func (l *ConfigLoader) loadWorkerConfig(cfg *Config, def Definition) {
if def.Worker != nil {
cfg.Worker.ID = def.Worker.ID
Expand Down Expand Up @@ -1269,6 +1281,7 @@ var envBindings = []envBinding{
{key: "queues.enabled", env: "QUEUE_ENABLED"},

// Coordinator
{key: "coordinator.enabled", env: "COORDINATOR_ENABLED"},
{key: "coordinator.host", env: "COORDINATOR_HOST"},
{key: "coordinator.advertise", env: "COORDINATOR_ADVERTISE"},
{key: "coordinator.port", env: "COORDINATOR_PORT"},
Expand Down
38 changes: 36 additions & 2 deletions internal/cmn/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func TestLoad_Env(t *testing.T) {
},
Queues: Queues{Enabled: false},
Coordinator: Coordinator{
Enabled: true,
Host: "0.0.0.0",
Advertise: "dagu-coordinator",
Port: 50099,
Expand Down Expand Up @@ -459,8 +460,9 @@ scheduler:
},
},
Coordinator: Coordinator{
Host: "coordinator.example.com",
Port: 8081,
Enabled: true,
Host: "coordinator.example.com",
Port: 8081,
},
Worker: Worker{
ID: "worker-1",
Expand Down Expand Up @@ -1235,6 +1237,38 @@ audit:
})
}

func TestLoad_Coordinator(t *testing.T) {
t.Run("CoordinatorDefault", func(t *testing.T) {
cfg := loadFromYAML(t, "# empty")
assert.True(t, cfg.Coordinator.Enabled)
})

t.Run("CoordinatorDisabled", func(t *testing.T) {
cfg := loadFromYAML(t, `
coordinator:
enabled: false
`)
assert.False(t, cfg.Coordinator.Enabled)
})

t.Run("CoordinatorDisabledFromEnv", func(t *testing.T) {
cfg := loadWithEnv(t, "# empty", map[string]string{
"DAGU_COORDINATOR_ENABLED": "false",
})
assert.False(t, cfg.Coordinator.Enabled)
})

t.Run("CoordinatorEnvOverridesYAML", func(t *testing.T) {
cfg := loadWithEnv(t, `
coordinator:
enabled: true
`, map[string]string{
"DAGU_COORDINATOR_ENABLED": "false",
})
assert.False(t, cfg.Coordinator.Enabled)
})
}

func TestLoad_TunnelConfig(t *testing.T) {
t.Run("TunnelDefault", func(t *testing.T) {
cfg := loadFromYAML(t, "# empty")
Expand Down
4 changes: 4 additions & 0 deletions internal/cmn/schema/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,10 @@
"description": "Coordinator service configuration for distributed execution.",
"additionalProperties": false,
"properties": {
"enabled": {
"type": "boolean",
"description": "Enable the coordinator service for distributed execution. Default: true."
},
"host": {
"type": "string",
"description": "Coordinator bind address."
Expand Down
27 changes: 21 additions & 6 deletions internal/test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ func WithServerConfig(cfg *config.Server) HelperOption {
}
}

// WithCoordinatorEnabled re-enables the coordinator in test configuration.
// By default, tests disable the coordinator since no coordinator is running.
func WithCoordinatorEnabled() HelperOption {
return WithConfigMutator(func(cfg *config.Config) {
cfg.Coordinator.Enabled = true
})
}

// WithConfigMutator applies mutations to the loaded configuration after defaults are set.
func WithConfigMutator(mutator func(*config.Config)) HelperOption {
return func(opts *Options) {
Expand Down Expand Up @@ -165,6 +173,7 @@ func Setup(t *testing.T, opts ...HelperOption) Helper {
cfg.Paths.UsersDir = filepath.Join(dataDir, "users")
cfg.Paths.SuspendFlagsDir = filepath.Join(tmpDir, "suspend-flags")
cfg.Paths.AdminLogsDir = filepath.Join(tmpDir, "admin-logs")
cfg.Coordinator.Enabled = false
if options.DAGsDir != "" {
cfg.Paths.DAGsDir = options.DAGsDir
}
Expand Down Expand Up @@ -312,13 +321,19 @@ func writeHelperConfigFile(t *testing.T, cfg *config.Config, configPath string)
configData["scheduler"] = scheduler
}

if cfg.Coordinator.Host != "" || cfg.Coordinator.Advertise != "" || cfg.Coordinator.Port != 0 {
configData["coordinator"] = map[string]any{
"host": cfg.Coordinator.Host,
"advertise": cfg.Coordinator.Advertise,
"port": cfg.Coordinator.Port,
}
coordData := map[string]any{
"enabled": cfg.Coordinator.Enabled,
}
if cfg.Coordinator.Host != "" {
coordData["host"] = cfg.Coordinator.Host
}
if cfg.Coordinator.Advertise != "" {
coordData["advertise"] = cfg.Coordinator.Advertise
}
if cfg.Coordinator.Port != 0 {
coordData["port"] = cfg.Coordinator.Port
}
configData["coordinator"] = coordData

if cfg.Worker.ID != "" || cfg.Worker.MaxActiveRuns != 0 || len(cfg.Worker.Labels) > 0 {
configData["worker"] = map[string]any{
Expand Down
Loading