diff --git a/pkg/util/translate/translate.go b/pkg/util/translate/translate.go index 7531466993..777398062a 100644 --- a/pkg/util/translate/translate.go +++ b/pkg/util/translate/translate.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1clientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apiequality "k8s.io/apimachinery/pkg/api/equality" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -23,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/rest" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -376,44 +378,66 @@ func checkSubresourceStatus(ctx context.Context, vClient *apiextensionsv1clients return isClusterScoped, hasStatusSubresource, err } -func crdUpdateWithNewVersion(ctx context.Context, vClient *apiextensionsv1clientset.Clientset, pCrdDefinition, vCrdDefinition *apiextensionsv1.CustomResourceDefinition, groupVersionKind schema.GroupVersionKind) (bool, bool, error) { - var err error - isClusterScoped := vCrdDefinition.Spec.Scope == apiextensionsv1.ClusterScoped - hasStatusSubresource := false +func crdVersionsWithNewStorageVersion(crdVersions []apiextensionsv1.CustomResourceDefinitionVersion, storageVersion apiextensionsv1.CustomResourceDefinitionVersion) []apiextensionsv1.CustomResourceDefinitionVersion { + storageVersion.Storage = true - // CRD exists but with different version. Need to add the new version to it, and set as storage version if it is not already set. - klog.FromContext(ctx).Info("CRD found in virtual cluster, checking versions", "crd", vCrdDefinition.Name, "groupVersionKind", groupVersionKind) - - newVersions := []apiextensionsv1.CustomResourceDefinitionVersion{} - for _, version := range vCrdDefinition.Spec.Versions { - if version.Name == groupVersionKind.Version { + newVersions := make([]apiextensionsv1.CustomResourceDefinitionVersion, 0, len(crdVersions)+1) + for _, version := range crdVersions { + if version.Name == storageVersion.Name { continue } version.Storage = false newVersions = append(newVersions, version) } - // Version not found, we need to add it - klog.FromContext(ctx).Info("CRD version not found in virtual cluster, adding it", "version", groupVersionKind.Version, "crd", vCrdDefinition.Name) + return append(newVersions, storageVersion) +} + +func crdUpdateWithNewVersion(ctx context.Context, vClient *apiextensionsv1clientset.Clientset, pCrdDefinition, vCrdDefinition *apiextensionsv1.CustomResourceDefinition, groupVersionKind schema.GroupVersionKind) (bool, bool, error) { + // CRD exists but with different version. Need to add the new version to it, and set as storage version if it is not already set. + isClusterScoped := vCrdDefinition.Spec.Scope == apiextensionsv1.ClusterScoped + hasStatusSubresource := false + + klog.FromContext(ctx).Info("CRD found in virtual cluster, checking versions", "crd", vCrdDefinition.Name, "groupVersionKind", groupVersionKind) + newVersion := getCrdVersionByName(pCrdDefinition.Spec.Versions, groupVersionKind.Version) if newVersion == nil { - err = fmt.Errorf("could not find version %q in physical CRD %q", groupVersionKind.Version, pCrdDefinition.Name) - return isClusterScoped, hasStatusSubresource, err - } - newVersion.Storage = true - newVersions = append(newVersions, *newVersion) - vCrdDefinition.Spec.Versions = newVersions - // Update the CRD in the virtual cluster - klog.FromContext(ctx).Info("Updating CRD in virtual cluster with new version", "crd", vCrdDefinition.Name, "version", groupVersionKind.Version) - _, err = vClient.ApiextensionsV1().CustomResourceDefinitions().Update(ctx, vCrdDefinition, metav1.UpdateOptions{}) - if err != nil { - err = fmt.Errorf("update crd in virtual cluster: %w", err) - return isClusterScoped, hasStatusSubresource, err + return isClusterScoped, false, fmt.Errorf("could not find version %q in physical CRD %q", groupVersionKind.Version, pCrdDefinition.Name) } // Check if the status subresource is set hasStatusSubresource = hasStatus(*newVersion) - klog.FromContext(ctx).Info("CRD updated in virtual cluster", "crd", vCrdDefinition.Name, "version", groupVersionKind.Version, "hasStatusSubresource", hasStatusSubresource) - return isClusterScoped, hasStatusSubresource, err + + updated := false + // In HA mode all replicas run this code concurrently (before leader election), so we use RetryOnConflict to handle potential + // race occurring when multiple replicas try to update the CRD at the same time. + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + crdDefinition, err := vClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, vCrdDefinition.Name, metav1.GetOptions{}) + if err != nil { + return err + } + isClusterScoped = crdDefinition.Spec.Scope == apiextensionsv1.ClusterScoped + newVersions := crdVersionsWithNewStorageVersion(crdDefinition.Spec.Versions, *newVersion) + if apiequality.Semantic.DeepEqual(crdDefinition.Spec.Versions, newVersions) { + klog.FromContext(ctx).V(1).Info("CRD in virtual cluster already has desired version", "crd", crdDefinition.Name, "version", groupVersionKind.Version) + return nil + } + crdDefinition.Spec.Versions = newVersions + + klog.FromContext(ctx).V(1).Info("Updating CRD in virtual cluster with new version", "crd", crdDefinition.Name, "version", groupVersionKind.Version) + _, err = vClient.ApiextensionsV1().CustomResourceDefinitions().Update(ctx, crdDefinition, metav1.UpdateOptions{}) + if kerrors.IsConflict(err) { + klog.FromContext(ctx).V(1).Info("CRD update conflict, retrying with latest ResourceVersion", "crd", crdDefinition.Name, "version", groupVersionKind.Version) + } + updated = err == nil + return err + }) + if err != nil { + return isClusterScoped, false, fmt.Errorf("update crd in virtual cluster: %w", err) + } + if updated { + klog.FromContext(ctx).Info("CRD updated in virtual cluster", "crd", vCrdDefinition.Name, "version", groupVersionKind.Version, "hasStatusSubresource", hasStatusSubresource) + } + return isClusterScoped, hasStatusSubresource, nil } func createCrdFromPhysicalCluster(ctx context.Context, vClient *apiextensionsv1clientset.Clientset, pCrdDefinition *apiextensionsv1.CustomResourceDefinition, groupVersionResource schema.GroupVersionResource, groupVersionKind schema.GroupVersionKind) (bool, bool, error) { diff --git a/pkg/util/translate/translate_test.go b/pkg/util/translate/translate_test.go index 2068cfa797..d1b1dba56b 100644 --- a/pkg/util/translate/translate_test.go +++ b/pkg/util/translate/translate_test.go @@ -1,13 +1,23 @@ package translate import ( + "bytes" + "context" + "encoding/json" "fmt" + "io" "maps" + "net/http" "testing" "gotest.tools/assert" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsv1clientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + clientsetscheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + fakerest "k8s.io/client-go/rest/fake" ) func TestVirtualLabels(t *testing.T) { @@ -213,3 +223,127 @@ func TestLabelSelector(t *testing.T) { }, }, pMap) } + +func TestCRDUpdateWithNewVersionRetriesConflictAndConverges(t *testing.T) { + ctx := context.Background() + crdName := "widgets.example.com" + gvk := schema.GroupVersionKind{Group: "example.com", Version: "v1beta1", Kind: "Widget"} + oldVersion := apiextensionsv1.CustomResourceDefinitionVersion{Name: "v1alpha1", Served: true, Storage: true} + desiredVersion := apiextensionsv1.CustomResourceDefinitionVersion{ + Name: "v1beta1", + Served: true, + Storage: false, + Subresources: &apiextensionsv1.CustomResourceSubresources{ + Status: &apiextensionsv1.CustomResourceSubresourceStatus{}, + }, + } + staleVersion := apiextensionsv1.CustomResourceDefinitionVersion{Name: "v1beta1", Served: false, Storage: false} + pCrdDefinition := testCRD(crdName, apiextensionsv1.NamespaceScoped, oldVersion, desiredVersion) + vCrdDefinition := testCRD(crdName, apiextensionsv1.NamespaceScoped, oldVersion) + staleVCrdDefinition := testCRD(crdName, apiextensionsv1.NamespaceScoped, oldVersion, staleVersion) + + getCalls := 0 + updateCalls := 0 + var updatedCRD *apiextensionsv1.CustomResourceDefinition + var serverErr error + vClient := apiextensionsv1clientset.New(&fakerest.RESTClient{ + Client: fakerest.CreateHTTPClient(func(r *http.Request) (*http.Response, error) { + assert.Equal(t, r.URL.Path, "/apis/apiextensions.k8s.io/v1/customresourcedefinitions/"+crdName) + + switch r.Method { + case http.MethodGet: + getCalls++ + return jsonResponse(http.StatusOK, staleVCrdDefinition) + case http.MethodPut: + updateCalls++ + if updateCalls == 1 { + return jsonResponse(http.StatusConflict, &metav1.Status{ + TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"}, + Status: metav1.StatusFailure, + Code: http.StatusConflict, + Reason: metav1.StatusReasonConflict, + Message: "conflict", + Details: &metav1.StatusDetails{ + Group: "apiextensions.k8s.io", + Kind: "customresourcedefinitions", + Name: crdName, + }, + }) + } + + updatedCRD = &apiextensionsv1.CustomResourceDefinition{} + serverErr = json.NewDecoder(r.Body).Decode(updatedCRD) + if serverErr != nil { + return jsonResponse(http.StatusInternalServerError, &metav1.Status{ + TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"}, + Status: metav1.StatusFailure, + Code: http.StatusInternalServerError, + Reason: metav1.StatusReasonInternalError, + Message: serverErr.Error(), + }) + } + return jsonResponse(http.StatusOK, updatedCRD) + default: + return jsonResponse(http.StatusMethodNotAllowed, &metav1.Status{ + TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"}, + Status: metav1.StatusFailure, + Code: http.StatusMethodNotAllowed, + Reason: metav1.StatusReasonMethodNotAllowed, + }) + } + }), + NegotiatedSerializer: clientsetscheme.Codecs.WithoutConversion(), + GroupVersion: apiextensionsv1.SchemeGroupVersion, + VersionedAPIPath: "/apis/apiextensions.k8s.io/v1", + }) + + isClusterScoped, hasStatusSubresource, err := crdUpdateWithNewVersion(ctx, vClient, pCrdDefinition, vCrdDefinition, gvk) + assert.NilError(t, err) + assert.NilError(t, serverErr) + assert.Assert(t, !isClusterScoped) + assert.Assert(t, hasStatusSubresource) + assert.Equal(t, getCalls, 2) + assert.Equal(t, updateCalls, 2) + assert.Assert(t, updatedCRD != nil) + + expectedOldVersion := oldVersion + expectedOldVersion.Storage = false + expectedDesiredVersion := desiredVersion + expectedDesiredVersion.Storage = true + assert.DeepEqual(t, updatedCRD.Spec.Versions, []apiextensionsv1.CustomResourceDefinitionVersion{ + expectedOldVersion, + expectedDesiredVersion, + }) +} + +func testCRD(name string, scope apiextensionsv1.ResourceScope, versions ...apiextensionsv1.CustomResourceDefinitionVersion) *apiextensionsv1.CustomResourceDefinition { + return &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: "example.com", + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: "widgets", + Kind: "Widget", + }, + Scope: scope, + Versions: versions, + }, + } +} + +func jsonResponse(statusCode int, obj interface{}) (*http.Response, error) { + data, err := json.Marshal(obj) + if err != nil { + return nil, err + } + + return &http.Response{ + StatusCode: statusCode, + Header: http.Header{ + "Content-Type": []string{"application/json"}, + }, + Body: io.NopCloser(bytes.NewReader(data)), + }, nil +}