Skip to content

Commit c19a003

Browse files
authored
fix: correct tag-base search for dag-runs impl (#1586)
* **New Features** * Added tags support for DAG runs—view and filter DAG runs by tags using AND logic matching. * Introduced dedicated log download endpoints for DAG runs, steps, and sub-DAG runs. * **Bug Fixes** * Improved command parsing validation with clearer error messages for invalid command array formats.
1 parent 621444e commit c19a003

File tree

15 files changed

+678
-315
lines changed

15 files changed

+678
-315
lines changed

api/v2/api.gen.go

Lines changed: 221 additions & 215 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v2/api.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4138,6 +4138,11 @@ components:
41384138
workerId:
41394139
type: string
41404140
description: "ID of the worker that executed this DAG-run ('local' for local execution)"
4141+
tags:
4142+
type: array
4143+
items:
4144+
type: string
4145+
description: "List of tags for categorizing and filtering DAG runs"
41414146
required:
41424147
- rootDAGRunName
41434148
- rootDAGRunId

internal/cmd/migrator.go

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -254,19 +254,29 @@ func (m *historyMigrator) convertStatus(legacy *legacymodel.Status, dag *core.DA
254254
createdAt = startedAt.UnixMilli()
255255
}
256256

257+
// Set metadata from DAG if available
258+
var tags []string
259+
var preconditions []*core.Condition
260+
if dag != nil {
261+
tags = dag.Tags
262+
preconditions = dag.Preconditions
263+
}
264+
257265
status := &exec.DAGRunStatus{
258-
Name: legacy.Name,
259-
DAGRunID: legacy.RequestID,
260-
Status: legacy.Status,
261-
PID: exec.PID(legacy.PID),
262-
Log: legacy.Log,
263-
Nodes: make([]*exec.Node, 0),
264-
Params: legacy.Params,
265-
ParamsList: legacy.ParamsList,
266-
CreatedAt: createdAt,
267-
StartedAt: formatTime(startedAt),
268-
FinishedAt: formatTime(finishedAt),
269-
QueuedAt: formatTime(startedAt), // Use StartedAt as QueuedAt for migration
266+
Name: legacy.Name,
267+
DAGRunID: legacy.RequestID,
268+
Status: legacy.Status,
269+
PID: exec.PID(legacy.PID),
270+
Log: legacy.Log,
271+
Nodes: make([]*exec.Node, 0),
272+
Params: legacy.Params,
273+
ParamsList: legacy.ParamsList,
274+
CreatedAt: createdAt,
275+
StartedAt: formatTime(startedAt),
276+
FinishedAt: formatTime(finishedAt),
277+
QueuedAt: formatTime(startedAt), // Use StartedAt as QueuedAt for migration
278+
Tags: tags,
279+
Preconditions: preconditions,
270280
}
271281

272282
// Convert nodes
@@ -288,11 +298,6 @@ func (m *historyMigrator) convertStatus(legacy *legacymodel.Status, dag *core.DA
288298
status.OnCancel = m.convertNode(legacy.OnCancel)
289299
}
290300

291-
// Set preconditions from DAG if available
292-
if dag != nil {
293-
status.Preconditions = dag.Preconditions
294-
}
295-
296301
return status
297302
}
298303

internal/core/exec/runstatus.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ func InitialStatus(dag *core.DAG) DAGRunStatus {
2929
StartedAt: stringutil.FormatTime(time.Time{}),
3030
FinishedAt: stringutil.FormatTime(time.Time{}),
3131
Preconditions: dag.Preconditions,
32+
Tags: dag.Tags,
3233
}
3334
}
3435

@@ -59,6 +60,7 @@ type DAGRunStatus struct {
5960
Params string `json:"params,omitempty"`
6061
ParamsList []string `json:"paramsList,omitempty"`
6162
Preconditions []*core.Condition `json:"preconditions,omitempty"`
63+
Tags []string `json:"tags,omitempty"`
6264
}
6365

6466
// DAGRun returns a reference to the dag-run associated with this status

internal/core/spec/step.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,28 @@ func buildMultipleCommands(val []any, result *core.Step) error {
852852
strVal = tv
853853
case int, int64, uint64, float64, bool:
854854
strVal = fmt.Sprintf("%v", tv)
855+
case map[string]any:
856+
if len(tv) == 1 {
857+
for k, val := range tv {
858+
switch v2 := val.(type) {
859+
case string, int, int64, uint64, float64, bool:
860+
strVal = fmt.Sprintf("%s: %v", k, v2)
861+
default:
862+
// Nested maps or arrays are too complex, fall through to error
863+
return core.NewValidationError(
864+
fmt.Sprintf("command[%d]", i),
865+
v,
866+
fmt.Errorf("command array elements must be strings. If this contains a colon, wrap it in quotes. Got nested %T", v2),
867+
)
868+
}
869+
}
870+
} else {
871+
return core.NewValidationError(
872+
fmt.Sprintf("command[%d]", i),
873+
v,
874+
fmt.Errorf("command array elements must be strings. If this contains a colon, wrap it in quotes"),
875+
)
876+
}
855877
default:
856878
return core.NewValidationError(
857879
fmt.Sprintf("command[%d]", i),

internal/core/spec/step_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1167,8 +1167,16 @@ func TestBuildMultipleCommands(t *testing.T) {
11671167
},
11681168
},
11691169
{
1170-
name: "RejectsMapType",
1170+
name: "AcceptsSingleKeyMap",
11711171
commands: []any{"echo hello", map[string]any{"key": "value"}},
1172+
expectedCommands: []core.CommandEntry{
1173+
{Command: "echo", Args: []string{"hello"}, CmdWithArgs: "echo hello"},
1174+
{Command: "key:", Args: []string{"value"}, CmdWithArgs: "key: value"},
1175+
},
1176+
},
1177+
{
1178+
name: "RejectsMultiKeyMap",
1179+
commands: []any{"echo hello", map[string]any{"key1": "val1", "key2": "val2"}},
11721180
wantErr: true,
11731181
},
11741182
{

internal/intg/mltcmd_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,24 @@ steps:
185185
"STEP2_OUT": "from step2\ndone",
186186
})
187187
})
188+
189+
t.Run("CommandWithColonInArray", func(t *testing.T) {
190+
t.Parallel()
191+
192+
dag := th.DAG(t, `
193+
steps:
194+
- name: colon-test
195+
command:
196+
- echo SATID: 123
197+
output: OUT
198+
`)
199+
agent := dag.Agent()
200+
agent.RunSuccess(t)
201+
dag.AssertLatestStatus(t, core.Succeeded)
202+
dag.AssertOutputs(t, map[string]any{
203+
"OUT": "SATID: 123",
204+
})
205+
})
188206
}
189207

