Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,18 @@ deploy-kind: ko ## Deploy kro to a kind cluster
kubectl wait --for=condition=ready --timeout=1m pod -n kro-system -l app.kubernetes.io/component=controller
$(KUBECTL) --context kind-${KIND_CLUSTER_NAME} get pods -A

.PHONY: deploy-kind-aggregation
deploy-kind-aggregation: export KO_DOCKER_REPO=kind.local
deploy-kind-aggregation: ko ## Deploy kro to a kind cluster with aggregation RBAC mode
$(KIND) delete clusters ${KIND_CLUSTER_NAME} || true
$(KIND) create cluster --name ${KIND_CLUSTER_NAME}
$(KUBECTL) --context kind-$(KIND_CLUSTER_NAME) create namespace kro-system
make install
# Deploy with aggregation RBAC mode - requires ClusterRoles with rbac.kro.run/aggregate-to-controller label
${HELM} template kro ./helm --namespace kro-system --set image.pullPolicy=Never --set image.ko=true --set config.allowCRDDeletion=true --set rbac.mode=aggregation | $(KO) apply -f -
kubectl wait --for=condition=ready --timeout=1m pod -n kro-system -l app.kubernetes.io/component=controller
$(KUBECTL) --context kind-${KIND_CLUSTER_NAME} get pods -A

.PHONY: ko-apply
ko-apply: ko
${HELM} template kro ./helm --namespace kro-system --set image.pullPolicy=Never --set image.ko=true | $(KO) apply -f -
Expand All @@ -288,4 +300,12 @@ test-e2e: chainsaw ## Run e2e tests

.PHONY: test-e2e-kind
test-e2e-kind: deploy-kind
make test-e2e
make test-e2e

.PHONY: test-e2e-kind-aggregation
test-e2e-kind-aggregation: deploy-kind-aggregation ## Run e2e tests with aggregation RBAC mode
make test-e2e

