Skip to content

Commit 8a67644

Browse files
authored
feat: default execution mode (#1642)
1 parent d3150e7 commit 8a67644

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1293
-578
lines changed

charts/dagu/README.md

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ The chart deploys four components:
3434

3535
- **Coordinator**: gRPC server for distributed task execution (port 50055)
3636
- **Scheduler**: Manages DAG execution schedules (port 8090 for health)
37-
- **Worker**: Executes DAG steps (2 replicas by default)
37+
- **Worker**: Executes DAG steps (configurable pools with independent replicas)
3838
- **UI**: Web interface for managing DAGs (port 8080)
3939

4040
All components share a single PersistentVolumeClaim with `ReadWriteMany` access mode.
@@ -56,7 +56,57 @@ For local single-node clusters that don't support RWX:
5656
helm install dagu charts/dagu \
5757
--set persistence.accessMode=ReadWriteOnce \
5858
--set persistence.skipValidation=true \
59-
--set worker.replicas=1
59+
--set workerPools.general.replicas=1
60+
```
61+
62+
### Worker Pools
63+
64+
Workers are organized into pools. Each pool creates a separate Kubernetes Deployment with its own replicas, labels, resources, and scheduling constraints. DAGs select workers via `workerSelector` labels that match a pool's labels.
65+
66+
```yaml
67+
workerPools:
68+
general:
69+
replicas: 2
70+
labels: {}
71+
resources:
72+
requests:
73+
memory: "128Mi"
74+
cpu: "100m"
75+
limits:
76+
memory: "256Mi"
77+
cpu: "200m"
78+
nodeSelector: {}
79+
tolerations: []
80+
affinity: {}
81+
82+
gpu:
83+
replicas: 1
84+
labels:
85+
gpu: "true"
86+
resources:
87+
requests:
88+
memory: "512Mi"
89+
cpu: "500m"
90+
nvidia.com/gpu: "1"
91+
limits:
92+
memory: "1Gi"
93+
cpu: "1000m"
94+
nvidia.com/gpu: "1"
95+
nodeSelector:
96+
nvidia.com/gpu.present: "true"
97+
tolerations:
98+
- key: nvidia.com/gpu
99+
operator: Exists
100+
effect: NoSchedule
101+
affinity: {}
102+
```
103+
104+
A pool with `labels: {}` (like `general` above) matches any DAG that has no `workerSelector`. To route a DAG to a specific pool, set `workerSelector` in the DAG definition to match the pool's labels:
105+
106+
```yaml
107+
# In your DAG file
108+
workerSelector:
109+
gpu: "true"
60110
```
61111

62112
### Authentication
@@ -101,12 +151,14 @@ scheduler:
101151
memory: "256Mi"
102152
cpu: "250m"
103153
104-
worker:
105-
replicas: 2
106-
resources:
107-
requests:
108-
memory: "128Mi"
109-
cpu: "100m"
154+
workerPools:
155+
general:
156+
replicas: 2
157+
labels: {}
158+
resources:
159+
requests:
160+
memory: "128Mi"
161+
cpu: "100m"
110162
111163
ui:
112164
replicas: 1

charts/dagu/templates/NOTES.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@ Make sure your storage class supports RWX access mode (e.g., NFS, EFS, CephFS).
55

66
Components deployed:
77
Scheduler: {{ .Values.scheduler.replicas }} replica(s)
8-
Worker: {{ .Values.worker.replicas }} replica(s)
8+
Worker Pools:
9+
{{- range $poolName, $pool := .Values.workerPools }}
10+
- {{ $poolName }}: {{ $pool.replicas }} replica(s)
11+
{{- if $pool.labels }} (labels: {{ include "dagu.workerLabels" $pool.labels }}){{- end }}
12+
{{- end }}
913
UI: {{ .Values.ui.replicas }} replica(s)
1014

1115
Access the UI:

charts/dagu/templates/_helpers.tpl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,15 @@ app.kubernetes.io/managed-by: {{ .Release.Service }}
2222
helm.sh/chart: {{ .Chart.Name }}-{{ .Chart.Version | replace "+" "_" }}
2323
{{- end }}
2424

25+
{{- define "dagu.selectorLabels" -}}
26+
app.kubernetes.io/name: {{ include "dagu.name" . }}
27+
app.kubernetes.io/instance: {{ .Release.Name }}
28+
{{- end }}
29+
30+
{{- define "dagu.workerLabels" -}}
31+
{{- $pairs := list -}}
32+
{{- range $key, $value := . -}}
33+
{{- $pairs = append $pairs (printf "%s=%v" $key $value) -}}
34+
{{- end -}}
35+
{{- join "," $pairs -}}
36+
{{- end }}

charts/dagu/templates/configmap.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ data:
1515
host: "0.0.0.0"
1616
port: 8080
1717
apiBasePath: "/api/v1"
18+
defaultExecutionMode: "distributed"
1819
1920
# Coordinator (distributed execution)
2021
coordinator:
Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,46 @@
1+
{{- range $poolName, $pool := .Values.workerPools }}
2+
{{- if not (regexMatch "^[a-z][a-z0-9-]*$" $poolName) }}
3+
{{- fail (printf "invalid workerPool name %q: must match ^[a-z][a-z0-9-]*$" $poolName) }}
4+
{{- end }}
5+
---
16
apiVersion: apps/v1
27
kind: Deployment
38
metadata:
4-
name: {{ include "dagu.fullname" . }}-worker
9+
name: {{ include "dagu.fullname" $ }}-worker-{{ $poolName }}
510
labels:
6-
{{- include "dagu.labels" . | nindent 4 }}
11+
{{- include "dagu.labels" $ | nindent 4 }}
712
app.kubernetes.io/component: worker
13+
dagu.io/worker-pool: {{ $poolName }}
814
spec:
9-
replicas: {{ .Values.worker.replicas }}
15+
replicas: {{ $pool.replicas }}
1016
selector:
1117
matchLabels:
12-
{{- include "dagu.labels" . | nindent 6 }}
18+
{{- include "dagu.selectorLabels" $ | nindent 6 }}
1319
app.kubernetes.io/component: worker
20+
dagu.io/worker-pool: {{ $poolName }}
1421
template:
1522
metadata:
1623
labels:
17-
{{- include "dagu.labels" . | nindent 8 }}
24+
{{- include "dagu.labels" $ | nindent 8 }}
1825
app.kubernetes.io/component: worker
26+
dagu.io/worker-pool: {{ $poolName }}
1927
spec:
2028
# Disable Kubernetes Service env var injection to avoid overriding
2129
# dagu config values (e.g., scheduler.port) with Service URLs
2230
enableServiceLinks: false
2331
containers:
2432
- name: worker
25-
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
26-
imagePullPolicy: {{ .Values.image.pullPolicy }}
33+
image: "{{ $.Values.image.repository }}:{{ $.Values.image.tag }}"
34+
imagePullPolicy: {{ $.Values.image.pullPolicy }}
2735
command:
2836
- dagu
2937
- worker
3038
- --config
3139
- /etc/dagu/dagu.yaml
40+
{{- if $pool.labels }}
41+
- --worker.labels
42+
- {{ include "dagu.workerLabels" $pool.labels | quote }}
43+
{{- end }}
3244
env:
3345
- name: WORKER_ID
3446
valueFrom:
@@ -40,11 +52,24 @@ spec:
4052
- name: config
4153
mountPath: /etc/dagu
4254
resources:
43-
{{- toYaml .Values.worker.resources | nindent 12 }}
55+
{{- toYaml $pool.resources | nindent 12 }}
56+
{{- with $pool.nodeSelector }}
57+
nodeSelector:
58+
{{- toYaml . | nindent 8 }}
59+
{{- end }}
60+
{{- with $pool.tolerations }}
61+
tolerations:
62+
{{- toYaml . | nindent 8 }}
63+
{{- end }}
64+
{{- with $pool.affinity }}
65+
affinity:
66+
{{- toYaml . | nindent 8 }}
67+
{{- end }}
4468
volumes:
4569
- name: data
4670
persistentVolumeClaim:
47-
claimName: {{ include "dagu.fullname" . }}-data
71+
claimName: {{ include "dagu.fullname" $ }}-data
4872
- name: config
4973
configMap:
50-
name: {{ include "dagu.fullname" . }}-config
74+
name: {{ include "dagu.fullname" $ }}-config
75+
{{- end }}

charts/dagu/values.yaml

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,23 @@ coordinator:
2929
memory: "256Mi"
3030
cpu: "200m"
3131

32-
# Worker configuration
33-
worker:
34-
replicas: 2
35-
resources:
36-
requests:
37-
memory: "128Mi"
38-
cpu: "100m"
39-
limits:
40-
memory: "256Mi"
41-
cpu: "200m"
32+
# Worker pool configuration
33+
# Each pool creates a separate Kubernetes Deployment.
34+
# Pool names become part of the Deployment name: <release>-dagu-worker-<poolName>
35+
workerPools:
36+
general:
37+
replicas: 2
38+
labels: {}
39+
resources:
40+
requests:
41+
memory: "128Mi"
42+
cpu: "100m"
43+
limits:
44+
memory: "256Mi"
45+
cpu: "200m"
46+
nodeSelector: {}
47+
tolerations: []
48+
affinity: {}
4249

4350
# UI configuration
4451
ui:

internal/cmd/context.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func (c *Context) NewScheduler() (*scheduler.Scheduler, error) {
335335
}
336336

337337
coordinatorCli := c.NewCoordinatorClient()
338-
de := scheduler.NewDAGExecutor(coordinatorCli, runtime.NewSubCmdBuilder(c.Config))
338+
de := scheduler.NewDAGExecutor(coordinatorCli, runtime.NewSubCmdBuilder(c.Config), c.Config.DefaultExecMode)
339339
m := scheduler.NewEntryReader(c.Config.Paths.DAGsDir, dr, c.DAGRunMgr, de, c.Config.Paths.Executable)
340340
return scheduler.New(c.Config, m, c.DAGRunMgr, c.DAGRunStore, c.QueueStore, c.ProcStore, c.ServiceRegistry, coordinatorCli)
341341
}

internal/cmd/dry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ func runDry(ctx *Context, args []string) error {
8181
ServiceRegistry: ctx.ServiceRegistry,
8282
RootDAGRun: exec.NewDAGRunRef(dag.Name, dagRunID),
8383
PeerConfig: ctx.Config.Core.Peer,
84+
DefaultExecMode: ctx.Config.DefaultExecMode,
8485
},
8586
)
8687

internal/cmd/restart.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ func executeDAGWithRunID(ctx *Context, cli runtime.Manager, dag *core.DAG, dagRu
161161
ServiceRegistry: ctx.ServiceRegistry,
162162
RootDAGRun: exec.NewDAGRunRef(dag.Name, dagRunID),
163163
PeerConfig: ctx.Config.Core.Peer,
164+
DefaultExecMode: ctx.Config.DefaultExecMode,
164165
})
165166

166167
listenSignals(ctx, agentInstance)

internal/cmd/retry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ func executeRetry(ctx *Context, dag *core.DAG, status *exec.DAGRunStatus, rootRu
142142
RootDAGRun: rootRun,
143143
PeerConfig: ctx.Config.Core.Peer,
144144
TriggerType: core.TriggerTypeRetry,
145+
DefaultExecMode: ctx.Config.DefaultExecMode,
145146
},
146147
)
147148

0 commit comments

Comments
 (0)