Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 50 additions & 26 deletions pkg/util/translate/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pkg/errors"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1clientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiequality "k8s.io/apimachinery/pkg/api/equality"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -23,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -376,44 +378,66 @@ func checkSubresourceStatus(ctx context.Context, vClient *apiextensionsv1clients
return isClusterScoped, hasStatusSubresource, err
}

func crdUpdateWithNewVersion(ctx context.Context, vClient *apiextensionsv1clientset.Clientset, pCrdDefinition, vCrdDefinition *apiextensionsv1.CustomResourceDefinition, groupVersionKind schema.GroupVersionKind) (bool, bool, error) {
var err error
isClusterScoped := vCrdDefinition.Spec.Scope == apiextensionsv1.ClusterScoped
hasStatusSubresource := false
func crdVersionsWithNewStorageVersion(crdVersions []apiextensionsv1.CustomResourceDefinitionVersion, storageVersion apiextensionsv1.CustomResourceDefinitionVersion) []apiextensionsv1.CustomResourceDefinitionVersion {
storageVersion.Storage = true

// CRD exists but with different version. Need to add the new version to it, and set as storage version if it is not already set.
klog.FromContext(ctx).Info("CRD found in virtual cluster, checking versions", "crd", vCrdDefinition.Name, "groupVersionKind", groupVersionKind)

newVersions := []apiextensionsv1.CustomResourceDefinitionVersion{}
for _, version := range vCrdDefinition.Spec.Versions {
if version.Name == groupVersionKind.Version {
newVersions := make([]apiextensionsv1.CustomResourceDefinitionVersion, 0, len(crdVersions)+1)
for _, version := range crdVersions {
if version.Name == storageVersion.Name {
continue
}
version.Storage = false
newVersions = append(newVersions, version)
}

// Version not found, we need to add it
klog.FromContext(ctx).Info("CRD version not found in virtual cluster, adding it", "version", groupVersionKind.Version, "crd", vCrdDefinition.Name)
return append(newVersions, storageVersion)
}

func crdUpdateWithNewVersion(ctx context.Context, vClient *apiextensionsv1clientset.Clientset, pCrdDefinition, vCrdDefinition *apiextensionsv1.CustomResourceDefinition, groupVersionKind schema.GroupVersionKind) (bool, bool, error) {
// CRD exists but with different version. Need to add the new version to it, and set as storage version if it is not already set.
isClusterScoped := vCrdDefinition.Spec.Scope == apiextensionsv1.ClusterScoped
hasStatusSubresource := false

klog.FromContext(ctx).Info("CRD found in virtual cluster, checking versions", "crd", vCrdDefinition.Name, "groupVersionKind", groupVersionKind)

newVersion := getCrdVersionByName(pCrdDefinition.Spec.Versions, groupVersionKind.Version)
if newVersion == nil {
err = fmt.Errorf("could not find version %q in physical CRD %q", groupVersionKind.Version, pCrdDefinition.Name)
return isClusterScoped, hasStatusSubresource, err
}
newVersion.Storage = true
newVersions = append(newVersions, *newVersion)
vCrdDefinition.Spec.Versions = newVersions
// Update the CRD in the virtual cluster
klog.FromContext(ctx).Info("Updating CRD in virtual cluster with new version", "crd", vCrdDefinition.Name, "version", groupVersionKind.Version)
_, err = vClient.ApiextensionsV1().CustomResourceDefinitions().Update(ctx, vCrdDefinition, metav1.UpdateOptions{})
if err != nil {
err = fmt.Errorf("update crd in virtual cluster: %w", err)
return isClusterScoped, hasStatusSubresource, err
return isClusterScoped, false, fmt.Errorf("could not find version %q in physical CRD %q", groupVersionKind.Version, pCrdDefinition.Name)
}
// Check if the status subresource is set
hasStatusSubresource = hasStatus(*newVersion)
klog.FromContext(ctx).Info("CRD updated in virtual cluster", "crd", vCrdDefinition.Name, "version", groupVersionKind.Version, "hasStatusSubresource", hasStatusSubresource)
return isClusterScoped, hasStatusSubresource, err

updated := false
// In HA mode all replicas run this code concurrently (before leader election), so we use RetryOnConflict to handle potential
// race occurring when multiple replicas try to update the CRD at the same time.
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
crdDefinition, err := vClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, vCrdDefinition.Name, metav1.GetOptions{})
if err != nil {
return err
}
isClusterScoped = crdDefinition.Spec.Scope == apiextensionsv1.ClusterScoped
newVersions := crdVersionsWithNewStorageVersion(crdDefinition.Spec.Versions, *newVersion)
if apiequality.Semantic.DeepEqual(crdDefinition.Spec.Versions, newVersions) {
klog.FromContext(ctx).V(1).Info("CRD in virtual cluster already has desired version", "crd", crdDefinition.Name, "version", groupVersionKind.Version)
return nil
}
crdDefinition.Spec.Versions = newVersions

klog.FromContext(ctx).V(1).Info("Updating CRD in virtual cluster with new version", "crd", crdDefinition.Name, "version", groupVersionKind.Version)
_, err = vClient.ApiextensionsV1().CustomResourceDefinitions().Update(ctx, crdDefinition, metav1.UpdateOptions{})
if kerrors.IsConflict(err) {
klog.FromContext(ctx).V(1).Info("CRD update conflict, retrying with latest ResourceVersion", "crd", crdDefinition.Name, "version", groupVersionKind.Version)
}
updated = err == nil
return err
})
if err != nil {
return isClusterScoped, false, fmt.Errorf("update crd in virtual cluster: %w", err)
}
if updated {
klog.FromContext(ctx).Info("CRD updated in virtual cluster", "crd", vCrdDefinition.Name, "version", groupVersionKind.Version, "hasStatusSubresource", hasStatusSubresource)
}
return isClusterScoped, hasStatusSubresource, nil
}

