Skip to content

chore(<backend>): Refactor Driver and Launcher: Make All Parameters Required #12725

@hbelmiro

Description

@hbelmiro

Chore description

Currently, the driver and launcher receive several parameters from the API server (and the launcher receives from the driver too). Most of these parameters are optional from the driver/launcher perspective when they shouldn't be. This is:

  • Error-prone: Silent failures when the API server doesn't pass required values
  • Hard to catch in tests: Tests may pass when they shouldn't because default values mask developer mistakes where a value should have been explicitly provided
  • Difficult to develop: New developers can't easily tell what's truly required vs. optional
  • Hard to debug: When something fails, it's unclear if a value was intentionally empty or missing
  • Hard to make changes: Implicit contracts between components make refactoring risky since dependencies on defaults are not visible

We should not rely on defaults for driver and launcher parameters.

Current State

Driver Flags

The driver defines ~30 flags, most with default values ("", 0, false, "{}"):

// backend/src/v2/cmd/driver/main.go (lines 55-94)
driverType        = flag.String(driverTypeArg, "", "task driver type, one of ROOT_DAG, DAG, CONTAINER")
pipelineName      = flag.String("pipeline_name", "", "pipeline context name")
runID             = flag.String("run_id", "", "pipeline run uid")
runName           = flag.String("run_name", "", "pipeline run name (Kubernetes object name)")
componentSpecJson = flag.String("component", "{}", "component spec")
taskSpecJson      = flag.String("task", "", "task spec")
// ... and many more

Driver Validation

The current validate() function only checks 4 flags out of ~30:

// backend/src/v2/cmd/driver/main.go (lines 121-136)
func validate() error {
    if *driverType == "" {
        return fmt.Errorf("argument --%s must be specified", driverTypeArg)
    }
    if *httpProxy == unsetProxyArgValue {
        return fmt.Errorf("argument --%s is required but can be an empty value", httpProxyArg)
    }
    // ... only proxy flags validated
    // validation responsibility lives in driver itself, so we do not validate all other args
    return nil
}

Monolithic Options Struct

The driver.Options struct has comments saying "required" or "optional, required only by..." but these are documentation-only with no enforcement:

// backend/src/v2/driver/driver.go (lines 36-94)
type Options struct {
    // required, pipeline context name
    PipelineName string
    // required, KFP run ID
    RunID string
    // required, Component spec
    Component *pipelinespec.ComponentSpec
    // optional, iteration index. -1 means not an iteration.
    IterationIndex int

    // optional, required only by root DAG driver
    RuntimeConfig *pipelinespec.PipelineJob_RuntimeConfig
    Namespace     string

    // optional, required by non-root drivers
    Task           *pipelinespec.PipelineTaskSpec
    DAGExecutionID int64

    // optional, required only by container driver
    Container *pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec
    // ... more fields
}

Launcher Flags

The launcher has similar issues with ~20 flags and no validation:

// backend/src/v2/cmd/launcher-v2/main.go (lines 31-52)
pipelineName            = flag.String("pipeline_name", "", "pipeline context name")
runID                   = flag.String("run_id", "", "pipeline run uid")
parentDagID             = flag.Int64("parent_dag_id", 0, "parent DAG execution ID")
executorType            = flag.String("executor_type", "container", "The type of the ExecutorSpec")
executionID             = flag.Int64("execution_id", 0, "Execution ID of this task.")
// ... no validation function exists

Proposed Solution

1. Use flag.Visit() to Detect Missing Flags

Go's flag.Visit() function tells us which flags were actually provided on the command line, regardless of their value. This distinguishes between "not provided" (uses default) and "provided as zero/empty" (explicit value).

flag.Parse()

// Collect which flags were actually passed
providedFlags := make(map[string]bool)
flag.Visit(func(f *flag.Flag) {
    providedFlags[f.Name] = true
})

2. Add Validation for Required Flags

Validate that all required flags for the driver type were provided:

func validateRequiredFlags(provided map[string]bool, driverType string) error {
    // Common flags required by all driver types
    common := []string{"type", "pipeline_name", "run_id", "component", "namespace",
        "ml_pipeline_server_address", "ml_pipeline_server_port",
        "mlmd_server_address", "mlmd_server_port",
        "http_proxy", "https_proxy", "no_proxy",
        "publish_logs", "cache_disabled", "log_level"}
    
    // Driver-type specific required flags
    var specific []string
    switch driverType {
    case "ROOT_DAG":
        specific = []string{"runtime_config", "run_name", "run_display_name"}
    case "DAG":
        specific = []string{"task", "dag_execution_id", "iteration_index", "task_name"}
    case "CONTAINER":
        specific = []string{"task", "dag_execution_id", "container", "kubernetes_config",
            "iteration_index", "task_name", "run_name", "run_display_name"}
    }
    
    required := append(common, specific...)
    for _, name := range required {
        if !provided[name] {
            return fmt.Errorf("--%s is required for %s but was not provided", name, driverType)
        }
    }
    return nil
}

