Skip to content

Commit 722788e

Browse files
authored
Refine conditions for reconciles from cluster updates across shards (#4730)
* Enable cluster-to-bundle updates across shards Filtering all incoming events in the bundle reconciler would prevent updates to relevant clusters, which may live in shards different to the bundle's, from impacting the bundle. Instead, all bundle controllers, regardless of their shards, may now receive updates from clusters which are relevant to the bundle, considering, as was already the case, the bundle and the cluster's respective namespaces, cluster groups, and bundle namespace mappings. * Keep shard filter on bundle reconciles from deployments A bundle deployment is created by Fleet, and bears the same shard ID as its originating bundle, as the sharding label is propagated along with all Fleet labels from a bundle to its bundle deployments. Therefore, filtering incoming bundle deployment-related events for bundle reconciles by shard ID is safe, and preserves the performance gain brought by the previous wider-reaching event filter. * Enable cluster updates to trigger schedule reconciles earlier When a cluster is created or, enabling it to newly match a schedule's targets, the schedule would not be reconciled as a result, because logic mapping clusters to schedules only considered schedules for which jobs had already been scheduled. That logic now takes all schedules into account within the cluster's namespace. * Enable schedule reconciles from clusters in different shards A cluster being updated would not make much of a difference, because mapping schedules to and from clusters works based on the schedules and clusters' respective namespaces. However, creating a new cluster in the same namespace still needs to be taken into account across shards, as a cluster living in a different shard may be relevant to a schedule, living in its same namespace and matching its targets. * Prevent multiple reconciles per schedule per cluster event While the Schedule reconciler must be able to process cluster events coming from any shard, only Schedules matching the reconciler's shard must be reconciled as a result, to prevent a given Schedule from being reconciled concurrently across multiple controllers.
1 parent 3093a8d commit 722788e

File tree

5 files changed

+330
-28
lines changed

5 files changed

+330
-28
lines changed

integrationtests/controller/bundle/status_test.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ var _ = Describe("Bundle Status Fields", func() {
103103
})
104104
})
105105

