Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
724 changes: 441 additions & 283 deletions api/v2/api.gen.go

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions api/v2/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2327,6 +2327,49 @@ paths:
schema:
$ref: "#/components/schemas/Error"

/dag-runs/{name}/{dagRunId}/sub-dag-runs/{subDAGRunId}/spec:
get:
summary: "Get Sub-DAG Run Specification"
description: "Returns the YAML specification used for a specific sub-DAG run"
operationId: "getSubDAGRunSpec"
tags:
- "dag-runs"
parameters:
- $ref: "#/components/parameters/RemoteNode"
- $ref: "#/components/parameters/DAGName"
- $ref: "#/components/parameters/DAGRunId"
- name: "subDAGRunId"
in: "path"
required: true
schema:
type: "string"
description: "ID of the sub DAG-run to retrieve the spec for"
responses:
"200":
description: "Sub-DAG specification retrieved successfully"
content:
application/json:
schema:
type: object
required:
- spec
properties:
spec:
type: string
description: "YAML specification of the sub-DAG"
"404":
description: "Sub-DAG run not found"
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
default:
description: "Generic error response"
content:
application/json:
schema:
$ref: "#/components/schemas/Error"

/dag-runs/{name}/{dagRunId}/sub-dag-runs/{subDAGRunId}/log:
get:
summary: "Retrieve log for a specific sub DAG-run"
Expand Down
74 changes: 51 additions & 23 deletions internal/service/frontend/api/v2/dagruns.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,46 +1048,37 @@ func (a *API) GetDAGRunSpec(ctx context.Context, request api.GetDAGRunSpecReques
dagName := request.Name
dagRunId := request.DagRunId

var attempt exec.DAGRunAttempt
var err error
var notFoundMsg string

// Handle "latest" by getting the most recent attempt
if dagRunId == "latest" {
attempt, err := a.dagRunStore.LatestAttempt(ctx, dagName)
if err != nil {
return &api.GetDAGRunSpec404JSONResponse{
Code: api.ErrorCodeNotFound,
Message: fmt.Sprintf("no dag-runs found for DAG %s", dagName),
}, nil
}
dag, err := attempt.ReadDAG(ctx)
if err != nil || dag == nil || len(dag.YamlData) == 0 {
return &api.GetDAGRunSpec404JSONResponse{
Code: api.ErrorCodeNotFound,
Message: fmt.Sprintf("DAG spec not found for %s", dagName),
}, nil
}
return &api.GetDAGRunSpec200JSONResponse{
Spec: string(dag.YamlData),
}, nil
attempt, err = a.dagRunStore.LatestAttempt(ctx, dagName)
notFoundMsg = fmt.Sprintf("no dag-runs found for DAG %s", dagName)
} else {
// Get spec from the specific DAG-run attempt
attempt, err = a.dagRunStore.FindAttempt(ctx, exec.NewDAGRunRef(dagName, dagRunId))
notFoundMsg = fmt.Sprintf("dag-run ID %s not found for DAG %s", dagRunId, dagName)
}

// Get spec from the specific DAG-run attempt
attempt, err := a.dagRunStore.FindAttempt(ctx, exec.NewDAGRunRef(dagName, dagRunId))
if err != nil {
return &api.GetDAGRunSpec404JSONResponse{
Code: api.ErrorCodeNotFound,
Message: fmt.Sprintf("dag-run ID %s not found for DAG %s", dagRunId, dagName),
Message: notFoundMsg,
}, nil
}

dag, err := attempt.ReadDAG(ctx)
if err != nil || dag == nil || len(dag.YamlData) == 0 {
spec, err := a.getSpecFromAttempt(ctx, attempt)
if err != nil {
return &api.GetDAGRunSpec404JSONResponse{
Code: api.ErrorCodeNotFound,
Message: fmt.Sprintf("DAG spec not found for dag-run %s", dagRunId),
}, nil
}

return &api.GetDAGRunSpec200JSONResponse{
Spec: string(dag.YamlData),
Spec: spec,
}, nil
}

Expand All @@ -1106,6 +1097,43 @@ func (a *API) GetSubDAGRunDetails(ctx context.Context, request api.GetSubDAGRunD
}, nil
}