3. Compiler Always Passes All Flags

The compiler (argocompiler) must always include ALL flags for the driver type being invoked. No conditional inclusion - every flag for that driver type is always passed.

// backend/src/v2/compiler/argocompiler/container.go
// Always include ALL flags for CONTAINER driver - no conditionals
args := []string{
    "--type", "CONTAINER",
    "--pipeline_name", c.spec.GetPipelineInfo().GetName(),
    "--run_id", runID(),
    "--run_name", runResourceName(),
    "--run_display_name", c.job.DisplayName,
    "--dag_execution_id", inputValue(paramParentDagID),
    "--task", inputValue(paramTask),
    "--component", inputValue(paramComponent),
    "--container", inputValue(paramContainer),
    "--kubernetes_config", inputValue(paramKubernetesConfig),
    "--iteration_index", inputValue(paramIterationIndex),
    "--task_name", inputValue(paramTaskName),
    // ... ALL other required flags
}

4. Apply Same Pattern to Launcher

The launcher validates its required flags using the same flag.Visit() approach:

func validateLauncherFlags(provided map[string]bool, executorType string) error {
    common := []string{"pipeline_name", "run_id", "pod_name", "pod_uid",
        "ml_pipeline_server_address", "ml_pipeline_server_port",
        "mlmd_server_address", "mlmd_server_port", "executor_type"}
    
    var specific []string
    switch executorType {
    case "container":
        specific = []string{"execution_id", "executor_input", "component_spec"}
    case "importer":
        specific = []string{"component_spec", "importer_spec", "task_spec", "parent_dag_id"}
    }
    
    required := append(common, specific...)
    for _, name := range required {
        if !provided[name] {
            return fmt.Errorf("--%s is required for %s executor but was not provided", name, executorType)
        }
    }
    return nil
}

Benefits

  • Fail-fast: Missing parameters are caught immediately with clear error messages
  • Explicit contracts: The list of required flags per driver/executor type is explicit in code
  • Testability: Validation logic can be unit tested independently
  • Developer experience: Clear errors tell you exactly which flag is missing
  • Minimal structural changes: Keep existing structs and functions, just add validation

Risks

  • Existing failures may be uncovered: Making all parameters required may reveal places where the code was silently relying on defaults when a value should have been explicitly provided. This could expose latent bugs in the compiler or API server that were previously masked by default values.
  • Requires fixes in upstream code: If validation failures are discovered, the compiler (argocompiler) may need to be updated to explicitly pass values that were previously omitted.

Affected Files

Driver

  • backend/src/v2/cmd/driver/main.go - Add flag.Visit() tracking, add validateRequiredFlags()

Launcher

  • backend/src/v2/cmd/launcher-v2/main.go - Add flag.Visit() tracking, add validateLauncherFlags()

Compiler (must pass all flags)

  • backend/src/v2/compiler/argocompiler/dag.go - Ensure all ROOT_DAG and DAG flags are passed
  • backend/src/v2/compiler/argocompiler/container.go - Ensure all CONTAINER flags are passed
  • backend/src/v2/compiler/argocompiler/importer.go - Ensure all importer launcher flags are passed

Future Improvements

Once validation is in place, further improvements could include:

  • Split the monolithic Options struct into driver-specific config structs (adds compile-time type safety)
  • Use constructor functions with private fields to make invalid states unrepresentable

Acceptance Criteria

  • Driver validates all required flags were provided using flag.Visit()
  • Launcher validates all required flags were provided using flag.Visit()
  • Missing required parameters result in clear, actionable error messages (e.g., "--dag_execution_id is required for CONTAINER but was not provided")
  • Compiler always passes ALL flags for each driver type (no conditional inclusion)
  • Required flags are documented per driver type (ROOT_DAG, DAG, CONTAINER)
  • Required flags are documented per executor type (container, importer)
  • Unit tests cover validation for each driver and executor type
  • Existing integration and E2E tests pass

Labels

/area backend


Love this idea? Give it a 👍.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions