Skip to content

Validate doesn't detect issues on for_each_task #3977

@idelafuente-oc

Description

@idelafuente-oc

Describe the issue

I was configuring a job and by mistake I configure max_retries on the parent task instead of the for each, run validate and returned no issues, it errored on deploy instead.

Configuration

- task_key: test_parent_ingestion
  max_retries: 3
  for_each_task:
    inputs: ${var.test_objects}
    task:
      task_key: test_loop_ingestion
      notebook_task:
        notebook_path: ../src/01_test.py
        base_parameters:
          test_object: "{{input}}"

Steps to reproduce the behavior

  1. Using a job config similar to what I posted on configuration.
  2. Run databricks bundle validate --target xyz
  3. DAB is validated without problems

Expected Behavior

Warning or failure should be the expected behavior, indicating that the max_retries or any other retry setting should be inside the loop instead of the parent task.

Actual Behavior

Validation OK! message

OS and CLI version

Databricks CLI v0.278.0

Is this a regression?

No

I have played around with some code to address this issue:

package validate

import (
	"context"
	"fmt"

	"github.com/databricks/cli/bundle"
	"github.com/databricks/cli/libs/diag"
	"github.com/databricks/cli/libs/dyn"
	"github.com/databricks/databricks-sdk-go/service/jobs"
)

// ForEachTask validates constraints for for_each_task configuration
func ForEachTask() bundle.ReadOnlyMutator {
	return &forEachTask{}
}

type forEachTask struct{ bundle.RO }

func (v *forEachTask) Name() string {
	return "validate:for_each_task"
}

func (v *forEachTask) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
	diags := diag.Diagnostics{}

	jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"))

	for resourceName, job := range b.Config.Resources.Jobs {
		resourcePath := jobsPath.Append(dyn.Key(resourceName))

		for taskIndex, task := range job.Tasks {
			taskPath := resourcePath.Append(dyn.Key("tasks"), dyn.Index(taskIndex))

			if task.ForEachTask != nil {
				diags = diags.Extend(validateForEachTask(b, task, taskPath))
			}
		}
	}

	return diags
}

func validateForEachTask(b *bundle.Bundle, task jobs.Task, taskPath dyn.Path) diag.Diagnostics {
	diags := diag.Diagnostics{}

	if task.MaxRetries != 0 {
		diags = diags.Append(invalidRetryFieldDiag(b, task, taskPath, "max_retries", diag.Error))
	}

	if task.MinRetryIntervalMillis != 0 {
		diags = diags.Append(invalidRetryFieldDiag(b, task, taskPath, "min_retry_interval_millis", diag.Warning))
	}

	if task.RetryOnTimeout {
		diags = diags.Append(invalidRetryFieldDiag(b, task, taskPath, "retry_on_timeout", diag.Warning))
	}

	return diags
}

func invalidRetryFieldDiag(b *bundle.Bundle, task jobs.Task, taskPath dyn.Path, fieldName string, severity diag.Severity) diag.Diagnostic {
	detail := fmt.Sprintf(
		"Task %q has %s defined at the parent level, but it uses for_each_task.\n"+
			"When using for_each_task, %s must be defined on the nested task (for_each_task.task.%s), not on the parent task.",
		task.TaskKey, fieldName, fieldName, fieldName,
	)

	return diag.Diagnostic{
		Severity:  severity,
		Summary:   fmt.Sprintf("Invalid %s configuration for for_each_task", fieldName),
		Detail:    detail,
		Locations: b.Config.GetLocations(taskPath.String()),
		Paths:     []dyn.Path{taskPath},
	}
}

That would help detect the issue, the test would be:

package validate

import (
	"context"
	"testing"

	"github.com/databricks/cli/bundle"
	"github.com/databricks/cli/bundle/config"
	"github.com/databricks/cli/bundle/config/resources"
	"github.com/databricks/cli/bundle/internal/bundletest"
	"github.com/databricks/cli/libs/diag"
	"github.com/databricks/cli/libs/dyn"
	"github.com/databricks/databricks-sdk-go/service/jobs"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
)

func createBundleWithForEachTask(parentTask jobs.Task) *bundle.Bundle {
	if parentTask.ForEachTask == nil {
		parentTask.ForEachTask = &jobs.ForEachTask{
			Inputs: "[1, 2, 3]",
			Task: jobs.Task{
				TaskKey: "child_task",
				NotebookTask: &jobs.NotebookTask{
					NotebookPath: "test.py",
				},
			},
		}
	}

	b := &bundle.Bundle{
		Config: config.Root{
			Resources: config.Resources{
				Jobs: map[string]*resources.Job{
					"job1": {
						JobSettings: jobs.JobSettings{
							Name:  "My Job",
							Tasks: []jobs.Task{parentTask},
						},
					},
				},
			},
		},
	}

	bundletest.SetLocation(b, "resources.jobs.job1.tasks[0]", []dyn.Location{{File: "job.yml", Line: 1, Column: 1}})
	return b
}