106-
When("Cluster changes", func() {
107-
BeforeEach(func() {
106+
DescribeTable("A Cluster change triggers a bundle status fields update",
107+
func(updatedClusterLabels map[string]string) {
108+
By("creating the cluster and bundle")
108109
cluster, err := utils.CreateCluster(ctx, k8sClient, "cluster", namespace, nil, namespace)
109110
Expect(err).NotTo(HaveOccurred())
110111
Expect(cluster).To(Not(BeNil()))
@@ -120,30 +121,29 @@ var _ = Describe("Bundle Status Fields", func() {
120121
bundle, err := utils.CreateBundle(ctx, k8sClient, "name", namespace, targets, targets)
121122
Expect(err).NotTo(HaveOccurred())
122123
Expect(bundle).To(Not(BeNil()))
123-
})
124124

125-
AfterEach(func() {
126-
Expect(k8sClient.Delete(ctx, &v1alpha1.Bundle{ObjectMeta: metav1.ObjectMeta{
127-
Name: "name",
128-
Namespace: namespace,
129-
}})).NotTo(HaveOccurred())
125+
defer func() {
126+
Expect(k8sClient.Delete(ctx, &v1alpha1.Bundle{ObjectMeta: metav1.ObjectMeta{
127+
Name: "name",
128+
Namespace: namespace,
129+
}})).NotTo(HaveOccurred())
130130

131-
})
131+
}()
132132

133-
It("updates the status fields", func() {
134-
cluster := &v1alpha1.Cluster{}
135-
err := k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
133+
By("checking that the bundle and cluster initially have 0 ready bundle deployments")
134+
cluster = &v1alpha1.Cluster{}
135+
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
136136
Expect(err).NotTo(HaveOccurred())
137137
Expect(cluster.Status.Summary.Ready).To(Equal(0))
138138

139-
bundle := &v1alpha1.Bundle{}
139+
bundle = &v1alpha1.Bundle{}
140140
Eventually(func() bool {
141141
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "name"}, bundle)
142142
Expect(err).NotTo(HaveOccurred())
143143
return bundle.Status.Summary.Ready == 0
144144
}).Should(BeTrue())
145145

146-
// prepare bundle deployment so it satisfies the status change
146+
By("updating the bundle deployment's status to trigger a bundle status update to Ready")
147147
bd := &v1alpha1.BundleDeployment{}
148148
Eventually(func() error {
149149
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "name"}, bd)
@@ -178,16 +178,15 @@ var _ = Describe("Bundle Status Fields", func() {
178178
Expect(cluster.Status.Summary.Pending).To(Equal(0))
179179
Expect(cluster.Status.Display.ReadyBundles).To(Equal("1/1"))
180180

181-
By("Modifying labels will change cluster state")
182-
modifiedLabels := map[string]string{"foo": "bar"}
181+
By("updating the cluster's labels")
183182
Eventually(func() error {
184183
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
185184
Expect(err).NotTo(HaveOccurred())
186-
cluster.Labels = modifiedLabels
185+
cluster.Labels = updatedClusterLabels
187186
return k8sClient.Update(ctx, cluster)
188187
}).ShouldNot(HaveOccurred())
189188

190-
// Change in cluster state results in a bundle deployment update
189+
By("validating that the bundle is re-deployed")
191190
Eventually(func(g Gomega) {
192191
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "name"}, bd)
193192
g.Expect(err).NotTo(HaveOccurred())
@@ -199,6 +198,9 @@ var _ = Describe("Bundle Status Fields", func() {
199198
g.Expect(bundle.Status.Display.ReadyClusters).To(Equal("1/1"))
200199
g.Expect(bundle.Status.Display.State).To(BeEmpty()) // all resources ready
201200
}).Should(Succeed())
202-
})
203-
})
201+
},
202+
203+
Entry("cluster with default (empty) shard ID", map[string]string{"foo": "bar"}),
204+
Entry("cluster with a different shard ID to the bundle's", map[string]string{"foo": "bar", "fleet.cattle.io/shard-ref": "non-default-shard"}),
205+
)
204206
})
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package schedule
2+
3+
import (
4+
"time"
5+
6+
. "github.com/onsi/ginkgo/v2"
7+
. "github.com/onsi/gomega"
8+
9+
"github.com/rancher/fleet/integrationtests/utils"
10+
"github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1"
11+
12+
corev1 "k8s.io/api/core/v1"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/apimachinery/pkg/types"
15+
)
16+
17+
var _ = Describe("Schedule updates triggered by cluster updates", func() {
18+
BeforeEach(func() {
19+
var err error
20+
namespace, err = utils.NewNamespaceName()
21+
Expect(err).ToNot(HaveOccurred())
22+
23+
ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
24+
Expect(k8sClient.Create(ctx, ns)).ToNot(HaveOccurred())
25+
26+
DeferCleanup(func() {
27+
Expect(k8sClient.Delete(ctx, ns)).ToNot(HaveOccurred())
28+
})
29+
})
30+
31+
When("a Cluster living in the same namespace as a schedule is updated to match the schedule's targets", func() {
32+
It("schedules the cluster", func() {
33+
By("creating the cluster and schedule")
34+
cluster, err := utils.CreateCluster(ctx, k8sClient, "cluster", namespace, nil, namespace)
35+
Expect(err).NotTo(HaveOccurred())
36+
Expect(cluster).To(Not(BeNil()))
37+
38+
schedule := v1alpha1.Schedule{
39+
ObjectMeta: metav1.ObjectMeta{
40+
Name: "my-schedule",
41+
Namespace: namespace,
42+
},
43+
Spec: v1alpha1.ScheduleSpec{
44+
Schedule: "0 */1 * * * *", // Every minute
45+
Duration: metav1.Duration{Duration: 30 * time.Second},
46+
Targets: v1alpha1.ScheduleTargets{
47+
Clusters: []v1alpha1.ScheduleTarget{
48+
{
49+
ClusterSelector: &metav1.LabelSelector{
50+
MatchLabels: map[string]string{"can-be-scheduled": "yes"}, // initially doesn't match any cluster
51+
},
52+
},
53+
},
54+
},
55+
},
56+
}
57+
err = k8sClient.Create(ctx, &schedule)
58+
Expect(err).NotTo(HaveOccurred())
59+
60+
Eventually(func(g Gomega) {
61+
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "my-schedule"}, &schedule)
62+
g.Expect(err).NotTo(HaveOccurred())
63+
}).Should(Succeed())
64+
65+
defer func() {
66+
Expect(k8sClient.Delete(ctx, &v1alpha1.Schedule{ObjectMeta: metav1.ObjectMeta{
67+
Name: "my-schedule",
68+
Namespace: namespace,
69+
}})).NotTo(HaveOccurred())
70+
71+
}()
72+
73+
By("checking that the cluster has not been scheduled")
74+
cluster = &v1alpha1.Cluster{}
75+
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
76+
Expect(err).NotTo(HaveOccurred())
77+
Expect(cluster.Status.Scheduled).To(BeFalse())
78+
79+
By("updating the cluster's labels to match the schedule's selector")
80+
Eventually(func() error {
81+
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
82+
Expect(err).NotTo(HaveOccurred())
83+
cluster.Labels = map[string]string{"can-be-scheduled": "yes"}
84+
return k8sClient.Update(ctx, cluster)
85+
}).ShouldNot(HaveOccurred())
86+
87+
By("validating that the cluster is scheduled")
88+
Eventually(func(g Gomega) {
89+
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
90+
g.Expect(err).NotTo(HaveOccurred())
91+
g.Expect(cluster.Status.Scheduled).To(BeTrue())
92+
}).Should(Succeed())
93+
})
94+
})
95+
96+
When("another Cluster with a different shard ID and matching the schedule's targets is added into the same namespace", func() {
97+
It("schedules the cluster", func() {
98+
By("creating the cluster and schedule")
99+
cluster, err := utils.CreateCluster(ctx, k8sClient, "cluster", namespace, nil, namespace)
100+
Expect(err).NotTo(HaveOccurred())
101+
Expect(cluster).To(Not(BeNil()))
102+
103+
schedule := v1alpha1.Schedule{
104+
ObjectMeta: metav1.ObjectMeta{
105+
Name: "my-schedule",
106+
Namespace: namespace,
107+
},
108+
Spec: v1alpha1.ScheduleSpec{
109+
Schedule: "0 */1 * * * *", // Every minute
110+
Duration: metav1.Duration{Duration: 30 * time.Second},
111+
Targets: v1alpha1.ScheduleTargets{
112+
Clusters: []v1alpha1.ScheduleTarget{
113+
{
114+
ClusterSelector: &metav1.LabelSelector{},
115+
},
116+
},
117+
},
118+
},
119+
}
120+
err = k8sClient.Create(ctx, &schedule)
121+
Expect(err).NotTo(HaveOccurred())
122+
123+
Eventually(func(g Gomega) {
124+
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "my-schedule"}, &schedule)
125+
g.Expect(err).NotTo(HaveOccurred())
126+
}).Should(Succeed())
127+
128+
defer func() {
129+
Expect(k8sClient.Delete(ctx, &v1alpha1.Schedule{ObjectMeta: metav1.ObjectMeta{
130+
Name: "my-schedule",
131+
Namespace: namespace,
132+
}})).NotTo(HaveOccurred())
133+
134+
}()
135+
136+
By("validating that the cluster is scheduled")
137+
Eventually(func(g Gomega) {
138+
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster"}, cluster)
139+
g.Expect(err).NotTo(HaveOccurred())
140+
g.Expect(cluster.Status.Scheduled).To(BeTrue())
141+
}).Should(Succeed())
142+
143+
By("adding another cluster with a different shard ID to the same namespace")
144+
labels := map[string]string{"fleet.cattle.io/shard-ref": "different-shard"}
145+
shardedCluster, err := utils.CreateCluster(ctx, k8sClient, "cluster2", namespace, labels, namespace)
146+
Expect(err).NotTo(HaveOccurred())
147+
Expect(shardedCluster).To(Not(BeNil()))
148+
149+
By("validating that the cluster is scheduled")
150+
Eventually(func(g Gomega) {
151+
var shardedCluster v1alpha1.Cluster
152+
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: "cluster2"}, &shardedCluster)
153+
g.Expect(err).NotTo(HaveOccurred())
154+
g.Expect(shardedCluster.Status.Scheduled).To(BeTrue())
155+
}).Should(Succeed())
156+
})
157+
})
158+
})
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package schedule
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"testing"
7+
"time"
8+
9+
. "github.com/onsi/ginkgo/v2"
10+
. "github.com/onsi/gomega"
11+
"github.com/reugn/go-quartz/quartz"
12+
13+
"github.com/rancher/fleet/integrationtests/utils"
14+
"github.com/rancher/fleet/internal/cmd/controller/reconciler"
15+
16+
"k8s.io/client-go/rest"
17+
18+
ctrl "sigs.k8s.io/controller-runtime"
19+
"sigs.k8s.io/controller-runtime/pkg/client"
20+
"sigs.k8s.io/controller-runtime/pkg/envtest"
21+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
22+
)
23+
24+
var (
25+
cancel context.CancelFunc
26+
cfg *rest.Config
27+
ctx context.Context
28+
k8sClient client.Client
29+
testenv *envtest.Environment
30+
logsBuffer bytes.Buffer
31+
32+
namespace string
33+
)
34+
35+
func TestFleet(t *testing.T) {
36+
RegisterFailHandler(Fail)
37+
RunSpecs(t, "Fleet Schedule Suite")
38+
}
39+
40+
var _ = BeforeSuite(func() {
41+
SetDefaultEventuallyTimeout(60 * time.Second)
42+
SetDefaultEventuallyPollingInterval(1 * time.Second)
43+
44+
ctx, cancel = context.WithCancel(context.TODO())
45+
testenv = utils.NewEnvTest("../../..")
46+
47+
var err error
48+
cfg, err = utils.StartTestEnv(testenv)
49+
Expect(err).NotTo(HaveOccurred())
50+
51+
// Set up log capture
52+
GinkgoWriter.TeeTo(&logsBuffer)
53+
ctrl.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
54+
55+
k8sClient, err = utils.NewClient(cfg)
56+
Expect(err).NotTo(HaveOccurred())
57+
58+
mgr, err := utils.NewManager(cfg)
59+
Expect(err).ToNot(HaveOccurred())
60+
61+
sched, err := quartz.NewStdScheduler()
62+
Expect(err).ToNot(HaveOccurred(), "failed to create scheduler")
63+
64+
err = (&reconciler.ScheduleReconciler{
65+
Client: mgr.GetClient(),
66+
Scheme: mgr.GetScheme(),
67+
Workers: 50,
68+
Scheduler: sched,
69+
}).SetupWithManager(mgr)
70+
Expect(err).ToNot(HaveOccurred(), "failed to set up manager")
71+
72+
go func() {
73+
defer GinkgoRecover()
74+
err = mgr.Start(ctx)
75+
Expect(err).ToNot(HaveOccurred(), "failed to run manager")
76+
}()
77+
})
78+
79+
var _ = AfterSuite(func() {
80+
cancel()
81+
Expect(testenv.Stop()).ToNot(HaveOccurred())
82+
})

