Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
436 changes: 221 additions & 215 deletions api/v2/api.gen.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions api/v2/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4138,6 +4138,11 @@ components:
workerId:
type: string
description: "ID of the worker that executed this DAG-run ('local' for local execution)"
tags:
type: array
items:
type: string
description: "List of tags for categorizing and filtering DAG runs"
required:
- rootDAGRunName
- rootDAGRunId
Expand Down
39 changes: 22 additions & 17 deletions internal/cmd/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,19 +254,29 @@ func (m *historyMigrator) convertStatus(legacy *legacymodel.Status, dag *core.DA
createdAt = startedAt.UnixMilli()
}

// Set metadata from DAG if available
var tags []string
var preconditions []*core.Condition
if dag != nil {
tags = dag.Tags
preconditions = dag.Preconditions
}

status := &exec.DAGRunStatus{
Name: legacy.Name,
DAGRunID: legacy.RequestID,
Status: legacy.Status,
PID: exec.PID(legacy.PID),
Log: legacy.Log,
Nodes: make([]*exec.Node, 0),
Params: legacy.Params,
ParamsList: legacy.ParamsList,
CreatedAt: createdAt,
StartedAt: formatTime(startedAt),
FinishedAt: formatTime(finishedAt),
QueuedAt: formatTime(startedAt), // Use StartedAt as QueuedAt for migration
Name: legacy.Name,
DAGRunID: legacy.RequestID,
Status: legacy.Status,
PID: exec.PID(legacy.PID),
Log: legacy.Log,
Nodes: make([]*exec.Node, 0),
Params: legacy.Params,
ParamsList: legacy.ParamsList,
CreatedAt: createdAt,
StartedAt: formatTime(startedAt),
FinishedAt: formatTime(finishedAt),
QueuedAt: formatTime(startedAt), // Use StartedAt as QueuedAt for migration
Tags: tags,
Preconditions: preconditions,
}

// Convert nodes
Expand All @@ -288,11 +298,6 @@ func (m *historyMigrator) convertStatus(legacy *legacymodel.Status, dag *core.DA
status.OnCancel = m.convertNode(legacy.OnCancel)
}

// Set preconditions from DAG if available
if dag != nil {
status.Preconditions = dag.Preconditions
}

return status
}

Expand Down
2 changes: 2 additions & 0 deletions internal/core/exec/runstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func InitialStatus(dag *core.DAG) DAGRunStatus {
StartedAt: stringutil.FormatTime(time.Time{}),
FinishedAt: stringutil.FormatTime(time.Time{}),
Preconditions: dag.Preconditions,
Tags: dag.Tags,
}
}

Expand Down Expand Up @@ -59,6 +60,7 @@ type DAGRunStatus struct {
Params string `json:"params,omitempty"`
ParamsList []string `json:"paramsList,omitempty"`
Preconditions []*core.Condition `json:"preconditions,omitempty"`
Tags []string `json:"tags,omitempty"`
}

// DAGRun returns a reference to the dag-run associated with this status
Expand Down
22 changes: 22 additions & 0 deletions internal/core/spec/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,28 @@ func buildMultipleCommands(val []any, result *core.Step) error {
strVal = tv
case int, int64, uint64, float64, bool:
strVal = fmt.Sprintf("%v", tv)
case map[string]any:
if len(tv) == 1 {
for k, val := range tv {
switch v2 := val.(type) {
case string, int, int64, uint64, float64, bool:
strVal = fmt.Sprintf("%s: %v", k, v2)
default:
// Nested maps or arrays are too complex, fall through to error
return core.NewValidationError(
fmt.Sprintf("command[%d]", i),
v,
fmt.Errorf("command array elements must be strings. If this contains a colon, wrap it in quotes. Got nested %T", v2),
)
}
}
} else {
return core.NewValidationError(
fmt.Sprintf("command[%d]", i),
v,
fmt.Errorf("command array elements must be strings. If this contains a colon, wrap it in quotes"),
)
}
default:
return core.NewValidationError(
fmt.Sprintf("command[%d]", i),
Expand Down
10 changes: 9 additions & 1 deletion internal/core/spec/step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,8 +1167,16 @@ func TestBuildMultipleCommands(t *testing.T) {
},
},
{
name: "RejectsMapType",
name: "AcceptsSingleKeyMap",
commands: []any{"echo hello", map[string]any{"key": "value"}},
expectedCommands: []core.CommandEntry{
{Command: "echo", Args: []string{"hello"}, CmdWithArgs: "echo hello"},
{Command: "key:", Args: []string{"value"}, CmdWithArgs: "key: value"},
},
},
{
name: "RejectsMultiKeyMap",
commands: []any{"echo hello", map[string]any{"key1": "val1", "key2": "val2"}},
wantErr: true,
},
{
Expand Down
18 changes: 18 additions & 0 deletions internal/intg/mltcmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,24 @@ steps:
"STEP2_OUT": "from step2\ndone",
})
})

