Skip to content

Commit 46e22c9

Browse files
committed
Adding new watcher and fixed finalization timer
Signed-off-by: Amit Schendel <[email protected]>
1 parent db0e53b commit 46e22c9

File tree

2 files changed

+12
-103
lines changed

2 files changed

+12
-103
lines changed

pkg/collector/collector.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/kubescape/kapprofiler/pkg/eventsink"
1212
"github.com/kubescape/kapprofiler/pkg/tracing"
13+
"github.com/kubescape/kapprofiler/pkg/watcher"
1314

1415
"golang.org/x/exp/slices"
1516
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -64,7 +65,7 @@ type CollectorManager struct {
6465
config CollectorManagerConfig
6566

6667
// Pod finalizer watcher
67-
podFinalizerControl chan struct{}
68+
podFinalizerWatcher watcher.WatcherInterface
6869

6970
// Pod finalizer state table
7071
podFinalizerState map[string]*PodProfileFinalizerState

pkg/collector/pod_finalizer.go

+10-102
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,11 @@ func (cm *CollectorManager) StartFinalizerWatcher() {
3333
cm.podFinalizerStateMutex = &sync.Mutex{}
3434
// Initialize map
3535
cm.podFinalizerState = make(map[string]*PodProfileFinalizerState)
36-
3736
// Initialize watcher
38-
podFinalizerWatcher := watcher.NewWatcher(cm.dynamicClient, false)
37+
cm.podFinalizerWatcher = watcher.NewWatcher(cm.dynamicClient, false)
3938

4039
// Start watcher
41-
err := podFinalizerWatcher.Start(watcher.WatchNotifyFunctions{
40+
err := cm.podFinalizerWatcher.Start(watcher.WatchNotifyFunctions{
4241
AddFunc: func(obj *unstructured.Unstructured) {
4342
cm.handlePodAddEvent(obj)
4443
},
@@ -56,35 +55,9 @@ func (cm *CollectorManager) StartFinalizerWatcher() {
5655

5756
if err != nil {
5857
log.Printf("Error starting watcher: %v", err)
58+
cm.podFinalizerWatcher = nil
5959
return
6060
}
61-
62-
// Initialize factory and informer
63-
// factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(cm.dynamicClient, 0, metav1.NamespaceAll, func(lo *metav1.ListOptions) {
64-
// lo.FieldSelector = "spec.nodeName=" + cm.config.NodeName
65-
// })
66-
// // Informer for Pods
67-
// informer := factory.ForResource(schema.GroupVersionResource{
68-
// Group: "",
69-
// Version: "v1",
70-
// Resource: "pods",
71-
// }).Informer()
72-
73-
// // Add event handlers to informer
74-
// informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
75-
// AddFunc: func(obj interface{}) {
76-
// cm.handlePodAddEvent(obj)
77-
// },
78-
// UpdateFunc: func(oldObj, newObj interface{}) {
79-
// cm.handlePodUpdateEvent(oldObj, newObj)
80-
// },
81-
// DeleteFunc: func(obj interface{}) {
82-
// cm.handlePodDeleteEvent(obj)
83-
// },
84-
// })
85-
86-
// // Run the informer
87-
// go informer.Run(cm.podFinalizerControl)
8861
}
8962

9063
func generateTableKey(obj metav1.Object) string {
@@ -93,8 +66,6 @@ func generateTableKey(obj metav1.Object) string {
9366

9467
func (cm *CollectorManager) handlePodAddEvent(obj interface{}) {
9568
// Add pod to finalizer map
96-
97-
// Convert object to Pod
9869
pod, err := ConvertInterfaceToPod(obj)
9970
if err != nil {
10071
log.Printf("the interface is not a Pod %v", err)
@@ -127,7 +98,6 @@ func (cm *CollectorManager) handlePodAddEvent(obj interface{}) {
12798
cm.startFinalizationTimer(pod)
12899
}
129100
}
130-
131101
}
132102

133103
func (cm *CollectorManager) handlePodUpdateEvent(obj interface{}) {
@@ -182,76 +152,10 @@ func (cm *CollectorManager) handlePodUpdateEvent(obj interface{}) {
182152
}
183153
}
184154

185-
// func (cm *CollectorManager) handlePodUpdateEvent(oldObj, newObj interface{}) {
186-
// // Need to access the status of the old and new pod to check if the pod became ready
187-
188-
// // Convert interface to Pod object
189-
// oldPod, err := ConvertInterfaceToPod(oldObj)
190-
// if err != nil {
191-
// log.Printf("the interface is not a Pod %v", err)
192-
// return
193-
// }
194-
// newPod, err := ConvertInterfaceToPod(newObj)
195-
// if err != nil {
196-
// log.Printf("the interface is not a Pod %v", err)
197-
// return
198-
// }
199-
200-
// // Check if recoding
201-
// cm.podFinalizerStateMutex.Lock()
202-
// finalizerState, ok := cm.podFinalizerState[generateTableKey(oldPod)]
203-
// if !ok || !finalizerState.Recording {
204-
// // Discard
205-
// cm.podFinalizerStateMutex.Unlock()
206-
// return
207-
// }
208-
// cm.podFinalizerStateMutex.Unlock()
209-
210-
// // Check old pod status
211-
// oldPodReady := false
212-
// for _, condition := range oldPod.Status.Conditions {
213-
// if condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue {
214-
// oldPodReady = true
215-
// }
216-
// }
217-
218-
// newPodReady := false
219-
// for _, condition := range newPod.Status.Conditions {
220-
// if condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue {
221-
// newPodReady = true
222-
// }
223-
// }
224-
225-
// if newPodReady && !oldPodReady {
226-
// // Pod became ready, add finalizer
227-
// // Get mutex
228-
// cm.podFinalizerStateMutex.Lock()
229-
// defer cm.podFinalizerStateMutex.Unlock()
230-
231-
// // Check if pod is in map
232-
// podState, ok := cm.podFinalizerState[generateTableKey(newPod)]
233-
// if !ok {
234-
// log.Printf("Pod %s in namespace %s not in finalizer map", newPod.GetName(), newPod.GetNamespace())
235-
// return
236-
// }
237-
238-
// // Check if timer is running
239-
// if podState.FinalizationTimer != nil {
240-
// // Timer is running, no need to add finalizer
241-
// return
242-
// }
243-
244-
// // Timer is not running, add finalizer
245-
// podState.FinalizationTimer = cm.startFinalizationTimer(newPod)
246-
// } else if !newPodReady && oldPodReady {
247-
// cm.stopTimer(&newPod.ObjectMeta)
248-
// }
249-
// }
250-
251155
// Timer function
252156
func (cm *CollectorManager) startFinalizationTimer(pod *v1.Pod) *time.Timer {
253-
jitter := time.Duration(rand.Intn(int(cm.config.FinalizeJitter))) * time.Second
254-
finalizationTimer := time.NewTimer(time.Duration(cm.config.FinalizeTime+uint64(jitter)) * time.Second)
157+
jitter := uint64(rand.Intn(int(cm.config.FinalizeJitter)))
158+
finalizationTimer := time.NewTimer(time.Duration(cm.config.FinalizeTime+jitter) * time.Second)
255159

256160
// This goroutine waits for the timer to finish.
257161
go func() {
@@ -379,7 +283,11 @@ func (cm *CollectorManager) MarkPodNotRecording(pod, namespace string) {
379283
}
380284

381285
func (cm *CollectorManager) StopFinalizerWatcher() {
382-
close(cm.podFinalizerControl)
286+
if cm.podFinalizerWatcher != nil {
287+
cm.podFinalizerWatcher.Stop()
288+
} else {
289+
log.Printf("Pod finalizer watcher not started")
290+
}
383291
}
384292

385293
func ConvertInterfaceToPod(obj interface{}) (*v1.Pod, error) {

0 commit comments

Comments
 (0)