Skip to content

Commit b9602ca

Browse files
authored
Merge pull request #64 from kubescape/bugfix/watcher
Bugfix/watcher
2 parents cd6c81b + c277f49 commit b9602ca

File tree

1 file changed

+29
-26
lines changed

1 file changed

+29
-26
lines changed

pkg/watcher/watcher.go

+29-26
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"log"
77
"strconv"
8-
"time"
98

109
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1110
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -92,27 +91,29 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
9291
w.watcher = watcher
9392
w.running = true
9493
currentWatcherContext, cancelFunc := context.WithCancel(context.Background())
94+
95+
// Function to restart the watcher
96+
restartWatcher := func() {
97+
if currentWatcherContext != nil && cancelFunc != nil {
98+
cancelFunc()
99+
}
100+
listOptions.ResourceVersion = resourceVersion
101+
currentWatcherContext, cancelFunc = context.WithCancel(context.Background())
102+
w.watcher, err = w.client.Resource(gvr).Namespace("").Watch(currentWatcherContext, listOptions)
103+
if err != nil {
104+
log.Printf("watcher restart error: %v, on object: %+v", err, gvr)
105+
}
106+
watcher = w.watcher
107+
}
108+
95109
go func() {
96110
// Watch for events
97-
98111
for {
99112
event, ok := <-watcher.ResultChan()
100113
if !ok {
101-
if currentWatcherContext != nil && cancelFunc != nil {
102-
cancelFunc()
103-
}
104-
105114
if w.running {
106-
// Need to restart the watcher: wait a bit and restart
107-
time.Sleep(5 * time.Second)
108-
listOptions.ResourceVersion = resourceVersion
109-
currentWatcherContext, cancelFunc = context.WithCancel(context.Background())
110-
w.watcher, err = w.client.Resource(gvr).Namespace("").Watch(currentWatcherContext, listOptions)
111-
if err != nil {
112-
log.Printf("watcher restart error: %v", err)
113-
}
114-
watcher = w.watcher
115-
// Restart the loop
115+
log.Printf("Watcher channel closed on object %+v", gvr)
116+
restartWatcher()
116117
continue
117118
} else {
118119
// Stop the watcher
@@ -128,9 +129,7 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
128129
continue
129130
}
130131
// Update the resourceVersion
131-
if isResourceVersionHigher(addedObject.GetResourceVersion(), resourceVersion) {
132-
resourceVersion = addedObject.GetResourceVersion()
133-
}
132+
resourceVersion = addedObject.GetResourceVersion()
134133
notifyF.AddFunc(addedObject)
135134
addedObject = nil // Make sure the item is scraped by the GC
136135
case watch.Modified:
@@ -141,9 +140,7 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
141140
continue
142141
}
143142
// Update the resourceVersion
144-
if isResourceVersionHigher(modifiedObject.GetResourceVersion(), resourceVersion) {
145-
resourceVersion = modifiedObject.GetResourceVersion()
146-
}
143+
resourceVersion = modifiedObject.GetResourceVersion()
147144
notifyF.UpdateFunc(modifiedObject)
148145
modifiedObject = nil // Make sure the item is scraped by the GC
149146
case watch.Deleted:
@@ -154,9 +151,7 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
154151
continue
155152
}
156153
// Update the resourceVersion
157-
if isResourceVersionHigher(deletedObject.GetResourceVersion(), resourceVersion) {
158-
resourceVersion = deletedObject.GetResourceVersion()
159-
}
154+
resourceVersion = deletedObject.GetResourceVersion()
160155
notifyF.DeleteFunc(deletedObject)
161156
deletedObject = nil // Make sure the item is scraped by the GC
162157

@@ -171,7 +166,15 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
171166
bookmarkObject = nil // Make sure the item is scraped by the GC
172167

173168
case watch.Error:
174-
log.Printf("watcher error: %v", event.Object)
169+
// Convert the object to metav1.Status
170+
watchError := event.Object.(*metav1.Status)
171+
// Check if the object reason is "Expired" or "Gone" and restart the watcher
172+
if watchError.Reason == "Expired" || watchError.Reason == "Gone" || watchError.Code == 410 {
173+
restartWatcher()
174+
continue
175+
} else {
176+
log.Printf("watcher error: %v, on object %+v", event.Object, gvr)
177+
}
175178
}
176179
}
177180
}()

0 commit comments

Comments
 (0)