// GetSubDAGRunSpec implements api.StrictServerInterface.
// This endpoint returns the YAML spec that was used for a specific sub-DAG run.
func (a *API) GetSubDAGRunSpec(ctx context.Context, request api.GetSubDAGRunSpecRequestObject) (api.GetSubDAGRunSpecResponseObject, error) {
root := exec.NewDAGRunRef(request.Name, request.DagRunId)

// Find the sub-DAG run attempt using the existing store method
attempt, err := a.dagRunStore.FindSubAttempt(ctx, root, request.SubDAGRunId)
if err != nil {
return &api.GetSubDAGRunSpec404JSONResponse{
Code: api.ErrorCodeNotFound,
Message: fmt.Sprintf("sub dag-run ID %s not found for DAG %s", request.SubDAGRunId, request.Name),
}, nil
}

spec, err := a.getSpecFromAttempt(ctx, attempt)
if err != nil {
return &api.GetSubDAGRunSpec404JSONResponse{
Code: api.ErrorCodeNotFound,
Message: fmt.Sprintf("DAG spec not found for sub dag-run %s", request.SubDAGRunId),
}, nil
}

return &api.GetSubDAGRunSpec200JSONResponse{
Spec: spec,
}, nil
}

// getSpecFromAttempt reads YAML spec from DAG run attempt.
// Returns spec string and nil error on success, or empty string and error on failure.
func (a *API) getSpecFromAttempt(ctx context.Context, attempt exec.DAGRunAttempt) (string, error) {
dag, err := attempt.ReadDAG(ctx)
if err != nil || dag == nil || len(dag.YamlData) == 0 {
return "", fmt.Errorf("DAG spec not found")
}
return string(dag.YamlData), nil
}

