Skip to content

Commit ef1b41f

Browse files
committed
Support parallelfor operations(re-push)
Signed-off-by: typhoonzero <[email protected]>
1 parent b558afd commit ef1b41f

File tree

8 files changed

+174
-1
lines changed

8 files changed

+174
-1
lines changed

elyra/kfp/bootstrapper.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,79 @@
5050
operation_name = None # global used in formatted logging
5151

5252

53+
def set_dist_train_config(rank, nranks, step_name, port=9888):
54+
"""
55+
Set distributed training envs for general uses.
56+
For Tensorflow: TF_CONFIG is configured.
57+
For Pytorch: MASTER_ADDR and MASTER_PORT is configured.
58+
For general use cases: NRANKS and RANK is configured.
59+
60+
TODO: this function is Argo specific, should add Tekton support.
61+
"""
62+
from kubernetes import client, config
63+
64+
wf_id = os.getenv("WORKFLOW_ID")
65+
ns = os.getenv("KFP_NAMESPACE")
66+
if not wf_id or not ns:
67+
raise ValueError("WORKFLOW_ID and KFP_NAMESPACE env must be set in the workflow pod!")
68+
69+
config.load_incluster_config()
70+
api = client.CustomObjectsApi()
71+
72+
worker_started = 0
73+
while worker_started != nranks:
74+
resource = api.get_namespaced_custom_object(
75+
group="argoproj.io",
76+
version="v1alpha1",
77+
name=wf_id,
78+
namespace=ns,
79+
plural="workflows",
80+
)
81+
if not resource.get("status"):
82+
time.sleep(2)
83+
continue
84+
if not resource["status"].get("nodes"):
85+
time.sleep(2)
86+
continue
87+
88+
nodes = resource["status"]["nodes"]
89+
workers_spec = []
90+
for nk in nodes:
91+
node_info = nodes[nk]
92+
OpUtil.log_operation_info(
93+
"kfpdist: searching for {}, curr node: {}, templateName: {}, type: {}".format(
94+
step_name, nk, node_info["templateName"], node_info["type"]
95+
)
96+
)
97+
if node_info["templateName"] == step_name and node_info["type"] == "Pod":
98+
podid = node_info["id"]
99+
for input_param in node_info["inputs"]["parameters"]:
100+
if input_param["name"].find("loop-item") >= 0:
101+
# FIXME: argo parameter with "loop-item" is the rank.
102+
curr_rank = int(input_param["value"])
103+
break
104+
v1 = client.CoreV1Api()
105+
podinfo = v1.read_namespaced_pod(podid, ns)
106+
if podinfo.status.pod_ip:
107+
workers_spec.append((curr_rank, "%s:%d" % (podinfo.status.pod_ip, port)))
108+
worker_started = len(workers_spec)
109+
time.sleep(2)
110+
111+
workers_spec.sort(key=lambda item: item[0])
112+
workers_spec_list = [i[1] for i in workers_spec]
113+
# set TF_CONFIG env for tf dist train
114+
os.environ["TF_CONFIG"] = json.dumps(
115+
{"cluster": {"worker": workers_spec_list}, "task": {"type": "worker", "index": rank}}
116+
)
117+
OpUtil.log_operation_info("Setting TF_CONFIG: %s" % os.environ["TF_CONFIG"])
118+
os.environ["MASTER_ADDR"] = workers_spec[0][1].split(":")[0]
119+
os.environ["MASTER_PORT"] = workers_spec[0][1].split(":")[1]
120+
OpUtil.log_operation_info(
121+
"Setting MASTER_ADDR: {}, MASTER_PORT: {}".format(os.environ["MASTER_ADDR"], os.environ["MASTER_PORT"])
122+
)
123+
OpUtil.log_operation_info("Setting RANK: {}, NRANKS: {}".format(os.environ["RANK"], os.environ["NRANKS"]))
124+
125+
53126
class FileOpBase(ABC):
54127
"""Abstract base class for file-based operations"""
55128

@@ -724,6 +797,22 @@ def main():
724797
)
725798
# Setup packages and gather arguments
726799
input_params = OpUtil.parse_arguments(sys.argv[1:])
800+
801+
if os.getenv("RANK"):
802+
op_name = os.getenv("ELYRA_OP_NAME")
803+
if not op_name:
804+
raise ValueError(
805+
"env ELYRA_OP_NAME is not set. please check whether elyra version is matching bootstrapper.py"
806+
)
807+
808+
# FIXME: operation name will be updated by kfp, replace these chars for matching.
809+
op_name = op_name.replace("_", "-")
810+
rank = int(os.getenv("RANK"))
811+
nranks = int(os.getenv("NRANKS"))
812+
if not nranks:
813+
raise ValueError("rank argument setted but no NRANKS env found!")
814+
set_dist_train_config(rank, nranks, op_name, port=9888)
815+
727816
OpUtil.log_operation_info("starting operation")
728817
t0 = time.time()
729818
OpUtil.package_install(user_volume_path=input_params.get("user-volume-path"))

