Skip to content

Commit ec01927

Browse files
author
arpechenin
committed
test:
- add explicit failure test for compressed workflows in removeCompletedTaskSetStatus - add e2e coverage for WorkflowTaskSet cleanup in completed compressed workflows Signed-off-by: arpechenin <arpechenin@avito.ru>
1 parent 1bdcf97 commit ec01927

7 files changed

Lines changed: 178 additions & 6 deletions

File tree

test/e2e/executor_plugins_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
//go:build plugins
2-
31
package e2e
42

53
import (
@@ -82,6 +80,22 @@ func (s *ExecutorPluginsSuite) TestTemplateExecutor() {
8280
})
8381
}
8482

83+
func (s *ExecutorPluginsSuite) TestCompressedTemplateExecutor_WorkflowTaskSetIsProperlyCleaned() {
84+
s.Given().
85+
Workflow("@testdata/plugins/executor/massive-executor-workflow.yaml").
86+
When().
87+
SubmitWorkflow().
88+
WaitForWorkflow(fixtures.ToBeSucceeded).
89+
Then().
90+
ExpectWorkflowCompressed().
91+
ExpectWorkflowTaskSet(func(t *testing.T, wfts *wfv1.WorkflowTaskSet) {
92+
assert.NotNil(t, wfts)
93+
assert.Empty(t, wfts.Spec.Tasks)
94+
assert.Empty(t, wfts.Status.Nodes)
95+
assert.Equal(t, "true", wfts.Labels[common.LabelKeyCompleted])
96+
})
97+
}
98+
8599
func TestExecutorPluginsSuite(t *testing.T) {
86100
suite.Run(t, new(ExecutorPluginsSuite))
87101
}

test/e2e/fixtures/then.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,18 @@ func (t *Then) ExpectWorkflowDeleted() *Then {
8383
return t
8484
}
8585

86+
func (t *Then) ExpectWorkflowCompressed() *Then {
87+
ctx := logging.TestContext(t.t.Context())
88+
wf, err := t.client.Get(ctx, t.wf.Name, metav1.GetOptions{})
89+
if err != nil {
90+
t.t.Fatal(err)
91+
}
92+
if wf.Status.CompressedNodes == "" {
93+
t.t.Errorf("expected workflow to be compressed")
94+
}
95+
return t
96+
}
97+
8698
// ExpectWorkflowNode checks on a specific node in the workflow.
8799
// If no node matches the selector, then the NodeStatus and Pod will be nil.
88100
// If the pod does not exist (e.g. because it was deleted) then the Pod will be nil too.

test/e2e/manifests/plugins/hello-executor-plugin-configmap.yaml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,17 @@ data:
3030
self.end_headers()
3131
3232
def do_POST(self):
33-
if self.headers.get("Authorization") != "Bearer " + token:
34-
self.forbidden()
35-
elif self.path == '/api/v1/template.execute':
33+
if self.path == '/api/v1/template.execute':
3634
args = self.args()
3735
if 'hello' in args['template'].get('plugin', {}):
36+
if self.headers.get("Authorization") != "Bearer " + token:
37+
self.forbidden()
38+
return
3839
self.reply(
3940
{'node': {'phase': 'Succeeded', 'message': 'Hello template!',
4041
'outputs': {'parameters': [{'name': 'foo', 'value': 'bar'}]}}})
4142
else:
42-
self.reply({})
43+
self.reply(None)
4344
else:
4445
self.unsupported()
4546

test/e2e/manifests/plugins/kustomization.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ resources:
66
- hello-executor-plugin-serviceaccount.yaml
77
- hello-executor-plugin.service-account-token-secret.yaml
88
- hello-executor-plugin-configmap.yaml
9+
- massive-executor-plugin-configmap.yaml
910

