diff --git a/integrationtests/controller/bundle/status_test.go b/integrationtests/controller/bundle/status_test.go index 0423e6fa1d..d872f2b154 100644 --- a/integrationtests/controller/bundle/status_test.go +++ b/integrationtests/controller/bundle/status_test.go @@ -103,8 +103,9 @@ var _ = Describe("Bundle Status Fields", func() { }) }) - When("Cluster changes", func() { - BeforeEach(func() { + DescribeTable("A Cluster change triggers a bundle status fields update", + func(updatedClusterLabels map[string]string) { + By("creating the cluster and bundle") cluster, err := utils.CreateCluster(ctx, k8sClient, "cluster", namespace, nil, namespace) Expect(err).NotTo(HaveOccurred()) Expect(cluster).To(Not(BeNil())) @@ -120,30 +121,29 @@ var _ = Describe("Bundle Status Fields", func() { bundle, err := utils.CreateBundle(ctx, k8sClient, "name", namespace, targets, targets) Expect(err).NotTo(HaveOccurred()) Expect(bundle).To(Not(BeNil())) - }) - AfterEach(func() { - Expect(k8sClient.Delete(ctx, &v1alpha1.Bundle{ObjectMeta: metav1.ObjectMeta{ - Name: "name", - Namespace: namespace, - }})).NotTo(HaveOccurred()) + defer func() { + Expect(k8sClient.Delete(ctx, &v1alpha1.Bundle{ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: namespace, + }})).NotTo(HaveOccurred()) - }) + }() - It("updates the status fields", func() { - cluster := &v1alpha1.Cluster{} - err := k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster) + By("checking that the bundle and cluster initially have 0 ready bundle deployments") + cluster = &v1alpha1.Cluster{} + err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster) Expect(err).NotTo(HaveOccurred()) Expect(cluster.Status.Summary.Ready).To(Equal(0)) - bundle := &v1alpha1.Bundle{} + bundle = &v1alpha1.Bundle{} Eventually(func() bool { err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "name"}, bundle) Expect(err).NotTo(HaveOccurred()) return bundle.Status.Summary.Ready == 0 }).Should(BeTrue()) - // prepare bundle deployment so it satisfies the status change + By("updating the bundle deployment's status to trigger a bundle status update to Ready") bd := &v1alpha1.BundleDeployment{} Eventually(func() error { err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "name"}, bd) @@ -178,12 +178,11 @@ var _ = Describe("Bundle Status Fields", func() { Expect(cluster.Status.Summary.Pending).To(Equal(0)) Expect(cluster.Status.Display.ReadyBundles).To(Equal("1/1")) - By("Modifying labels will change cluster state") - modifiedLabels := map[string]string{"foo": "bar"} + By("updating the cluster's labels") Eventually(func() error { err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster) Expect(err).NotTo(HaveOccurred()) - cluster.Labels = modifiedLabels + cluster.Labels = updatedClusterLabels return k8sClient.Update(ctx, cluster) }).ShouldNot(HaveOccurred()) @@ -212,6 +211,9 @@ var _ = Describe("Bundle Status Fields", func() { g.Expect(bundle.Status.Display.ReadyClusters).To(Equal("1/1")) g.Expect(bundle.Status.Display.State).To(BeEmpty()) // all resources ready }).Should(Succeed()) - }) - }) + }, + + Entry("cluster with default (empty) shard ID", map[string]string{"foo": "bar"}), + Entry("cluster with a different shard ID to the bundle's", map[string]string{"foo": "bar", "fleet.cattle.io/shard-ref": "non-default-shard"}), + ) }) diff --git a/integrationtests/controller/schedule/schedule_test.go b/integrationtests/controller/schedule/schedule_test.go new file mode 100644 index 0000000000..10cd4ea3a2 --- /dev/null +++ b/integrationtests/controller/schedule/schedule_test.go @@ -0,0 +1,158 @@ +package schedule + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/rancher/fleet/integrationtests/utils" + "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("Schedule updates triggered by cluster updates", func() { + BeforeEach(func() { + var err error + namespace, err = utils.NewNamespaceName() + Expect(err).ToNot(HaveOccurred()) + + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} + Expect(k8sClient.Create(ctx, ns)).ToNot(HaveOccurred()) + + DeferCleanup(func() { + Expect(k8sClient.Delete(ctx, ns)).ToNot(HaveOccurred()) + }) + }) + + When("a Cluster living in the same namespace as a schedule is updated to match the schedule's targets", func() { + It("schedules the cluster", func() { + By("creating the cluster and schedule") + cluster, err := utils.CreateCluster(ctx, k8sClient, "cluster", namespace, nil, namespace) + Expect(err).NotTo(HaveOccurred()) + Expect(cluster).To(Not(BeNil())) + + schedule := v1alpha1.Schedule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-schedule", + Namespace: namespace, + }, + Spec: v1alpha1.ScheduleSpec{ + Schedule: "0 */1 * * * *", // Every minute + Duration: metav1.Duration{Duration: 30 * time.Second}, + Targets: v1alpha1.ScheduleTargets{ + Clusters: []v1alpha1.ScheduleTarget{ + { + ClusterSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"can-be-scheduled": "yes"}, // initially doesn't match any cluster + }, + }, + }, + }, + }, + } + err = k8sClient.Create(ctx, &schedule) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func(g Gomega) { + err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "my-schedule"}, &schedule) + g.Expect(err).NotTo(HaveOccurred()) + }).Should(Succeed()) + + defer func() { + Expect(k8sClient.Delete(ctx, &v1alpha1.Schedule{ObjectMeta: metav1.ObjectMeta{ + Name: "my-schedule", + Namespace: namespace, + }})).NotTo(HaveOccurred()) + + }() + + By("checking that the cluster has not been scheduled") + cluster = &v1alpha1.Cluster{} + err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(cluster.Status.Scheduled).To(BeFalse()) + + By("updating the cluster's labels to match the schedule's selector") + Eventually(func() error { + err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster) + Expect(err).NotTo(HaveOccurred()) + cluster.Labels = map[string]string{"can-be-scheduled": "yes"} + return k8sClient.Update(ctx, cluster) + }).ShouldNot(HaveOccurred()) + + By("validating that the cluster is scheduled") + Eventually(func(g Gomega) { + err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(cluster.Status.Scheduled).To(BeTrue()) + }).Should(Succeed()) + }) + }) + + When("another Cluster with a different shard ID and matching the schedule's targets is added into the same namespace", func() { + It("schedules the cluster", func() { + By("creating the cluster and schedule") + cluster, err := utils.CreateCluster(ctx, k8sClient, "cluster", namespace, nil, namespace) + Expect(err).NotTo(HaveOccurred()) + Expect(cluster).To(Not(BeNil())) + + schedule := v1alpha1.Schedule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-schedule", + Namespace: namespace, + }, + Spec: v1alpha1.ScheduleSpec{ + Schedule: "0 */1 * * * *", // Every minute + Duration: metav1.Duration{Duration: 30 * time.Second}, + Targets: v1alpha1.ScheduleTargets{ + Clusters: []v1alpha1.ScheduleTarget{ + { + ClusterSelector: &metav1.LabelSelector{}, + }, + }, + }, + }, + } + err = k8sClient.Create(ctx, &schedule) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func(g Gomega) { + err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "my-schedule"}, &schedule) + g.Expect(err).NotTo(HaveOccurred()) + }).Should(Succeed()) + + defer func() { + Expect(k8sClient.Delete(ctx, &v1alpha1.Schedule{ObjectMeta: metav1.ObjectMeta{ + Name: "my-schedule", + Namespace: namespace, + }})).NotTo(HaveOccurred()) + + }() + + By("validating that the cluster is scheduled") + Eventually(func(g Gomega) { + err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(cluster.Status.Scheduled).To(BeTrue()) + }).Should(Succeed()) + + By("adding another cluster with a different shard ID to the same namespace") + labels := map[string]string{"fleet.cattle.io/shard-ref": "different-shard"} + shardedCluster, err := utils.CreateCluster(ctx, k8sClient, "cluster2", namespace, labels, namespace) + Expect(err).NotTo(HaveOccurred()) + Expect(shardedCluster).To(Not(BeNil())) + + By("validating that the cluster is scheduled") + Eventually(func(g Gomega) { + var shardedCluster v1alpha1.Cluster + err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster2"}, &shardedCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(shardedCluster.Status.Scheduled).To(BeTrue()) + }).Should(Succeed()) + }) + }) +}) diff --git a/integrationtests/controller/schedule/suite_test.go b/integrationtests/controller/schedule/suite_test.go new file mode 100644 index 0000000000..973f9f5459 --- /dev/null +++ b/integrationtests/controller/schedule/suite_test.go @@ -0,0 +1,82 @@ +package schedule + +import ( + "bytes" + "context" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/reugn/go-quartz/quartz" + + "github.com/rancher/fleet/integrationtests/utils" + "github.com/rancher/fleet/internal/cmd/controller/reconciler" + + "k8s.io/client-go/rest" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +var ( + cancel context.CancelFunc + cfg *rest.Config + ctx context.Context + k8sClient client.Client + testenv *envtest.Environment + logsBuffer bytes.Buffer + + namespace string +) + +func TestFleet(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Fleet Schedule Suite") +} + +var _ = BeforeSuite(func() { + SetDefaultEventuallyTimeout(60 * time.Second) + SetDefaultEventuallyPollingInterval(1 * time.Second) + + ctx, cancel = context.WithCancel(context.TODO()) + testenv = utils.NewEnvTest("../../..") + + var err error + cfg, err = utils.StartTestEnv(testenv) + Expect(err).NotTo(HaveOccurred()) + + // Set up log capture + GinkgoWriter.TeeTo(&logsBuffer) + ctrl.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + k8sClient, err = utils.NewClient(cfg) + Expect(err).NotTo(HaveOccurred()) + + mgr, err := utils.NewManager(cfg) + Expect(err).ToNot(HaveOccurred()) + + sched, err := quartz.NewStdScheduler() + Expect(err).ToNot(HaveOccurred(), "failed to create scheduler") + + err = (&reconciler.ScheduleReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Workers: 50, + Scheduler: sched, + }).SetupWithManager(mgr) + Expect(err).ToNot(HaveOccurred(), "failed to set up manager") + + go func() { + defer GinkgoRecover() + err = mgr.Start(ctx) + Expect(err).ToNot(HaveOccurred(), "failed to run manager") + }() +}) + +var _ = AfterSuite(func() { + cancel() + Expect(testenv.Stop()).ToNot(HaveOccurred()) +}) diff --git a/internal/cmd/controller/reconciler/bundle_controller.go b/internal/cmd/controller/reconciler/bundle_controller.go index ae10f20b89..3f47f14319 100644 --- a/internal/cmd/controller/reconciler/bundle_controller.go +++ b/internal/cmd/controller/reconciler/bundle_controller.go @@ -94,6 +94,7 @@ func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error { predicate.AnnotationChangedPredicate{}, predicate.LabelChangedPredicate{}, ), + sharding.FilterByShardID(r.ShardID), ), ). // Note: Maybe improve with WatchesMetadata, does it have access to labels? @@ -101,7 +102,10 @@ func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error { // Fan out from bundledeployment to bundle, this is useful to update the // bundle's status fields. &fleet.BundleDeployment{}, handler.EnqueueRequestsFromMapFunc(BundleDeploymentMapFunc(r)), - builder.WithPredicates(bundleDeploymentStatusChangedPredicate()), + builder.WithPredicates( + bundleDeploymentStatusChangedPredicate(), + sharding.FilterByShardID(r.ShardID), + ), ). Watches( // Fan out from cluster to bundle, this is useful for targeting and templating. @@ -128,6 +132,8 @@ func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error { return requests }), builder.WithPredicates(clusterChangedPredicate()), + // Deliberately skipping the sharding filter here: a bundle may live in the namespace of a cluster with both + // bearing distinct shard IDs. ). Watches( // Fan out from secret to bundle, reconcile bundles when a secret @@ -143,7 +149,6 @@ func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error { handler.EnqueueRequestsFromMapFunc(r.downstreamResourceMapFunc("ConfigMap")), builder.WithPredicates(dataChangedPredicate()), ). - WithEventFilter(sharding.FilterByShardID(r.ShardID)). WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}). Complete(r) } diff --git a/internal/cmd/controller/reconciler/schedule_controller.go b/internal/cmd/controller/reconciler/schedule_controller.go index 2d4374b92d..5ae77c5c10 100644 --- a/internal/cmd/controller/reconciler/schedule_controller.go +++ b/internal/cmd/controller/reconciler/schedule_controller.go @@ -9,6 +9,8 @@ import ( fleetutil "github.com/rancher/fleet/internal/cmd/controller/errorutil" "github.com/rancher/fleet/internal/cmd/controller/finalize" + "github.com/rancher/fleet/internal/cmd/controller/target" + "github.com/rancher/fleet/internal/cmd/controller/target/matcher" fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" "github.com/rancher/fleet/pkg/sharding" "github.com/rancher/wrangler/v3/pkg/condition" @@ -52,14 +54,17 @@ func (r *ScheduleReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&fleet.Schedule{}, builder.WithPredicates( predicate.GenerationChangedPredicate{}, + sharding.FilterByShardID(r.ShardID), ), ). Watches( &fleet.Cluster{}, handler.EnqueueRequestsFromMapFunc(r.mapClustersToSchedules), builder.WithPredicates(clusterChangedPredicate()), + // Deliberately skipping the sharding filter here: a schedule may live in the namespace of a cluster with both + // bearing distinct shard IDs. Instead, mapClustersToSchedules maps clusters to schedules in the + // current shard only. ). - WithEventFilter(sharding.FilterByShardID(r.ShardID)). WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}). Complete(r) } @@ -152,7 +157,7 @@ func (r *ScheduleReconciler) handleDelete(ctx context.Context, schedule *fleet.S } // mapClustersToSchedules is a mapping function used to trigger a reconciliation of Schedules -// when a targeted Cluster changes. It finds all schedules that target the cluster +// when a targeted Cluster changes. It finds all schedules in r's shard that target the cluster // and enqueues a reconcile request for each of them. func (r *ScheduleReconciler) mapClustersToSchedules(ctx context.Context, a client.Object) []ctrl.Request { ns := a.GetNamespace() @@ -160,11 +165,12 @@ func (r *ScheduleReconciler) mapClustersToSchedules(ctx context.Context, a clien cluster := a.(*fleet.Cluster) // check if the cluster is scheduled - schedules, err := getClusterSchedules(r.Scheduler, cluster.Name, cluster.Namespace) + schedules, err := getClusterSchedules(ctx, r.Client, r.Scheduler, cluster, r.ShardID) if err != nil { logger.Error(err, "Failed to get cluster schedules") return nil } + requests := []ctrl.Request{} for _, schedule := range schedules { requests = append(requests, ctrl.Request{ @@ -352,14 +358,24 @@ func isClusterScheduled(scheduler quartz.Scheduler, cluster, namespace string) ( return len(keys) != 0, nil } -// getClusterSchedules returns all the fleet Schedules in which the given cluster is found as a matching target. -func getClusterSchedules(scheduler quartz.Scheduler, cluster, namespace string) ([]*fleet.Schedule, error) { - keys, err := getClusterScheduleKeys(scheduler, cluster, namespace) +// getClusterSchedules returns all the fleet Schedules with a matching shardID, in which the given cluster is found as a +// matching target. To this end, it looks at two sources of data: +// * keys of already scheduled jobs +// * schedules which targets match the cluster, to include schedules for which no job may have been scheduled yet. +func getClusterSchedules( + ctx context.Context, + c client.Client, + scheduler quartz.Scheduler, + cluster *fleet.Cluster, + shardID string, +) ([]*fleet.Schedule, error) { + keys, err := getClusterScheduleKeys(scheduler, cluster.Name, cluster.Namespace) if err != nil { return nil, err } schedules := []*fleet.Schedule{} + scheduleNames := map[string]struct{}{} for _, key := range keys { job, err := scheduler.GetScheduledJob(key) if err != nil { @@ -369,7 +385,46 @@ func getClusterSchedules(scheduler quartz.Scheduler, cluster, namespace string) if !ok { return nil, fmt.Errorf("unexpected job type for key: %s", key.String()) } + + if !sharding.ShouldProcess(cronDurationJob.Schedule, shardID) { + continue + } + schedules = append(schedules, cronDurationJob.Schedule) + scheduleNames[cronDurationJob.Schedule.Name] = struct{}{} + } + + // Consider schedules which may exist but for which no job may have been created yet. + allSchedules := &fleet.ScheduleList{} + if err := c.List(ctx, allSchedules, client.InNamespace(cluster.Namespace)); err != nil { + return nil, fmt.Errorf("%w, listing schedules: %w", fleetutil.ErrRetryable, err) + } + + groups, err := target.ClusterGroupsForCluster(ctx, c, cluster) + if err != nil { + return nil, fmt.Errorf("%w, getting cluster groups from clusters: %w", fleetutil.ErrRetryable, err) + } + + cgs := target.ClusterGroupsToLabelMap(groups) + + for i, s := range allSchedules.Items { + if !sharding.ShouldProcess(&s, shardID) { + continue + } + + // Skip already found schedules, to prevent duplicates and unnecessary computations. + if _, alreadyFound := scheduleNames[s.Name]; alreadyFound { + continue + } + + matcher, err := matcher.NewScheduleMatch(&s) + if err != nil { + return nil, err + } + + if matcher.MatchCluster(cluster.Name, cgs, cluster.Labels) { + schedules = append(schedules, &allSchedules.Items[i]) + } } return schedules, nil