t.Run("CommandWithColonInArray", func(t *testing.T) {
t.Parallel()

dag := th.DAG(t, `
steps:
- name: colon-test
command:
- echo SATID: 123
output: OUT
`)
agent := dag.Agent()
agent.RunSuccess(t)
dag.AssertLatestStatus(t, core.Succeeded)
dag.AssertOutputs(t, map[string]any{
"OUT": "SATID: 123",
})
})
}

func TestMultipleCommands_Docker(t *testing.T) {
Expand Down
10 changes: 8 additions & 2 deletions internal/persis/filedagrun/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ func (th StoreTest) CreateAttempt(t *testing.T, ts time.Time, dagRunID string, s
t.Helper()

dag := th.DAG("test_DAG")
attempt, err := th.Store.CreateAttempt(th.Context, dag.DAG, ts, dagRunID, exec.NewDAGRunAttemptOptions{})
return th.CreateAttemptWithDAG(t, ts, dagRunID, s, dag.DAG)
}

func (th StoreTest) CreateAttemptWithDAG(t *testing.T, ts time.Time, dagRunID string, s core.Status, dag *core.DAG) *Attempt {
t.Helper()

attempt, err := th.Store.CreateAttempt(th.Context, dag, ts, dagRunID, exec.NewDAGRunAttemptOptions{})
require.NoError(t, err)

err = attempt.Open(th.Context)
Expand All @@ -49,7 +55,7 @@ func (th StoreTest) CreateAttempt(t *testing.T, ts time.Time, dagRunID string, s
_ = attempt.Close(th.Context)
}()

dagRunStatus := exec.InitialStatus(dag.DAG)
dagRunStatus := exec.InitialStatus(dag)
dagRunStatus.DAGRunID = dagRunID
dagRunStatus.Status = s

Expand Down
22 changes: 22 additions & 0 deletions internal/persis/filedagrun/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,28 @@ func (store *Store) collectStatusesFromRoots(
tag.Error(err))
continue
}

// Filter by tags (AND logic)
if len(opts.Tags) > 0 {
match := true
for _, t := range opts.Tags {
found := false
for _, st := range status.Tags {
if st == t {
found = true
break
}
}
if !found {
match = false
break
}
}
if !match {
continue
}
}

if !hasStatusFilter {
statuses = append(statuses, status)
continue
Expand Down
53 changes: 53 additions & 0 deletions internal/persis/filedagrun/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,59 @@ func TestListStatuses(t *testing.T) {
assert.Equal(t, "dagrun-id-2", statuses[1].DAGRunID)
assert.Equal(t, "dagrun-id-1", statuses[2].DAGRunID)
})

t.Run("FilterByTags", func(t *testing.T) {
th := setupTestStore(t)

ts := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)

// Create runs with different tags
run1 := th.DAG("dag1")
run1.Tags = []string{"prod", "batch"}
th.CreateAttemptWithDAG(t, ts, "run-1", core.Succeeded, run1.DAG)

run2 := th.DAG("dag2")
run2.Tags = []string{"prod", "api"}
th.CreateAttemptWithDAG(t, ts, "run-2", core.Succeeded, run2.DAG)

run3 := th.DAG("dag3")
run3.Tags = []string{"dev"}
th.CreateAttemptWithDAG(t, ts, "run-3", core.Succeeded, run3.DAG)

// Filter by tag "prod" (should match run-1 and run-2)
statuses, err := th.Store.ListStatuses(th.Context,
exec.WithTags([]string{"prod"}),
exec.WithFrom(exec.NewUTC(ts)),
)
require.NoError(t, err)
assert.Len(t, statuses, 2)

// Filter by tags "prod" AND "batch" (should match only run-1)
statuses, err = th.Store.ListStatuses(th.Context,
exec.WithTags([]string{"prod", "batch"}),
exec.WithFrom(exec.NewUTC(ts)),
)
require.NoError(t, err)
assert.Len(t, statuses, 1)
assert.Equal(t, "run-1", statuses[0].DAGRunID)

// Filter by tag "dev" (should match only run-3)
statuses, err = th.Store.ListStatuses(th.Context,
exec.WithTags([]string{"dev"}),
exec.WithFrom(exec.NewUTC(ts)),
)
require.NoError(t, err)
assert.Len(t, statuses, 1)
assert.Equal(t, "run-3", statuses[0].DAGRunID)

// Filter by tag "nonexistent" (should match nothing)
statuses, err = th.Store.ListStatuses(th.Context,
exec.WithTags([]string{"nonexistent"}),
exec.WithFrom(exec.NewUTC(ts)),
)
require.NoError(t, err)
assert.Empty(t, statuses)
})
}