// GetSubDAGRunLog implements api.StrictServerInterface.
func (a *API) GetSubDAGRunLog(ctx context.Context, request api.GetSubDAGRunLogRequestObject) (api.GetSubDAGRunLogResponseObject, error) {
root := exec.NewDAGRunRef(request.Name, request.DagRunId)
Expand Down
93 changes: 93 additions & 0 deletions internal/service/frontend/api/v2/dagruns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,99 @@ func TestGetDAGRunSpecInline(t *testing.T) {
require.Contains(t, specBody.Spec, "echo inline_dag_test")
}

func TestGetSubDAGRunSpec(t *testing.T) {
server := test.SetupServer(t)

// Create a parent DAG with an inline sub-DAG definition
dagSpec := `steps:
- name: call_child
call: child_dag
params: "MSG=hello"
---
name: child_dag
params:
- MSG
steps:
- name: echo_message
command: "echo ${MSG}_from_child"`

// Create the parent DAG
_ = server.Client().Post("/api/v2/dags", api.CreateNewDAGJSONRequestBody{
Name: "parent_dag_for_subdag_spec",
Spec: &dagSpec,
}).ExpectStatus(http.StatusCreated).Send(t)

// Start the parent DAG
startResp := server.Client().Post("/api/v2/dags/parent_dag_for_subdag_spec/start", api.ExecuteDAGJSONRequestBody{}).
ExpectStatus(http.StatusOK).Send(t)

var startBody api.ExecuteDAG200JSONResponse
startResp.Unmarshal(t, &startBody)
require.NotEmpty(t, startBody.DagRunId)

// Wait for the parent DAG to complete
require.Eventually(t, func() bool {
url := fmt.Sprintf("/api/v2/dags/parent_dag_for_subdag_spec/dag-runs/%s", startBody.DagRunId)
statusResp := server.Client().Get(url).Send(t)
if statusResp.Response.StatusCode() != http.StatusOK {
return false
}

var dagRunStatus api.GetDAGDAGRunDetails200JSONResponse
statusResp.Unmarshal(t, &dagRunStatus)
return dagRunStatus.DagRun.Status == api.Status(core.Succeeded)
}, 10*time.Second, 200*time.Millisecond)

// Get the parent DAG run details to extract sub-DAG run ID
detailsResp := server.Client().Get(
fmt.Sprintf("/api/v2/dags/parent_dag_for_subdag_spec/dag-runs/%s", startBody.DagRunId),
).ExpectStatus(http.StatusOK).Send(t)

var detailsBody api.GetDAGDAGRunDetails200JSONResponse
detailsResp.Unmarshal(t, &detailsBody)
require.Len(t, detailsBody.DagRun.Nodes, 1, "Expected 1 node (the call_child step)")

// Extract the sub-DAG run ID from the call step
callNode := detailsBody.DagRun.Nodes[0]
require.Equal(t, "call_child", callNode.Step.Name)
require.NotNil(t, callNode.SubRuns, "Expected SubRuns to be present")
require.Len(t, *callNode.SubRuns, 1, "Expected exactly one sub-DAG run")
subDAGRunID := (*callNode.SubRuns)[0].DagRunId

// Test 1: Fetch the sub-DAG spec successfully
subSpecResp := server.Client().Get(
fmt.Sprintf("/api/v2/dag-runs/parent_dag_for_subdag_spec/%s/sub-dag-runs/%s/spec",
startBody.DagRunId, subDAGRunID),
).ExpectStatus(http.StatusOK).Send(t)

var subSpecBody api.GetSubDAGRunSpec200JSONResponse
subSpecResp.Unmarshal(t, &subSpecBody)
require.NotEmpty(t, subSpecBody.Spec, "Sub-DAG spec should not be empty")
require.Contains(t, subSpecBody.Spec, "child_dag", "Spec should contain child_dag name")
require.Contains(t, subSpecBody.Spec, "echo_message", "Spec should contain echo_message step")
require.Contains(t, subSpecBody.Spec, "echo ${MSG}_from_child", "Spec should contain the command")

// Test 2: 404 for non-existent sub-DAG run ID
_ = server.Client().Get(
fmt.Sprintf("/api/v2/dag-runs/parent_dag_for_subdag_spec/%s/sub-dag-runs/%s/spec",
startBody.DagRunId, "non_existent_sub_dag_id"),
).ExpectStatus(http.StatusNotFound).Send(t)

// Test 3: 404 for non-existent parent DAG
_ = server.Client().Get(
fmt.Sprintf("/api/v2/dag-runs/non_existent_dag/%s/sub-dag-runs/%s/spec",
startBody.DagRunId, subDAGRunID),
).ExpectStatus(http.StatusNotFound).Send(t)

// Test 4: 404 for non-existent parent DAG run ID
_ = server.Client().Get(
fmt.Sprintf("/api/v2/dag-runs/parent_dag_for_subdag_spec/%s/sub-dag-runs/%s/spec",
"non_existent_run_id", subDAGRunID),
).ExpectStatus(http.StatusNotFound).Send(t)
}

func TestApproveDAGRunStep(t *testing.T) {
server := test.SetupServer(t)

Expand Down
8 changes: 7 additions & 1 deletion ui/index.html
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
<!doctype html>
<html lang="en" translate="no">
<html lang="en" translate="no" class="dark" style="background-color: #020617">
<head>
<meta charset="UTF-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="icon" href="/assets/favicon.ico" />
<link rel="preconnect" href="https://fonts.googleapis.com" />
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin />
<link
href="https://fonts.googleapis.com/css2?family=Inter:ital,opsz,wght@0,14..32,100..900;1,14..32,100..900&family=Plus+Jakarta+Sans:ital,wght@0,200..800;1,200..800&display=swap"
rel="stylesheet"
/>
<title>Dagu</title>
<script>
function getConfig() {
Expand Down
Loading
Loading