Skip to content

Commit 5b02b1e

Browse files
committed
Rewrite test for workload listener.
Signed-off-by: Thomas Hallgren <thomas@tada.se>
1 parent ffcdaf9 commit 5b02b1e

File tree

1 file changed

+80
-81
lines changed

1 file changed

+80
-81
lines changed

integration_test/workload_watch_test.go

Lines changed: 80 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,43 @@ import (
1010
"github.com/telepresenceio/telepresence/v2/pkg/version"
1111
)
1212

13-
//nolint:gocognit // complex
13+
func (s *notConnectedSuite) createIntercept(ctx context.Context, client manager.ManagerClient, session *manager.SessionInfo) (*manager.InterceptInfo, error) {
14+
ir := &manager.CreateInterceptRequest{
15+
Session: session,
16+
InterceptSpec: &manager.InterceptSpec{
17+
Name: "echo-easy",
18+
Client: "telepresence@datawire.io",
19+
Agent: "echo-easy",
20+
WorkloadKind: "Deployment",
21+
Namespace: s.AppNamespace(),
22+
Mechanism: "tcp",
23+
TargetHost: "127.0.0.1",
24+
TargetPort: 8080,
25+
},
26+
}
27+
pi, err := client.PrepareIntercept(ctx, ir)
28+
if err != nil {
29+
return nil, err
30+
}
31+
spec := ir.InterceptSpec
32+
spec.ServicePort = pi.ServicePort
33+
spec.ServicePortName = pi.ServicePortName
34+
spec.ServiceUid = pi.ServiceUid
35+
spec.ContainerPort = pi.ContainerPort
36+
spec.Protocol = pi.Protocol
37+
spec.ContainerName = pi.ContainerName
38+
if pi.ServiceUid != "" {
39+
if pi.ServicePortName != "" {
40+
spec.PortIdentifier = pi.ServicePortName
41+
} else {
42+
spec.PortIdentifier = strconv.Itoa(int(pi.ServicePort))
43+
}
44+
} else {
45+
spec.PortIdentifier = strconv.Itoa(int(pi.ContainerPort))
46+
}
47+
return client.CreateIntercept(ctx, ir)
48+
}
49+
1450
func (s *notConnectedSuite) Test_WorkloadListener() {
1551
if !s.ClientVersion().EQ(version.Structured) {
1652
s.T().Skip(`Not part of compatibility tests. DoWithTrafficManager assumes compiled executable`)
@@ -24,69 +60,25 @@ func (s *notConnectedSuite) Test_WorkloadListener() {
2460
// 3. Create an intercept (changes state to INTERCEPTED)
2561
// 4. Leave the intercept (state goes back to INSTALLED)
2662
// 5. Remove the deployment
27-
go func() {
28-
defer cancel()
29-
s.ApplyApp(ctx, "echo-easy", "deploy/echo-easy")
30-
ir := &manager.CreateInterceptRequest{
31-
Session: session,
32-
InterceptSpec: &manager.InterceptSpec{
33-
Name: "echo-easy",
34-
Client: "telepresence@datawire.io",
35-
Agent: "echo-easy",
36-
WorkloadKind: "Deployment",
37-
Namespace: s.AppNamespace(),
38-
Mechanism: "tcp",
39-
TargetHost: "127.0.0.1",
40-
TargetPort: 8080,
41-
},
42-
}
43-
_, err := client.SetLogLevel(ctx, &manager.LogLevelRequest{LogLevel: "trace"})
44-
if !s.NoError(err) {
45-
return
46-
}
47-
defer func() {
48-
_, _ = client.SetLogLevel(ctx, &manager.LogLevelRequest{LogLevel: "debug"})
49-
}()
50-
pi, err := client.PrepareIntercept(ctx, ir)
51-
if !s.NoError(err) {
52-
return
53-
}
54-
spec := ir.InterceptSpec
55-
spec.ServicePort = pi.ServicePort
56-
spec.ServicePortName = pi.ServicePortName
57-
spec.ServiceUid = pi.ServiceUid
58-
spec.ContainerPort = pi.ContainerPort
59-
spec.Protocol = pi.Protocol
60-
spec.ContainerName = pi.ContainerName
61-
if pi.ServiceUid != "" {
62-
if pi.ServicePortName != "" {
63-
spec.PortIdentifier = pi.ServicePortName
64-
} else {
65-
spec.PortIdentifier = strconv.Itoa(int(pi.ServicePort))
66-
}
67-
} else {
68-
spec.PortIdentifier = strconv.Itoa(int(pi.ContainerPort))
69-
}
70-
_, err = client.CreateIntercept(ctx, ir)
71-
if !s.NoError(err) {
72-
return
73-
}
74-
time.Sleep(5 * time.Second)
75-
_, err = client.RemoveIntercept(ctx, &manager.RemoveInterceptRequest2{
76-
Session: session,
77-
Name: spec.Name,
78-
})
79-
s.NoError(err)
80-
time.Sleep(5 * time.Second)
81-
s.DeleteSvcAndWorkload(ctx, "deploy", "echo-easy")
82-
time.Sleep(5 * time.Second)
83-
}()
63+
defer cancel()
64+
_, err := client.SetLogLevel(ctx, &manager.LogLevelRequest{LogLevel: "trace"})
65+
if !s.NoError(err) {
66+
return
67+
}
8468

85-
wwStream, err := client.WatchWorkloads(ctx, &manager.WorkloadEventsRequest{
69+
toCtx, toCancel := context.WithTimeout(ctx, time.Minute)
70+
defer toCancel()
71+
wwStream, err := client.WatchWorkloads(toCtx, &manager.WorkloadEventsRequest{
8672
SessionInfo: session,
8773
})
8874
rq.NoError(err)
8975

76+
defer func() {
77+
_, _ = client.SetLogLevel(ctx, &manager.LogLevelRequest{LogLevel: "debug"})
78+
}()
79+
80+
s.ApplyApp(ctx, "echo-easy", "deploy/echo-easy")
81+
9082
// This map contains a key for each expected event from the workload watcher
9183
expectations := map[string]bool{
9284
"added": false,
@@ -98,8 +90,9 @@ func (s *notConnectedSuite) Test_WorkloadListener() {
9890
"deleted": false,
9991
}
10092

93+
var spec *manager.InterceptSpec
10194
var interceptingClient string
102-
for {
95+
for !(s.T().Failed() || expectations["deleted"]) {
10396
delta, err := wwStream.Recv()
10497
if err != nil {
10598
dlog.Infof(ctx, "watcher ended with %v", err)
@@ -108,33 +101,39 @@ func (s *notConnectedSuite) Test_WorkloadListener() {
108101
for _, ev := range delta.Events {
109102
dlog.Infof(ctx, "watcher event: %s %v", ev.Type, ev.Workload)
110103
switch ev.Type {
111-
case manager.WorkloadEvent_ADDED_UNSPECIFIED:
104+
case manager.WorkloadEvent_ADDED_UNSPECIFIED, manager.WorkloadEvent_MODIFIED:
112105
expectations["added"] = true
113106
switch ev.Workload.State {
114107
case manager.WorkloadInfo_PROGRESSING:
115108
expectations["progressing"] = true
116109
case manager.WorkloadInfo_AVAILABLE:
117-
expectations["available"] = true
118-
}
119-
case manager.WorkloadEvent_MODIFIED:
120-
switch ev.Workload.State {
121-
case manager.WorkloadInfo_PROGRESSING:
122-
expectations["progressing"] = true
123-
case manager.WorkloadInfo_AVAILABLE:
124-
expectations["available"] = true
125-
}
126-
switch ev.Workload.AgentState {
127-
case manager.WorkloadInfo_INSTALLED:
128-
if expectations["agent intercepted"] {
129-
expectations["agent installed again"] = true
130-
} else {
131-
expectations["agent installed"] = true
110+
if !expectations["available"] {
111+
expectations["available"] = true
112+
ii, err := s.createIntercept(ctx, client, session)
113+
if !s.NoError(err) {
114+
return
115+
}
116+
spec = ii.Spec
132117
}
133-
case manager.WorkloadInfo_INTERCEPTED:
134-
expectations["agent installed"] = true
135-
expectations["agent intercepted"] = true
136-
if ics := ev.Workload.InterceptClients; len(ics) == 1 {
137-
interceptingClient = ics[0].Client
118+
switch ev.Workload.AgentState {
119+
case manager.WorkloadInfo_INSTALLED:
120+
if expectations["agent intercepted"] {
121+
expectations["agent installed again"] = true
122+
s.DeleteSvcAndWorkload(ctx, "deploy", "echo-easy")
123+
} else {
124+
expectations["agent installed"] = true
125+
}
126+
case manager.WorkloadInfo_INTERCEPTED:
127+
expectations["agent installed"] = true
128+
expectations["agent intercepted"] = true
129+
if ics := ev.Workload.InterceptClients; len(ics) == 1 {
130+
interceptingClient = ics[0].Client
131+
}
132+
_, err = client.RemoveIntercept(ctx, &manager.RemoveInterceptRequest2{
133+
Session: session,
134+
Name: spec.Name,
135+
})
136+
s.NoError(err)
138137
}
139138
}
140139
case manager.WorkloadEvent_DELETED:

0 commit comments

Comments
 (0)