1717import json
1818import logging
1919import os
20+ import re
2021from shutil import copyfile
2122import tempfile
2223import time
2324from typing import Any
2425
2526import kfp
27+ from kfp_server_api .exceptions import ApiException
2628
2729from kale .common import utils
2830
@@ -58,7 +60,10 @@ def get_pipeline_id(pipeline_name: str, host: str = None) -> str:
5860 while pipeline_id is None and token is not None :
5961 pipelines = client .list_pipelines (page_token = token )
6062 token = pipelines .next_page_token
61- f = next (filter (lambda x : x .display_name == pipeline_name , pipelines .pipelines or []), None )
63+ f = next (
64+ filter (lambda x : x .display_name == pipeline_name , pipelines .pipelines or []),
65+ None ,
66+ )
6267 if f is not None :
6368 pipeline_id = f .pipeline_id
6469 return pipeline_id
@@ -80,7 +85,9 @@ def get_pipeline_version_id(version_name: str, pipeline_id: str, host: str = Non
8085 version_id = None
8186 while version_id is None and page_token is not None :
8287 versions = client .pipelines .list_pipeline_versions (
83- resource_key_type = "PIPELINE" , resource_key_id = pipeline_id , page_token = page_token
88+ resource_key_type = "PIPELINE" ,
89+ resource_key_id = pipeline_id ,
90+ page_token = page_token ,
8491 )
8592 page_token = versions .next_page_token
8693 f = next (filter (lambda x : x .name == version_name , versions .versions ), None )
@@ -180,7 +187,10 @@ def run_pipeline(
180187 pipeline_id = pipeline_id , pipeline_version_id = version_id
181188 ).display_name
182189 except Exception :
183- log .debug ("Could not retrieve pipeline version with ID '%s'. Using 'unknown'." , version_id )
190+ log .debug (
191+ "Could not retrieve pipeline version with ID '%s'. Using 'unknown'." ,
192+ version_id ,
193+ )
184194 version_name = "unknown"
185195
186196 if not run_name :
@@ -193,12 +203,41 @@ def run_pipeline(
193203 display_version ,
194204 )
195205
196- run = client .create_run_from_pipeline_package (
197- pipeline_file = pipeline_package_path ,
198- arguments = kwargs ,
199- run_name = run_name ,
200- experiment_name = experiment_name ,
201- )
206+ try :
207+ run = client .create_run_from_pipeline_package (
208+ pipeline_file = pipeline_package_path ,
209+ arguments = kwargs ,
210+ run_name = run_name ,
211+ experiment_name = experiment_name ,
212+ )
213+ except ApiException as e :
214+ try :
215+ body = json .loads (e .body or "{}" )
216+
217+ if body .get ("code" ) == 13 :
218+ details = body .get ("details" ) or []
219+
220+ for d in details :
221+ if (
222+ d .get ("@type" ) == "type.googleapis.com/google.rpc.Status"
223+ and d .get ("code" ) == 2
224+ ):
225+ message = body .get ("message" , "" ).lower ()
226+
227+ if "failed to unmarshal kubernetes config" in message and re .search (
228+ r"unknown field.*securitycontext" ,
229+ message ,
230+ re .IGNORECASE ,
231+ ):
232+ raise RuntimeError (
233+ "Your KFP server does not support the 'securityContext' field. "
234+ "Please upgrade Kubeflow Pipelines to version >= 2.16.0."
235+ ) from e
236+
237+ except (ValueError , TypeError ):
238+ pass
239+
240+ raise
202241
203242 print ("Pipeline submitted!" )
204243 log .info ("Run ID: %s" , run .run_id )
@@ -323,7 +362,11 @@ def compute_component_id(pod):
323362 Kale steps are KFP SDK Components. This is the way MetadataWriter generates
324363 unique names for such components.
325364 """
326- log .info ("Computing component ID for pod %s/%s..." , pod .metadata .namespace , pod .metadata .name )
365+ log .info (
366+ "Computing component ID for pod %s/%s..." ,
367+ pod .metadata .namespace ,
368+ pod .metadata .name ,
369+ )
327370 component_spec_text = pod .metadata .annotations .get (KFP_COMPONENT_SPEC_ANNOTATION_KEY )
328371 if not component_spec_text :
329372 raise ValueError ("KFP component spec annotation not found in pod" )
0 commit comments