Skip to content

Commit adc3c0c

Browse files
committed
tiny changes
Signed-off-by: Marco Ma <qingjin_ma@163.com>
1 parent 9856d03 commit adc3c0c

File tree

4 files changed

+690
-325
lines changed

4 files changed

+690
-325
lines changed

pkg/controller/nativedaemonset/nativedaemonset_controller.go

Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"flag"
2222
"fmt"
23-
"sync"
2423
"time"
2524

2625
appsv1 "k8s.io/api/apps/v1"
@@ -121,10 +120,24 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
121120
return false
122121
}
123122

123+
// Filter DaemonSets on controller restart - only process those with partition annotation
124+
createHandler := func(e event.CreateEvent) bool {
125+
ds := e.Object.(*appsv1.DaemonSet)
126+
_, hasPartition := ds.Annotations[util.DaemonSetPartitionAnnotation]
127+
if hasPartition {
128+
klog.Infof("Observed DaemonSet with partition annotation on controller restart: %s/%s", ds.Namespace, ds.Name)
129+
return true
130+
}
131+
return false
132+
}
133+
124134
// Watch for changes to DaemonSet
125135
if err = c.Watch(source.Kind(mgr.GetCache(), &appsv1.DaemonSet{}),
126136
&handler.EnqueueRequestForObject{},
127-
predicate.Funcs{UpdateFunc: updateHandler}); err != nil {
137+
predicate.Funcs{
138+
UpdateFunc: updateHandler,
139+
CreateFunc: createHandler,
140+
}); err != nil {
128141
return err
129142
}
130143

@@ -173,7 +186,8 @@ func (r *ReconcileNativeDaemonSet) Reconcile(ctx context.Context, request reconc
173186
err := r.Get(ctx, request.NamespacedName, daemon)
174187
if err != nil {
175188
if errors.IsNotFound(err) {
176-
// Object not found, return.
189+
// Object not found, clean up expectations and return.
190+
r.expectations.DeleteExpectations(dsKey)
177191
return ctrl.Result{}, nil
178192
}
179193
// Error reading the object - requeue the request.
@@ -289,48 +303,41 @@ func (r *ReconcileNativeDaemonSet) executePodDeletion(ctx context.Context, podsT
289303

290304
klog.Infof("Pods to delete for DaemonSet %s/%s, count: %d", daemon.Namespace, daemon.Name, deleteDiff)
291305

292-
// error channel to communicate back failures. make the buffer big enough to avoid any blocking
293-
errCh := make(chan error, deleteDiff)
294-
deleteWait := sync.WaitGroup{}
295-
deleteWait.Add(deleteDiff)
296-
297-
for i := 0; i < deleteDiff; i++ {
298-
go func(ix int) {
299-
defer deleteWait.Done()
300-
pod := actualPodsToDelete[ix]
301-
302-
klog.Infof("About to delete pod %s/%s for DaemonSet %s/%s (currentRevision=%s)",
303-
pod.Namespace, pod.Name, daemon.Namespace, daemon.Name, pod.Labels[appsv1.ControllerRevisionHashLabelKey])
304-
305-
if err := r.Delete(ctx, pod); err != nil {
306-
// On deletion failure, observe immediately to decrease expectation
307-
r.expectations.Observe(dsKey, expectations.Delete, string(pod.UID))
308-
if !errors.IsNotFound(err) {
309-
klog.Infof("Failed deletion, decremented expectations for DaemonSet %s/%s, pod: %s/%s", daemon.Namespace, daemon.Name, pod.Namespace, pod.Name)
310-
errCh <- err
311-
utilruntime.HandleError(err)
312-
} else {
313-
// NotFound means pod is already deleted, which is fine
314-
klog.Infof("Pod already deleted (NotFound): %s/%s", pod.Namespace, pod.Name)
315-
}
306+
// Use SlowStartBatch for better performance and error handling
307+
successCount, err := daemonsetutil.SlowStartBatch(deleteDiff, 1, func(index int) error {
308+
pod := actualPodsToDelete[index]
309+
310+
klog.Infof("About to delete pod %s/%s for DaemonSet %s/%s (currentRevision=%s)",
311+
pod.Namespace, pod.Name, daemon.Namespace, daemon.Name, pod.Labels[appsv1.ControllerRevisionHashLabelKey])
312+
313+
if deleteErr := r.Delete(ctx, pod); deleteErr != nil {
314+
// On deletion failure, observe immediately to decrease expectation
315+
r.expectations.Observe(dsKey, expectations.Delete, string(pod.UID))
316+
if !errors.IsNotFound(deleteErr) {
317+
klog.Infof("Failed deletion, decremented expectations for DaemonSet %s/%s, pod: %s/%s", daemon.Namespace, daemon.Name, pod.Namespace, pod.Name)
318+
utilruntime.HandleError(deleteErr)
319+
return deleteErr
316320
} else {
317-
// On successful deletion, the expectation will be observed by the Pod Delete watch handler
318-
r.eventRecorder.Event(daemon, corev1.EventTypeNormal, "PodDeleted",
319-
fmt.Sprintf("Deleted pod %s/%s for batch update", pod.Namespace, pod.Name))
320-
klog.Infof("Successfully deleted pod %s/%s for DaemonSet %s/%s", pod.Namespace, pod.Name, daemon.Namespace, daemon.Name)
321+
// NotFound means pod is already deleted, which is fine
322+
klog.Infof("Pod already deleted (NotFound): %s/%s", pod.Namespace, pod.Name)
321323
}
322-
}(i)
323-
}
324-
325-
deleteWait.Wait()
324+
} else {
325+
// On successful deletion, the expectation will be observed by the Pod Delete watch handler
326+
r.eventRecorder.Event(daemon, corev1.EventTypeNormal, "PodDeleted",
327+
fmt.Sprintf("Deleted pod %s/%s for batch update", pod.Namespace, pod.Name))
328+
klog.Infof("Successfully deleted pod %s/%s for DaemonSet %s/%s", pod.Namespace, pod.Name, daemon.Namespace, daemon.Name)
329+
}
330+
return nil
331+
})
326332

327-
// Collect errors if any for proper reporting/retry logic in the controller
328-
close(errCh)
329-
if len(errCh) > 0 {
330-
// Return the first error encountered
331-
return <-errCh
333+
if err != nil {
334+
klog.Errorf("Failed to delete pods for DaemonSet %s/%s, successfully deleted: %d out of %d planned, error: %v",
335+
daemon.Namespace, daemon.Name, successCount, deleteDiff, err)
336+
return err
332337
}
333338

339+
klog.Infof("Successfully deleted %d/%d pods for DaemonSet %s/%s", successCount, deleteDiff, daemon.Namespace, daemon.Name)
340+
334341
return nil
335342
}
336343

0 commit comments

Comments
 (0)