Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 22 additions & 20 deletions integrationtests/controller/bundle/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ var _ = Describe("Bundle Status Fields", func() {
})
})

When("Cluster changes", func() {
BeforeEach(func() {
DescribeTable("A Cluster change triggers a bundle status fields update",
func(updatedClusterLabels map[string]string) {
By("creating the cluster and bundle")
cluster, err := utils.CreateCluster(ctx, k8sClient, "cluster", namespace, nil, namespace)
Expect(err).NotTo(HaveOccurred())
Expect(cluster).To(Not(BeNil()))
Expand All @@ -120,30 +121,29 @@ var _ = Describe("Bundle Status Fields", func() {
bundle, err := utils.CreateBundle(ctx, k8sClient, "name", namespace, targets, targets)
Expect(err).NotTo(HaveOccurred())
Expect(bundle).To(Not(BeNil()))
})

AfterEach(func() {
Expect(k8sClient.Delete(ctx, &v1alpha1.Bundle{ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: namespace,
}})).NotTo(HaveOccurred())
defer func() {
Expect(k8sClient.Delete(ctx, &v1alpha1.Bundle{ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: namespace,
}})).NotTo(HaveOccurred())

})
}()

It("updates the status fields", func() {
cluster := &v1alpha1.Cluster{}
err := k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
By("checking that the bundle and cluster initially have 0 ready bundle deployments")
cluster = &v1alpha1.Cluster{}
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
Expect(err).NotTo(HaveOccurred())
Expect(cluster.Status.Summary.Ready).To(Equal(0))

bundle := &v1alpha1.Bundle{}
bundle = &v1alpha1.Bundle{}
Eventually(func() bool {
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "name"}, bundle)
Expect(err).NotTo(HaveOccurred())
return bundle.Status.Summary.Ready == 0
}).Should(BeTrue())

// prepare bundle deployment so it satisfies the status change
By("updating the bundle deployment's status to trigger a bundle status update to Ready")
bd := &v1alpha1.BundleDeployment{}
Eventually(func() error {
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "name"}, bd)
Expand Down Expand Up @@ -178,16 +178,15 @@ var _ = Describe("Bundle Status Fields", func() {
Expect(cluster.Status.Summary.Pending).To(Equal(0))
Expect(cluster.Status.Display.ReadyBundles).To(Equal("1/1"))

By("Modifying labels will change cluster state")
modifiedLabels := map[string]string{"foo": "bar"}
By("updating the cluster's labels")
Eventually(func() error {
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
Expect(err).NotTo(HaveOccurred())
cluster.Labels = modifiedLabels
cluster.Labels = updatedClusterLabels
return k8sClient.Update(ctx, cluster)
}).ShouldNot(HaveOccurred())

// Change in cluster state results in a bundle deployment update
By("validating that the bundle is re-deployed")
Eventually(func(g Gomega) {
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "name"}, bd)
g.Expect(err).NotTo(HaveOccurred())
Expand All @@ -199,6 +198,9 @@ var _ = Describe("Bundle Status Fields", func() {
g.Expect(bundle.Status.Display.ReadyClusters).To(Equal("1/1"))
g.Expect(bundle.Status.Display.State).To(BeEmpty()) // all resources ready
}).Should(Succeed())
})
})
},

Entry("cluster with default (empty) shard ID", map[string]string{"foo": "bar"}),
Entry("cluster with a different shard ID to the bundle's", map[string]string{"foo": "bar", "fleet.cattle.io/shard-ref": "non-default-shard"}),
)
})
158 changes: 158 additions & 0 deletions integrationtests/controller/schedule/schedule_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package schedule

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/rancher/fleet/integrationtests/utils"
"github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

