Skip to content

Commit d0564de

Browse files
committed
fix: CRD sync race in vcluster HA
1 parent f608d31 commit d0564de

2 files changed

Lines changed: 184 additions & 25 deletions

File tree

pkg/util/translate/translate.go

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/pkg/errors"
1616
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
1717
apiextensionsv1clientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
18+
apiequality "k8s.io/apimachinery/pkg/api/equality"
1819
kerrors "k8s.io/apimachinery/pkg/api/errors"
1920
"k8s.io/apimachinery/pkg/api/meta"
2021
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -23,6 +24,7 @@ import (
2324
"k8s.io/apimachinery/pkg/util/wait"
2425
"k8s.io/client-go/discovery"
2526
"k8s.io/client-go/rest"
27+
"k8s.io/client-go/util/retry"
2628
"k8s.io/klog/v2"
2729
"sigs.k8s.io/controller-runtime/pkg/client"
2830
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
@@ -376,44 +378,67 @@ func checkSubresourceStatus(ctx context.Context, vClient *apiextensionsv1clients
376378
return isClusterScoped, hasStatusSubresource, err
377379
}
378380

379-
func crdUpdateWithNewVersion(ctx context.Context, vClient *apiextensionsv1clientset.Clientset, pCrdDefinition, vCrdDefinition *apiextensionsv1.CustomResourceDefinition, groupVersionKind schema.GroupVersionKind) (bool, bool, error) {
380-
var err error
381-
isClusterScoped := vCrdDefinition.Spec.Scope == apiextensionsv1.ClusterScoped
382-
hasStatusSubresource := false
381+
func crdVersionsWithNewStorageVersion(crdVersions []apiextensionsv1.CustomResourceDefinitionVersion, storageVersion apiextensionsv1.CustomResourceDefinitionVersion) []apiextensionsv1.CustomResourceDefinitionVersion {
382+
storageVersion.Storage = true
383383

384-
// CRD exists but with different version. Need to add the new version to it, and set as storage version if it is not already set.
385-
klog.FromContext(ctx).Info("CRD found in virtual cluster, checking versions", "crd", vCrdDefinition.Name, "groupVersionKind", groupVersionKind)
386-
387-
newVersions := []apiextensionsv1.CustomResourceDefinitionVersion{}
388-
for _, version := range vCrdDefinition.Spec.Versions {
389-
if version.Name == groupVersionKind.Version {
384+
newVersions := make([]apiextensionsv1.CustomResourceDefinitionVersion, 0, len(crdVersions)+1)
385+
for _, version := range crdVersions {
386+
if version.Name == storageVersion.Name {
390387
continue
391388
}
392389
version.Storage = false
393390
newVersions = append(newVersions, version)
394391
}
395392

396-
// Version not found, we need to add it
397-
klog.FromContext(ctx).Info("CRD version not found in virtual cluster, adding it", "version", groupVersionKind.Version, "crd", vCrdDefinition.Name)
393+
return append(newVersions, storageVersion)
394+
}
395+
396+
func crdUpdateWithNewVersion(ctx context.Context, vClient *apiextensionsv1clientset.Clientset, pCrdDefinition, vCrdDefinition *apiextensionsv1.CustomResourceDefinition, groupVersionKind schema.GroupVersionKind) (bool, bool, error) {
397+
// CRD exists but with different version. Need to add the new version to it, and set as storage version if it is not already set.
398+
isClusterScoped := vCrdDefinition.Spec.Scope == apiextensionsv1.ClusterScoped
399+
hasStatusSubresource := false
400+
401+
klog.FromContext(ctx).Info("CRD found in virtual cluster, checking versions", "crd", vCrdDefinition.Name, "groupVersionKind", groupVersionKind)
402+
398403
newVersion := getCrdVersionByName(pCrdDefinition.Spec.Versions, groupVersionKind.Version)
399404
if newVersion == nil {
400-
err = fmt.Errorf("could not find version %q in physical CRD %q", groupVersionKind.Version, pCrdDefinition.Name)
401-
return isClusterScoped, hasStatusSubresource, err
405+
return isClusterScoped, false, fmt.Errorf("could not find version %q in physical CRD %q", groupVersionKind.Version, pCrdDefinition.Name)
402406
}
403407
newVersion.Storage = true
404-
newVersions = append(newVersions, *newVersion)
405-
vCrdDefinition.Spec.Versions = newVersions
406-
// Update the CRD in the virtual cluster
407-
klog.FromContext(ctx).Info("Updating CRD in virtual cluster with new version", "crd", vCrdDefinition.Name, "version", groupVersionKind.Version)
408-
_, err = vClient.ApiextensionsV1().CustomResourceDefinitions().Update(ctx, vCrdDefinition, metav1.UpdateOptions{})
408+
hasStatusSubresource = hasStatus(*newVersion)
409+
410+
updated := false
411+
// In HA mode all replicas run this code concurrently (before leader election), so we use RetryOnConflict to handle potential
412+
// race occurring when multiple replicas try to update the CRD at the same time.
413+
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
414+
crdDefinition, err := vClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, vCrdDefinition.Name, metav1.GetOptions{})
415+
if err != nil {
416+
return err
417+
}
418+
isClusterScoped = crdDefinition.Spec.Scope == apiextensionsv1.ClusterScoped
419+
newVersions := crdVersionsWithNewStorageVersion(crdDefinition.Spec.Versions, *newVersion)
420+
if apiequality.Semantic.DeepEqual(crdDefinition.Spec.Versions, newVersions) {
421+
klog.FromContext(ctx).Info("CRD in virtual cluster already has desired version", "crd", crdDefinition.Name, "version", groupVersionKind.Version)
422+
return nil
423+
}
424+
crdDefinition.Spec.Versions = newVersions
425+
426+
klog.FromContext(ctx).Info("Updating CRD in virtual cluster with new version", "crd", crdDefinition.Name, "version", groupVersionKind.Version)
427+
_, err = vClient.ApiextensionsV1().CustomResourceDefinitions().Update(ctx, crdDefinition, metav1.UpdateOptions{})
428+
if kerrors.IsConflict(err) {
429+
klog.FromContext(ctx).Info("CRD update conflict, retrying with latest ResourceVersion", "crd", crdDefinition.Name, "version", groupVersionKind.Version)
430+
}
431+
updated = err == nil
432+
return err
433+
})
409434
if err != nil {
410-
err = fmt.Errorf("update crd in virtual cluster: %w", err)
411-
return isClusterScoped, hasStatusSubresource, err
435+
return isClusterScoped, false, fmt.Errorf("update crd in virtual cluster: %w", err)
412436
}
413-
// Check if the status subresource is set
414-
hasStatusSubresource = hasStatus(*newVersion)
415-
klog.FromContext(ctx).Info("CRD updated in virtual cluster", "crd", vCrdDefinition.Name, "version", groupVersionKind.Version, "hasStatusSubresource", hasStatusSubresource)
416-
return isClusterScoped, hasStatusSubresource, err
437+
if updated {
438+
// Check if the status subresource is set
439+
klog.FromContext(ctx).Info("CRD updated in virtual cluster", "crd", vCrdDefinition.Name, "version", groupVersionKind.Version, "hasStatusSubresource", hasStatusSubresource)
440+
}
441+
return isClusterScoped, hasStatusSubresource, nil
417442
}
418443

419444
func createCrdFromPhysicalCluster(ctx context.Context, vClient *apiextensionsv1clientset.Clientset, pCrdDefinition *apiextensionsv1.CustomResourceDefinition, groupVersionResource schema.GroupVersionResource, groupVersionKind schema.GroupVersionKind) (bool, bool, error) {

pkg/util/translate/translate_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
11
package translate
22

33
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
47
"fmt"
8+
"io"
59
"maps"
10+
"net/http"
611
"testing"
712

813
"gotest.tools/assert"
914
corev1 "k8s.io/api/core/v1"
15+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
16+
apiextensionsv1clientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
17+
clientsetscheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
1018
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19+
"k8s.io/apimachinery/pkg/runtime/schema"
20+
fakerest "k8s.io/client-go/rest/fake"
1121
)
1222

1323
func TestVirtualLabels(t *testing.T) {
@@ -213,3 +223,127 @@ func TestLabelSelector(t *testing.T) {
213223
},
214224
}, pMap)
215225
}
226+
227+
func TestCRDUpdateWithNewVersionRetriesConflictAndConverges(t *testing.T) {
228+
ctx := context.Background()
229+
crdName := "widgets.example.com"
230+
gvk := schema.GroupVersionKind{Group: "example.com", Version: "v1beta1", Kind: "Widget"}
231+
oldVersion := apiextensionsv1.CustomResourceDefinitionVersion{Name: "v1alpha1", Served: true, Storage: true}
232+
desiredVersion := apiextensionsv1.CustomResourceDefinitionVersion{
233+
Name: "v1beta1",
234+
Served: true,
235+
Storage: false,
236+
Subresources: &apiextensionsv1.CustomResourceSubresources{
237+
Status: &apiextensionsv1.CustomResourceSubresourceStatus{},
238+
},
239+
}
240+
staleVersion := apiextensionsv1.CustomResourceDefinitionVersion{Name: "v1beta1", Served: false, Storage: false}
241+
pCrdDefinition := testCRD(crdName, apiextensionsv1.NamespaceScoped, oldVersion, desiredVersion)
242+
vCrdDefinition := testCRD(crdName, apiextensionsv1.NamespaceScoped, oldVersion)
243+
staleVCrdDefinition := testCRD(crdName, apiextensionsv1.NamespaceScoped, oldVersion, staleVersion)
244+
245+
getCalls := 0
246+
updateCalls := 0
247+
var updatedCRD *apiextensionsv1.CustomResourceDefinition
248+
var serverErr error
249+
vClient := apiextensionsv1clientset.New(&fakerest.RESTClient{
250+
Client: fakerest.CreateHTTPClient(func(r *http.Request) (*http.Response, error) {
251+
assert.Equal(t, r.URL.Path, "/apis/apiextensions.k8s.io/v1/customresourcedefinitions/"+crdName)
252+
253+
switch r.Method {
254+
case http.MethodGet:
255+
getCalls++
256+
return jsonResponse(http.StatusOK, staleVCrdDefinition)
257+
case http.MethodPut:
258+
updateCalls++
259+
if updateCalls == 1 {
260+
return jsonResponse(http.StatusConflict, &metav1.Status{
261+
TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"},
262+
Status: metav1.StatusFailure,
263+
Code: http.StatusConflict,
264+
Reason: metav1.StatusReasonConflict,
265+
Message: "conflict",
266+
Details: &metav1.StatusDetails{
267+
Group: "apiextensions.k8s.io",
268+
Kind: "customresourcedefinitions",
269+
Name: crdName,
270+
},
271+
})
272+
}
273+
274+
updatedCRD = &apiextensionsv1.CustomResourceDefinition{}
275+
serverErr = json.NewDecoder(r.Body).Decode(updatedCRD)
276+
if serverErr != nil {
277+
return jsonResponse(http.StatusInternalServerError, &metav1.Status{
278+
TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"},
279+
Status: metav1.StatusFailure,
280+
Code: http.StatusInternalServerError,
281+
Reason: metav1.StatusReasonInternalError,
282+
Message: serverErr.Error(),
283+
})
284+
}
285+
return jsonResponse(http.StatusOK, updatedCRD)
286+
default:
287+
return jsonResponse(http.StatusMethodNotAllowed, &metav1.Status{
288+
TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"},
289+
Status: metav1.StatusFailure,
290+
Code: http.StatusMethodNotAllowed,
291+
Reason: metav1.StatusReasonMethodNotAllowed,
292+
})
293+
}
294+
}),
295+
NegotiatedSerializer: clientsetscheme.Codecs.WithoutConversion(),
296+
GroupVersion: apiextensionsv1.SchemeGroupVersion,
297+
VersionedAPIPath: "/apis/apiextensions.k8s.io/v1",
298+
})
299+
300+
isClusterScoped, hasStatusSubresource, err := crdUpdateWithNewVersion(ctx, vClient, pCrdDefinition, vCrdDefinition, gvk)
301+
assert.NilError(t, err)
302+
assert.NilError(t, serverErr)
303+
assert.Assert(t, !isClusterScoped)
304+
assert.Assert(t, hasStatusSubresource)
305+
assert.Equal(t, getCalls, 2)
306+
assert.Equal(t, updateCalls, 2)
307+
assert.Assert(t, updatedCRD != nil)
308+
309+
expectedOldVersion := oldVersion
310+
expectedOldVersion.Storage = false
311+
expectedDesiredVersion := desiredVersion
312+
expectedDesiredVersion.Storage = true
313+
assert.DeepEqual(t, updatedCRD.Spec.Versions, []apiextensionsv1.CustomResourceDefinitionVersion{
314+
expectedOldVersion,
315+
expectedDesiredVersion,
316+
})
317+
}
318+
319+
func testCRD(name string, scope apiextensionsv1.ResourceScope, versions ...apiextensionsv1.CustomResourceDefinitionVersion) *apiextensionsv1.CustomResourceDefinition {
320+
return &apiextensionsv1.CustomResourceDefinition{
321+
ObjectMeta: metav1.ObjectMeta{
322+
Name: name,
323+
},
324+
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
325+
Group: "example.com",
326+
Names: apiextensionsv1.CustomResourceDefinitionNames{
327+
Plural: "widgets",
328+
Kind: "Widget",
329+
},
330+
Scope: scope,
331+
Versions: versions,
332+
},
333+
}
334+
}
335+
336+
func jsonResponse(statusCode int, obj interface{}) (*http.Response, error) {
337+
data, err := json.Marshal(obj)
338+
if err != nil {
339+
return nil, err
340+
}
341+
342+
return &http.Response{
343+
StatusCode: statusCode,
344+
Header: http.Header{
345+
"Content-Type": []string{"application/json"},
346+
},
347+
Body: io.NopCloser(bytes.NewReader(data)),
348+
}, nil
349+
}

0 commit comments

Comments
 (0)