1011
namespace: argo
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
apiVersion: v1
2+
kind: ConfigMap
3+
metadata:
4+
name: massive-executor-plugin
5+
labels:
6+
workflows.argoproj.io/configmap-type: ExecutorPlugin
7+
annotations:
8+
workflows.argoproj.io/description: |
9+
This plugin returns a massive string (~1000 chars).
10+
data:
11+
sidecar.automountServiceAccountToken: "false"
12+
sidecar.container: |
13+
args:
14+
- |
15+
import json
16+
import random
17+
import string
18+
from http.server import BaseHTTPRequestHandler, HTTPServer
19+
20+
with open("/var/run/argo/token") as f:
21+
token = f.read().strip()
22+
23+
def gen_text():
24+
base = "HELLO_WORLD_123 "
25+
return base * (100_000 // len(base))
26+
27+
class Plugin(BaseHTTPRequestHandler):
28+
29+
def args(self):
30+
return json.loads(self.rfile.read(int(self.headers.get('Content-Length'))))
31+
32+
def reply(self, reply):
33+
self.send_response(200)
34+
self.end_headers()
35+
self.wfile.write(json.dumps(reply).encode("UTF-8"))
36+
37+
def unsupported(self):
38+
self.send_response(404)
39+
self.end_headers()
40+
41+
def do_POST(self):
42+
if self.path == '/api/v1/template.execute':
43+
args = self.args()
44+
45+
if 'massive' in args['template'].get('plugin', {}):
46+
text = gen_text()
47+
self.reply({
48+
'node': {
49+
'phase': 'Succeeded',
50+
'message': 'Massive payload generated',
51+
'outputs': {
52+
'parameters': [
53+
{
54+
'name': 'big-text',
55+
'value': text
56+
}
57+
]
58+
}
59+
}
60+
})
61+
else:
62+
self.reply(None)
63+
else:
64+
self.unsupported()
65+
66+
if __name__ == '__main__':
67+
httpd = HTTPServer(('', 4355), Plugin)
68+
httpd.serve_forever()
69+
command:
70+
- python
71+
- -c
72+
image: python:alpine3.23
73+
name: massive-executor-plugin
74+
ports:
75+
- containerPort: 4356
76+
resources:
77+
limits:
78+
cpu: 200m
79+
memory: 64Mi
80+
requests:
81+
cpu: 100m
82+
memory: 32Mi
83+
securityContext:
84+
allowPrivilegeEscalation: false
85+
capabilities:
86+
drop:
87+
- ALL
88+
readOnlyRootFilesystem: true
89+
runAsNonRoot: true
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
apiVersion: argoproj.io/v1alpha1
2+
kind: Workflow
3+
metadata:
4+
generateName: massive-executor-
5+
spec:
6+
entrypoint: main
7+
parallelism: 6
8+
9+
templates:
10+
- name: main
11+
steps:
12+
- - name: fanout
13+
template: massive-plugin
14+
withSequence:
15+
count: "15"
16+
arguments:
17+
parameters:
18+
- name: idx
19+
value: "{{item}}"
20+
21+
- name: massive-plugin
22+
inputs:
23+
parameters:
24+
- name: idx
25+
plugin:
26+
massive: {}

workflow/controller/taskset_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,35 @@ status:
312312
assert.Empty(t, ts.Status.Nodes)
313313
}
314314
})
315+
t.Run("RemoveCompletedTaskSetStatusFailsForCompressedWorkflow", func(t *testing.T) {
316+
cancel, controller := newController(ctx, wf, ts)
317+
defer cancel()
318+
319+
_, err := controller.wfclientset.ArgoprojV1alpha1().
320+
WorkflowTaskSets("default").
321+
Create(ctx, &ts, v1.CreateOptions{})
322+
require.NoError(t, err)
323+
324+
compressedWf := wf.DeepCopy()
325+
compressedWf.Status.CompressedNodes = "compressed"
326+
327+
woc := newWorkflowOperationCtx(ctx, compressedWf, controller)
328+
329+
err = woc.removeCompletedTaskSetStatus(ctx)
330+
require.Error(t, err)
331+
require.ErrorContains(t, err, "workflow must be decompressed")
332+
333+
tslist, err := woc.controller.wfclientset.ArgoprojV1alpha1().
334+
WorkflowTaskSets("default").
335+
List(ctx, v1.ListOptions{})
336+
require.NoError(t, err)
337+
338+
require.Len(t, tslist.Items, 1)
339+
340+
// Ensure tasksets were not modified.
341+
assert.NotEmpty(t, tslist.Items[0].Spec.Tasks)
342+
assert.NotEmpty(t, tslist.Items[0].Status.Nodes)
343+
})
315344
}
316345

317346
func TestNonHTTPTemplateScenario(t *testing.T) {

0 commit comments

Comments
 (0)