var _ = Describe("Schedule updates triggered by cluster updates", func() {
BeforeEach(func() {
var err error
namespace, err = utils.NewNamespaceName()
Expect(err).ToNot(HaveOccurred())

ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
Expect(k8sClient.Create(ctx, ns)).ToNot(HaveOccurred())

DeferCleanup(func() {
Expect(k8sClient.Delete(ctx, ns)).ToNot(HaveOccurred())
})
})

When("a Cluster living in the same namespace as a schedule is updated to match the schedule's targets", func() {
It("schedules the cluster", func() {
By("creating the cluster and schedule")
cluster, err := utils.CreateCluster(ctx, k8sClient, "cluster", namespace, nil, namespace)
Expect(err).NotTo(HaveOccurred())
Expect(cluster).To(Not(BeNil()))

schedule := v1alpha1.Schedule{
ObjectMeta: metav1.ObjectMeta{
Name: "my-schedule",
Namespace: namespace,
},
Spec: v1alpha1.ScheduleSpec{
Schedule: "0 */1 * * * *", // Every minute
Duration: metav1.Duration{Duration: 30 * time.Second},
Targets: v1alpha1.ScheduleTargets{
Clusters: []v1alpha1.ScheduleTarget{
{
ClusterSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{"can-be-scheduled": "yes"}, // initially doesn't match any cluster
},
},
},
},
},
}
err = k8sClient.Create(ctx, &schedule)
Expect(err).NotTo(HaveOccurred())

Eventually(func(g Gomega) {
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "my-schedule"}, &schedule)
g.Expect(err).NotTo(HaveOccurred())
}).Should(Succeed())

defer func() {
Expect(k8sClient.Delete(ctx, &v1alpha1.Schedule{ObjectMeta: metav1.ObjectMeta{
Name: "my-schedule",
Namespace: namespace,
}})).NotTo(HaveOccurred())

}()

By("checking that the cluster has not been scheduled")
cluster = &v1alpha1.Cluster{}
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
Expect(err).NotTo(HaveOccurred())
Expect(cluster.Status.Scheduled).To(BeFalse())

By("updating the cluster's labels to match the schedule's selector")
Eventually(func() error {
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
Expect(err).NotTo(HaveOccurred())
cluster.Labels = map[string]string{"can-be-scheduled": "yes"}
return k8sClient.Update(ctx, cluster)
}).ShouldNot(HaveOccurred())

By("validating that the cluster is scheduled")
Eventually(func(g Gomega) {
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(cluster.Status.Scheduled).To(BeTrue())
}).Should(Succeed())
})
})

When("another Cluster with a different shard ID and matching the schedule's targets is added into the same namespace", func() {
It("schedules the cluster", func() {
By("creating the cluster and schedule")
cluster, err := utils.CreateCluster(ctx, k8sClient, "cluster", namespace, nil, namespace)
Expect(err).NotTo(HaveOccurred())
Expect(cluster).To(Not(BeNil()))

schedule := v1alpha1.Schedule{
ObjectMeta: metav1.ObjectMeta{
Name: "my-schedule",
Namespace: namespace,
},
Spec: v1alpha1.ScheduleSpec{
Schedule: "0 */1 * * * *", // Every minute
Duration: metav1.Duration{Duration: 30 * time.Second},
Targets: v1alpha1.ScheduleTargets{
Clusters: []v1alpha1.ScheduleTarget{
{
ClusterSelector: &metav1.LabelSelector{},
},
},
},
},
}
err = k8sClient.Create(ctx, &schedule)
Expect(err).NotTo(HaveOccurred())

Eventually(func(g Gomega) {
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "my-schedule"}, &schedule)
g.Expect(err).NotTo(HaveOccurred())
}).Should(Succeed())

defer func() {
Expect(k8sClient.Delete(ctx, &v1alpha1.Schedule{ObjectMeta: metav1.ObjectMeta{
Name: "my-schedule",
Namespace: namespace,
}})).NotTo(HaveOccurred())

}()

By("validating that the cluster is scheduled")
Eventually(func(g Gomega) {
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(cluster.Status.Scheduled).To(BeTrue())
}).Should(Succeed())

By("adding another cluster with a different shard ID to the same namespace")
labels := map[string]string{"fleet.cattle.io/shard-ref": "different-shard"}
shardedCluster, err := utils.CreateCluster(ctx, k8sClient, "cluster2", namespace, labels, namespace)
Expect(err).NotTo(HaveOccurred())
Expect(shardedCluster).To(Not(BeNil()))

By("validating that the cluster is scheduled")
Eventually(func(g Gomega) {
var shardedCluster v1alpha1.Cluster
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster2"}, &shardedCluster)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(shardedCluster.Status.Scheduled).To(BeTrue())
}).Should(Succeed())
})
})
})
82 changes: 82 additions & 0 deletions integrationtests/controller/schedule/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package schedule

