Skip to content

Commit ca4a064

Browse files
freeznetmaxsxu
andcommitted
fix(connection): honor always-update for ready resources (#407)
* fix(connection): honor always-update for ready resources * fix(connection): avoid noop connection updates * chore: add PLAN.md to .gitignore * build(deps): update go.sum with new dependencies --------- Co-authored-by: Max Xu <xuhuan@live.cn>
1 parent 9db41af commit ca4a064

8 files changed

Lines changed: 569 additions & 12 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,4 @@ node_modules/
5353
.cursor
5454
.envrc
5555
mise.toml
56+
/PLAN.md

charts/pulsar-resources-operator/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ kubectl apply -f https://raw.githubusercontent.com/streamnative/pulsar-resources
5858
|-----|------|---------|-------------|
5959
| affinity | object | `{}` | Add affinity for pod |
6060
| annotations | object | `{}` | Add annotations for the deployment |
61-
| features.alwaysUpdatePulsarResource | bool | `false` | |
61+
| features.alwaysUpdatePulsarResource | bool | `false` | Re-apply observed managed Pulsar resources even when their Kubernetes resources are already Ready. Prefer temporary use for upgrade remediation because it increases Pulsar admin API load on reconciliations and resyncs. |
6262
| fullnameOverride | string | `""` | It will override the name of deployment |
6363
| image.manager.registry | string | `"docker.io"` | Specififies the registry of images, especially when user want to use a different image hub |
6464
| image.manager.repository | string | `"streamnative/pulsar-resources-operator"` | The full repo name for image. |

charts/pulsar-resources-operator/values.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ imagePullSecrets: []
4141
# - name: test
4242

4343
features:
44-
# Enable to force always sync k8s resource status to pulsar.
44+
# -- Re-apply observed managed Pulsar resources even when their Kubernetes resources are already Ready. Prefer temporary use for upgrade remediation because it increases Pulsar admin API load on reconciliations and resyncs.
4545
alwaysUpdatePulsarResource: false
4646
# resyncPeriod determines the minimum frequency at which watched resources are reconciled. The unit is hour, default value is 10 hours.
4747
resyncPeriod: 10

docs/pulsar_namespace.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,8 @@ test-pulsar-namespace test-tenant/testns 1 1
540540

541541
You can update the namespace policies by editing the `namespace.yaml` file and then applying it again using `kubectl apply -f namespace.yaml`. This allows you to modify various settings of the Pulsar namespace.
542542

543+
If a namespace was already `Ready=True` before an operator upgrade introduced a new spec field, that new field may not be applied until the resource is reconciled again. See [Pulsar resource lifecycle reconciliation skip behavior](pulsar_resource_lifecycle.md#reconciliation-skip-behavior) for the skip contract and recovery options, including temporary use of `ALWAYS_UPDATE_PULSAR_RESOURCE`.
544+
543545
Please note the following important points:
544546

545547
1. The fields `name` and `bundles` cannot be updated after the namespace is created. These are immutable properties of the namespace.

docs/pulsar_resource_lifecycle.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,18 @@ When you need to delete the actual Pulsar resource (tenant, namespace, or topic)
107107

108108
Always ensure you have the necessary permissions and have considered the implications of deleting resources before proceeding with any deletion operation.
109109

110+
## Reconciliation Skip Behavior
111+
112+
For normal steady-state operation, the operator skips applying Pulsar API changes for a managed child resource when its Kubernetes status already has `Ready=True` and `status.observedGeneration` matches `metadata.generation`. This avoids unnecessary Pulsar admin requests during resyncs.
113+
114+
After upgrading the operator, a new spec field may be introduced while existing custom resources remain `Ready=True` at the same generation. In that case, the new field is not applied to Pulsar until the resource is reconciled again. Recovery options are:
115+
116+
1. Update the custom resource spec or metadata so Kubernetes increments the resource generation, then wait for `Ready=True` again.
117+
2. Temporarily enable `ALWAYS_UPDATE_PULSAR_RESOURCE=true` (Helm: `features.alwaysUpdatePulsarResource=true`) so the operator re-applies observed managed child resources even when they are already Ready.
118+
3. Disable `ALWAYS_UPDATE_PULSAR_RESOURCE` after remediation unless continuous re-application is intentionally required.
119+
120+
Use the always-update option carefully. It can apply all observed managed resources on every reconciliation or resync and may increase Pulsar broker/admin API load. The `PulsarConnection` deletion guard is still preserved: a deleting connection is kept until its remaining managed child resources are removed.
121+
110122
## Changing the Policy
111123

112124
You can change the policy of a Pulsar resource by updating the `lifecyclePolicy` field in the corresponding Kubernetes custom resource. However, there are important considerations to keep in mind when changing the policy:

go.sum

Lines changed: 243 additions & 0 deletions
Large diffs are not rendered by default.

pkg/connection/reconciler.go

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ import (
2222
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
2323

2424
"github.com/go-logr/logr"
25+
"github.com/streamnative/pulsar-resources-operator/pkg/feature"
2526
"github.com/streamnative/pulsar-resources-operator/pkg/utils"
2627
corev1 "k8s.io/api/core/v1"
28+
"k8s.io/apimachinery/pkg/api/equality"
2729
"k8s.io/apimachinery/pkg/api/meta"
2830
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2931
"k8s.io/apimachinery/pkg/types"
@@ -130,27 +132,34 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error {
130132
len(r.namespaces), "topics", len(r.topics), "geo", len(r.geoReplications))
131133
msg := fmt.Sprintf("remaining resources: tenants [%d], namespaces [%d], topics [%d], geoReplications [%d]",
132134
len(r.tenants), len(r.namespaces), len(r.topics), len(r.geoReplications))
135+
originalStatus := r.connection.Status.DeepCopy()
133136
meta.SetStatusCondition(&r.connection.Status.Conditions, *NewErrorCondition(r.connection.Generation, msg))
134-
if err := r.client.Status().Update(ctx, r.connection); err != nil {
135-
return err
136-
}
137-
return nil
137+
return r.updateConnectionStatusIfChanged(ctx, originalStatus)
138138
}
139139
return nil
140140
}
141-
log.Info("Doesn't have associated unready resource, reconcile completed")
142-
return nil
141+
if !r.shouldReconcileReadyResources() {
142+
log.Info("Doesn't have associated unready resource, reconcile completed")
143+
return nil
144+
}
145+
log.Info("AlwaysUpdatePulsarResource is enabled; reconciling ready pulsar resources")
143146
}
144147
log.Info("Reconciling pulsar resources", "resources", r.unreadyResources)
145148

