-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Open
Labels
Description
Environment
- Deployment commands
export PIPELINE_VERSION=2.14.3
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION"
- KFP version:
kubectl get deployment ml-pipeline -n kubeflow -o jsonpath='{.spec.template.spec.containers[0].image}'
ghcr.io/kubeflow/kfp-api-server:master
- KFP SDK version:
pip list | grep kfp
kfp 2.15.1
kfp-kubernetes 2.15.1
kfp-pipeline-spec 2.15.1
kfp-server-api 2.15.1
Steps to reproduce
import os
from kfp import dsl
try:
from kfp import kubernetes
_HAS_KFP_K8S = True
except Exception:
_HAS_KFP_K8S = False
from kfp.compiler import Compiler
from kfp.client import Client
from kfp.components import load_component_from_file
def get_component_op(yaml_file):
return load_component_from_file(os.path.join(os.path.dirname(__file__), "..", yaml_file))
try:
preprocess_op = get_component_op('mnist_preprocess_component.yaml')
train_op = get_component_op('mnist_train_component.yaml')
except FileNotFoundError:
print("Error: Component YAML files not found. Please run the component generation step first.")
exit(1)
@dsl.pipeline(
name='mnist-volcano-training',
pipeline_root='s3://mlpipeline/mnist-runs'
)
def mnist_pipeline(
epochs: int = 10,
lr: float = 0.001
):
raw_data_importer = dsl.importer(
artifact_uri='s3://mlpipeline/raw/train_data.csv',
artifact_class=dsl.Dataset
)
prep_task = preprocess_op(raw_data=raw_data_importer.outputs['artifact'])
prep_task.set_caching_options(False)
train_task = train_op(
training_data=prep_task.outputs['processed_data'],
epochs=epochs,
lr=lr
)
train_task.set_caching_options(False)
if _HAS_KFP_K8S:
kubernetes.use_secret_as_env(
task=prep_task,
secret_name='mlpipeline-minio-artifact',
secret_key_to_env={
'accesskey': 'AWS_ACCESS_KEY_ID',
'secretkey': 'AWS_SECRET_ACCESS_KEY'
}
)
prep_task.set_env_variable('AWS_REGION', 'us-east-1')
prep_task.set_env_variable('AWS_ENDPOINT_URL', 'http://minio-service.kubeflow:9000')
prep_task.set_env_variable('S3_FORCE_PATH_STYLE', 'true')
prep_task.set_env_variable('AWS_S3_FORCE_PATH_STYLE', 'true')
prep_task.set_env_variable('AWS_USE_PATH_STYLE_REQUESTS', 'true')
prep_task.set_env_variable('AWS_S3_USE_PATH_STYLE', 'true')
kubernetes.use_secret_as_env(
task=train_task,
secret_name='mlpipeline-minio-artifact',
secret_key_to_env={
'accesskey': 'AWS_ACCESS_KEY_ID',
'secretkey': 'AWS_SECRET_ACCESS_KEY'
}
)
train_task.set_env_variable('AWS_REGION', 'us-east-1')
train_task.set_env_variable('AWS_ENDPOINT_URL', 'http://minio-service.kubeflow:9000')
train_task.set_env_variable('S3_FORCE_PATH_STYLE', 'true')
train_task.set_env_variable('AWS_S3_FORCE_PATH_STYLE', 'true')
train_task.set_env_variable('AWS_USE_PATH_STYLE_REQUESTS', 'true')
train_task.set_env_variable('AWS_S3_USE_PATH_STYLE', 'true')
kubernetes.add_pod_annotation(
task=train_task,
annotation_key='scheduling.k8s.io/group-name',
annotation_value='mnist-gpu-group'
)
kubernetes.add_pod_annotation(
task=train_task,
annotation_key='scheduling.volcano.sh/schedulerName',
annotation_value='volcano'
)
train_task.set_cpu_limit('4').set_memory_limit('16G').set_gpu_limit(1)
Compiler().compile(
pipeline_func=mnist_pipeline,
package_path='mnist_pipeline.yaml'
)
print("Pipeline compiled successfully: mnist_pipeline.yaml")
try:
KFP_API_HOST = os.environ.get("KFP_HOST", "http://localhost:30088")
client = Client(host=KFP_API_HOST)
run = client.create_run_from_pipeline_func(
mnist_pipeline,
arguments={'epochs': 15, 'lr': 0.0001},
experiment_name='MNIST Volcano Training Run',
)
print(f"\n--- Workflow run successful! ---")
print(f"Run ID: {run.run_id}")
print(f"Please check the KFP UI for run status.")
except Exception as e:
print(f"\n--- Error: Failed to connect or submit workflow to KFP API Server ---")
print(f"Please check if KFP_API_HOST ({KFP_API_HOST}) is correct and if the KFP backend is running.")
print(f"Detailed error: {e}")Expected result
kubectl -nkubeflow logs -f mnist-volcano-training-n52cg-system-container-impl-2651597712
time="2025-12-05T02:58:24.510Z" level=info msg="capturing logs" argo=true
I1205 02:58:24.563569 28 main.go:66] Setting log level to: '1'
I1205 02:58:24.564487 28 cache.go:89] Connecting to cache endpoint ml-pipeline.kubeflow:8887
I1205 02:58:24.612201 28 launcher_v2.go:193] publish success.
I1205 02:58:24.633929 28 client.go:750] Attempting to update DAG state
F1205 02:58:24.649376 28 main.go:58] failed to execute component: failed to download input artifact "raw_data" from remote storage URI "s3://mlpipeline/raw/train_data.csv": failed to list objects in remote storage "raw/train_data.csv": blob (code=Unknown): operation error S3: ListObjectsV2, https response error StatusCode: 0, RequestID: , HostID: , request send failed, Get "http://mlpipeline.minio-service.kubeflow:9000/?list-type=2&max-keys=1000&prefix=raw%2Ftrain_data.csv": dial tcp: lookup mlpipeline.minio-service.kubeflow on 10.96.0.10:53: no such host
time="2025-12-05T02:58:25.512Z" level=info msg="sub-process exited" argo=true error="<nil>"
Error: exit status 1