Skip to content

Commit c906c36

Browse files
support unwatchables in vai (#458)
* Create and use a synthetic watcher for non-watchable resources. * Write unit tests for the synthetic watcher. * Make the refresh interval for synthetic watchers configurable. The default is to call `client.List(...)` every 5 seconds for each unwatchable GVK. There are currently only 3 such GVKs right now so this will be informative enough but not really noticeable. * Pass the context into the synthetic watch func. * Restore changes lost in rebasing. --------- Co-authored-by: Tom Lebreux <tom.lebreux@suse.com>
1 parent 6559fa9 commit c906c36

6 files changed

Lines changed: 364 additions & 19 deletions

File tree

pkg/sqlcache/informer/factory/informer_factory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type guardedInformer struct {
4444
mutex *sync.Mutex
4545
}
4646

47-
type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool) (*informer.Informer, error)
47+
type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool) (*informer.Informer, error)
4848

4949
type Cache struct {
5050
informer.ByOptionsLister
@@ -121,7 +121,7 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, transfor
121121

122122
_, encryptResourceAlways := defaultEncryptedResourceTypes[gvk]
123123
shouldEncrypt := f.encryptAll || encryptResourceAlways
124-
i, err := f.newInformer(ctx, client, fields, transform, gvk, f.dbClient, shouldEncrypt, namespaced)
124+
i, err := f.newInformer(ctx, client, fields, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable)
125125
if err != nil {
126126
return Cache{}, err
127127
}

pkg/sqlcache/informer/factory/informer_factory_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func TestCacheFor(t *testing.T) {
7474
expectedC := Cache{
7575
ByOptionsLister: i,
7676
}
77-
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool) (*informer.Informer, error) {
77+
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*informer.Informer, error) {
7878
assert.Equal(t, client, dynamicClient)
7979
assert.Equal(t, fields, fields)
8080
assert.Equal(t, expectedGVK, gvk)
@@ -119,7 +119,7 @@ func TestCacheFor(t *testing.T) {
119119
// need to set this so Run function is not nil
120120
SharedIndexInformer: sii,
121121
}
122-
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) {
122+
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) {
123123
assert.Equal(t, client, dynamicClient)
124124
assert.Equal(t, fields, fields)
125125
assert.Equal(t, expectedGVK, gvk)
@@ -161,7 +161,7 @@ func TestCacheFor(t *testing.T) {
161161
expectedC := Cache{
162162
ByOptionsLister: i,
163163
}
164-
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) {
164+
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) {
165165
assert.Equal(t, client, dynamicClient)
166166
assert.Equal(t, fields, fields)
167167
assert.Equal(t, expectedGVK, gvk)
@@ -200,7 +200,7 @@ func TestCacheFor(t *testing.T) {
200200
expectedC := Cache{
201201
ByOptionsLister: i,
202202
}
203-
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) {
203+
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) {
204204
assert.Equal(t, client, dynamicClient)
205205
assert.Equal(t, fields, fields)
206206
assert.Equal(t, expectedGVK, gvk)
@@ -248,7 +248,7 @@ func TestCacheFor(t *testing.T) {
248248
expectedC := Cache{
249249
ByOptionsLister: i,
250250
}
251-
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) {
251+
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool) (*informer.Informer, error) {
252252
assert.Equal(t, client, dynamicClient)
253253
assert.Equal(t, fields, fields)
254254
assert.Equal(t, expectedGVK, gvk)
@@ -295,7 +295,7 @@ func TestCacheFor(t *testing.T) {
295295
expectedC := Cache{
296296
ByOptionsLister: i,
297297
}
298-
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool) (*informer.Informer, error) {
298+
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool) (*informer.Informer, error) {
299299
assert.Equal(t, client, dynamicClient)
300300
assert.Equal(t, fields, fields)
301301
assert.Equal(t, expectedGVK, gvk)
@@ -342,7 +342,7 @@ func TestCacheFor(t *testing.T) {
342342
expectedC := Cache{
343343
ByOptionsLister: i,
344344
}
345-
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool) (*informer.Informer, error) {
345+
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*informer.Informer, error) {
346346
// we can't test func == func, so instead we check if the output was as expected
347347
input := "someinput"
348348
ouput, err := transform(input)

pkg/sqlcache/informer/informer.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"k8s.io/client-go/tools/cache"
2121
)
2222

23+
var defaultRefreshTime = 5 * time.Second
24+
2325
// Informer is a SQLite-backed cache.SharedIndexInformer that can execute queries on listprocessor structs
2426
type Informer struct {
2527
cache.SharedIndexInformer
@@ -35,15 +37,22 @@ var newInformer = cache.NewSharedIndexInformer
3537

3638
// NewInformer returns a new SQLite-backed Informer for the type specified by schema in unstructured.Unstructured form
3739
// using the specified client
38-
func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool) (*Informer, error) {
40+
func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*Informer, error) {
41+
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
42+
return client.Watch(ctx, options)
43+
}
44+
if !watchable {
45+
watchFunc = func(options metav1.ListOptions) (watch.Interface, error) {
46+
ctx, cancel := context.WithCancel(ctx)
47+
return newSyntheticWatcher(ctx, cancel).watch(client, options, defaultRefreshTime)
48+
}
49+
}
3950
listWatcher := &cache.ListWatch{
4051
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
4152
a, err := client.List(ctx, options)
4253
return a, err
4354
},
44-
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
45-
return client.Watch(ctx, options)
46-
},
55+
WatchFunc: watchFunc,
4756
}
4857

4958
example := &unstructured.Unstructured{}
@@ -97,6 +106,11 @@ func (i *Informer) ListByOptions(ctx context.Context, lo ListOptions, partitions
97106
return i.ByOptionsLister.ListByOptions(ctx, lo, partitions, namespace)
98107
}
99108

109+
// SetSyntheticWatchableInterval - call this function to override the default interval time of 5 seconds
110+
func SetSyntheticWatchableInterval(interval time.Duration) {
111+
defaultRefreshTime = interval
112+
}
113+
100114
func informerNameFromGVK(gvk schema.GroupVersionKind) string {
101115
return gvk.Group + "_" + gvk.Version + "_" + gvk.Kind
102116
}

pkg/sqlcache/informer/informer_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestNewInformer(t *testing.T) {
7979
}
8080
})
8181

82-
informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true)
82+
informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true)
8383
assert.Nil(t, err)
8484
assert.NotNil(t, informer.ByOptionsLister)
8585
assert.NotNil(t, informer.SharedIndexInformer)
@@ -103,7 +103,7 @@ func TestNewInformer(t *testing.T) {
103103
}
104104
})
105105