func TestLatestStatusTimezone(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions internal/service/coordinator/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (h *Handler) createAttemptForTask(ctx context.Context, task *coordinatorv1.
return fmt.Errorf("failed to open attempt: %w", err)
}

h.writeInitialStatus(ctx, attempt, dag.Name, task.DagRunId, task.AttemptKey, exec.DAGRunRef{})
h.writeInitialStatus(ctx, attempt, dag.Name, task.DagRunId, task.AttemptKey, exec.DAGRunRef{}, dag.Tags)

h.attemptsMu.Lock()
h.openAttempts[task.DagRunId] = attempt
Expand Down Expand Up @@ -367,7 +367,7 @@ func (h *Handler) createSubAttemptForTask(ctx context.Context, task *coordinator
return fmt.Errorf("failed to open sub-attempt: %w", err)
}

h.writeInitialStatus(ctx, attempt, task.Target, task.DagRunId, task.AttemptKey, rootRef)
h.writeInitialStatus(ctx, attempt, task.Target, task.DagRunId, task.AttemptKey, rootRef, dag.Tags)

h.attemptsMu.Lock()
h.openAttempts[task.DagRunId] = attempt
Expand All @@ -385,7 +385,7 @@ func (h *Handler) createSubAttemptForTask(ctx context.Context, task *coordinator

// writeInitialStatus writes an initial NotStarted status to the attempt.
// This ensures the status file is not empty when read before the worker reports its first status.
func (h *Handler) writeInitialStatus(ctx context.Context, attempt exec.DAGRunAttempt, dagName, dagRunID, attemptKey string, root exec.DAGRunRef) {
func (h *Handler) writeInitialStatus(ctx context.Context, attempt exec.DAGRunAttempt, dagName, dagRunID, attemptKey string, root exec.DAGRunRef, tags []string) {
initialStatus := exec.DAGRunStatus{
Name: dagName,
DAGRunID: dagRunID,
Expand All @@ -394,6 +394,7 @@ func (h *Handler) writeInitialStatus(ctx context.Context, attempt exec.DAGRunAtt
Status: core.NotStarted,
StartedAt: time.Now().UTC().Format(time.RFC3339),
Root: root,
Tags: tags,
}
if err := attempt.Write(ctx, initialStatus); err != nil {
logger.Warn(ctx, "Failed to write initial status", tag.Error(err), tag.RunID(dagRunID))
Expand Down
Loading
Loading