190208
func TestMultipleCommands_Docker(t *testing.T) {

internal/persis/filedagrun/setup_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@ func (th StoreTest) CreateAttempt(t *testing.T, ts time.Time, dagRunID string, s
3939
t.Helper()
4040

4141
dag := th.DAG("test_DAG")
42-
attempt, err := th.Store.CreateAttempt(th.Context, dag.DAG, ts, dagRunID, exec.NewDAGRunAttemptOptions{})
42+
return th.CreateAttemptWithDAG(t, ts, dagRunID, s, dag.DAG)
43+
}
44+
45+
func (th StoreTest) CreateAttemptWithDAG(t *testing.T, ts time.Time, dagRunID string, s core.Status, dag *core.DAG) *Attempt {
46+
t.Helper()
47+
48+
attempt, err := th.Store.CreateAttempt(th.Context, dag, ts, dagRunID, exec.NewDAGRunAttemptOptions{})
4349
require.NoError(t, err)
4450

4551
err = attempt.Open(th.Context)
@@ -49,7 +55,7 @@ func (th StoreTest) CreateAttempt(t *testing.T, ts time.Time, dagRunID string, s
4955
_ = attempt.Close(th.Context)
5056
}()
5157

52-
dagRunStatus := exec.InitialStatus(dag.DAG)
58+
dagRunStatus := exec.InitialStatus(dag)
5359
dagRunStatus.DAGRunID = dagRunID
5460
dagRunStatus.Status = s
5561

internal/persis/filedagrun/store.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,28 @@ func (store *Store) collectStatusesFromRoots(
203203
tag.Error(err))
204204
continue
205205
}
206+
207+
// Filter by tags (AND logic)
208+
if len(opts.Tags) > 0 {
209+
match := true
210+
for _, t := range opts.Tags {
211+
found := false
212+
for _, st := range status.Tags {
213+
if st == t {
214+
found = true
215+
break
216+
}
217+
}
218+
if !found {
219+
match = false
220+
break
221+
}
222+
}
223+
if !match {
224+
continue
225+
}
226+
}
227+
206228
if !hasStatusFilter {
207229
statuses = append(statuses, status)
208230
continue

internal/persis/filedagrun/store_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,59 @@ func TestListStatuses(t *testing.T) {
522522
assert.Equal(t, "dagrun-id-2", statuses[1].DAGRunID)
523523
assert.Equal(t, "dagrun-id-1", statuses[2].DAGRunID)
524524
})
525+
526+
t.Run("FilterByTags", func(t *testing.T) {
527+
th := setupTestStore(t)
528+
529+
ts := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
530+
531+
// Create runs with different tags
532+
run1 := th.DAG("dag1")
533+
run1.Tags = []string{"prod", "batch"}
534+
th.CreateAttemptWithDAG(t, ts, "run-1", core.Succeeded, run1.DAG)
535+
536+
run2 := th.DAG("dag2")
537+
run2.Tags = []string{"prod", "api"}
538+
th.CreateAttemptWithDAG(t, ts, "run-2", core.Succeeded, run2.DAG)
539+
540+
run3 := th.DAG("dag3")
541+
run3.Tags = []string{"dev"}
542+
th.CreateAttemptWithDAG(t, ts, "run-3", core.Succeeded, run3.DAG)
543+
544+
// Filter by tag "prod" (should match run-1 and run-2)
545+
statuses, err := th.Store.ListStatuses(th.Context,
546+
exec.WithTags([]string{"prod"}),
547+
exec.WithFrom(exec.NewUTC(ts)),
548+
)
549+
require.NoError(t, err)
550+
assert.Len(t, statuses, 2)
551+
552+
// Filter by tags "prod" AND "batch" (should match only run-1)
553+
statuses, err = th.Store.ListStatuses(th.Context,
554+
exec.WithTags([]string{"prod", "batch"}),
555+
exec.WithFrom(exec.NewUTC(ts)),
556+
)
557+
require.NoError(t, err)
558+
assert.Len(t, statuses, 1)
559+
assert.Equal(t, "run-1", statuses[0].DAGRunID)
560+
561+
// Filter by tag "dev" (should match only run-3)
562+
statuses, err = th.Store.ListStatuses(th.Context,
563+
exec.WithTags([]string{"dev"}),
564+
exec.WithFrom(exec.NewUTC(ts)),
565+
)
566+
require.NoError(t, err)
567+
assert.Len(t, statuses, 1)
568+
assert.Equal(t, "run-3", statuses[0].DAGRunID)
569+
570+
// Filter by tag "nonexistent" (should match nothing)
571+
statuses, err = th.Store.ListStatuses(th.Context,
572+
exec.WithTags([]string{"nonexistent"}),
573+
exec.WithFrom(exec.NewUTC(ts)),
574+
)
575+
require.NoError(t, err)
576+
assert.Empty(t, statuses)
577+
})
525578
}
526579

527580
func TestLatestStatusTimezone(t *testing.T) {

0 commit comments

Comments
 (0)