.PHONY: test-e2e-non-blocking
test-e2e-non-blocking: chainsaw ## Run the non-blocking RGD test (requires aggregation RBAC mode)
$(CHAINSAW) test ./test/e2e/chainsaw/check-non-blocking-rgd
20 changes: 18 additions & 2 deletions api/v1alpha1/resourcegraphdefinition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,25 @@ type Resource struct {
type ResourceGraphDefinitionState string

const (
// ResourceGraphDefinitionStateActive represents the active state of the resource definition.
// ResourceGraphDefinitionStateActive indicates the RGD is fully operational.
// The CRD is established and the controller is running and healthy.
ResourceGraphDefinitionStateActive ResourceGraphDefinitionState = "Active"
// ResourceGraphDefinitionStateInactive represents the inactive state of the resource graph definition

// ResourceGraphDefinitionStateActivating indicates the RGD is starting up.
// This includes CRD creation/establishment and controller synchronization
ResourceGraphDefinitionStateActivating ResourceGraphDefinitionState = "Activating"

// ResourceGraphDefinitionStateDeactivating indicates the RGD is shutting down.
// The controller is being stopped and CRD is being deleted, if the controller
// is configured to do so.
ResourceGraphDefinitionStateDeactivating ResourceGraphDefinitionState = "Deactivating"

// ResourceGraphDefinitionStateDegraded indicates the RGD is operational but
// experiencing issues such as watch errors on some resources.
ResourceGraphDefinitionStateDegraded ResourceGraphDefinitionState = "Degraded"

// ResourceGraphDefinitionStateInactive indicates the RGD has encountered an error
// and is not operational. Check conditions for details.
ResourceGraphDefinitionStateInactive ResourceGraphDefinitionState = "Inactive"
)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/sync v0.12.0
golang.org/x/time v0.9.0
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb
k8s.io/api v0.34.1
k8s.io/apiextensions-apiserver v0.31.0
k8s.io/apimachinery v0.34.1
Expand Down Expand Up @@ -92,6 +91,7 @@ require (
golang.org/x/text v0.23.0 // indirect
golang.org/x/tools v0.28.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
Expand Down
206 changes: 8 additions & 198 deletions pkg/client/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,208 +15,18 @@
package client

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/kubernetes-sigs/kro/pkg/metadata"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
logr "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
// DefaultPollInterval is the default interval for polling CRD status
defaultPollInterval = 150 * time.Millisecond
// DefaultTimeout is the default timeout for waiting for CRD status
defaultTimeout = 2 * time.Minute
)

var _ CRDClient = &CRDWrapper{}

// CRDClient represents operations for managing CustomResourceDefinitions
type CRDClient interface {
// EnsureCreated ensures a CRD exists and is ready
Ensure(ctx context.Context, crd v1.CustomResourceDefinition) error

// Delete removes a CRD if it exists
Delete(ctx context.Context, name string) error

// Get retrieves a CRD by name
Get(ctx context.Context, name string) (*v1.CustomResourceDefinition, error)
}

// CRDInterface provides a simplified interface for CRD operations
type CRDInterface interface {
// Ensure ensures a CRD exists, up-to-date, and is ready. This can be
// a dangerous operation as it will update the CRD if it already exists.
//
// The caller is responsible for ensuring the CRD, isn't introducing
// breaking changes.
Ensure(ctx context.Context, crd v1.CustomResourceDefinition) error

// Get retrieves a CRD by name
Get(ctx context.Context, name string) (*v1.CustomResourceDefinition, error)

// Delete removes a CRD if it exists
Delete(ctx context.Context, name string) error
}

// CRDWrapper provides a simplified interface for CRD operations
type CRDWrapper struct {
client apiextensionsv1.CustomResourceDefinitionInterface
pollInterval time.Duration
timeout time.Duration
}

var _ CRDInterface = (*CRDWrapper)(nil)

// CRDWrapperConfig contains configuration for the CRD wrapper
type CRDWrapperConfig struct {
Client apiextensionsv1.ApiextensionsV1Interface
PollInterval time.Duration
Timeout time.Duration
}

// DefaultConfig returns a CRDWrapperConfig with default values
func DefaultCRDWrapperConfig() CRDWrapperConfig {
return CRDWrapperConfig{
PollInterval: defaultPollInterval,
Timeout: defaultTimeout,
}
}

// newCRDWrapper creates a new CRD wrapper
func newCRDWrapper(cfg CRDWrapperConfig) *CRDWrapper {
if cfg.PollInterval == 0 {
cfg.PollInterval = defaultPollInterval
}
if cfg.Timeout == 0 {
cfg.Timeout = defaultTimeout
// IsEstablished checks if a CRD is established and ready to use.
func IsEstablished(crd *v1.CustomResourceDefinition) bool {
if crd == nil {
return false
}

return &CRDWrapper{
client: cfg.Client.CustomResourceDefinitions(),
pollInterval: cfg.PollInterval,
timeout: cfg.Timeout,
}
}

// Ensure ensures a CRD exists, up-to-date, and is ready. This can be
// a dangerous operation as it will update the CRD if it already exists.
//
// The caller is responsible for ensuring the CRD, isn't introducing
// breaking changes.
func (w *CRDWrapper) Ensure(ctx context.Context, crd v1.CustomResourceDefinition) error {
log := logr.FromContext(ctx)
existing, err := w.Get(ctx, crd.Name)
if err != nil {
if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to check for existing CRD: %w", err)
for _, cond := range crd.Status.Conditions {
if cond.Type == v1.Established && cond.Status == v1.ConditionTrue {
return true
}

log.Info("Creating CRD", "name", crd.Name)
if err := w.create(ctx, crd); err != nil {
return fmt.Errorf("failed to create CRD: %w", err)
}
} else {
kroOwned, nameMatch, idMatch := metadata.CompareRGDOwnership(existing.ObjectMeta, crd.ObjectMeta)
if !kroOwned {
return fmt.Errorf(
"failed to update CRD %s: CRD already exists and is not owned by KRO", crd.Name,
)
}

if !nameMatch {
existingRGDName := existing.Labels[metadata.ResourceGraphDefinitionNameLabel]
return fmt.Errorf(
"failed to update CRD %s: CRD is owned by another ResourceGraphDefinition %s",
crd.Name, existingRGDName,
)
}

if nameMatch && !idMatch {
log.Info(
"Adopting CRD with different RGD ID - RGD may have been deleted and recreated",
"crd", crd.Name,
"existingRGDID", existing.Labels[metadata.ResourceGraphDefinitionIDLabel],
"newRGDID", crd.Labels[metadata.ResourceGraphDefinitionIDLabel],
)
}

log.Info("Updating existing CRD", "name", crd.Name)
if err := w.patch(ctx, crd); err != nil {
return fmt.Errorf("failed to patch CRD: %w", err)
}
}

return w.waitForReady(ctx, crd.Name)
}

// Get retrieves a CRD by name
func (w *CRDWrapper) Get(ctx context.Context, name string) (*v1.CustomResourceDefinition, error) {
return w.client.Get(ctx, name, metav1.GetOptions{})
}

func (w *CRDWrapper) create(ctx context.Context, crd v1.CustomResourceDefinition) error {
_, err := w.client.Create(ctx, &crd, metav1.CreateOptions{})
return err
}

func (w *CRDWrapper) patch(ctx context.Context, newCRD v1.CustomResourceDefinition) error {
patchBytes, err := json.Marshal(newCRD)
if err != nil {
return fmt.Errorf("failed to marshal CRD for patch: %w", err)
}

_, err = w.client.Patch(
ctx,
newCRD.Name,
types.MergePatchType,
patchBytes,
metav1.PatchOptions{},
)
return err
}

// Delete removes a CRD if it exists
func (w *CRDWrapper) Delete(ctx context.Context, name string) error {
log := logr.FromContext(ctx)
log.Info("Deleting CRD", "name", name)

err := w.client.Delete(ctx, name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete CRD: %w", err)
}
return nil
}

// waitForReady waits for a CRD to become ready
func (w *CRDWrapper) waitForReady(ctx context.Context, name string) error {
log := logr.FromContext(ctx)
log.Info("Waiting for CRD to become ready", "name", name)

return wait.PollUntilContextTimeout(ctx, w.pollInterval, w.timeout, true,
func(ctx context.Context) (bool, error) {
crd, err := w.Get(ctx, name)
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}

for _, cond := range crd.Status.Conditions {
if cond.Type == v1.Established && cond.Status == v1.ConditionTrue {
return true, nil
}
}

return false, nil
})
return false
}
56 changes: 20 additions & 36 deletions pkg/client/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
package fake

import (
"context"
"net/http"

"github.com/kubernetes-sigs/kro/pkg/client"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/dynamic"
Expand All @@ -29,12 +28,13 @@ import (

// FakeSet is a fake implementation of SetInterface for testing
type FakeSet struct {
DynamicClient dynamic.Interface
KubernetesClient kubernetes.Interface
ApiExtensionsClient apiextensionsv1.ApiextensionsV1Interface
Config *rest.Config
restMapper meta.RESTMapper
HTTP *http.Client
DynamicClient dynamic.Interface
KubernetesClient kubernetes.Interface
ApiExtensionsClientV1 apiextensionsv1.ApiextensionsV1Interface
ApiExtensionsClient apiextensions.Interface
Config *rest.Config
restMapper meta.RESTMapper
HTTP *http.Client
}

var _ client.SetInterface = (*FakeSet)(nil)
Expand All @@ -61,20 +61,27 @@ func (f *FakeSet) Dynamic() dynamic.Interface {
return f.DynamicClient
}

// APIExtensions returns the full API extensions clientset
func (f *FakeSet) APIExtensions() apiextensions.Interface {
return f.ApiExtensionsClient
}

// APIExtensionsV1 returns the API extensions client
func (f *FakeSet) APIExtensionsV1() apiextensionsv1.ApiextensionsV1Interface {
return f.ApiExtensionsClient
return f.ApiExtensionsClientV1
}

// RESTConfig returns a copy of the underlying REST config
func (f *FakeSet) RESTConfig() *rest.Config {
return f.Config
}

// CRD returns a new CRD interface instance
func (f *FakeSet) CRD(cfg client.CRDWrapperConfig) client.CRDInterface {
// Return a fake CRD implementation for testing
return &FakeCRD{}
// CRD returns the CustomResourceDefinition client
func (f *FakeSet) CRD() apiextensionsv1.CustomResourceDefinitionInterface {
if f.ApiExtensionsClientV1 == nil {
return nil
}
return f.ApiExtensionsClientV1.CustomResourceDefinitions()
}

// WithImpersonation returns a new client that impersonates the given user
Expand All @@ -90,26 +97,3 @@ func (f *FakeSet) RESTMapper() meta.RESTMapper {
func (f *FakeSet) SetRESTMapper(restMapper meta.RESTMapper) {
f.restMapper = restMapper
}

// FakeCRD is a fake implementation of CRDInterface for testing
type FakeCRD struct{}

var _ client.CRDInterface = (*FakeCRD)(nil)

// Ensure ensures a CRD exists, up-to-date, and is ready
func (f *FakeCRD) Ensure(ctx context.Context, crd v1.CustomResourceDefinition) error {
// For testing, just return success
return nil
}

// Get retrieves a CRD by name
func (f *FakeCRD) Get(ctx context.Context, name string) (*v1.CustomResourceDefinition, error) {
// For testing, return an empty CRD
return &v1.CustomResourceDefinition{}, nil
}

// Delete removes a CRD if it exists
func (f *FakeCRD) Delete(ctx context.Context, name string) error {
// For testing, just return success
return nil
}
Loading