149+
connectionChanged := false
146150
if r.connection.Spec.AdminServiceURL == "" && r.connection.Spec.AdminServiceSecureURL != "" {
147151
r.connection.Spec.AdminServiceURL = r.connection.Spec.AdminServiceSecureURL
152+
connectionChanged = true
148153
}
149154

150155
// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
151-
controllerutil.AddFinalizer(r.connection, resourcev1alpha1.FinalizerName)
152-
if err := r.client.Update(ctx, r.connection); err != nil {
153-
return err
156+
if controllerutil.AddFinalizer(r.connection, resourcev1alpha1.FinalizerName) {
157+
connectionChanged = true
158+
}
159+
if connectionChanged {
160+
if err := r.client.Update(ctx, r.connection); err != nil {
161+
return err
162+
}
154163
}
155164

156165
pulsarConfig, err := r.MakePulsarAdminConfig(ctx)
@@ -204,6 +213,7 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error {
204213
return fmt.Errorf("PulsarConnectionReconciler Reconcile errors: %v", errs)
205214
}
206215

216+
originalStatus := r.connection.Status.DeepCopy()
207217
auth := r.connection.Spec.Authentication
208218
if auth != nil && auth.Token != nil && auth.Token.SecretRef != nil {
209219
// calculate secret key hash
@@ -237,17 +247,41 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error {
237247
}
238248
r.connection.Status.ObservedGeneration = r.connection.Generation
239249
meta.SetStatusCondition(&r.connection.Status.Conditions, *NewReadyCondition(r.connection.Generation))
240-
if err := r.client.Status().Update(ctx, r.connection); err != nil {
250+
if err := r.updateConnectionStatusIfChanged(ctx, originalStatus); err != nil {
241251
return err
242252
}
243253

244254
return nil
245255
}
246256

257+
func (r *PulsarConnectionReconciler) updateConnectionStatusIfChanged(ctx context.Context, originalStatus *resourcev1alpha1.PulsarConnectionStatus) error {
258+
if equality.Semantic.DeepEqual(*originalStatus, r.connection.Status) {
259+
return nil
260+
}
261+
return r.client.Status().Update(ctx, r.connection)
262+
}
263+
247264
func (r *PulsarConnectionReconciler) hasUnreadyResource() bool {
248265
return len(r.unreadyResources) > 0
249266
}
250267

268+
func (r *PulsarConnectionReconciler) shouldReconcileReadyResources() bool {
269+
return feature.DefaultFeatureGate.Enabled(feature.AlwaysUpdatePulsarResource) && r.hasObservedResource()
270+
}
271+
272+
func (r *PulsarConnectionReconciler) hasObservedResource() bool {
273+
return len(r.tenants) > 0 ||
274+
len(r.namespaces) > 0 ||
275+
len(r.topics) > 0 ||
276+
len(r.permissions) > 0 ||
277+
len(r.geoReplications) > 0 ||
278+
len(r.packages) > 0 ||
279+
len(r.sinks) > 0 ||
280+
len(r.sources) > 0 ||
281+
len(r.functions) > 0 ||
282+
len(r.nsIsolationPolicies) > 0
283+
}
284+
251285
func (r *PulsarConnectionReconciler) addUnreadyResource(obj reconciler.Object) {
252286
if len(r.unreadyResources) == 30 {
253287
// avoid add too many unready resources

0 commit comments

Comments
 (0)