Skip to content

feat(sdk/backend): support PipelineChannel in set_env_variable#13034

Open
JerT33 wants to merge 1 commit intokubeflow:masterfrom
JerT33:feat/pipelinechannel_env_vars
Open

feat(sdk/backend): support PipelineChannel in set_env_variable#13034
JerT33 wants to merge 1 commit intokubeflow:masterfrom
JerT33:feat/pipelinechannel_env_vars

Conversation

@JerT33
Copy link
Contributor

@JerT33 JerT33 commented Mar 14, 2026

Description of your changes:

  • SDK: accept PipelineChannel in set_env_variable and register channel inputs
  • Compiler: inject env var channels into task inputs and convert placeholders
  • Backend: resolve env var placeholders at runtime via
    resolvePodSpecInputRuntimeParameter

Live Cluster Testing Evidence:

SDK

Before:

Traceback (most recent call last):
  File "/Users/jer/Projects/kfp/pipelines/issue_11035/test_simple_dynamic_env_var.py", line 19, in <module>
    @dsl.pipeline(name="test-dynamic-env-var")
     ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jer/Projects/kfp/pipelines/venv/lib/python3.14/site-packages/kfp/dsl/pipeline_context.py", line 71, in pipeline
    return component_factory.create_graph_component_from_func(
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^
        func,
        ^^^^^
    ...<3 lines>...
        pipeline_config=pipeline_config,
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/Users/jer/Projects/kfp/pipelines/venv/lib/python3.14/site-packages/kfp/dsl/component_factory.py", line 915, in create_graph_component_from_func
    return graph_component.GraphComponent(
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^
        component_spec=component_spec,
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<2 lines>...
        pipeline_config=pipeline_config,
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/Users/jer/Projects/kfp/pipelines/venv/lib/python3.14/site-packages/kfp/dsl/graph_component.py", line 77, in __init__
    pipeline_spec, platform_spec = builder.create_pipeline_spec(
                                   ~~~~~~~~~~~~~~~~~~~~~~~~~~~~^
        pipeline=dsl_pipeline,
        ^^^^^^^^^^^^^^^^^^^^^^
    ...<2 lines>...
        pipeline_config=pipeline_config,
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/Users/jer/Projects/kfp/pipelines/venv/lib/python3.14/site-packages/kfp/compiler/pipeline_spec_builder.py", line 2048, in create_pipeline_spec
    build_spec_by_group(
    ~~~~~~~~~~~~~~~~~~~^
        pipeline_spec=pipeline_spec,
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<10 lines>...
        is_compiled_component=False,
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/Users/jer/Projects/kfp/pipelines/venv/lib/python3.14/site-packages/kfp/compiler/pipeline_spec_builder.py", line 1409, in build_spec_by_group
    subgroup_container_spec = build_container_spec_for_task(
        task=subgroup)
  File "/Users/jer/Projects/kfp/pipelines/venv/lib/python3.14/site-packages/kfp/compiler/pipeline_spec_builder.py", line 751, in build_container_spec_for_task
    .EnvVar(name=name, value=value)
     ~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: bad argument type for built-in operation

After:

# PIPELINE DEFINITION
# Name: test-dynamic-env-var
# Inputs:
#    name: str [Default: 'test-value']
components:
  comp-produce-value:
    executorLabel: exec-produce-value
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
  comp-read-env-var:
    executorLabel: exec-read-env-var
    inputDefinitions:
      parameters:
        expected:
          parameterType: STRING
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
  comp-read-env-var-2:
    executorLabel: exec-read-env-var-2
    inputDefinitions:
      parameters:
        expected:
          parameterType: STRING
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
deploymentSpec:
  executors:
    exec-produce-value:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - produce_value
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef produce_value() -> str:\n    return \"from-upstream-task\"\n\n"
        image: python:3.11
    exec-read-env-var:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - read_env_var
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef read_env_var(expected: str) -> str:\n    \"\"\"Read MY_VAR from\
          \ env and verify it matches expected value.\"\"\"\n    import os\n    value\
          \ = os.environ.get('MY_VAR', '')\n    print(f\"MY_VAR={value}\")\n    if\
          \ value != expected:\n        raise ValueError(f\"Expected MY_VAR='{expected}',\
          \ got '{value}'\")\n    return value\n\n"
        env:
        - name: MY_VAR
          value: '{{$.inputs.parameters[''pipelinechannel--name'']}}'
        image: python:3.11
    exec-read-env-var-2:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - read_env_var
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef read_env_var(expected: str) -> str:\n    \"\"\"Read MY_VAR from\
          \ env and verify it matches expected value.\"\"\"\n    import os\n    value\
          \ = os.environ.get('MY_VAR', '')\n    print(f\"MY_VAR={value}\")\n    if\
          \ value != expected:\n        raise ValueError(f\"Expected MY_VAR='{expected}',\
          \ got '{value}'\")\n    return value\n\n"
        env:
        - name: MY_VAR
          value: '{{$.inputs.parameters[''pipelinechannel--produce-value-Output'']}}'
        image: python:3.11
pipelineInfo:
  name: test-dynamic-env-var
root:
  dag:
    tasks:
      produce-value:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-produce-value
        taskInfo:
          name: produce-value
      read-env-var:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-read-env-var
        inputs:
          parameters:
            MY_VAR:
              runtimeValue:
                constant: '{{$.inputs.parameters[''pipelinechannel--name'']}}'
            expected:
              componentInputParameter: name
            pipelinechannel--name:
              componentInputParameter: name
        taskInfo:
          name: read-env-var
      read-env-var-2:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-read-env-var-2
        dependentTasks:
        - produce-value
        inputs:
          parameters:
            MY_VAR:
              runtimeValue:
                constant: '{{$.inputs.parameters[''pipelinechannel--produce-value-Output'']}}'
            expected:
              taskOutputParameter:
                outputParameterKey: Output
                producerTask: produce-value
            pipelinechannel--produce-value-Output:
              taskOutputParameter:
                outputParameterKey: Output
                producerTask: produce-value
        taskInfo:
          name: read-env-var-2
  inputDefinitions:
    parameters:
      name:
        defaultValue: test-value
        isOptional: true
        parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.15.2

Driver

Before:

Screenshot 2026-03-13 at 11 18 10 PM
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/usr/local/lib/python3.11/site-packages/kfp/dsl/executor_main.py", line 109, in <module>
    executor_main()
  File "/usr/local/lib/python3.11/site-packages/kfp/dsl/executor_main.py", line 101, in executor_main
    output_file = executor.execute()
                  ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kfp/dsl/executor.py", line 417, in execute
    result = self.func(**func_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/tmp.5gkGg93xUl/ephemeral_component.py", line 13, in read_env_var
    raise ValueError(f"Expected MY_VAR='{expected}', got '{value}'")
ValueError: Expected MY_VAR='from-upstream-task', got '{{channel:task=produce-value;name=Output;type=String;}}'
MY_VAR={{channel:task=produce-value;name=Output;type=String;}}
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/usr/local/lib/python3.11/site-packages/kfp/dsl/executor_main.py", line 109, in <module>
    executor_main()
  File "/usr/local/lib/python3.11/site-packages/kfp/dsl/executor_main.py", line 101, in executor_main
    output_file = executor.execute()
                  ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kfp/dsl/executor.py", line 417, in execute
    result = self.func(**func_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/tmp.m8B9XomrYf/ephemeral_component.py", line 13, in read_env_var
    raise ValueError(f"Expected MY_VAR='{expected}', got '{value}'")
ValueError: Expected MY_VAR='test_manual_input_value', got '{{channel:task=;name=name;type=String;}}'
MY_VAR={{channel:task=;name=name;type=String;}}

After:

Screenshot 2026-03-14 at 2 05 50 PM Screenshot 2026-03-14 at 2 06 11 PM Screenshot 2026-03-14 at 2 08 15 PM Screenshot 2026-03-14 at 2 06 55 PM Screenshot 2026-03-14 at 2 04 53 PM

Checklist:

@google-oss-prow
Copy link

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@google-oss-prow
Copy link

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign chensun for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@JerT33 JerT33 force-pushed the feat/pipelinechannel_env_vars branch from 3a98889 to b048202 Compare March 14, 2026 18:18
@JerT33 JerT33 marked this pull request as ready for review March 14, 2026 18:18
Copilot AI review requested due to automatic review settings March 14, 2026 18:18
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds end-to-end support for using PipelineChannel values as task environment variables by wiring them through the compiled IR and resolving the resulting {{$.inputs.parameters[...]}} placeholders at runtime in the v2 driver. This fits into KFP’s existing “compiler-injected input + placeholder substitution” mechanism used for other dynamic container fields.

Changes:

  • SDK: Allow PipelineChannel in PipelineTask.set_env_variable() and track channel dependencies.
  • Compiler: Inject env-var channel references into task inputs and convert env values to {{$.inputs.parameters[...]}} placeholders.
  • Backend: Resolve env-var placeholders during pod spec patch generation.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
sdk/python/kfp/dsl/pipeline_task.py Expands set_env_variable to accept PipelineChannel and register channel inputs.
sdk/python/kfp/compiler/pipeline_spec_builder.py Adds env-var channel injection into task inputs and converts env values via placeholder conversion.
backend/src/v2/driver/driver.go Resolves env var values at runtime using executor input before creating the pod spec patch.

You can also share your feedback on Copilot code review. Take the survey.

@JerT33 JerT33 force-pushed the feat/pipelinechannel_env_vars branch from b048202 to b8255d2 Compare March 14, 2026 19:43
@JerT33 JerT33 requested a review from Copilot March 14, 2026 19:44
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds end-to-end support for using PipelineChannel values in PipelineTask.set_env_variable, so dynamic env vars compile correctly and are resolved at runtime by the v2 driver.

Changes:

  • SDK: broaden set_env_variable to accept PipelineChannel and track channel inputs.
  • Compiler: inject env-var channel references into task inputs and convert env values to IR placeholders.
  • Backend driver: resolve {{$.inputs.parameters[...]}} placeholders in container env var values at runtime.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
sdk/python/kfp/dsl/pipeline_task_test.py Adds SDK-level unit test for PipelineChannel env var support.
sdk/python/kfp/dsl/pipeline_task.py Updates set_env_variable API to accept PipelineChannel and register channel inputs.
sdk/python/kfp/compiler/pipeline_spec_builder.py Injects env var channels into task inputs and converts env values to placeholders.
sdk/python/kfp/compiler/compiler_test.py Adds compiler regression test ensuring env var compiles to pipelinechannel--... placeholder wiring.
backend/src/v2/driver/driver.go Resolves env var input-parameter placeholders in initPodSpecPatch.

You can also share your feedback on Copilot code review. Take the survey.

@JerT33 JerT33 force-pushed the feat/pipelinechannel_env_vars branch from b8255d2 to 03b7bb9 Compare March 14, 2026 20:06
@google-oss-prow google-oss-prow bot added size/L and removed size/M labels Mar 14, 2026
@JerT33 JerT33 requested a review from Copilot March 14, 2026 20:07
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds end-to-end support for using dynamic PipelineChannel values in task environment variables, spanning DSL task configuration, compiler placeholder conversion/injection, and backend runtime resolution so env vars evaluate to concrete values inside the running pod.

Changes:

  • SDK (DSL): PipelineTask.set_env_variable now accepts PipelineChannel values and registers any referenced channels as task channel inputs.
  • Compiler: detects pipeline channels embedded in env var values, injects the required task inputs, and converts env var channel placeholders to IR input placeholders.
  • Backend (driver): resolves {{$.inputs.parameters[...]}} placeholders in container env var values at runtime before constructing the pod spec.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.

Show a summary per file
File Description
sdk/python/kfp/dsl/pipeline_task.py Accepts PipelineChannel in set_env_variable and tracks referenced channels for compilation/topology.
sdk/python/kfp/dsl/pipeline_task_test.py Adds unit coverage for set_env_variable with a PipelineParameterChannel.
sdk/python/kfp/compiler/pipeline_spec_builder.py Injects env var channel references into task inputs and converts env var values to IR placeholders.
sdk/python/kfp/compiler/compiler_test.py Adds compiler tests asserting env var placeholder conversion + injected inputs in the compiled YAML.
backend/src/v2/driver/driver.go Resolves input-parameter placeholders in env var values when building the pod spec patch.

You can also share your feedback on Copilot code review. Take the survey.

Signed-off-by: JerT33 <trestjeremiah@gmail.com>

compiler updates

Signed-off-by: JerT33 <trestjeremiah@gmail.com>

address copilot feedback

Signed-off-by: JerT33 <trestjeremiah@gmail.com>

fix tests

Signed-off-by: JerT33 <trestjeremiah@gmail.com>

fix formatting

Signed-off-by: JerT33 <trestjeremiah@gmail.com>
@JerT33 JerT33 force-pushed the feat/pipelinechannel_env_vars branch from 03b7bb9 to 8e436b9 Compare March 14, 2026 20:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants