forked from Azure/AgentBaker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpollers.go
More file actions
124 lines (102 loc) · 3.96 KB
/
pollers.go
File metadata and controls
124 lines (102 loc) · 3.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package e2e_test
import (
"context"
"log"
"strings"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)
const (
// Polling intervals
execOnVMPollInterval = 10 * time.Second
extractClusterParametersPollInterval = 15 * time.Second
extractVMLogsPollInterval = 15 * time.Second
waitUntilPodRunningPollInterval = 5 * time.Second
waitUntilPodDeletedPollInterval = 5 * time.Second
// Polling timeouts
execOnVMPollingTimeout = 3 * time.Minute
extractClusterParametersPollingTimeout = 3 * time.Minute
extractVMLogsPollingTimeout = 5 * time.Minute
waitUntilPodRunningPollingTimeout = 3 * time.Minute
waitUntilPodDeletedPollingTimeout = 1 * time.Minute
)
func pollExecOnVM(ctx context.Context, kube *kubeclient, vmPrivateIP, jumpboxPodName string, sshPrivateKey, command string) (*podExecResult, error) {
var execResult *podExecResult
err := wait.PollImmediateWithContext(ctx, execOnVMPollInterval, execOnVMPollingTimeout, func(ctx context.Context) (bool, error) {
res, err := execOnVM(ctx, kube, vmPrivateIP, jumpboxPodName, sshPrivateKey, command)
if err != nil {
log.Printf("unable to execute command on VM: %s", err)
// fail hard on non-retriable error
if strings.Contains(err.Error(), "error extracting exit code") {
return false, err
}
return false, nil
}
// this denotes a retriable SSH failure
if res.exitCode == "255" {
return false, nil
}
execResult = res
return true, nil
})
if err != nil {
return nil, err
}
return execResult, nil
}
// Wraps extractClusterParameters in a poller with a 15-second wait interval and 5-minute timeout
func pollExtractClusterParameters(ctx context.Context, t *testing.T, kube *kubeclient) (map[string]string, error) {
var clusterParams map[string]string
err := wait.PollImmediateWithContext(ctx, extractClusterParametersPollInterval, extractClusterParametersPollingTimeout, func(ctx context.Context) (bool, error) {
params, err := extractClusterParameters(ctx, t, kube)
if err != nil {
t.Logf("error extracting cluster parameters: %q", err)
return false, nil
}
clusterParams = params
return true, nil
})
if err != nil {
return nil, err
}
return clusterParams, nil
}
// Wraps exctracLogsFromVM and dumpFileMapToDir in a poller with a 15-second wait interval and 5-minute timeout
func pollExtractVMLogs(ctx context.Context, t *testing.T, vmssName string, privateKeyBytes []byte, opts *scenarioRunOpts) error {
err := wait.PollImmediateWithContext(ctx, extractVMLogsPollInterval, extractVMLogsPollingTimeout, func(ctx context.Context) (bool, error) {
t.Log("attempting to extract VM logs")
logFiles, err := extractLogsFromVM(ctx, t, vmssName, string(privateKeyBytes), opts)
if err != nil {
t.Logf("error extracting VM logs: %q", err)
return false, nil
}
t.Logf("dumping VM logs to local directory: %s", opts.loggingDir)
if err = dumpFileMapToDir(opts.loggingDir, logFiles); err != nil {
t.Logf("error extracting VM logs: %q", err)
return false, nil
}
return true, nil
})
if err != nil {
return err
}
return nil
}
func waitUntilPodRunning(ctx context.Context, kube *kubeclient, podName string) error {
return wait.PollImmediateWithContext(ctx, waitUntilPodRunningPollInterval, waitUntilPodRunningPollingTimeout, func(ctx context.Context) (bool, error) {
pod, err := kube.typed.CoreV1().Pods(defaultNamespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return false, err
}
return pod.Status.Phase == corev1.PodPhase("Running"), nil
})
}
func waitUntilPodDeleted(ctx context.Context, kube *kubeclient, podName string) error {
return wait.PollImmediateWithContext(ctx, waitUntilPodDeletedPollInterval, waitUntilPodDeletedPollingTimeout, func(ctx context.Context) (bool, error) {
err := kube.typed.CoreV1().Pods(defaultNamespace).Delete(ctx, podName, metav1.DeleteOptions{})
return err == nil, err
})
}