diff --git a/test/e2e/functional/parameter-aggregation-dag-with-retry.yaml b/test/e2e/functional/parameter-aggregation-dag-with-retry.yaml new file mode 100644 index 000000000000..10d96848c20e --- /dev/null +++ b/test/e2e/functional/parameter-aggregation-dag-with-retry.yaml @@ -0,0 +1,34 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: parameter-aggregation-dag-with-retry +spec: + retryStrategy: + limit: 1 + entrypoint: fanout-dag-with-output + templates: + - name: echo-value + inputs: + parameters: + - name: message + container: + image: argoproj/argosay:v2 + outputs: + parameters: + - name: dummy-output + value: '{{inputs.parameters.message}}' + - name: fanout-dag-with-output + dag: + tasks: + - name: echo-list + template: echo-value + arguments: + parameters: + - name: message + value: '{{item}}' + withItems: [1, 2, 3] + outputs: + parameters: + - name: dummy-dag-output + valueFrom: + parameter: '{{tasks.echo-list.outputs.parameters.dummy-output}}' \ No newline at end of file diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 2848ed700183..42425ce97b80 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -582,6 +582,24 @@ func (s *FunctionalSuite) TestParameterAggregationFromOutputs() { }) } +func (s *FunctionalSuite) TestParameterAggregationDAGWithRetry() { + s.Given(). + Workflow("@functional/parameter-aggregation-dag-with-retry.yaml"). + When(). + SubmitWorkflow(). + WaitForWorkflow(time.Second * 90). + Then(). + ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase) + nodeStatus := status.Nodes.FindByDisplayName("parameter-aggregation-dag-with-retry(0)") + require.NotNil(t, nodeStatus) + assert.Equal(t, wfv1.NodeSucceeded, nodeStatus.Phase) + require.NotNil(t, nodeStatus.Outputs) + assert.Len(t, nodeStatus.Outputs.Parameters, 1) + assert.Equal(t, `["1","2","3"]`, nodeStatus.Outputs.Parameters[0].Value.String()) + }) +} + func (s *FunctionalSuite) TestDAGDepends() { s.Given(). Workflow("@functional/dag-depends.yaml"). diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 8f708eb02d05..7cf9c0b680f1 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -659,6 +659,10 @@ func (woc *wfOperationCtx) buildLocalScopeFromTask(dagCtx *dagContext, task *wfv var ancestorNodes []wfv1.NodeStatus for _, node := range woc.wf.Status.Nodes { if node.BoundaryID == dagCtx.boundaryID && strings.HasPrefix(node.Name, ancestorNode.Name+"(") { + // Filter retried nodes and only aggregate outputs of their parent nodes. + if node.NodeFlag != nil && node.NodeFlag.Retried { + continue + } ancestorNodes = append(ancestorNodes, node) } } diff --git a/workflow/controller/dag_test.go b/workflow/controller/dag_test.go index dee2ef413f64..249ba8c604c7 100644 --- a/workflow/controller/dag_test.go +++ b/workflow/controller/dag_test.go @@ -2565,6 +2565,8 @@ status: - name: chunk value: "7" name: reproduce-bug-9tpfr.process-tasks(7:7)(0) + nodeFlag: + retried: true outputs: exitCode: "0" phase: Succeeded @@ -2588,6 +2590,8 @@ status: - name: chunk value: "5" name: reproduce-bug-9tpfr.process-tasks(5:5)(0) + nodeFlag: + retried: true outputs: exitCode: "0" phase: Succeeded @@ -2630,6 +2634,8 @@ status: - name: chunk value: "1" name: reproduce-bug-9tpfr.process-tasks(1:1)(0) + nodeFlag: + retried: true outputs: exitCode: "0" phase: Succeeded @@ -2653,6 +2659,8 @@ status: - name: chunk value: "8" name: reproduce-bug-9tpfr.process-tasks(8:8)(0) + nodeFlag: + retried: true outputs: exitCode: "0" phase: Succeeded @@ -2732,6 +2740,8 @@ status: value: "3" message: failed with exit code 1 name: reproduce-bug-9tpfr.process-tasks(3:3)(0) + nodeFlag: + retried: true outputs: exitCode: "1" phase: Failed @@ -2755,6 +2765,8 @@ status: - name: chunk value: "9" name: reproduce-bug-9tpfr.process-tasks(9:9)(0) + nodeFlag: + retried: true outputs: exitCode: "0" phase: Succeeded @@ -2777,6 +2789,8 @@ status: value: "3" message: failed with exit code 1 name: reproduce-bug-9tpfr.process-tasks(3:3)(1) + nodeFlag: + retried: true outputs: exitCode: "1" phase: Failed @@ -2819,6 +2833,8 @@ status: - name: chunk value: "6" name: reproduce-bug-9tpfr.process-tasks(6:6)(0) + nodeFlag: + retried: true outputs: exitCode: "0" phase: Succeeded @@ -2899,6 +2915,8 @@ status: - name: chunk value: "2" name: reproduce-bug-9tpfr.process-tasks(2:2)(0) + nodeFlag: + retried: true outputs: exitCode: "0" phase: Succeeded @@ -2961,6 +2979,8 @@ status: - name: chunk value: "4" name: reproduce-bug-9tpfr.process-tasks(4:4)(0) + nodeFlag: + retried: true outputs: exitCode: "0" phase: Succeeded @@ -3004,6 +3024,8 @@ status: - name: chunk value: "0" name: reproduce-bug-9tpfr.process-tasks(0:0)(0) + nodeFlag: + retried: true outputs: exitCode: "0" phase: Succeeded @@ -3047,6 +3069,8 @@ status: value: "3" message: failed with exit code 1 name: reproduce-bug-9tpfr.process-tasks(3:3)(2) + nodeFlag: + retried: true outputs: exitCode: "1" phase: Failed diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 6cfdfa1e18b9..f3542edf1ac0 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -3236,7 +3236,8 @@ func (n loopNodes) Len() int { } func parseLoopIndex(s string) int { - s = strings.SplitN(s, "(", 2)[1] + splits := strings.Split(s, "(") + s = splits[len(splits)-1] s = strings.SplitN(s, ":", 2)[0] val, err := strconv.Atoi(s) if err != nil { @@ -3276,7 +3277,7 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(scope *wfScope, prefix st outputParamValueLists := make(map[string][]string) resultsList := make([]wfv1.Item, 0) for _, node := range childNodes { - if node.Outputs == nil || node.Phase != wfv1.NodeSucceeded || node.Type == wfv1.NodeTypeRetry { + if node.Outputs == nil || node.Phase != wfv1.NodeSucceeded { continue } if len(node.Outputs.Parameters) > 0 {