Skip to content

Commit 18f55ac

Browse files
jswxstwoninowang
authored and
oninowang
committed
fix: process aggregate outputs for nodes with retries. Fixes #14228
Signed-off-by: oninowang <[email protected]>
1 parent 8691b9b commit 18f55ac

File tree

3 files changed

+55
-2
lines changed

3 files changed

+55
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
apiVersion: argoproj.io/v1alpha1
2+
kind: Workflow
3+
metadata:
4+
name: parameter-aggregation-dag-with-retry
5+
spec:
6+
retryStrategy:
7+
limit: 1
8+
entrypoint: fanout-dag-with-output
9+
templates:
10+
- name: echo-value
11+
inputs:
12+
parameters:
13+
- name: message
14+
container:
15+
image: argoproj/argosay:v2
16+
outputs:
17+
parameters:
18+
- name: dummy-output
19+
value: '{{inputs.parameters.message}}'
20+
- name: fanout-dag-with-output
21+
dag:
22+
tasks:
23+
- name: echo-list
24+
template: echo-value
25+
arguments:
26+
parameters:
27+
- name: message
28+
value: '{{item}}'
29+
withItems: [1, 2, 3]
30+
outputs:
31+
parameters:
32+
- name: dummy-dag-output
33+
valueFrom:
34+
parameter: '{{tasks.echo-list.outputs.parameters.dummy-output}}'

test/e2e/functional_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,24 @@ func (s *FunctionalSuite) TestParameterAggregationFromOutputs() {
582582
})
583583
}
584584

585+
func (s *FunctionalSuite) TestParameterAggregationDAGWithRetry() {
586+
s.Given().
587+
Workflow("@functional/parameter-aggregation-dag-with-retry.yaml").
588+
When().
589+
SubmitWorkflow().
590+
WaitForWorkflow(time.Second * 90).
591+
Then().
592+
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
593+
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
594+
nodeStatus := status.Nodes.FindByDisplayName("parameter-aggregation-dag-with-retry(0)")
595+
require.NotNil(t, nodeStatus)
596+
assert.Equal(t, wfv1.NodeSucceeded, nodeStatus.Phase)
597+
require.NotNil(t, nodeStatus.Outputs)
598+
assert.Len(t, nodeStatus.Outputs.Parameters, 1)
599+
assert.Equal(t, `["1","2","3"]`, nodeStatus.Outputs.Parameters[0].Value.String())
600+
})
601+
}
602+
585603
func (s *FunctionalSuite) TestDAGDepends() {
586604
s.Given().
587605
Workflow("@functional/dag-depends.yaml").

workflow/controller/operator.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -3236,7 +3236,8 @@ func (n loopNodes) Len() int {
32363236
}
32373237

32383238
func parseLoopIndex(s string) int {
3239-
s = strings.SplitN(s, "(", 2)[1]
3239+
splits := strings.Split(s, "(")
3240+
s = splits[len(splits)-1]
32403241
s = strings.SplitN(s, ":", 2)[0]
32413242
val, err := strconv.Atoi(s)
32423243
if err != nil {
@@ -3276,7 +3277,7 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(scope *wfScope, prefix st
32763277
outputParamValueLists := make(map[string][]string)
32773278
resultsList := make([]wfv1.Item, 0)
32783279
for _, node := range childNodes {
3279-
if node.Outputs == nil || node.Phase != wfv1.NodeSucceeded || node.Type == wfv1.NodeTypeRetry {
3280+
if node.Outputs == nil || node.Phase != wfv1.NodeSucceeded {
32803281
continue
32813282
}
32823283
if len(node.Outputs.Parameters) > 0 {

0 commit comments

Comments
 (0)