Skip to content

Commit 2f7bdb8

Browse files
authored
Merge pull request #65 from kubescape/bugfix/watcher-final
Fixing watcher
2 parents b9602ca + 0a600f6 commit 2f7bdb8

File tree

1 file changed

+41
-55
lines changed

1 file changed

+41
-55
lines changed

pkg/watcher/watcher.go

+41-55
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import (
1313
"k8s.io/client-go/dynamic"
1414
)
1515

16+
type resourceVersionGetter interface {
17+
GetResourceVersion() string
18+
}
19+
1620
type WatchNotifyFunctions struct {
1721
AddFunc func(obj *unstructured.Unstructured)
1822
UpdateFunc func(obj *unstructured.Unstructured)
@@ -26,14 +30,15 @@ type WatcherInterface interface {
2630
}
2731

2832
type Watcher struct {
29-
preList bool
30-
client dynamic.Interface
31-
watcher watch.Interface
32-
running bool
33+
preList bool
34+
client dynamic.Interface
35+
watcher watch.Interface
36+
lastResourceVersion string
37+
running bool
3338
}
3439

3540
func NewWatcher(k8sClient dynamic.Interface, preList bool) WatcherInterface {
36-
return &Watcher{client: k8sClient, watcher: nil, running: false, preList: preList}
41+
return &Watcher{client: k8sClient, watcher: nil, running: false, preList: preList, lastResourceVersion: ""}
3742
}
3843

3944
func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionResource, listOptions metav1.ListOptions) error {
@@ -55,7 +60,7 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
5560
}
5661

5762
// List of current objects
58-
resourceVersion := ""
63+
w.lastResourceVersion = ""
5964

6065
if w.preList {
6166
listOptions.Watch = false
@@ -65,9 +70,9 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
6570
return err
6671
}
6772
for i, item := range list.Items {
68-
if isResourceVersionHigher(item.GetResourceVersion(), resourceVersion) {
73+
if isResourceVersionHigher(item.GetResourceVersion(), w.lastResourceVersion) {
6974
// Update the resourceVersion to the latest
70-
resourceVersion = item.GetResourceVersion()
75+
w.lastResourceVersion = item.GetResourceVersion()
7176
if w.preList {
7277
notifyF.AddFunc(&item)
7378
}
@@ -78,48 +83,45 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
7883
list.Items = nil
7984
list = nil
8085
}
81-
} else {
82-
resourceVersion = "0"
8386
}
8487

8588
// Start the watcher
86-
listOptions.ResourceVersion = resourceVersion
87-
watcher, err := w.client.Resource(gvr).Namespace("").Watch(context.Background(), listOptions)
89+
listOptions.ResourceVersion = w.lastResourceVersion
90+
listOptions.Watch = true
91+
listOptions.AllowWatchBookmarks = true
92+
w.watcher, err = w.client.Resource(gvr).Namespace("").Watch(context.TODO(), listOptions)
8893
if err != nil {
8994
return err
9095
}
91-
w.watcher = watcher
9296
w.running = true
93-
currentWatcherContext, cancelFunc := context.WithCancel(context.Background())
94-
95-
// Function to restart the watcher
96-
restartWatcher := func() {
97-
if currentWatcherContext != nil && cancelFunc != nil {
98-
cancelFunc()
99-
}
100-
listOptions.ResourceVersion = resourceVersion
101-
currentWatcherContext, cancelFunc = context.WithCancel(context.Background())
102-
w.watcher, err = w.client.Resource(gvr).Namespace("").Watch(currentWatcherContext, listOptions)
103-
if err != nil {
104-
log.Printf("watcher restart error: %v, on object: %+v", err, gvr)
105-
}
106-
watcher = w.watcher
107-
}
108-
10997
go func() {
11098
// Watch for events
11199
for {
112-
event, ok := <-watcher.ResultChan()
100+
event, ok := <-w.watcher.ResultChan()
113101
if !ok {
114102
if w.running {
115-
log.Printf("Watcher channel closed on object %+v", gvr)
116-
restartWatcher()
103+
log.Printf("Restarting watcher on object %+v", gvr)
104+
w.watcher.Stop()
105+
w.watcher = nil
106+
listOptions.ResourceVersion = w.lastResourceVersion
107+
w.watcher, err = w.client.Resource(gvr).Namespace("").Watch(context.TODO(), listOptions)
108+
if err != nil {
109+
log.Printf("watcher restart error: %v, on object: %+v", err, gvr)
110+
}
117111
continue
118112
} else {
119113
// Stop the watcher
120114
return
121115
}
122116
}
117+
118+
if metaObject, ok := event.Object.(resourceVersionGetter); ok {
119+
// Update the resourceVersion to the latest
120+
if metaObject.GetResourceVersion() != "" && isResourceVersionHigher(metaObject.GetResourceVersion(), w.lastResourceVersion) {
121+
w.lastResourceVersion = metaObject.GetResourceVersion()
122+
}
123+
}
124+
123125
switch event.Type {
124126
case watch.Added:
125127
// Convert the object to unstructured
@@ -128,8 +130,6 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
128130
log.Printf("watcher error: addedObject is nil")
129131
continue
130132
}
131-
// Update the resourceVersion
132-
resourceVersion = addedObject.GetResourceVersion()
133133
notifyF.AddFunc(addedObject)
134134
addedObject = nil // Make sure the item is scraped by the GC
135135
case watch.Modified:
@@ -139,8 +139,6 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
139139
log.Printf("watcher error: modifiedObject is nil")
140140
continue
141141
}
142-
// Update the resourceVersion
143-
resourceVersion = modifiedObject.GetResourceVersion()
144142
notifyF.UpdateFunc(modifiedObject)
145143
modifiedObject = nil // Make sure the item is scraped by the GC
146144
case watch.Deleted:
@@ -150,31 +148,19 @@ func (w *Watcher) Start(notifyF WatchNotifyFunctions, gvr schema.GroupVersionRes
150148
log.Printf("watcher error: deletedObject is nil")
151149
continue
152150
}
153-
// Update the resourceVersion
154-
resourceVersion = deletedObject.GetResourceVersion()
155151
notifyF.DeleteFunc(deletedObject)
156152
deletedObject = nil // Make sure the item is scraped by the GC
157-
158-
case watch.Bookmark:
159-
// Update the resourceVersion
160-
bookmarkObject := event.Object.(*unstructured.Unstructured)
161-
if bookmarkObject == nil {
162-
log.Printf("watcher error: bookmarkObject is nil")
163-
continue
164-
}
165-
resourceVersion = bookmarkObject.GetResourceVersion()
166-
bookmarkObject = nil // Make sure the item is scraped by the GC
167-
168153
case watch.Error:
169154
// Convert the object to metav1.Status
170155
watchError := event.Object.(*metav1.Status)
171-
// Check if the object reason is "Expired" or "Gone" and restart the watcher
172-
if watchError.Reason == "Expired" || watchError.Reason == "Gone" || watchError.Code == 410 {
173-
restartWatcher()
174-
continue
175-
} else {
176-
log.Printf("watcher error: %v, on object %+v", event.Object, gvr)
156+
if watchError != nil {
157+
if watchError.Code == 410 {
158+
// If the resourceVersion is too old, reset the resourceVersion to the latest.
159+
// https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions
160+
w.lastResourceVersion = ""
161+
}
177162
}
163+
continue
178164
}
179165
}
180166
}()

0 commit comments

Comments
 (0)