Skip to content

Commit 6404d0b

Browse files
authored
feat: add DAG run resource limits (#2179)
1 parent 6e8e6c7 commit 6404d0b

23 files changed

Lines changed: 2046 additions & 750 deletions

File tree

api/v1/api.gen.go

Lines changed: 772 additions & 748 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v1/api.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11065,6 +11065,8 @@ components:
1106511065
x-deprecated-reason: "For concurrency control, configure global queues in config.yaml instead"
1106611066
runConfig:
1106711067
$ref: "#/components/schemas/RunConfig"
11068+
resources:
11069+
$ref: "#/components/schemas/DAGResources"
1106811070
required:
1106911071
- name
1107011072

@@ -11396,6 +11398,8 @@ components:
1139611398
type: string
1139711399
runConfig:
1139811400
$ref: "#/components/schemas/RunConfig"
11401+
resources:
11402+
$ref: "#/components/schemas/DAGResources"
1139911403
required:
1140011404
- name
1140111405

@@ -11530,6 +11534,26 @@ components:
1153011534
- disableParamEdit
1153111535
- disableRunIdEdit
1153211536

11537+
DAGResources:
11538+
type: object
11539+
additionalProperties: false
11540+
description: "Resource limits requested for a DAG run"
11541+
properties:
11542+
limits:
11543+
$ref: "#/components/schemas/DAGResourceLimits"
11544+
11545+
DAGResourceLimits:
11546+
type: object
11547+
additionalProperties: false
11548+
description: "CPU and memory limits requested for a DAG run"
11549+
properties:
11550+
cpu:
11551+
type: string
11552+
description: "CPU limit as cores (for example, \"2\" or \"0.5\") or millicores (for example, \"500m\")"
11553+
memory:
11554+
type: string
11555+
description: "Memory limit in bytes or with a unit suffix (for example, \"512Mi\", \"1Gi\", or \"2G\")"
11556+
1153311557
DAGArtifactsConfig:
1153411558
type: object
1153511559
description: "Configuration for DAG run artifact storage"

internal/cmn/schema/dag.schema.json

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,10 @@
758758
"$ref": "#/definitions/container",
759759
"description": "Default container configuration for all steps in the DAG. Steps can override this configuration with their own container settings."
760760
},
761+
"resources": {
762+
"$ref": "#/definitions/resources",
763+
"description": "CPU and memory limits requested for this DAG run. Dagu attempts to enforce these limits where supported and warns while continuing when enforcement is unavailable."
764+
},
761765
"registry_auths": {
762766
"oneOf": [
763767
{
@@ -926,6 +930,44 @@
926930
}
927931
},
928932
"definitions": {
933+
"resources": {
934+
"type": "object",
935+
"additionalProperties": false,
936+
"properties": {
937+
"limits": {
938+
"$ref": "#/definitions/resourceLimits"
939+
}
940+
},
941+
"description": "Resource limits requested for a DAG run."
942+
},
943+
"resourceLimits": {
944+
"type": "object",
945+
"additionalProperties": false,
946+
"properties": {
947+
"cpu": {
948+
"oneOf": [
949+
{
950+
"type": "string",
951+
"pattern": "^0*[1-9][0-9]*m$"
952+
},
953+
{
954+
"type": "string",
955+
"pattern": "^[0-9]+(\\.[0-9]{1,3})?$",
956+
"not": {
957+
"pattern": "^0+(\\.0{1,3})?$"
958+
}
959+
}
960+
],
961+
"description": "CPU limit as cores (e.g., \"2\", \"0.5\") or millicores (e.g., \"500m\")."
962+
},
963+
"memory": {
964+
"type": "string",
965+
"pattern": "^([1-9][0-9]*(\\.[0-9]+)?|0\\.[0-9]*[1-9][0-9]*)([KkMmGgTtPpEe]i?[Bb]?|[Bb])?$",
966+
"description": "Memory limit in bytes or with a unit suffix (e.g., \"512Mi\", \"1Gi\", \"2G\")."
967+
}
968+
},
969+
"description": "CPU and memory limits requested for a DAG run."
970+
},
929971
"stepRetryPolicy": {
930972
"type": "object",
931973
"additionalProperties": false,

internal/cmn/schema/dag_schema_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,6 +1084,106 @@ steps:
10841084
require.Contains(t, err.Error(), "retry_policy")
10851085
}
10861086

1087+
func TestDAGSchemaResources(t *testing.T) {
1088+
t.Parallel()
1089+
1090+
resolved := mustResolveDAGSchema(t)
1091+
1092+
tests := []struct {
1093+
name string
1094+
spec string
1095+
wantErr string
1096+
}{
1097+
{
1098+
name: "ValidLimits",
1099+
spec: `
1100+
name: limited-dag
1101+
resources:
1102+
limits:
1103+
cpu: "500m"
1104+
memory: "512Mi"
1105+
steps:
1106+
- run: echo hi
1107+
`,
1108+
},
1109+
{
1110+
name: "RejectsInvalidCPU",
1111+
spec: `
1112+
name: limited-dag
1113+
resources:
1114+
limits:
1115+
cpu: nope
1116+
steps:
1117+
- run: echo hi
1118+
`,
1119+
wantErr: "resources",
1120+
},
1121+
{
1122+
name: "RejectsSubMilliCPU",
1123+
spec: `
1124+
name: limited-dag
1125+
resources:
1126+
limits:
1127+
cpu: "0.0005"
1128+
steps:
1129+
- run: echo hi
1130+
`,
1131+
wantErr: "resources",
1132+
},
1133+
{
1134+
name: "RejectsFractionalMillicores",
1135+
spec: `
1136+
name: limited-dag
1137+
resources:
1138+
limits:
1139+
cpu: "0.5m"
1140+
steps:
1141+
- run: echo hi
1142+
`,
1143+
wantErr: "resources",
1144+
},
1145+
{
1146+
name: "RejectsInvalidMemory",
1147+
spec: `
1148+
name: limited-dag
1149+
resources:
1150+
limits:
1151+
memory: nope
1152+
steps:
1153+
- run: echo hi
1154+
`,
1155+
wantErr: "resources",
1156+
},
1157+
{
1158+
name: "RejectsUnknownLimitField",
1159+
spec: `
1160+
name: limited-dag
1161+
resources:
1162+
limits:
1163+
gpu: "1"
1164+
steps:
1165+
- run: echo hi
1166+
`,
1167+
wantErr: "resources",
1168+
},
1169+
}
1170+
1171+
for _, tt := range tests {
1172+
t.Run(tt.name, func(t *testing.T) {
1173+
t.Parallel()
1174+
1175+
doc := mustParseYAMLDocument(t, tt.spec)
1176+
err := resolved.Validate(doc)
1177+
if tt.wantErr == "" {
1178+
require.NoError(t, err)
1179+
return
1180+
}
1181+
require.Error(t, err)
1182+
require.Contains(t, err.Error(), tt.wantErr)
1183+
})
1184+
}
1185+
}
1186+
10871187
func TestDAGSchemaStepRetryPolicyRejectsUnknownField(t *testing.T) {
10881188
t.Parallel()
10891189

internal/core/dag.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ type DAG struct {
218218
Container *Container `json:"container,omitempty"`
219219
// RunConfig contains configuration for controlling user interactions during DAG runs.
220220
RunConfig *RunConfig `json:"runConfig,omitempty"`
221+
// Resources contains CPU and memory limits requested for this DAG run.
222+
Resources *Resources `json:"resources,omitempty"`
221223
// Webhook contains DAG-level webhook trigger behavior configuration.
222224
Webhook *WebhookConfig `json:"webhook,omitempty"`
223225
// RegistryAuths maps registry hostnames to authentication configs.
@@ -375,6 +377,9 @@ func (d *DAG) Clone() *DAG {
375377
artifactsCopy := *d.Artifacts
376378
clone.Artifacts = &artifactsCopy
377379
}
380+
if d.Resources != nil {
381+
clone.Resources = d.Resources.Clone()
382+
}
378383
if d.Webhook != nil {
379384
webhookCopy := *d.Webhook
380385
webhookCopy.ForwardHeaders = append([]string(nil), d.Webhook.ForwardHeaders...)

0 commit comments

Comments
 (0)