106-
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true)
106+
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true)
107107
assert.NotNil(t, err)
108108
}})
109109
tests = append(tests, testCase{description: "NewInformer() with errors returned from NewIndexer(), should return an error", test: func(t *testing.T) {
@@ -138,7 +138,7 @@ func TestNewInformer(t *testing.T) {
138138
}
139139
})
140140

141-
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true)
141+
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true)
142142
assert.NotNil(t, err)
143143
}})
144144
tests = append(tests, testCase{description: "NewInformer() with errors returned from NewListOptionIndexer(), should return an error", test: func(t *testing.T) {
@@ -190,7 +190,7 @@ func TestNewInformer(t *testing.T) {
190190
}
191191
})
192192

193-
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true)
193+
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true)
194194
assert.NotNil(t, err)
195195
}})
196196
tests = append(tests, testCase{description: "NewInformer() with transform func", test: func(t *testing.T) {
@@ -253,7 +253,7 @@ func TestNewInformer(t *testing.T) {
253253
transformFunc := func(input interface{}) (interface{}, error) {
254254
return "someoutput", nil
255255
}
256-
informer, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true)
256+
informer, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true)
257257
assert.Nil(t, err)
258258
assert.NotNil(t, informer.ByOptionsLister)
259259
assert.NotNil(t, informer.SharedIndexInformer)
@@ -289,7 +289,7 @@ func TestNewInformer(t *testing.T) {
289289
transformFunc := func(input interface{}) (interface{}, error) {
290290
return "someoutput", nil
291291
}
292-
_, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true)
292+
_, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true)
293293
assert.Error(t, err)
294294
newInformer = cache.NewSharedIndexInformer
295295
}})
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package informer
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/sirupsen/logrus"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
12+
"k8s.io/apimachinery/pkg/watch"
13+
"k8s.io/client-go/dynamic"
14+
)
15+
16+
type SyntheticWatcher struct {
17+
resultChan chan watch.Event
18+
stopChan chan struct{}
19+
doneChan chan struct{}
20+
stopChanLock sync.Mutex
21+
context context.Context
22+
cancelFunc context.CancelFunc
23+
}
24+
25+
func newSyntheticWatcher(context context.Context, cancel context.CancelFunc) *SyntheticWatcher {
26+
return &SyntheticWatcher{
27+
stopChan: make(chan struct{}),
28+
doneChan: make(chan struct{}),
29+
resultChan: make(chan watch.Event, 0),
30+
context: context,
31+
cancelFunc: cancel,
32+
}
33+
}
34+
35+
func (rw *SyntheticWatcher) watch(client dynamic.ResourceInterface, options metav1.ListOptions, interval time.Duration) (*SyntheticWatcher, error) {
36+
go rw.receive(client, options, interval)
37+
return rw, nil
38+
}
39+
40+
type objectHolder struct {
41+
version string
42+
unstructuredObject *unstructured.Unstructured
43+
}
44+
45+
// receive periodically calls client.List(), and converts the returned items into Watch Events
46+
func (rw *SyntheticWatcher) receive(client dynamic.ResourceInterface, options metav1.ListOptions, interval time.Duration) {
47+
go func() {
48+
defer close(rw.doneChan)
49+
defer close(rw.resultChan)
50+
defer rw.cancelFunc()
51+
previousState := make(map[string]objectHolder)
52+
ticker := time.NewTicker(interval)
53+
54+
for {
55+
select {
56+
case <-ticker.C:
57+
list, err := client.List(rw.context, options)
58+
if err != nil {
59+
logrus.Errorf("synthetic watcher: client.List => error: %s", err)
60+
continue
61+
}
62+
newObjects := make(map[string]objectHolder)
63+
for _, uItem := range list.Items {
64+
namespace := uItem.GetNamespace()
65+
name := uItem.GetName()
66+
key := name
67+
if namespace != "" {
68+
key = fmt.Sprintf("%s/%s", namespace, name)
69+
}
70+
version := uItem.GetResourceVersion()
71+
newObjects[key] = objectHolder{version: version, unstructuredObject: &uItem}
72+
}
73+
// Now determine whether items were added, deleted, or modified
74+
currentState := make(map[string]objectHolder)
75+
for key, newObject := range newObjects {
76+
currentState[key] = newObject
77+
if oldItem, ok := previousState[key]; !ok {
78+
w, err := createWatchEvent(watch.Added, newObject.unstructuredObject)
79+
if err != nil {
80+
logrus.Errorf("can't convert unstructured obj into runtime: %s", err)
81+
continue
82+
}
83+
rw.resultChan <- w
84+
} else {
85+
delete(previousState, key)
86+
if oldItem.version != newObject.version {
87+
w, err := createWatchEvent(watch.Modified, oldItem.unstructuredObject)
88+
if err != nil {
89+
logrus.Errorf("can't convert unstructured obj into runtime: %s", err)
90+
continue
91+
}
92+
rw.resultChan <- w
93+
}
94+
}
95+
}
96+
// And anything left in the previousState didn't show up in currentState and can be deleted.
97+
for _, item := range previousState {
98+
w, err := createWatchEvent(watch.Deleted, item.unstructuredObject)
99+
if err != nil {
100+
continue
101+
}
102+
rw.resultChan <- w
103+
}
104+
previousState = currentState
105+
106+
case <-rw.stopChan:
107+
rw.cancelFunc()
108+
return
109+
110+
case <-rw.context.Done():
111+
return
112+
}
113+
}
114+
}()
115+
}
116+
117+
func createWatchEvent(event watch.EventType, u *unstructured.Unstructured) (watch.Event, error) {
118+
return watch.Event{Type: event, Object: u}, nil
119+
}
120+
121+
// ResultChan implements [k8s.io/apimachinery/pkg/watch].Interface.
122+
func (rw *SyntheticWatcher) ResultChan() <-chan watch.Event {
123+
return rw.resultChan
124+
}
125+
126+
// Stop implements [k8s.io/apimachinery/pkg/watch].Interface.
127+
func (rw *SyntheticWatcher) Stop() {
128+
rw.stopChanLock.Lock()
129+
defer rw.stopChanLock.Unlock()
130+
131+
// Prevent closing an already closed channel to prevent a panic
132+
select {
133+
case <-rw.stopChan:
134+
default:
135+
close(rw.stopChan)
136+
}
137+
}
138+
139+
func (rw *SyntheticWatcher) Done() <-chan struct{} {
140+
return rw.doneChan
141+
}

0 commit comments

Comments
 (0)