-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Environment
VertexAI Pipelines
Steps to reproduce
Define a function that imports the DSL components in a function component like this
from kfp.dsl import Output, Dataset, Model, Artifact, Metrics
def dataset_op(
test_dataset: Output[Dataset],
training_dataset: Output[Dataset],
validation_dataset: Output[Dataset],
brand: str,
config: Dict,
):
from kfp.dsl import Output, Dataset, Model, Artifact, Metrics
# By removing the internal imports it works as expected
def some_fn( test_dataset: Output[Dataset], training_dataset: Output[Dataset], validation_dataset: Output[Dataset]):
....
some_fn(test_dataset, training_dataset, validation_dataset)yields the following error
AttributeError: partially initialized module 'kfp.dsl.pipeline_context' has no attribute 'Pipeline' (most likely due to a circular reference)
File "/usr/local/lib/python3.10/site-packages/kfp/dsl/graph_component.py", line 141, in GraphComponent
Expected result
I would expect it to work just like it did prior to the 1.15 release. I understand this might not be correct usage of the API. What we built was an abstraction around KFP pipelines where a user supplies a configuration pointing to a sdist package and we invoke the users function from a dynamically generated KFP python component.
Here is the code generated
@dsl.component(
base_image=intrpl(global_dataset_op_config["image"]),
packages_to_install=intrpl(global_dataset_op_config["packages_to_install"]),
pip_index_urls=intrpl(global_dataset_op_config["pip_index_urls"]),
)
def dataset_sdist_op(
config: Dict,
brand: str,
training_dataset: dsl.Output[dsl.Dataset],
test_dataset: dsl.Output[dsl.Dataset],
validation_dataset: dsl.Output[dsl.Dataset],
) -> bool:
import logging
import os
from mlutils.support import get_dataset_op_config, get_function_to_invoke
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.info("ENV: [%s]", os.environ)
logger.info("Config: [%s]", config)
dataset_op_config = config
logger.info("Config dataset_op_config: [%s]", dataset_op_config)
fn = get_function_to_invoke(
dataset_op_config["module"], dataset_op_config["function"]
)
return fn(test_dataset, training_dataset, validation_dataset, brand, config)Here is the function in the sdist
import os
from kfp.dsl import Output, Dataset, Model, Artifact, Metrics
from typing import Dict, NamedTuple
def dataset_op(
test_dataset: Output[Dataset],
training_dataset: Output[Dataset],
validation_dataset: Output[Dataset],
brand: str,
config: Dict,
):
...Materials and Reference
Impacted by this bug? Give it a 👍.