Skip to content

Commit 2f0a091

Browse files
authored
Merge pull request #56 from kubescape/feature/finalizer-watcher
Feature/finalizer watcher
2 parents e3874f8 + 46e22c9 commit 2f0a091

File tree

2 files changed

+41
-56
lines changed

2 files changed

+41
-56
lines changed

pkg/collector/collector.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/kubescape/kapprofiler/pkg/eventsink"
1212
"github.com/kubescape/kapprofiler/pkg/tracing"
13+
"github.com/kubescape/kapprofiler/pkg/watcher"
1314

1415
"golang.org/x/exp/slices"
1516
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -64,7 +65,7 @@ type CollectorManager struct {
6465
config CollectorManagerConfig
6566

6667
// Pod finalizer watcher
67-
podFinalizerControl chan struct{}
68+
podFinalizerWatcher watcher.WatcherInterface
6869

6970
// Pod finalizer state table
7071
podFinalizerState map[string]*PodProfileFinalizerState

pkg/collector/pod_finalizer.go

+39-55
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@ import (
88
"sync"
99
"time"
1010

11+
"github.com/kubescape/kapprofiler/pkg/watcher"
1112
v1 "k8s.io/api/core/v1"
1213
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1314
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1415
"k8s.io/apimachinery/pkg/runtime"
1516
"k8s.io/apimachinery/pkg/runtime/schema"
1617
apitypes "k8s.io/apimachinery/pkg/types"
17-
"k8s.io/client-go/dynamic/dynamicinformer"
18-
"k8s.io/client-go/tools/cache"
1918
)
2019

2120
type PodProfileFinalizerState struct {
@@ -34,33 +33,31 @@ func (cm *CollectorManager) StartFinalizerWatcher() {
3433
cm.podFinalizerStateMutex = &sync.Mutex{}
3534
// Initialize map
3635
cm.podFinalizerState = make(map[string]*PodProfileFinalizerState)
36+
// Initialize watcher
37+
cm.podFinalizerWatcher = watcher.NewWatcher(cm.dynamicClient, false)
3738

38-
// Initialize factory and informer
39-
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(cm.dynamicClient, 0, metav1.NamespaceAll, func(lo *metav1.ListOptions) {
40-
lo.FieldSelector = "spec.nodeName=" + cm.config.NodeName
41-
})
42-
// Informer for Pods
43-
informer := factory.ForResource(schema.GroupVersionResource{
44-
Group: "",
45-
Version: "v1",
46-
Resource: "pods",
47-
}).Informer()
48-
49-
// Add event handlers to informer
50-
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
51-
AddFunc: func(obj interface{}) {
39+
// Start watcher
40+
err := cm.podFinalizerWatcher.Start(watcher.WatchNotifyFunctions{
41+
AddFunc: func(obj *unstructured.Unstructured) {
5242
cm.handlePodAddEvent(obj)
5343
},
54-
UpdateFunc: func(oldObj, newObj interface{}) {
55-
cm.handlePodUpdateEvent(oldObj, newObj)
44+
UpdateFunc: func(obj *unstructured.Unstructured) {
45+
cm.handlePodUpdateEvent(obj)
5646
},
57-
DeleteFunc: func(obj interface{}) {
47+
DeleteFunc: func(obj *unstructured.Unstructured) {
5848
cm.handlePodDeleteEvent(obj)
5949
},
60-
})
50+
}, schema.GroupVersionResource{
51+
Group: "",
52+
Version: "v1",
53+
Resource: "pods",
54+
}, metav1.ListOptions{})
6155

62-
// Run the informer
63-
go informer.Run(cm.podFinalizerControl)
56+
if err != nil {
57+
log.Printf("Error starting watcher: %v", err)
58+
cm.podFinalizerWatcher = nil
59+
return
60+
}
6461
}
6562

6663
func generateTableKey(obj metav1.Object) string {
@@ -69,8 +66,6 @@ func generateTableKey(obj metav1.Object) string {
6966

7067
func (cm *CollectorManager) handlePodAddEvent(obj interface{}) {
7168
// Add pod to finalizer map
72-
73-
// Convert object to Pod
7469
pod, err := ConvertInterfaceToPod(obj)
7570
if err != nil {
7671
log.Printf("the interface is not a Pod %v", err)
@@ -103,59 +98,44 @@ func (cm *CollectorManager) handlePodAddEvent(obj interface{}) {
10398
cm.startFinalizationTimer(pod)
10499
}
105100
}
106-
107101
}
108102

109-
func (cm *CollectorManager) handlePodUpdateEvent(oldObj, newObj interface{}) {
110-
// Need to access the status of the old and new pod to check if the pod became ready
111-
103+
func (cm *CollectorManager) handlePodUpdateEvent(obj interface{}) {
112104
// Convert interface to Pod object
113-
oldPod, err := ConvertInterfaceToPod(oldObj)
114-
if err != nil {
115-
log.Printf("the interface is not a Pod %v", err)
116-
return
117-
}
118-
newPod, err := ConvertInterfaceToPod(newObj)
105+
pod, err := ConvertInterfaceToPod(obj)
119106
if err != nil {
120107
log.Printf("the interface is not a Pod %v", err)
121108
return
122109
}
123110

124111
// Check if recoding
125112
cm.podFinalizerStateMutex.Lock()
126-
finalizerState, ok := cm.podFinalizerState[generateTableKey(oldPod)]
113+
finalizerState, ok := cm.podFinalizerState[generateTableKey(&pod.ObjectMeta)]
127114
if !ok || !finalizerState.Recording {
128115
// Discard
129116
cm.podFinalizerStateMutex.Unlock()
130117
return
131118
}
132119
cm.podFinalizerStateMutex.Unlock()
133120

134-
// Check old pod status
135-
oldPodReady := false
136-
for _, condition := range oldPod.Status.Conditions {
137-
if condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue {
138-
oldPodReady = true
139-
}
140-
}
141-
142-
newPodReady := false
143-
for _, condition := range newPod.Status.Conditions {
121+
// Check pod status
122+
podReady := false
123+
for _, condition := range pod.Status.Conditions {
144124
if condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue {
145-
newPodReady = true
125+
podReady = true
146126
}
147127
}
148128

149-
if newPodReady && !oldPodReady {
129+
if podReady {
150130
// Pod became ready, add finalizer
151131
// Get mutex
152132
cm.podFinalizerStateMutex.Lock()
153133
defer cm.podFinalizerStateMutex.Unlock()
154134

155135
// Check if pod is in map
156-
podState, ok := cm.podFinalizerState[generateTableKey(newPod)]
136+
podState, ok := cm.podFinalizerState[generateTableKey(&pod.ObjectMeta)]
157137
if !ok {
158-
log.Printf("Pod %s in namespace %s not in finalizer map", newPod.GetName(), newPod.GetNamespace())
138+
log.Printf("Pod %s in namespace %s not in finalizer map", pod.GetName(), pod.GetNamespace())
159139
return
160140
}
161141

@@ -166,16 +146,16 @@ func (cm *CollectorManager) handlePodUpdateEvent(oldObj, newObj interface{}) {
166146
}
167147

168148
// Timer is not running, add finalizer
169-
podState.FinalizationTimer = cm.startFinalizationTimer(newPod)
170-
} else if !newPodReady && oldPodReady {
171-
cm.stopTimer(&newPod.ObjectMeta)
149+
podState.FinalizationTimer = cm.startFinalizationTimer(pod)
150+
} else {
151+
cm.stopTimer(&pod.ObjectMeta)
172152
}
173153
}
174154

175155
// Timer function
176156
func (cm *CollectorManager) startFinalizationTimer(pod *v1.Pod) *time.Timer {
177-
jitter := time.Duration(rand.Intn(int(cm.config.FinalizeJitter))) * time.Second
178-
finalizationTimer := time.NewTimer(time.Duration(cm.config.FinalizeTime+uint64(jitter)) * time.Second)
157+
jitter := uint64(rand.Intn(int(cm.config.FinalizeJitter)))
158+
finalizationTimer := time.NewTimer(time.Duration(cm.config.FinalizeTime+jitter) * time.Second)
179159

180160
// This goroutine waits for the timer to finish.
181161
go func() {
@@ -303,7 +283,11 @@ func (cm *CollectorManager) MarkPodNotRecording(pod, namespace string) {
303283
}
304284

305285
func (cm *CollectorManager) StopFinalizerWatcher() {
306-
close(cm.podFinalizerControl)
286+
if cm.podFinalizerWatcher != nil {
287+
cm.podFinalizerWatcher.Stop()
288+
} else {
289+
log.Printf("Pod finalizer watcher not started")
290+
}
307291
}
308292

309293
func ConvertInterfaceToPod(obj interface{}) (*v1.Pod, error) {

0 commit comments

Comments
 (0)