Skip to content

Commit fc3443f

Browse files
leonardocearmrumnenciaGabriFedi97
committed
feat: reload CNPG-i plugins automatically when their pods are rolled (cloudnative-pg#10836)
Previously, after upgrading a CNPG-i plugin (re-applying its manifest with a new image), the operator kept talking to the plugin through the connection it had opened during the most recent reconciliation. Clusters using that plugin would not see the new version until something else triggered a reconcile (a spec change, the periodic resync, or an operator restart), which could leave clusters running against an old plugin implementation for a long time (up to 10 hours, the default cache resync period). With this change, the operator watches the EndpointSlices that back CNPG-i plugin Services in the operator namespace. When the plugin Deployment finishes rolling out and the new Pods become Ready, the EndpointSlice changes and every cluster using that plugin is enqueued for reconciliation, ensuring the operator interacts with the updated plugin promptly. A new field index on Cluster (".spec.usedPlugins") is used to look up all clusters affected by a given plugin without scanning the full cluster list on every EndpointSlice event. Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com> Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com> Signed-off-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com> Co-authored-by: Armando Ruocco <armando.ruocco@enterprisedb.com> Co-authored-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com> Co-authored-by: Gabriele Fedi <gabriele.fedi@enterprisedb.com> (cherry picked from commit ec3305d) Signed-off-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com>
1 parent 8a77eab commit fc3443f

8 files changed

Lines changed: 575 additions & 8 deletions

File tree

config/rbac/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,14 @@ rules:
111111
- create
112112
- get
113113
- update
114+
- apiGroups:
115+
- discovery.k8s.io
116+
resources:
117+
- endpointslices
118+
verbs:
119+
- get
120+
- list
121+
- watch
114122
- apiGroups:
115123
- monitoring.coreos.com
116124
resources:

internal/cmd/manager/controller/controller.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ import (
2828

2929
"github.com/cloudnative-pg/machinery/pkg/log"
3030
corev1 "k8s.io/api/core/v1"
31+
discoveryv1 "k8s.io/api/discovery/v1"
3132
apierrs "k8s.io/apimachinery/pkg/api/errors"
3233
"k8s.io/apimachinery/pkg/types"
3334
ctrl "sigs.k8s.io/controller-runtime"
35+
"sigs.k8s.io/controller-runtime/pkg/cache"
3436
"sigs.k8s.io/controller-runtime/pkg/client"
3537
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
3638
"sigs.k8s.io/controller-runtime/pkg/webhook"
@@ -132,6 +134,31 @@ func RunController(
132134
LeaderElectionReleaseOnCancel: true,
133135
}
134136

137+
// EndpointSlices are watched only to detect CNPG-i plugin rollouts, and
138+
// plugins are deployed in the operator namespace. Restricting the informer
139+
// to that namespace avoids listing and watching every EndpointSlice in the
140+
// cluster (one or more per Service in every namespace), which would
141+
// otherwise be cached just to be dropped by the controller predicate.
142+
//
143+
// The restriction is only applied when the operator namespace is known: an
144+
// empty namespace key is interpreted by controller-runtime as "all
145+
// namespaces", which would silently widen the cache instead of narrowing
146+
// it. When the namespace is unknown (e.g. local development without
147+
// OPERATOR_NAMESPACE set) we simply fall back to the default caching.
148+
if conf.OperatorNamespace != "" {
149+
managerOptions.Cache = cache.Options{
150+
ByObject: map[client.Object]cache.ByObject{
151+
&discoveryv1.EndpointSlice{}: {
152+
Namespaces: map[string]cache.Config{
153+
conf.OperatorNamespace: {},
154+
},
155+
},
156+
},
157+
}
158+
} else {
159+
setupLog.Info("Plugin EndpointSlice watch disabled: OPERATOR_NAMESPACE not set")
160+
}
161+
135162
if conf.WatchNamespace != "" {
136163
namespaces := conf.WatchedNamespaces()
137164
managerOptions.NewCache = multicache.DelegatingMultiNamespacedCacheBuilder(

internal/controller/cluster_controller.go

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/cloudnative-pg/machinery/pkg/stringset"
3232
batchv1 "k8s.io/api/batch/v1"
3333
corev1 "k8s.io/api/core/v1"
34+
discoveryv1 "k8s.io/api/discovery/v1"
3435
policyv1 "k8s.io/api/policy/v1"
3536
apierrs "k8s.io/apimachinery/pkg/api/errors"
3637
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -71,6 +72,9 @@ const (
7172
poolerClusterKey = ".spec.cluster.name"
7273
disableDefaultQueriesSpecPath = ".spec.monitoring.disableDefaultQueries"
7374
imageCatalogKey = ".spec.imageCatalog.name"
75+
// usedPluginsClusterKey is a synthetic index key, not a real Cluster spec field;
76+
// it is populated by getPluginsNeededForReconcile.
77+
usedPluginsClusterKey = ".spec.usedPlugins"
7478
)
7579

7680
var apiSGVString = apiv1.SchemeGroupVersion.String()
@@ -121,6 +125,7 @@ var ErrNextLoop = utils.ErrNextLoop
121125
// +kubebuilder:rbac:groups=admissionregistration.k8s.io,resources=validatingwebhookconfigurations,verbs=get;patch
122126
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;delete;patch;create;watch
123127
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;create;update
128+
// +kubebuilder:rbac:groups=discovery.k8s.io,resources=endpointslices,verbs=get;list;watch
124129
// +kubebuilder:rbac:groups=monitoring.coreos.com,resources=podmonitors,verbs=get;create;list;watch;delete;patch
125130
// +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=create;delete;get;list;watch;update;patch
126131
// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete
@@ -190,11 +195,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
190195
ctx = cluster.SetInContext(ctx)
191196

192197
// Load the plugins required to bootstrap and reconcile this cluster
193-
enabledPluginNames := apiv1.GetPluginConfigurationEnabledPluginNames(cluster.Spec.Plugins)
194-
enabledPluginNames = append(
195-
enabledPluginNames,
196-
apiv1.GetExternalClustersEnabledPluginNames(cluster.Spec.ExternalClusters)...,
197-
)
198+
enabledPluginNames := getPluginsNeededForReconcile(cluster)
198199

199200
pluginLoadingContext, cancelPluginLoading := context.WithTimeout(ctx, 5*time.Second)
200201
defer cancelPluginLoading()
@@ -238,6 +239,18 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
238239
return result, nil
239240
}
240241

242+
// getPluginsNeededForReconcile returns the names of the plugins that must be
243+
// loaded for the reconciliation loop to interact with this cluster, collected
244+
// from both the cluster's plugin configuration and its external clusters.
245+
func getPluginsNeededForReconcile(cluster *apiv1.Cluster) []string {
246+
names := apiv1.GetPluginConfigurationEnabledPluginNames(cluster.Spec.Plugins)
247+
names = append(
248+
names,
249+
apiv1.GetExternalClustersEnabledPluginNames(cluster.Spec.ExternalClusters)...,
250+
)
251+
return stringset.From(names).ToSortedList()
252+
}
253+
241254
// Inner reconcile loop. Anything inside can require the reconciliation loop to stop by returning ErrNextLoop
242255
//
243256
//nolint:gocognit,gocyclo
@@ -1176,7 +1189,7 @@ func (r *ClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
11761189
return err
11771190
}
11781191

1179-
return ctrl.NewControllerManagedBy(mgr).
1192+
ctrlBuilder := ctrl.NewControllerManagedBy(mgr).
11801193
WithOptions(controller.Options{
11811194
MaxConcurrentReconciles: maxConcurrentReconciles,
11821195
}).
@@ -1215,8 +1228,22 @@ func (r *ClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
12151228
&apiv1.ClusterImageCatalog{},
12161229
handler.EnqueueRequestsFromMapFunc(r.mapClusterImageCatalogsToClusters()),
12171230
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
1218-
).
1219-
Complete(r)
1231+
)
1232+
1233+
if configuration.Current.OperatorNamespace != "" {
1234+
ctrlBuilder = ctrlBuilder.Watches(
1235+
&discoveryv1.EndpointSlice{},
1236+
handler.EnqueueRequestsFromMapFunc(
1237+
r.mapPluginEndpointSlicesToClusters(configuration.Current.OperatorNamespace),
1238+
),
1239+
builder.WithPredicates(
1240+
predicate.GenerationChangedPredicate{},
1241+
operatorNamespaceServiceEndpointSlicePredicate(configuration.Current.OperatorNamespace),
1242+
),
1243+
)
1244+
}
1245+
1246+
return ctrlBuilder.Complete(r)
12201247
}
12211248

12221249
// createFieldIndexes creates the indexes needed by this controller
@@ -1256,6 +1283,19 @@ func (r *ClusterReconciler) createFieldIndexes(ctx context.Context, mgr ctrl.Man
12561283
return err
12571284
}
12581285

1286+
// Create a new index field that allows mapping a cluster to all the
1287+
// plugins that are required to reconcile it
1288+
if err := mgr.GetFieldIndexer().IndexField(
1289+
ctx,
1290+
&apiv1.Cluster{},
1291+
usedPluginsClusterKey, func(rawObj client.Object) []string {
1292+
cluster := rawObj.(*apiv1.Cluster)
1293+
return getPluginsNeededForReconcile(cluster)
1294+
},
1295+
); err != nil {
1296+
return err
1297+
}
1298+
12591299
// Create a new indexed field on Pods. This field will be used to easily
12601300
// find all the Pods created by node
12611301
if err := mgr.GetFieldIndexer().IndexField(

internal/controller/cluster_controller_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,3 +507,112 @@ var _ = Describe("evaluatePodReadinessGuards", func() {
507507
"kubelet-stale branch must stay event-less to avoid noise")
508508
})
509509
})
510+
511+
var _ = Describe("getPluginsNeededForReconcile", func() {
512+
ptrBool := func(b bool) *bool { return &b }
513+
514+
It("returns an empty slice when no plugins or external clusters are configured", func() {
515+
cluster := &apiv1.Cluster{}
516+
Expect(getPluginsNeededForReconcile(cluster)).To(BeEmpty())
517+
})
518+
519+
It("returns the names of enabled plugins from Spec.Plugins", func() {
520+
cluster := &apiv1.Cluster{
521+
Spec: apiv1.ClusterSpec{
522+
Plugins: []apiv1.PluginConfiguration{
523+
{Name: "plugin-a"},
524+
{Name: "plugin-b", Enabled: ptrBool(true)},
525+
},
526+
},
527+
}
528+
Expect(getPluginsNeededForReconcile(cluster)).
529+
To(ConsistOf("plugin-a", "plugin-b"))
530+
})
531+
532+
It("skips plugins that are explicitly disabled", func() {
533+
cluster := &apiv1.Cluster{
534+
Spec: apiv1.ClusterSpec{
535+
Plugins: []apiv1.PluginConfiguration{
536+
{Name: "plugin-a"},
537+
{Name: "plugin-b", Enabled: ptrBool(false)},
538+
},
539+
},
540+
}
541+
Expect(getPluginsNeededForReconcile(cluster)).
542+
To(ConsistOf("plugin-a"))
543+
})
544+
545+
It("includes plugins referenced by external clusters", func() {
546+
cluster := &apiv1.Cluster{
547+
Spec: apiv1.ClusterSpec{
548+
ExternalClusters: []apiv1.ExternalCluster{
549+
{
550+
Name: "source",
551+
PluginConfiguration: &apiv1.PluginConfiguration{Name: "plugin-ext"},
552+
},
553+
{Name: "no-plugin"},
554+
},
555+
},
556+
}
557+
Expect(getPluginsNeededForReconcile(cluster)).
558+
To(ConsistOf("plugin-ext"))
559+
})
560+
561+
It("merges plugins from Spec.Plugins and Spec.ExternalClusters", func() {
562+
cluster := &apiv1.Cluster{
563+
Spec: apiv1.ClusterSpec{
564+
Plugins: []apiv1.PluginConfiguration{
565+
{Name: "plugin-a"},
566+
{Name: "plugin-disabled", Enabled: ptrBool(false)},
567+
},
568+
ExternalClusters: []apiv1.ExternalCluster{
569+
{
570+
Name: "source",
571+
PluginConfiguration: &apiv1.PluginConfiguration{Name: "plugin-ext"},
572+
},
573+
},
574+
},
575+
}
576+
Expect(getPluginsNeededForReconcile(cluster)).
577+
To(ConsistOf("plugin-a", "plugin-ext"))
578+
})
579+
580+
It("includes external-cluster plugins even when explicitly disabled", func() {
581+
// GetExternalClustersEnabledPluginNames does not consult
582+
// PluginConfiguration.Enabled — any non-nil PluginConfiguration on an
583+
// external cluster contributes its name. This test locks that in.
584+
cluster := &apiv1.Cluster{
585+
Spec: apiv1.ClusterSpec{
586+
ExternalClusters: []apiv1.ExternalCluster{
587+
{
588+
Name: "source",
589+
PluginConfiguration: &apiv1.PluginConfiguration{
590+
Name: "plugin-ext",
591+
Enabled: ptrBool(false),
592+
},
593+
},
594+
},
595+
},
596+
}
597+
Expect(getPluginsNeededForReconcile(cluster)).
598+
To(ConsistOf("plugin-ext"))
599+
})
600+
601+
It("deduplicates plugin names that appear in both Spec.Plugins and external clusters", func() {
602+
cluster := &apiv1.Cluster{
603+
Spec: apiv1.ClusterSpec{
604+
Plugins: []apiv1.PluginConfiguration{
605+
{Name: "plugin-shared"},
606+
},
607+
ExternalClusters: []apiv1.ExternalCluster{
608+
{
609+
Name: "source",
610+
PluginConfiguration: &apiv1.PluginConfiguration{Name: "plugin-shared"},
611+
},
612+
},
613+
},
614+
}
615+
Expect(getPluginsNeededForReconcile(cluster)).
616+
To(Equal([]string{"plugin-shared"}))
617+
})
618+
})

