diff --git a/elyra/kfp/bootstrapper.py b/elyra/kfp/bootstrapper.py index 32c340fef..a63a57e28 100644 --- a/elyra/kfp/bootstrapper.py +++ b/elyra/kfp/bootstrapper.py @@ -50,6 +50,79 @@ operation_name = None # global used in formatted logging +def set_dist_train_config(rank, nranks, step_name, port=9888): + """ + Set distributed training envs for general uses. + For Tensorflow: TF_CONFIG is configured. + For Pytorch: MASTER_ADDR and MASTER_PORT is configured. + For general use cases: NRANKS and RANK is configured. + + TODO: this function is Argo specific, should add Tekton support. + """ + from kubernetes import client, config + + wf_id = os.getenv("WORKFLOW_ID") + ns = os.getenv("KFP_NAMESPACE") + if not wf_id or not ns: + raise ValueError("WORKFLOW_ID and KFP_NAMESPACE env must be set in the workflow pod!") + + config.load_incluster_config() + api = client.CustomObjectsApi() + + worker_started = 0 + while worker_started != nranks: + resource = api.get_namespaced_custom_object( + group="argoproj.io", + version="v1alpha1", + name=wf_id, + namespace=ns, + plural="workflows", + ) + if not resource.get("status"): + time.sleep(2) + continue + if not resource["status"].get("nodes"): + time.sleep(2) + continue + + nodes = resource["status"]["nodes"] + workers_spec = [] + for nk in nodes: + node_info = nodes[nk] + OpUtil.log_operation_info( + "kfpdist: searching for {}, curr node: {}, templateName: {}, type: {}".format( + step_name, nk, node_info["templateName"], node_info["type"] + ) + ) + if node_info["templateName"] == step_name and node_info["type"] == "Pod": + podid = node_info["id"] + for input_param in node_info["inputs"]["parameters"]: + if input_param["name"].find("loop-item") >= 0: + # FIXME: argo parameter with "loop-item" is the rank. + curr_rank = int(input_param["value"]) + break + v1 = client.CoreV1Api() + podinfo = v1.read_namespaced_pod(podid, ns) + if podinfo.status.pod_ip: + workers_spec.append((curr_rank, "%s:%d" % (podinfo.status.pod_ip, port))) + worker_started = len(workers_spec) + time.sleep(2) + + workers_spec.sort(key=lambda item: item[0]) + workers_spec_list = [i[1] for i in workers_spec] + # set TF_CONFIG env for tf dist train + os.environ["TF_CONFIG"] = json.dumps( + {"cluster": {"worker": workers_spec_list}, "task": {"type": "worker", "index": rank}} + ) + OpUtil.log_operation_info("Setting TF_CONFIG: %s" % os.environ["TF_CONFIG"]) + os.environ["MASTER_ADDR"] = workers_spec[0][1].split(":")[0] + os.environ["MASTER_PORT"] = workers_spec[0][1].split(":")[1] + OpUtil.log_operation_info( + "Setting MASTER_ADDR: {}, MASTER_PORT: {}".format(os.environ["MASTER_ADDR"], os.environ["MASTER_PORT"]) + ) + OpUtil.log_operation_info("Setting RANK: {}, NRANKS: {}".format(os.environ["RANK"], os.environ["NRANKS"])) + + class FileOpBase(ABC): """Abstract base class for file-based operations""" @@ -724,6 +797,22 @@ def main(): ) # Setup packages and gather arguments input_params = OpUtil.parse_arguments(sys.argv[1:]) + + if os.getenv("RANK"): + op_name = os.getenv("ELYRA_OP_NAME") + if not op_name: + raise ValueError( + "env ELYRA_OP_NAME is not set. please check whether elyra version is matching bootstrapper.py" + ) + + # FIXME: operation name will be updated by kfp, replace these chars for matching. + op_name = op_name.replace("_", "-") + rank = int(os.getenv("RANK")) + nranks = int(os.getenv("NRANKS")) + if not nranks: + raise ValueError("rank argument setted but no NRANKS env found!") + set_dist_train_config(rank, nranks, op_name, port=9888) + OpUtil.log_operation_info("starting operation") t0 = time.time() OpUtil.package_install(user_volume_path=input_params.get("user-volume-path")) diff --git a/elyra/pipeline/kfp/processor_kfp.py b/elyra/pipeline/kfp/processor_kfp.py index 2eb32621c..6116b0ed2 100644 --- a/elyra/pipeline/kfp/processor_kfp.py +++ b/elyra/pipeline/kfp/processor_kfp.py @@ -794,6 +794,7 @@ def _generate_workflow_tasks( } component_definition = generic_component_template.render( + op_name=sanitize_label_value(operation.name), container_image=operation.runtime_image, task_parameters=task_parameters, command_args=self._compose_container_command_args( @@ -847,6 +848,7 @@ def _generate_workflow_tasks( if operation.gpu_vendor: gpu_vendor = operation.gpu_vendor workflow_task["task_modifiers"]["gpu_limit"] = {"size": operation.gpu, "vendor": gpu_vendor} + workflow_task["task_modifiers"]["parallel_count"] = operation.parallel_count if is_crio_runtime: # Attach empty dir volume @@ -880,6 +882,8 @@ def _generate_workflow_tasks( ) # Pipeline node name workflow_task["task_modifiers"]["pod_labels"]["elyra/node-name"] = sanitize_label_value(operation.name) + # Original operation name for runtime lookups + workflow_task["task_modifiers"]["env_variables"]["ELYRA_OP_NAME"] = operation.name # Add non-identifying metadata if workflow_task["task_modifiers"].get("pod_annotations") is None: diff --git a/elyra/pipeline/pipeline.py b/elyra/pipeline/pipeline.py index ddfa81a6d..e428b6b5c 100644 --- a/elyra/pipeline/pipeline.py +++ b/elyra/pipeline/pipeline.py @@ -249,6 +249,7 @@ def __init__( gpu: number of gpus requested to run the operation parameters: a list of names of pipeline parameters that should be passed to this operation gpu_vendor: gpu resource type, eg. nvidia.com/gpu, amd.com/gpu etc. + parallel_count: operation parallel count to run parallelfor steps. Entries for other (non-built-in) component types are a function of the respective component. :param elyra_props: dictionary of property key:value pairs that are owned by Elyra @@ -276,6 +277,7 @@ def __init__( self._component_props["memory"] = component_props.get("memory") self._component_props["gpu"] = component_props.get("gpu") self._component_props["gpu_vendor"] = component_props.get("gpu_vendor") + self._component_props["parallel_count"] = component_props.get("parallel_count") self._component_props["parameters"] = component_props.get(PIPELINE_PARAMETERS, []) if not elyra_props: @@ -332,6 +334,10 @@ def parameters(self) -> Optional[List[str]]: def gpu_vendor(self) -> Optional[str]: return self._component_props.get("gpu_vendor") + @property + def parallel_count(self) -> Optional[str]: + return self._component_props.get("parallel_count", 1) + def __eq__(self, other: GenericOperation) -> bool: if isinstance(self, other.__class__): return super().__eq__(other) diff --git a/elyra/templates/components/generic_properties_template.jinja2 b/elyra/templates/components/generic_properties_template.jinja2 index c500c4d6c..07ecc2bea 100644 --- a/elyra/templates/components/generic_properties_template.jinja2 +++ b/elyra/templates/components/generic_properties_template.jinja2 @@ -61,6 +61,13 @@ "ui:placeholder": "nvidia.com/gpu" } }, + "parallel_count": { + "type": "integer", + "title": "ParallelCount", + "description": "Each component can be run as parallel step, set this >1 to do parallelfor-like operation.", + "minimum": 1, + "default": 1 + }, "pipeline_parameters": { "type": "array", "title": "Pipeline Parameters", diff --git a/elyra/templates/kubeflow/v1/generic_component_definition_template.jinja2 b/elyra/templates/kubeflow/v1/generic_component_definition_template.jinja2 index 85c8eb086..6f3e1e25d 100644 --- a/elyra/templates/kubeflow/v1/generic_component_definition_template.jinja2 +++ b/elyra/templates/kubeflow/v1/generic_component_definition_template.jinja2 @@ -1,4 +1,4 @@ -name: Run a file +name: {{ op_name }} description: Run a Jupyter notebook or Python/R script {% if task_parameters %} inputs: diff --git a/elyra/templates/kubeflow/v1/python_dsl_template.jinja2 b/elyra/templates/kubeflow/v1/python_dsl_template.jinja2 index 4ff84e884..9ebce539c 100644 --- a/elyra/templates/kubeflow/v1/python_dsl_template.jinja2 +++ b/elyra/templates/kubeflow/v1/python_dsl_template.jinja2 @@ -33,6 +33,16 @@ def generated_pipeline( {% for workflow_task in workflow_tasks.values() %} {% set task_name = "task_" + workflow_task.escaped_task_id %} # Task for node '{{ workflow_task.name }}' + {% set parallel_indent = 0 %} +{% if 'parallel_count' in workflow_task.task_modifiers and workflow_task.task_modifiers.parallel_count is not none %} +{% if workflow_task.task_modifiers.parallel_count > 1 %} + {% set parallel_indent = 4 %} + parallel_count = {{workflow_task.task_modifiers.parallel_count}} + with kfp.dsl.ParallelFor(list(range(parallel_count))) as rank: +{% endif %} +{% endif %} + +{% filter indent(width=parallel_indent) %} {{ task_name }} = factory_{{ workflow_task.component_definition_hash | python_safe }}( {% for task_input_name, task_input_spec in workflow_task.task_inputs.items() %} {% if task_input_spec.task_output_reference %} @@ -73,6 +83,46 @@ def generated_pipeline( {% for env_var_name, env_var_value in workflow_task.task_modifiers.env_variables.items() %} {{ task_name }}.add_env_variable(V1EnvVar(name="{{ env_var_name }}", value="{{ env_var_value | string_delimiter_safe }}")) {% endfor %} +{% if 'parallel_count' in workflow_task.task_modifiers and workflow_task.task_modifiers.parallel_count is not none %} +{% if workflow_task.task_modifiers.parallel_count > 1 %} + {{ task_name }}.add_env_variable(V1EnvVar(name="NRANKS", value=str(parallel_count))) + {{ task_name }}.add_env_variable(V1EnvVar(name="RANK", value=str(rank))) +{% endif %} +{% endif %} +{% if workflow_engine == "argo" %} + {{ task_name }}.add_env_variable(V1EnvVar( + name="WORKFLOW_ID", + value_from=V1EnvVarSource( + field_ref=V1ObjectFieldSelector( + api_version="v1", field_path="metadata.labels['workflows.argoproj.io/workflow']" + ) + ), + )) + {{ task_name }}.add_env_variable(V1EnvVar( + name="KFP_NAMESPACE", + value_from=V1EnvVarSource( + field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.namespace") + ), + )) + {{ task_name }}.add_env_variable(V1EnvVar( + name="KFP_POD_NAME", + value_from=V1EnvVarSource( + field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.name") + ), + )) + {{ task_name }}.add_env_variable(V1EnvVar( + name="KFP_POD_UID", + value_from=V1EnvVarSource( + field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.uid") + ), + )) + {{ task_name }}.add_env_variable(V1EnvVar( + name="KFP_RUN_ID", + value_from=V1EnvVarSource( + field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.labels['pipeline/runid']") + ), + )) +{% endif %} {% endif %} {% if workflow_task.task_modifiers.set_run_name %} {% if workflow_engine == "tekton" %} @@ -163,6 +213,8 @@ def generated_pipeline( {{ task_name }}.after(task_{{ upstream_workflow_task_id | python_safe }}) {% endfor %} {% endif %} +{% endfilter %} + {% endfor %} if __name__ == "__main__": diff --git a/elyra/tests/kfp/test_bootstrapper.py b/elyra/tests/kfp/test_bootstrapper.py index d4d788380..18a8482dd 100644 --- a/elyra/tests/kfp/test_bootstrapper.py +++ b/elyra/tests/kfp/test_bootstrapper.py @@ -192,6 +192,21 @@ def test_main_method(monkeypatch, s3_setup, tmpdir): main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict) +def test_main_method_with_parallel_count(monkeypatch, s3_setup, tmpdir): + argument_dict = { + "cos-endpoint": "http://" + MINIO_HOST_PORT, + "cos-bucket": "test-bucket", + "cos-directory": "test-directory", + "cos-dependencies-archive": "test-archive.tgz", + "filepath": os.path.join(RESOURCES_DIR, "test-notebookA.ipynb"), + "inputs": "test-file.txt;test,file.txt", + "outputs": "test-file/test-file-copy.txt;test-file/test,file/test,file-copy.txt", + "user-volume-path": None, + "parallel_count": 2, + } + main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict) + + def test_main_method_with_wildcard_outputs(monkeypatch, s3_setup, tmpdir): argument_dict = { "cos-endpoint": "http://" + MINIO_HOST_PORT, diff --git a/elyra/tests/pipeline/kfp/test_processor_kfp.py b/elyra/tests/pipeline/kfp/test_processor_kfp.py index 0750a15a2..6340538bf 100644 --- a/elyra/tests/pipeline/kfp/test_processor_kfp.py +++ b/elyra/tests/pipeline/kfp/test_processor_kfp.py @@ -735,7 +735,7 @@ def test_generate_pipeline_dsl_compile_pipeline_dsl_one_generic_node_pipeline_te # Verify component definition information (see generic_component_definition_template.jinja2) # - property 'name' - assert node_template["name"] == "run-a-file" + assert node_template["name"] == sanitize_label_value(op.name) # - property 'implementation.container.command' assert node_template["container"]["command"] == ["sh", "-c"] # - property 'implementation.container.args' @@ -1416,11 +1416,9 @@ def test_generate_pipeline_dsl_compile_pipeline_dsl_generic_components_data_exch assert len(compiled_spec["spec"]["templates"]) >= 3 template_specs = {} for node_template in compiled_spec["spec"]["templates"]: - if node_template["name"] == compiled_spec["spec"]["entrypoint"] or not node_template["name"].startswith( - "run-a-file" - ): + if node_template["name"] == compiled_spec["spec"]["entrypoint"]: continue - template_specs[node_template["name"]] = node_template + template_specs[sanitize_label_value(node_template["name"])] = node_template # Iterate through sorted operations and verify that their inputs # and outputs are properly represented in their respective template @@ -1430,10 +1428,8 @@ def test_generate_pipeline_dsl_compile_pipeline_dsl_generic_components_data_exch if not op.is_generic: # ignore custom nodes continue - if template_index == 1: - template_name = "run-a-file" - else: - template_name = f"run-a-file-{template_index}" + template_name = sanitize_label_value(op.name) + template_name = template_name.replace("_", "-") # kubernetes does this replace template_index = template_index + 1 # compare outputs if len(op.outputs) > 0: diff --git a/packages/pipeline-editor/src/FileSubmissionDialog.tsx b/packages/pipeline-editor/src/FileSubmissionDialog.tsx index e94d23bb3..52bfae928 100644 --- a/packages/pipeline-editor/src/FileSubmissionDialog.tsx +++ b/packages/pipeline-editor/src/FileSubmissionDialog.tsx @@ -113,6 +113,10 @@ export const FileSubmissionDialog: React.FC = ({
+ + + +