Skip to content

Commit 2f1db59

Browse files
authored
fix(sdk): compile ParallelFor in a deterministic manner (#4926)
* fix(sdk): compile ParallelFor in a deterministic manner During compilataion ParallelFor components end up with randomized names, which makes it very inconvenient to compare two versions of a pipeline. This commit fixes this issue. * fix(sdk): fix new parallel-for test cases
1 parent ce985bc commit 2f1db59

20 files changed

+859
-880
lines changed

sdk/python/kfp/dsl/_ops_group.py

+6-10
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def __init__(self, group_type: str, name: str=None, parallelism: int=None):
5353
def _get_matching_opsgroup_already_in_pipeline(group_type, name):
5454
"""Retrieves the opsgroup when the pipeline already contains it.
5555
the opsgroup might be already in the pipeline in case of recursive calls.
56-
56+
5757
Args:
5858
group_type (str): one of 'pipeline', 'exit_handler', 'condition', and 'graph'.
5959
name (str): the name before conversion.
@@ -193,27 +193,23 @@ class ParallelFor(OpsGroup):
193193
"""
194194
TYPE_NAME = 'for_loop'
195195

196-
@staticmethod
197-
def _get_unique_id_code():
198-
return uuid.uuid4().hex[:_for_loop.LoopArguments.NUM_CODE_CHARS]
199-
200196
def __init__(self, loop_args: Union[_for_loop.ItemList, _pipeline_param.PipelineParam],
201197
parallelism: int=None):
202198
if parallelism and parallelism < 1:
203199
raise ValueError('ParallelFor parallism set to < 1, allowed values are > 0')
204200

205201
self.items_is_pipeline_param = isinstance(loop_args, _pipeline_param.PipelineParam)
206202

207-
# use a random code to uniquely identify this loop
208-
code = self._get_unique_id_code()
209-
group_name = 'for-loop-{}'.format(code)
210-
super().__init__(self.TYPE_NAME, name=group_name, parallelism=parallelism)
203+
super().__init__(self.TYPE_NAME, parallelism=parallelism)
211204

212205
if self.items_is_pipeline_param:
213206
loop_args = _for_loop.LoopArguments.from_pipeline_param(loop_args)
214207
elif not self.items_is_pipeline_param and not isinstance(loop_args, _for_loop.LoopArguments):
215208
# we were passed a raw list, wrap it in loop args
216-
loop_args = _for_loop.LoopArguments(loop_args, code)
209+
loop_args = _for_loop.LoopArguments(
210+
loop_args,
211+
code=str(_pipeline.Pipeline.get_default_pipeline().get_next_group_id()),
212+
)
217213

218214
self.loop_args = loop_args
219215

sdk/python/tests/compiler/testdata/loop_over_lightweight_output.py

+1-12
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,6 @@
1717
from kfp import dsl
1818
from kfp.dsl import _for_loop
1919

20-
class Coder:
21-
def __init__(self, ):
22-
self._code_id = 0
23-
24-
def get_code(self, ):
25-
self._code_id += 1
26-
return '{code:0{num_chars:}d}'.format(code=self._code_id, num_chars=_for_loop.LoopArguments.NUM_CODE_CHARS)
27-
28-
29-
dsl.ParallelFor._get_unique_id_code = Coder().get_code
30-
3120
produce_op = kfp.components.load_component_from_text('''\
3221
name: Produce list
3322
outputs:
@@ -61,4 +50,4 @@ def get_code(self, ):
6150
def pipeline():
6251
source_task = produce_op()
6352
with dsl.ParallelFor(source_task.output) as item:
64-
consume_op(item)
53+
consume_op(item)

sdk/python/tests/compiler/testdata/loop_over_lightweight_output.yaml

+3-3
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
- "name": |-
6767
produce-list-data_list-loop-item
6868
"name": |-
69-
for-loop-for-loop-00000001-1
69+
for-loop-1
7070
- "dag":
7171
"tasks":
7272
- "arguments":
@@ -79,9 +79,9 @@
7979
- |-
8080
produce-list
8181
"name": |-
82-
for-loop-for-loop-00000001-1
82+
for-loop-1
8383
"template": |-
84-
for-loop-for-loop-00000001-1
84+
for-loop-1
8585
"withParam": |-
8686
{{tasks.produce-list.outputs.parameters.produce-list-data_list}}
8787
- "name": |-

sdk/python/tests/compiler/testdata/parallelfor_item_argument_resolving.py

+5-14
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,10 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
from typing import NamedTuple
17-
1816
import kfp
1917
from kfp.components import func_to_container_op
2018

2119

22-
# Stabilizing the test output
23-
class StableIDGenerator:
24-
def __init__(self, ):
25-
self._index = 0
26-
27-
def get_next_id(self, ):
28-
self._index += 1
29-
return '{code:0{num_chars:}d}'.format(code=self._index, num_chars=kfp.dsl._for_loop.LoopArguments.NUM_CODE_CHARS)
30-
31-
kfp.dsl.ParallelFor._get_unique_id_code = StableIDGenerator().get_next_id
32-
33-
3420
@func_to_container_op
3521
def produce_str() -> str:
3622
return "Hello"
@@ -77,6 +63,11 @@ def parallelfor_item_argument_resolving():
7763
#consume(loop_item) # Cannot use the full loop item when it's a dict
7864
consume(loop_item.aaa)
7965

66+
loop_args = [{'a': 1, 'b': 2}, {'a': 10, 'b': 20}]
67+
with kfp.dsl.ParallelFor(loop_args) as loop_item:
68+
consume(loop_args)
69+
consume(loop_item)
70+
8071

8172
if __name__ == '__main__':
8273
import kfp.compiler as compiler

0 commit comments

Comments
 (0)