internal/controller/cluster_plugins.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,18 @@ import (
2424
"context"
2525
"reflect"
2626

27+
"github.com/cloudnative-pg/machinery/pkg/log"
28+
corev1 "k8s.io/api/core/v1"
29+
discoveryv1 "k8s.io/api/discovery/v1"
30+
apierrs "k8s.io/apimachinery/pkg/api/errors"
31+
"k8s.io/apimachinery/pkg/types"
2732
"sigs.k8s.io/controller-runtime/pkg/client"
33+
"sigs.k8s.io/controller-runtime/pkg/handler"
34+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2835

2936
apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
3037
cnpgiclient "github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/client"
38+
"github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
3139
)
3240

3341
// updatePluginsStatus ensures that we load the plugins that are required to reconcile
@@ -57,3 +65,76 @@ func (r *ClusterReconciler) updatePluginsStatus(ctx context.Context, cluster *ap
5765

5866
return r.Client.Status().Patch(ctx, cluster, client.MergeFrom(oldCluster))
5967
}
68+
69+
// mapPluginEndpointSlicesToClusters enqueues a reconcile request for every
70+
// cluster that uses a plugin whose backing Service has its endpoints changing.
71+
// EndpointSlice events fire when Pods behind the Service become Ready or
72+
// NotReady, so the operator picks up plugin rollouts after the new Pods are
73+
// actually serving traffic.
74+
func (r *ClusterReconciler) mapPluginEndpointSlicesToClusters(
75+
operatorNamespace string,
76+
) handler.MapFunc {
77+
return func(ctx context.Context, obj client.Object) []reconcile.Request {
78+
logger := log.FromContext(ctx)
79+
80+
slice, ok := obj.(*discoveryv1.EndpointSlice)
81+
if !ok {
82+
return nil
83+
}
84+
85+
serviceName := slice.Labels[discoveryv1.LabelServiceName]
86+
if serviceName == "" {
87+
return nil
88+
}
89+
90+
var service corev1.Service
91+
err := r.Get(
92+
ctx,
93+
types.NamespacedName{Namespace: slice.Namespace, Name: serviceName},
94+
&service,
95+
)
96+
if apierrs.IsNotFound(err) {
97+
return nil
98+
}
99+
if err != nil {
100+
logger.Error(
101+
err,
102+
"Error while resolving the Service owning a plugin EndpointSlice, skipping",
103+
"endpointSlice", client.ObjectKeyFromObject(slice),
104+
"serviceName", serviceName,
105+
)
106+
return nil
107+
}
108+
109+
if !isPluginService(&service, operatorNamespace) {
110+
return nil
111+
}
112+
113+
pluginName := service.Labels[utils.PluginNameLabelName]
114+
if pluginName == "" {
115+
return nil
116+
}
117+
118+
var clusterList apiv1.ClusterList
119+
if err := r.List(
120+
ctx,
121+
&clusterList,
122+
client.MatchingFields{usedPluginsClusterKey: pluginName},
123+
); err != nil {
124+
logger.Error(
125+
err,
126+
"Error while looking up clusters using a plugin whose endpoints changed, skipping",
127+
"endpointSlice", client.ObjectKeyFromObject(slice),
128+
"pluginName", pluginName,
129+
)
130+
return nil
131+
}
132+
133+
result := make([]reconcile.Request, len(clusterList.Items))
134+
for i := range clusterList.Items {
135+
result[i].Name = clusterList.Items[i].Name
136+
result[i].Namespace = clusterList.Items[i].Namespace
137+
}
138+
return result
139+
}
140+
}

0 commit comments

Comments
 (0)