elyra/pipeline/kfp/processor_kfp.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,7 @@ def _generate_workflow_tasks(
794794
}
795795

796796
component_definition = generic_component_template.render(
797+
op_name=sanitize_label_value(operation.name),
797798
container_image=operation.runtime_image,
798799
task_parameters=task_parameters,
799800
command_args=self._compose_container_command_args(
@@ -847,6 +848,7 @@ def _generate_workflow_tasks(
847848
if operation.gpu_vendor:
848849
gpu_vendor = operation.gpu_vendor
849850
workflow_task["task_modifiers"]["gpu_limit"] = {"size": operation.gpu, "vendor": gpu_vendor}
851+
workflow_task["task_modifiers"]["parallel_count"] = operation.parallel_count
850852

851853
if is_crio_runtime:
852854
# Attach empty dir volume
@@ -880,6 +882,8 @@ def _generate_workflow_tasks(
880882
)
881883
# Pipeline node name
882884
workflow_task["task_modifiers"]["pod_labels"]["elyra/node-name"] = sanitize_label_value(operation.name)
885+
# Original operation name for runtime lookups
886+
workflow_task["task_modifiers"]["env_variables"]["ELYRA_OP_NAME"] = operation.name
883887

884888
# Add non-identifying metadata
885889
if workflow_task["task_modifiers"].get("pod_annotations") is None:

elyra/pipeline/pipeline.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ def __init__(
249249
gpu: number of gpus requested to run the operation
250250
parameters: a list of names of pipeline parameters that should be passed to this operation
251251
gpu_vendor: gpu resource type, eg. nvidia.com/gpu, amd.com/gpu etc.
252+
parallel_count: operation parallel count to run parallelfor steps.
252253
Entries for other (non-built-in) component types are a function of the respective component.
253254
254255
:param elyra_props: dictionary of property key:value pairs that are owned by Elyra
@@ -276,6 +277,7 @@ def __init__(
276277
self._component_props["memory"] = component_props.get("memory")
277278
self._component_props["gpu"] = component_props.get("gpu")
278279
self._component_props["gpu_vendor"] = component_props.get("gpu_vendor")
280+
self._component_props["parallel_count"] = component_props.get("parallel_count")
279281
self._component_props["parameters"] = component_props.get(PIPELINE_PARAMETERS, [])
280282

281283
if not elyra_props:
@@ -332,6 +334,10 @@ def parameters(self) -> Optional[List[str]]:
332334
def gpu_vendor(self) -> Optional[str]:
333335
return self._component_props.get("gpu_vendor")
334336

337+
@property
338+
def parallel_count(self) -> Optional[str]:
339+
return self._component_props.get("parallel_count")
340+
335341
def __eq__(self, other: GenericOperation) -> bool:
336342
if isinstance(self, other.__class__):
337343
return super().__eq__(other)

elyra/templates/components/generic_properties_template.jinja2

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@
6161
"ui:placeholder": "nvidia.com/gpu"
6262
}
6363
},
64+
"parallel_count": {
65+
"type": "integer",
66+
"title": "ParallelCount",
67+
"description": "Each component can be run as parallel step, set this >1 to do parallelfor-like operation.",
68+
"minimum": 1,
69+
"default": 1
70+
},
6471
"pipeline_parameters": {
6572
"type": "array",
6673
"title": "Pipeline Parameters",

elyra/templates/kubeflow/v1/generic_component_definition_template.jinja2

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Run a file
1+
name: {{ op_name }}
22
description: Run a Jupyter notebook or Python/R script
33
{% if task_parameters %}
44
inputs:

elyra/templates/kubeflow/v1/python_dsl_template.jinja2

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ def generated_pipeline(
3333
{% for workflow_task in workflow_tasks.values() %}
3434
{% set task_name = "task_" + workflow_task.escaped_task_id %}
3535
# Task for node '{{ workflow_task.name }}'
36+
{% set parallel_indent = 0 %}
37+
{% if workflow_task.task_modifiers.parallel_count > 1 %}
38+
{% set parallel_indent = 4 %}
39+
parallel_count = {{workflow_task.task_modifiers.parallel_count}}
40+
with kfp.dsl.ParallelFor(list(range(parallel_count))) as rank:
41+
{% endif %}
42+
43+
{% filter indent(width=parallel_indent) %}
3644
{{ task_name }} = factory_{{ workflow_task.component_definition_hash | python_safe }}(
3745
{% for task_input_name, task_input_spec in workflow_task.task_inputs.items() %}
3846
{% if task_input_spec.task_output_reference %}
@@ -73,6 +81,44 @@ def generated_pipeline(
7381
{% for env_var_name, env_var_value in workflow_task.task_modifiers.env_variables.items() %}
7482
{{ task_name }}.add_env_variable(V1EnvVar(name="{{ env_var_name }}", value="{{ env_var_value | string_delimiter_safe }}"))
7583
{% endfor %}
84+
{% if workflow_task.task_modifiers.parallel_count > 1 %}
85+
{{ task_name }}.add_env_variable(V1EnvVar(name="NRANKS", value=str(parallel_count)))
86+
{{ task_name }}.add_env_variable(V1EnvVar(name="RANK", value=str(rank)))
87+
{% endif %}
88+
{% if workflow_engine == "argo" %}
89+
{{ task_name }}.add_env_variable(V1EnvVar(
90+
name="WORKFLOW_ID",
91+
value_from=V1EnvVarSource(
92+
field_ref=V1ObjectFieldSelector(
93+
api_version="v1", field_path="metadata.labels['workflows.argoproj.io/workflow']"
94+
)
95+
),
96+
))
97+
{{ task_name }}.add_env_variable(V1EnvVar(
98+
name="KFP_NAMESPACE",
99+
value_from=V1EnvVarSource(
100+
field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.namespace")
101+
),
102+
))
103+
{{ task_name }}.add_env_variable(V1EnvVar(
104+
name="KFP_POD_NAME",
105+
value_from=V1EnvVarSource(
106+
field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.name")
107+
),
108+
))
109+
{{ task_name }}.add_env_variable(V1EnvVar(
110+
name="KFP_POD_UID",
111+
value_from=V1EnvVarSource(
112+
field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.uid")
113+
),
114+
))
115+
{{ task_name }}.add_env_variable(V1EnvVar(
116+
name="KFP_RUN_ID",
117+
value_from=V1EnvVarSource(
118+
field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.labels['pipeline/runid']")
119+
),
120+
))
121+
{% endif %}
76122
{% endif %}
77123
{% if workflow_task.task_modifiers.set_run_name %}
78124
{% if workflow_engine == "tekton" %}
@@ -163,6 +209,8 @@ def generated_pipeline(
163209
{{ task_name }}.after(task_{{ upstream_workflow_task_id | python_safe }})
164210
{% endfor %}
165211
{% endif %}
212+
{% endfilter %}
213+
166214
{% endfor %}
167215

168216
if __name__ == "__main__":

elyra/tests/kfp/test_bootstrapper.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,21 @@ def test_main_method(monkeypatch, s3_setup, tmpdir):
192192
main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict)
193193

194194

195+
def test_main_method_with_parallel_count(monkeypatch, s3_setup, tmpdir):
196+
argument_dict = {
197+
"cos-endpoint": "http://" + MINIO_HOST_PORT,
198+
"cos-bucket": "test-bucket",
199+
"cos-directory": "test-directory",
200+
"cos-dependencies-archive": "test-archive.tgz",
201+
"filepath": os.path.join(RESOURCES_DIR, "test-notebookA.ipynb"),
202+
"inputs": "test-file.txt;test,file.txt",
203+
"outputs": "test-file/test-file-copy.txt;test-file/test,file/test,file-copy.txt",
204+
"user-volume-path": None,
205+
"parallel_count": 2,
206+
}
207+
main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict)
208+
209+
195210
def test_main_method_with_wildcard_outputs(monkeypatch, s3_setup, tmpdir):
196211
argument_dict = {
197212
"cos-endpoint": "http://" + MINIO_HOST_PORT,

packages/pipeline-editor/src/FileSubmissionDialog.tsx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ export const FileSubmissionDialog: React.FC<IProps> = ({
113113
</div>
114114
</div>
115115
<br />
116+
117+
<label htmlFor="parallel_count"> Parallel Count:</label>
118+
<input id="parallel_count" type="number" name="parallel_count" />
119+
116120
<input
117121
type="checkbox"
118122
className="elyra-Dialog-checkbox"

0 commit comments

Comments
 (0)