internal/cmd/controller/reconciler/bundle_controller.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,18 @@ func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error {
9393
predicate.AnnotationChangedPredicate{},
9494
predicate.LabelChangedPredicate{},
9595
),
96+
sharding.FilterByShardID(r.ShardID),
9697
),
9798
).
9899
// Note: Maybe improve with WatchesMetadata, does it have access to labels?
99100
Watches(
100101
// Fan out from bundledeployment to bundle, this is useful to update the
101102
// bundle's status fields.
102103
&fleet.BundleDeployment{}, handler.EnqueueRequestsFromMapFunc(BundleDeploymentMapFunc(r)),
103-
builder.WithPredicates(bundleDeploymentStatusChangedPredicate()),
104+
builder.WithPredicates(
105+
bundleDeploymentStatusChangedPredicate(),
106+
sharding.FilterByShardID(r.ShardID),
107+
),
104108
).
105109
Watches(
106110
// Fan out from cluster to bundle, this is useful for targeting and templating.
@@ -127,6 +131,8 @@ func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error {
127131
return requests
128132
}),
129133
builder.WithPredicates(clusterChangedPredicate()),
134+
// Deliberately skipping the sharding filter here: a bundle may live in the namespace of a cluster with both
135+
// bearing distinct shard IDs.
130136
).
131137
Watches(
132138
// Fan out from secret to bundle, reconcile bundles when a secret
@@ -142,7 +148,6 @@ func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error {
142148
handler.EnqueueRequestsFromMapFunc(r.downstreamResourceMapFunc("ConfigMap")),
143149
builder.WithPredicates(dataChangedPredicate()),
144150
).
145-
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
146151
WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}).
147152
Complete(r)
148153
}

0 commit comments

Comments
 (0)