import (
"bytes"
"context"
"testing"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/reugn/go-quartz/quartz"

"github.com/rancher/fleet/integrationtests/utils"
"github.com/rancher/fleet/internal/cmd/controller/reconciler"

"k8s.io/client-go/rest"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

var (
cancel context.CancelFunc
cfg *rest.Config
ctx context.Context
k8sClient client.Client
testenv *envtest.Environment
logsBuffer bytes.Buffer

namespace string
)

func TestFleet(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Fleet Schedule Suite")
}

var _ = BeforeSuite(func() {
SetDefaultEventuallyTimeout(60 * time.Second)
SetDefaultEventuallyPollingInterval(1 * time.Second)

ctx, cancel = context.WithCancel(context.TODO())
testenv = utils.NewEnvTest("../../..")

var err error
cfg, err = utils.StartTestEnv(testenv)
Expect(err).NotTo(HaveOccurred())

// Set up log capture
GinkgoWriter.TeeTo(&logsBuffer)
ctrl.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

k8sClient, err = utils.NewClient(cfg)
Expect(err).NotTo(HaveOccurred())

mgr, err := utils.NewManager(cfg)
Expect(err).ToNot(HaveOccurred())

sched, err := quartz.NewStdScheduler()
Expect(err).ToNot(HaveOccurred(), "failed to create scheduler")

err = (&reconciler.ScheduleReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Workers: 50,
Scheduler: sched,
}).SetupWithManager(mgr)
Expect(err).ToNot(HaveOccurred(), "failed to set up manager")

go func() {
defer GinkgoRecover()
err = mgr.Start(ctx)
Expect(err).ToNot(HaveOccurred(), "failed to run manager")
}()
})

var _ = AfterSuite(func() {
cancel()
Expect(testenv.Stop()).ToNot(HaveOccurred())
})
9 changes: 7 additions & 2 deletions internal/cmd/controller/reconciler/bundle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error {
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
),
sharding.FilterByShardID(r.ShardID),
),
).
// Note: Maybe improve with WatchesMetadata, does it have access to labels?
Watches(
// Fan out from bundledeployment to bundle, this is useful to update the
// bundle's status fields.
&fleet.BundleDeployment{}, handler.EnqueueRequestsFromMapFunc(BundleDeploymentMapFunc(r)),
builder.WithPredicates(bundleDeploymentStatusChangedPredicate()),
builder.WithPredicates(
bundleDeploymentStatusChangedPredicate(),
sharding.FilterByShardID(r.ShardID),
),
).
Watches(
// Fan out from cluster to bundle, this is useful for targeting and templating.
Expand All @@ -127,6 +131,8 @@ func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return requests
}),
builder.WithPredicates(clusterChangedPredicate()),
// Deliberately skipping the sharding filter here: a bundle may live in the namespace of a cluster with both
// bearing distinct shard IDs.
).
Watches(
// Fan out from secret to bundle, reconcile bundles when a secret
Expand All @@ -142,7 +148,6 @@ func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error {
handler.EnqueueRequestsFromMapFunc(r.downstreamResourceMapFunc("ConfigMap")),
builder.WithPredicates(dataChangedPredicate()),
).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}).
Complete(r)
}
Expand Down
Loading