func createCrdFromPhysicalCluster(ctx context.Context, vClient *apiextensionsv1clientset.Clientset, pCrdDefinition *apiextensionsv1.CustomResourceDefinition, groupVersionResource schema.GroupVersionResource, groupVersionKind schema.GroupVersionKind) (bool, bool, error) {
Expand Down
134 changes: 134 additions & 0 deletions pkg/util/translate/translate_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package translate

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"maps"
"net/http"
"testing"

"gotest.tools/assert"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1clientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
clientsetscheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
fakerest "k8s.io/client-go/rest/fake"
)

func TestVirtualLabels(t *testing.T) {
Expand Down Expand Up @@ -213,3 +223,127 @@ func TestLabelSelector(t *testing.T) {
},
}, pMap)
}

func TestCRDUpdateWithNewVersionRetriesConflictAndConverges(t *testing.T) {
ctx := context.Background()
crdName := "widgets.example.com"
gvk := schema.GroupVersionKind{Group: "example.com", Version: "v1beta1", Kind: "Widget"}
oldVersion := apiextensionsv1.CustomResourceDefinitionVersion{Name: "v1alpha1", Served: true, Storage: true}
desiredVersion := apiextensionsv1.CustomResourceDefinitionVersion{
Name: "v1beta1",
Served: true,
Storage: false,
Subresources: &apiextensionsv1.CustomResourceSubresources{
Status: &apiextensionsv1.CustomResourceSubresourceStatus{},
},
}
staleVersion := apiextensionsv1.CustomResourceDefinitionVersion{Name: "v1beta1", Served: false, Storage: false}
pCrdDefinition := testCRD(crdName, apiextensionsv1.NamespaceScoped, oldVersion, desiredVersion)
vCrdDefinition := testCRD(crdName, apiextensionsv1.NamespaceScoped, oldVersion)
staleVCrdDefinition := testCRD(crdName, apiextensionsv1.NamespaceScoped, oldVersion, staleVersion)

getCalls := 0
updateCalls := 0
var updatedCRD *apiextensionsv1.CustomResourceDefinition
var serverErr error
vClient := apiextensionsv1clientset.New(&fakerest.RESTClient{
Client: fakerest.CreateHTTPClient(func(r *http.Request) (*http.Response, error) {
assert.Equal(t, r.URL.Path, "/apis/apiextensions.k8s.io/v1/customresourcedefinitions/"+crdName)

switch r.Method {
case http.MethodGet:
getCalls++
return jsonResponse(http.StatusOK, staleVCrdDefinition)
case http.MethodPut:
updateCalls++
if updateCalls == 1 {
return jsonResponse(http.StatusConflict, &metav1.Status{
TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"},
Status: metav1.StatusFailure,
Code: http.StatusConflict,
Reason: metav1.StatusReasonConflict,
Message: "conflict",
Details: &metav1.StatusDetails{
Group: "apiextensions.k8s.io",
Kind: "customresourcedefinitions",
Name: crdName,
},
})
}

updatedCRD = &apiextensionsv1.CustomResourceDefinition{}
serverErr = json.NewDecoder(r.Body).Decode(updatedCRD)
if serverErr != nil {
return jsonResponse(http.StatusInternalServerError, &metav1.Status{
TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"},
Status: metav1.StatusFailure,
Code: http.StatusInternalServerError,
Reason: metav1.StatusReasonInternalError,
Message: serverErr.Error(),
})
}
return jsonResponse(http.StatusOK, updatedCRD)
default:
return jsonResponse(http.StatusMethodNotAllowed, &metav1.Status{
TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"},
Status: metav1.StatusFailure,
Code: http.StatusMethodNotAllowed,
Reason: metav1.StatusReasonMethodNotAllowed,
})
}
}),
NegotiatedSerializer: clientsetscheme.Codecs.WithoutConversion(),
GroupVersion: apiextensionsv1.SchemeGroupVersion,
VersionedAPIPath: "/apis/apiextensions.k8s.io/v1",
})

isClusterScoped, hasStatusSubresource, err := crdUpdateWithNewVersion(ctx, vClient, pCrdDefinition, vCrdDefinition, gvk)
assert.NilError(t, err)
assert.NilError(t, serverErr)
assert.Assert(t, !isClusterScoped)
assert.Assert(t, hasStatusSubresource)
assert.Equal(t, getCalls, 2)
assert.Equal(t, updateCalls, 2)
assert.Assert(t, updatedCRD != nil)

expectedOldVersion := oldVersion
expectedOldVersion.Storage = false
expectedDesiredVersion := desiredVersion
expectedDesiredVersion.Storage = true
assert.DeepEqual(t, updatedCRD.Spec.Versions, []apiextensionsv1.CustomResourceDefinitionVersion{
expectedOldVersion,
expectedDesiredVersion,
})
}

func testCRD(name string, scope apiextensionsv1.ResourceScope, versions ...apiextensionsv1.CustomResourceDefinitionVersion) *apiextensionsv1.CustomResourceDefinition {
return &apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: "example.com",
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: "widgets",
Kind: "Widget",
},
Scope: scope,
Versions: versions,
},
}
}

func jsonResponse(statusCode int, obj interface{}) (*http.Response, error) {
data, err := json.Marshal(obj)
if err != nil {
return nil, err
}

return &http.Response{
StatusCode: statusCode,
Header: http.Header{
"Content-Type": []string{"application/json"},
},
Body: io.NopCloser(bytes.NewReader(data)),
}, nil
}
Loading