func TestForEachTask_InvalidRetryFields(t *testing.T) {
	tests := []struct {
		name             string
		task             jobs.Task
		expectedSeverity diag.Severity
		expectedSummary  string
		expectedDetail   string
	}{
		{
			name: "max_retries on parent",
			task: jobs.Task{
				TaskKey:    "parent_task",
				MaxRetries: 3,
			},
			expectedSeverity: diag.Error,
			expectedSummary:  "Invalid max_retries configuration for for_each_task",
			expectedDetail:   "max_retries must be defined on the nested task",
		},
		{
			name: "min_retry_interval_millis on parent",
			task: jobs.Task{
				TaskKey:                "parent_task",
				MinRetryIntervalMillis: 1000,
			},
			expectedSeverity: diag.Warning,
			expectedSummary:  "Invalid min_retry_interval_millis configuration for for_each_task",
			expectedDetail:   "min_retry_interval_millis must be defined on the nested task",
		},
		{
			name: "retry_on_timeout on parent",
			task: jobs.Task{
				TaskKey:        "parent_task",
				RetryOnTimeout: true,
			},
			expectedSeverity: diag.Warning,
			expectedSummary:  "Invalid retry_on_timeout configuration for for_each_task",
			expectedDetail:   "retry_on_timeout must be defined on the nested task",
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			ctx := context.Background()
			b := createBundleWithForEachTask(tt.task)

			diags := ForEachTask().Apply(ctx, b)

			require.Len(t, diags, 1)
			assert.Equal(t, tt.expectedSeverity, diags[0].Severity)
			assert.Equal(t, tt.expectedSummary, diags[0].Summary)
			assert.Contains(t, diags[0].Detail, tt.expectedDetail)
		})
	}
}

func TestForEachTask_MultipleRetryFieldsOnParent(t *testing.T) {
	ctx := context.Background()
	b := createBundleWithForEachTask(jobs.Task{
		TaskKey:                "parent_task",
		MaxRetries:             3,
		MinRetryIntervalMillis: 1000,
		RetryOnTimeout:         true,
	})

	diags := ForEachTask().Apply(ctx, b)
	require.Len(t, diags, 3)

	errorCount := 0
	warningCount := 0
	for _, d := range diags {
		if d.Severity == diag.Error {
			errorCount++
		} else if d.Severity == diag.Warning {
			warningCount++
		}
	}
	assert.Equal(t, 1, errorCount)
	assert.Equal(t, 2, warningCount)
}

func TestForEachTask_ValidConfigurationOnChild(t *testing.T) {
	ctx := context.Background()
	b := createBundleWithForEachTask(jobs.Task{
		TaskKey: "parent_task",
		ForEachTask: &jobs.ForEachTask{
			Inputs: "[1, 2, 3]",
			Task: jobs.Task{
				TaskKey:    "child_task",
				MaxRetries: 3,
				NotebookTask: &jobs.NotebookTask{
					NotebookPath: "test.py",
				},
			},
		},
	})

	diags := ForEachTask().Apply(ctx, b)
	assert.Empty(t, diags)
}

func TestForEachTask_NoForEachTask(t *testing.T) {
	ctx := context.Background()
	b := &bundle.Bundle{
		Config: config.Root{
			Resources: config.Resources{
				Jobs: map[string]*resources.Job{
					"job1": {
						JobSettings: jobs.JobSettings{
							Name: "My Job",
							Tasks: []jobs.Task{
								{
									TaskKey:    "simple_task",
									MaxRetries: 3,
									NotebookTask: &jobs.NotebookTask{
										NotebookPath: "test.py",
									},
								},
							},
						},
					},
				},
			},
		},
	}

	bundletest.SetLocation(b, "resources.jobs.job1.tasks[0]", []dyn.Location{{File: "job.yml", Line: 1, Column: 1}})

	diags := ForEachTask().Apply(ctx, b)
	assert.Empty(t, diags)
}

func TestForEachTask_RetryOnTimeoutFalse(t *testing.T) {
	ctx := context.Background()
	b := createBundleWithForEachTask(jobs.Task{
		TaskKey:        "parent_task",
		RetryOnTimeout: false,
	})

	diags := ForEachTask().Apply(ctx, b)
	assert.Empty(t, diags)
}

With that and adding the ForEachTask to fast_validate.go

func (f *fastValidate) Apply(ctx context.Context, rb *bundle.Bundle) diag.Diagnostics {
	bundle.ApplyParallel(ctx, rb,
		// Fast mutators with only in-memory checks
		JobClusterKeyDefined(),
		JobTaskClusterSpec(),
		ForEachTask(),

		// Blocking mutators. Deployments will fail if these checks fail.
		ValidateArtifactPath(),
	)

	return nil
}

It worked on my local testing

Image

I relied on copilot because I suck on Go so probably not the best, but did it to learn about the cli and wanted to avoid the code going to waste. If it's discarded after posting it here it's fine.

Thank you.

Metadata

Metadata

Assignees

No one assigned

    Labels

    DABsDABs related issues

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions