Skip to content

Commit df0b746

Browse files
Add CAPI library that include all required internal code
Signed-off-by: Alex Demicev <alex.demicev@lambdal.com>
1 parent d081f56 commit df0b746

16 files changed

Lines changed: 2760 additions & 0 deletions

File tree

controlplane/config/manager/manager.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ spec:
2626
- "--insecure-diagnostics=${CAPRKE2_INSECURE_DIAGNOSTICS:=false}"
2727
- "--v=${CAPRKE2_DEBUG_LEVEL:=0}"
2828
- "--feature-gates=InPlaceUpdates=${EXP_IN_PLACE_UPDATES:=false}"
29+
- "--concurrency=${CONCURRENCY_NUMBER:=10}"
2930
image: controller:latest
3031
name: manager
3132
env:

controlplane/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
274274
var runtimeClient runtimeclient.Client
275275

276276
if feature.Gates.Enabled(feature.InPlaceUpdates) {
277+
setupLog.Info("InPlaceUpdates feature gate is enabled, setting up RuntimeSDK client and ExtensionConfig controller")
278+
277279
var certWatcher *certwatcher.CertWatcher
278280

279281
runtimeClient, certWatcher, err = capiruntimeclient.New(capiruntimeclient.Options{

pkg/capi/client/client.go

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package client provides utilities for controller-runtime client operations,
18+
// including cache consistency helpers.
19+
//
20+
// Copied from sigs.k8s.io/cluster-api/internal/util/client.
21+
// Remove when upstream CAPI exposes these APIs publicly.
22+
package client
23+
24+
import (
25+
"context"
26+
"fmt"
27+
"time"
28+
29+
"github.com/pkg/errors"
30+
apierrors "k8s.io/apimachinery/pkg/api/errors"
31+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
32+
kerrors "k8s.io/apimachinery/pkg/util/errors"
33+
"k8s.io/apimachinery/pkg/util/resourceversion"
34+
"k8s.io/apimachinery/pkg/util/wait"
35+
"k8s.io/klog/v2"
36+
ctrl "sigs.k8s.io/controller-runtime"
37+
"sigs.k8s.io/controller-runtime/pkg/client"
38+
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
39+
)
40+
41+
const (
42+
waitBackoffDuration = 25 * time.Microsecond
43+
waitBackoffFactor = 1.2
44+
waitBackoffSteps = 63
45+
)
46+
47+
// waitBackoff is the timeout used when waiting for the cache to become up-to-date.
48+
// This adds up to ~ 10 seconds max wait duration.
49+
var waitBackoff = wait.Backoff{
50+
Duration: waitBackoffDuration,
51+
Cap: 2 * time.Second,
52+
Factor: waitBackoffFactor,
53+
Steps: waitBackoffSteps,
54+
}
55+
56+
// WaitForCacheToBeUpToDate waits until the cache is up-to-date in the sense of that the cache contains
57+
// all passed in objects with at least the passed in resourceVersion.
58+
// This is done by retrieving objects from the cache via the client and then comparing resourceVersions.
59+
// Note: This func will update the passed in objects while polling.
60+
// Note: resourceVersion must be set on the passed in objects.
61+
// Note: The generic parameter enforces that all objects have the same type.
62+
func WaitForCacheToBeUpToDate[T client.Object](ctx context.Context, c client.Client, action string, objs ...T) error {
63+
return waitFor(ctx, c, action, checkIfObjectUpToDate, objs...)
64+
}
65+
66+
// WaitForObjectsToBeAddedToTheCache waits until the cache is up-to-date in the sense of that the
67+
// passed in objects exist in the cache.
68+
// Note: This func will update the passed in objects while polling.
69+
// Note: The generic parameter enforces that all objects have the same type.
70+
func WaitForObjectsToBeAddedToTheCache[T client.Object](ctx context.Context, c client.Client, action string, objs ...T) error {
71+
return waitFor(ctx, c, action, checkIfObjectAdded, objs...)
72+
}
73+
74+
// WaitForObjectsToBeDeletedFromTheCache waits until the cache is up-to-date in the sense of that the
75+
// passed in objects have been either removed from the cache or they have a deletionTimestamp set.
76+
// Note: This func will update the passed in objects while polling.
77+
// Note: The generic parameter enforces that all objects have the same type.
78+
func WaitForObjectsToBeDeletedFromTheCache[T client.Object](ctx context.Context, c client.Client, action string, objs ...T) error {
79+
return waitFor(ctx, c, action, checkIfObjectDeleted, objs...)
80+
}
81+
82+
// checkIfObjectUpToDate checks if an object is up-to-date and returns an error if it is not.
83+
func checkIfObjectUpToDate(ctx context.Context, c client.Client, desiredObj desiredObject) (isErrorRetryable bool, err error) {
84+
if desiredObj.MinimumResourceVersion == "" {
85+
// Unexpected error occurred: resourceVersion not set on passed in object (not retryable).
86+
return false, errors.Errorf("%s: cannot compare with invalid resourceVersion: resourceVersion not set",
87+
klog.KObj(desiredObj.Object))
88+
}
89+
90+
if err := c.Get(ctx, desiredObj.Key, desiredObj.Object); err != nil {
91+
if apierrors.IsNotFound(err) {
92+
// Done, object was deleted in the meantime.
93+
return false, nil
94+
}
95+
// Unexpected error occurred (not retryable).
96+
return false, err
97+
}
98+
99+
cmp, err := resourceversion.CompareResourceVersion(desiredObj.Object.GetResourceVersion(), desiredObj.MinimumResourceVersion)
100+
if err != nil {
101+
// Unexpected error occurred: invalid resourceVersion (not retryable).
102+
return false, errors.Wrapf(err, "%s: cannot compare with invalid resourceVersion: current: %s, expected to be >= %s",
103+
klog.KObj(desiredObj.Object), desiredObj.Object.GetResourceVersion(), desiredObj.MinimumResourceVersion)
104+
}
105+
106+
if cmp < 0 {
107+
// resourceVersion < MinimumResourceVersion (retryable).
108+
return true, errors.Errorf("%s: resourceVersion not yet up-to-date: current: %s, expected to be >= %s",
109+
klog.KObj(desiredObj.Object), desiredObj.Object.GetResourceVersion(), desiredObj.MinimumResourceVersion)
110+
}
111+
112+
// Done, resourceVersion is new enough.
113+
return false, nil
114+
}
115+
116+
func checkIfObjectAdded(ctx context.Context, c client.Client, desiredObj desiredObject) (isErrorRetryable bool, err error) {
117+
if err := c.Get(ctx, desiredObj.Key, desiredObj.Object); err != nil {
118+
if apierrors.IsNotFound(err) {
119+
// Object is not yet in the cache (retryable).
120+
return true, err
121+
}
122+
// Unexpected error occurred (not retryable).
123+
return false, err
124+
}
125+
126+
// Done, object exists in the cache.
127+
return false, nil
128+
}
129+
130+
func checkIfObjectDeleted(ctx context.Context, c client.Client, desiredObj desiredObject) (isErrorRetryable bool, err error) {
131+
if err := c.Get(ctx, desiredObj.Key, desiredObj.Object); err != nil {
132+
if apierrors.IsNotFound(err) {
133+
// Done, object has been removed from the cache.
134+
return false, nil
135+
}
136+
// Unexpected error occurred (not retryable).
137+
return false, err
138+
}
139+
140+
if !desiredObj.Object.GetDeletionTimestamp().IsZero() {
141+
// Done, object has deletionTimestamp set.
142+
return false, nil
143+
}
144+
145+
// Object does not have deletionTimestamp set yet (retryable).
146+
return true, fmt.Errorf("%s still exists", klog.KObj(desiredObj.Object))
147+
}
148+
149+
type desiredObject struct {
150+
Object client.Object
151+
Key client.ObjectKey
152+
MinimumResourceVersion string
153+
}
154+
155+
type checkFunc func(ctx context.Context, c client.Client, desiredObj desiredObject) (retryableErr bool, err error)
156+
157+
func waitFor[T client.Object](ctx context.Context, c client.Client, action string, checkFunc checkFunc, objs ...T) error {
158+
// Done, if there are no objects.
159+
if len(objs) == 0 {
160+
return nil
161+
}
162+
163+
var o any = objs[0]
164+
if _, ok := o.(*unstructured.Unstructured); ok {
165+
return errors.Errorf("failed to wait for up-to-date objects in the cache after %s: Unstructured is not supported", action)
166+
}
167+
168+
// All objects have the same type, so we can just take the GVK of the first object.
169+
objGVK, err := apiutil.GVKForObject(objs[0], c.Scheme())
170+
if err != nil {
171+
return errors.Wrapf(err, "failed to wait for up-to-date objects in the cache after %s", action)
172+
}
173+
174+
log := ctrl.LoggerFrom(ctx)
175+
176+
desiredObjects := make([]desiredObject, len(objs))
177+
for i, obj := range objs {
178+
desiredObjects[i] = desiredObject{
179+
Object: obj,
180+
Key: client.ObjectKeyFromObject(obj),
181+
MinimumResourceVersion: obj.GetResourceVersion(),
182+
}
183+
}
184+
185+
now := time.Now()
186+
187+
var pollErrs []error
188+
189+
err = wait.ExponentialBackoffWithContext(ctx, waitBackoff, func(ctx context.Context) (bool, error) {
190+
pollErrs = nil
191+
192+
for _, desiredObj := range desiredObjects {
193+
if isErrorRetryable, err := checkFunc(ctx, c, desiredObj); err != nil {
194+
pollErrs = append(pollErrs, err)
195+
196+
if !isErrorRetryable {
197+
// Stop polling, non-retryable error occurred.
198+
return true, nil
199+
}
200+
}
201+
}
202+
203+
if len(pollErrs) > 0 {
204+
// Continue polling, only retryable errors occurred.
205+
return false, nil
206+
}
207+
208+
// Stop polling, all objects are up-to-date.
209+
return true, nil
210+
})
211+
212+
waitDuration := time.Since(now)
213+
214+
if err != nil || len(pollErrs) > 0 {
215+
waitDurationMetric.WithLabelValues(objGVK.Kind, "error").Observe(waitDuration.Seconds())
216+
217+
var errSuffix string
218+
219+
if err != nil {
220+
if wait.Interrupted(err) {
221+
errSuffix = ": timed out"
222+
} else {
223+
errSuffix = ": " + err.Error()
224+
}
225+
}
226+
227+
err := errors.Errorf(
228+
"failed to wait for up-to-date %s objects in the cache after %s%s: %s",
229+
objGVK.Kind, action, errSuffix, kerrors.NewAggregate(pollErrs))
230+
log.Error(err, "Failed to wait for cache to be up-to-date", "kind", objGVK.Kind, "waitDuration", waitDuration)
231+
232+
return err
233+
}
234+
235+
waitDurationMetric.WithLabelValues(objGVK.Kind, "success").Observe(waitDuration.Seconds())
236+
237+
// Log on a high log-level if it took a long time for the cache to be up-to-date.
238+
if waitDuration >= 1*time.Second {
239+
log.Info("Successfully waited for cache to be up-to-date (>=1s)", "kind", objGVK.Kind, "waitDuration", waitDuration)
240+
} else {
241+
log.V(10).Info("Successfully waited for cache to be up-to-date", "kind", objGVK.Kind, "waitDuration", waitDuration)
242+
}
243+
244+
return nil
245+
}

pkg/capi/client/metrics.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package client
18+
19+
import (
20+
"time"
21+
22+
"github.com/prometheus/client_golang/prometheus"
23+
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
24+
)
25+
26+
const (
27+
nativeHistogramBucketFactor = 1.1
28+
nativeHistogramMaxBucketNumber = 100
29+
)
30+
31+
func init() {
32+
// Register the metrics at the controller-runtime metrics registry.
33+
ctrlmetrics.Registry.MustRegister(waitDurationMetric)
34+
}
35+
36+
var waitDurationMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{
37+
Name: "caprke2_client_cache_wait_duration_seconds",
38+
Help: "Duration that we waited for the cache to be up-to-date in seconds, broken down by kind and status",
39+
Buckets: []float64{
40+
0.00001, 0.000025, 0.00005, 0.0001, 0.00025, 0.0005, 0.001, 0.005,
41+
0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 2.5, 5, 10,
42+
},
43+
NativeHistogramBucketFactor: nativeHistogramBucketFactor,
44+
NativeHistogramMaxBucketNumber: nativeHistogramMaxBucketNumber,
45+
NativeHistogramMinResetDuration: 1 * time.Hour,
46+
}, []string{"kind", "status"})

pkg/capi/compare/equal.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package compare contains utilities to compare objects.
18+
//
19+
// Copied from sigs.k8s.io/cluster-api/internal/util/compare.
20+
// Remove when upstream CAPI exposes these APIs publicly.
21+
package compare
22+
23+
import (
24+
"strings"
25+
26+
"github.com/google/go-cmp/cmp"
27+
"github.com/pkg/errors"
28+
)
29+
30+
// Diff uses cmp.Diff to compare x and y.
31+
// If cmp.Diff panics, Diff returns an error.
32+
func Diff(x any, y any) (equal bool, diff string, matchErr error) {
33+
defer func() {
34+
if r := recover(); r != nil {
35+
equal = false
36+
37+
if err, ok := r.(error); ok {
38+
matchErr = errors.Wrapf(err, "error diffing objects")
39+
40+
return
41+
}
42+
43+
if errMsg, ok := r.(string); ok {
44+
matchErr = errors.Errorf("error diffing objects: %s", errMsg)
45+
46+
return
47+
}
48+
49+
matchErr = errors.Errorf("error diffing objects: panic of unknown type %T", r)
50+
}
51+
}()
52+
53+
diff = cmp.Diff(x, y)
54+
55+
if diff != "" {
56+
// Replace non-breaking space (NBSP) through a regular space.
57+
// This prevents output like this: "\u00a0\u00a0int(\n-\u00a0\t1,\n+\u00a0\t2,\n\u00a0\u00a0)\n"
58+
diff = strings.ReplaceAll(diff, "\u00a0", " ")
59+
// Replace \t through " " because it's easier to read in log output
60+
diff = strings.ReplaceAll(diff, "\t", " ")
61+
diff = strings.TrimSpace(diff)
62+
}
63+
64+
return diff == "", diff, nil
65+
}

pkg/capi/doc.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
Copyright 2025 SUSE.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package capi contains utilities copied from sigs.k8s.io/cluster-api internal packages.
18+
// These are needed by CAPRKE2 for in-place update support but are not yet public in upstream CAPI.
19+
//
20+
// This package should be removed once the upstream CAPI exposes these APIs publicly.
21+
// Tracking: https://github.com/kubernetes-sigs/cluster-api/issues/12291
22+
package capi

0 commit comments

Comments
 (0)