Skip to content

[sdk] dsl.ParallelFor dict-item loop variables are incorrectly scoped across control flow group boundaries (dsl.If, dsl.ExitHandler) #13273

@tom-pavz

Description

@tom-pavz

Environment

  • KFP backend version: 2.14.3
  • KFP SDK version: 2.14.3
  • All dependencies version:
kfp                                2.14.3
kfp-kubernetes                     2.14.3
kfp-pipeline-spec                  2.14.0
kfp-server-api                     2.16.0

Summary

When using dsl.ParallelFor with dict items, the compiler incorrectly handles loop variable scoping across control flow group boundaries. This manifests in at least two ways:

  1. ParallelFordsl.If: Loop variables passed to tasks inside a dsl.If block produce incorrect parameter wiring in the compiled IR YAML, causing runtime failures.
  2. ParallelFor inside dsl.ExitHandler: Dict-item sub-variables (e.g. loop-item-param-2-subvar-name) leak upward through the ExitHandler into the parent DAG's inputDefinitions. At runtime, the driver fails because the parent DAG doesn't have the leaked parameters.

Both cases share the same root cause: the compiler's PipelineChannel resolution doesn't correctly scope ParallelFor loop variables across nested GroupContext boundaries.


Case 1: Loop variables inside dsl.If

Steps to reproduce

from kfp import compiler, dsl

@dsl.component
def my_component(msg: str) -> str:
    return msg

@dsl.pipeline
def my_pipeline(should_run: bool = True):
    with dsl.ParallelFor(
        items=[
            {"name": "alice", "greeting": "hello alice"},
            {"name": "bob", "greeting": "hello bob"},
        ]
    ) as item:
        with dsl.If(should_run == True):
            # This task receives a ParallelFor loop variable inside a dsl.If block
            my_component(msg=item.greeting)

compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path="pipeline.yaml",
)

Expected result

Loop variables should be usable inside dsl.If/dsl.Condition blocks nested within the same ParallelFor. The compiler should correctly propagate the parameter reference across the condition group boundary.

Actual result

The compiled IR YAML has incorrect parameter wiring for the loop variable inside the condition group, causing runtime failures when the backend cannot resolve the reference.


Case 2: ParallelFor with dict items inside dsl.ExitHandler

Steps to reproduce

from kfp import compiler, dsl

@dsl.component
def my_component(msg: str) -> str:
    return msg

@dsl.component
def on_exit():
    print("done")

@dsl.pipeline
def my_pipeline():
    exit_task = on_exit()
    with dsl.ExitHandler(exit_task):
        with dsl.ParallelFor(
            items=[
                {"name": "alice", "greeting": "hello alice"},
                {"name": "bob", "greeting": "hello bob"},
            ]
        ) as item:
            my_component(msg=item.greeting)

compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path="pipeline.yaml",
)

Expected result

The ExitHandler component's inputDefinitions should only contain parameters explicitly passed to it. The ParallelFor loop variables should be scoped to the loop body and not propagate upward.

Actual result

The pipeline compiles without error, but the generated IR YAML has loop-item-param sub-variables leaked into the ExitHandler component's inputDefinitions:

# In the compiled YAML, comp-exit-handler-1 incorrectly includes:
inputDefinitions:
  parameters:
    pipelinechannel--loop-item-param-2-subvar-greeting:
      parameterType: STRING
    pipelinechannel--loop-item-param-2-subvar-name:
      parameterType: STRING

The root DAG then tries to resolve loop-item-param-2 to provide these values, but it doesn't have that parameter. At runtime, the KFP driver fails with:

KFP driver: driver.DAG(...) failed: failed to resolve inputs:
  resolving input parameter with spec component_input_parameter:"loop-item-param-2"
  parameter_expression_selector:"parseJson(string_value)[\"name\"]":
  parent DAG does not have input parameter loop-item-param-2

Workaround (applies to both cases)

Wrap the problematic control flow boundary in a sub-pipeline (@dsl.pipeline). The sub-pipeline boundary forces the compiler to resolve loop variables as explicit input parameters, which scopes them correctly:

from kfp import compiler, dsl

@dsl.component
def my_component(msg: str) -> str:
    return msg

@dsl.component
def on_exit():
    print("done")

# Workaround for Case 1: wrap dsl.If in a sub-pipeline
@dsl.pipeline
def conditional_component(should_run: bool, msg: str):
    with dsl.If(should_run == True):
        my_component(msg=msg)

# Workaround for Case 2: wrap ParallelFor in a sub-pipeline
@dsl.pipeline
def run_all(should_run: bool = True):
    with dsl.ParallelFor(
        items=[
            {"name": "alice", "greeting": "hello alice"},
            {"name": "bob", "greeting": "hello bob"},
        ]
    ) as item:
        conditional_component(should_run=should_run, msg=item.greeting)

@dsl.pipeline
def my_pipeline():
    exit_task = on_exit()
    with dsl.ExitHandler(exit_task):
        run_all()

compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path="pipeline.yaml",
)

Materials and Reference

Likely related to how compiler.py resolves PipelineChannel references across nested GroupContext boundaries — the ParallelFor group creates a parameter scope that other control flow groups (Condition, ExitHandler) don't correctly respect during IR compilation.


Impacted by this bug? Give it a 👍.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions