12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- package e2e2
15
+ package kube
16
16
17
17
import (
18
18
"context"
19
+ "errors"
19
20
"fmt"
20
- "os"
21
- "strconv"
21
+ "time"
22
22
23
23
corev1 "k8s.io/api/core/v1"
24
24
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
25
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
26
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25
27
"k8s.io/apimachinery/pkg/runtime"
26
28
ctrl "sigs.k8s.io/controller-runtime"
27
29
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -30,30 +32,30 @@ import (
30
32
akov2next "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/nextapi/v1"
31
33
)
32
34
33
- // InitK8sTest initializes a test enviroment on Kubernetes.
34
- // It requires:
35
- // - A running kubernetes cluster with a local configuration accessing it.
36
- // - The given target CRD being installed in that cluster
37
- // - Certain environment variables, such as OPERATOR_NAMESPACE, being set
38
- // - The operator running on a given Pod or as a process with a given PID
39
- func InitK8sTest (ctx context.Context , targetCRD string ) (client.Client , error ) {
40
- if err := assertRequiredEnvVars ("OPERATOR_NAMESPACE" ); err != nil {
41
- return nil , fmt .Errorf ("missing required test Kubernetes env vars: %w" , err )
42
- }
35
+ const (
36
+ Pause = time .Second
37
+ )
43
38
39
+ type ObjectWithStatus interface {
40
+ client.Object
41
+ GetConditions () []metav1.Condition
42
+ }
43
+
44
+ // NewK8sTest initializes a test environment on Kubernetes.
45
+ // It requires:
46
+ // - A running Kubernetes cluster with a local configuration bound to it.
47
+ // - The given set CRDs installed in that cluster
48
+ func NewK8sTest (ctx context.Context , crds ... string ) (client.Client , error ) {
44
49
kubeClient , err := TestKubeClient ()
45
50
if err != nil {
46
51
return nil , fmt .Errorf ("failed to setup Kubernetes test env client: %w" , err )
47
52
}
48
53
49
- if err := assertCRD (ctx , kubeClient , targetCRD ); err != nil {
50
- return nil , fmt .Errorf ("failed to asert for test-required CRD: %w" , err )
51
- }
52
-
53
- if err := assertOperator (ctx , kubeClient ); err != nil {
54
- return nil , fmt .Errorf ("failed to asert for test operator running: %w" , err )
54
+ for _ , targetCRD := range crds {
55
+ if err := assertCRD (ctx , kubeClient , targetCRD ); err != nil {
56
+ return nil , fmt .Errorf ("failed to asert for test-required CRD: %w" , err )
57
+ }
55
58
}
56
-
57
59
return kubeClient , nil
58
60
}
59
61
@@ -72,6 +74,72 @@ func TestKubeClient() (client.Client, error) {
72
74
return getKubeClient (testScheme )
73
75
}
74
76
77
+ func Apply (ctx context.Context , kubeClient client.Client , defaultNamespace string , objs ... client.Object ) error {
78
+ for i , obj := range objs {
79
+ if obj .GetNamespace () == "" {
80
+ obj = obj .DeepCopyObject ().(client.Object )
81
+ obj .SetNamespace (defaultNamespace )
82
+ }
83
+ if err := apply (ctx , kubeClient , obj ); err != nil {
84
+ return fmt .Errorf ("failed to apply object %d: %w" , (i + 1 ), err )
85
+ }
86
+ }
87
+ return nil
88
+ }
89
+
90
+ func apply (ctx context.Context , kubeClient client.Client , obj client.Object ) error {
91
+ key := client .ObjectKeyFromObject (obj )
92
+ old := obj .DeepCopyObject ().(client.Object )
93
+ err := kubeClient .Get (ctx , key , old )
94
+ switch {
95
+ case err == nil :
96
+ obj = obj .DeepCopyObject ().(client.Object )
97
+ obj .SetResourceVersion (old .GetResourceVersion ())
98
+ if err := kubeClient .Update (ctx , obj ); err != nil {
99
+ return fmt .Errorf ("failed to update %s: %w" , key , err )
100
+ }
101
+ case apierrors .IsNotFound (err ):
102
+ if err := kubeClient .Create (ctx , obj ); err != nil {
103
+ return fmt .Errorf ("failed to create %s: %w" , key , err )
104
+ }
105
+ default :
106
+ return fmt .Errorf ("failed to apply %s: %w" , key , err )
107
+ }
108
+ return nil
109
+ }
110
+
111
+ type OKOrFailureFunc func () (bool , error )
112
+
113
+ func WaitConditionOrFailure (timeout time.Duration , okOrFailFn OKOrFailureFunc ) error {
114
+ start := time .Now ()
115
+ for {
116
+ ok , err := okOrFailFn ()
117
+ if ok {
118
+ return nil
119
+ }
120
+ if err != nil {
121
+ return fmt .Errorf ("failed to check condition: %w" , err )
122
+ }
123
+ if time .Since (start ) > timeout {
124
+ return errors .New ("wait condition timed out" )
125
+ }
126
+ time .Sleep (Pause )
127
+ }
128
+ }
129
+
130
+ func AssertObjReady (ctx context.Context , kubeClient client.Client , key client.ObjectKey , obj ObjectWithStatus ) (bool , error ) {
131
+ err := kubeClient .Get (ctx , key , obj )
132
+ if err != nil {
133
+ return false , fmt .Errorf ("failed to get object %v: %w" , key , err )
134
+ }
135
+ for _ , condition := range obj .GetConditions () {
136
+ if condition .Type == "Ready" && condition .Status == metav1 .ConditionTrue {
137
+ return true , nil
138
+ }
139
+ }
140
+ return false , nil
141
+ }
142
+
75
143
func getTestScheme (addToSchemeFunctions ... func (* runtime.Scheme ) error ) (* runtime.Scheme , error ) {
76
144
testScheme := runtime .NewScheme ()
77
145
for _ , addToSchemeFn := range addToSchemeFunctions {
@@ -106,50 +174,3 @@ func assertCRD(ctx context.Context, kubeClient client.Client, targetCRD string)
106
174
}
107
175
return fmt .Errorf ("%s not found" , targetCRD )
108
176
}
109
-
110
- func assertRequiredEnvVars (envVars ... string ) error {
111
- missing := make ([]string , 0 , len (envVars ))
112
- for _ , envVar := range envVars {
113
- _ , ok := os .LookupEnv (envVar )
114
- if ! ok {
115
- missing = append (missing , envVar )
116
- }
117
- }
118
- if len (missing ) > 0 {
119
- return fmt .Errorf ("missing required env vars: %v" , missing )
120
- }
121
- return nil
122
- }
123
-
124
- func assertOperator (ctx context.Context , kubeClient client.Client ) error {
125
- pid := os .Getenv ("OPERATOR_PID" )
126
- if pid != "" {
127
- return assertProcessIsRunning (pid )
128
- }
129
- pod := os .Getenv ("OPERATOR_POD_NAME" )
130
- if pod != "" {
131
- ns := os .Getenv ("OPERATOR_NAMESPACE" )
132
- return assertPod (ctx , kubeClient , pod , ns )
133
- }
134
- return fmt .Errorf ("please set OPERATOR_PID or OPERATOR_POD_NAME to allow to check he operator is running" )
135
- }
136
-
137
- func assertProcessIsRunning (pidString string ) error {
138
- pid , err := strconv .Atoi (pidString )
139
- if err != nil {
140
- return fmt .Errorf ("failed to convert %s to a numeric PID: %w" , pidString , err )
141
- }
142
- if _ , err := os .FindProcess (pid ); err != nil {
143
- return fmt .Errorf ("failed to find process for PID %d: %w" , pid , err )
144
- }
145
- return nil
146
- }
147
-
148
- func assertPod (ctx context.Context , kubeClient client.Client , pod , ns string ) error {
149
- podObj := corev1.Pod {}
150
- key := client.ObjectKey {Name : pod , Namespace : ns }
151
- if err := kubeClient .Get (ctx , key , & podObj , & client.GetOptions {}); err != nil {
152
- return fmt .Errorf ("failed to get POD %s/%s: %w" , ns , pod , err )
153
- }
154
- return nil
155
- }
0 commit comments