Skip to content

Commit 83f5aeb

Browse files
authored
Make virtual transformers idempotent (#1039)
* Revert "Disable WatchList for SQL cache informer (#1019)" This reverts commit 77a377e. * Revert "Disable WatchListClient for Tokens and Kubeconfigs (#1034)" This reverts commit e0180e2. * Make our virtual transformers idempotent * Send bookmark event in our synthetic watcher * Add unit tests
1 parent 7d8eafa commit 83f5aeb

7 files changed

Lines changed: 211 additions & 43 deletions

File tree

pkg/clustercache/controller.go

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -150,17 +150,7 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
150150
opts := &client.Options{
151151
Schema: schema.Schema,
152152
}
153-
kubeconfigGVK := schema2.GroupVersionKind{Group: "ext.cattle.io", Version: "v1", Kind: "Kubeconfig"}
154-
tokenGVK := schema2.GroupVersionKind{Group: "ext.cattle.io", Version: "v1", Kind: "Token"}
155-
client := h.summaryClient
156-
// Due to a bug in Rancher's extension apiserver for the token and kubeconfig APIs, we
157-
// must disable the WatchList features for those APIs.
158-
if gvk == kubeconfigGVK || gvk == tokenGVK {
159-
client = &noWatchListClient{
160-
ExtendedInterface: h.summaryClient,
161-
}
162-
}
163-
summaryInformer := informer.NewFilteredSummaryInformerWithOptions(client, gvr, opts, metav1.NamespaceAll, 2*time.Hour,
153+
summaryInformer := informer.NewFilteredSummaryInformerWithOptions(h.summaryClient, gvr, opts, metav1.NamespaceAll, 2*time.Hour,
164154
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil)
165155
ctx, cancel := context.WithCancel(h.ctx)
166156
w := &watcher{
@@ -310,12 +300,3 @@ func callAll(handlers []interface{}, gvr schema2.GroupVersionKind, key string, o
310300

311301
return obj, merr.NewErrors(errs...)
312302
}
313-
314-
// noWatchListListWatch disables WatchList feature
315-
type noWatchListClient struct {
316-
client.ExtendedInterface
317-
}
318-
319-
func (n *noWatchListClient) IsWatchListSemanticsUnSupported() bool {
320-
return true
321-
}

pkg/resources/virtual/common/common.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func addIDField(raw *unstructured.Unstructured) *unstructured.Unstructured {
6767
objectID = fmt.Sprintf("%s/%s", namespace, objectID)
6868
}
6969
currentIDValue, ok := raw.Object["id"]
70-
if ok {
70+
if ok && raw.Object["_id"] == nil {
7171
raw.Object["_id"] = currentIDValue
7272
}
7373
raw.Object["id"] = objectID

pkg/resources/virtual/dates/dates.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ func (d *Converter) Transform(obj *unstructured.Unstructured) (*unstructured.Uns
8787
}
8888
}
8989

90+
if isUnixMilli(value) {
91+
continue
92+
}
93+
9094
// Handle duration strings (e.g. "5m", "1d") by converting to absolute timestamp
9195
duration, err := rescommon.ParseTimestampOrHumanReadableDuration(value)
9296
if err != nil {
@@ -129,3 +133,15 @@ func getValueFromJSONPath(obj map[string]any, jp *jsonpath.JSONPath) (string, bo
129133
}
130134
return "", false
131135
}
136+
137+
func isUnixMilli(s string) bool {
138+
if len(s) != 13 {
139+
return false
140+
}
141+
for _, c := range s {
142+
if c < '0' || c > '9' {
143+
return false
144+
}
145+
}
146+
return true
147+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package dates
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
rescommon "github.com/rancher/steve/pkg/resources/common"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
13+
"k8s.io/apimachinery/pkg/runtime/schema"
14+
)
15+
16+
func TestTransformIdempotency(t *testing.T) {
17+
mockTime := func() time.Time { return time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) }
18+
Now = mockTime
19+
20+
tests := []struct {
21+
name string
22+
converter Converter
23+
input *unstructured.Unstructured
24+
wantField any
25+
}{
26+
{
27+
name: "built-in duration field is stable after two transforms",
28+
converter: Converter{
29+
GVK: schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"},
30+
Columns: []rescommon.ColumnDefinition{
31+
{
32+
TableColumnDefinition: v1.TableColumnDefinition{Name: "Age"},
33+
Field: "$.metadata.fields[0]",
34+
},
35+
},
36+
},
37+
input: &unstructured.Unstructured{
38+
Object: map[string]any{
39+
"apiVersion": "apps/v1",
40+
"kind": "Deployment",
41+
"metadata": map[string]any{
42+
"fields": []any{"1d"},
43+
},
44+
},
45+
},
46+
wantField: fmt.Sprintf("%d", mockTime().Add(-24*time.Hour).UnixMilli()),
47+
},
48+
{
49+
name: "CRD duration field is stable after two transforms",
50+
converter: Converter{
51+
GVK: schema.GroupVersionKind{Group: "test.cattle.io", Version: "v1", Kind: "TestResource"},
52+
Columns: []rescommon.ColumnDefinition{
53+
{
54+
TableColumnDefinition: v1.TableColumnDefinition{Name: "Age", Type: "date"},
55+
Field: "$.metadata.fields[0]",
56+
},
57+
},
58+
IsCRD: true,
59+
},
60+
input: &unstructured.Unstructured{
61+
Object: map[string]any{
62+
"apiVersion": "test.cattle.io/v1",
63+
"kind": "TestResource",
64+
"metadata": map[string]any{
65+
"fields": []any{"5m"},
66+
},
67+
},
68+
},
69+
wantField: fmt.Sprintf("%d", mockTime().Add(-5*time.Minute).UnixMilli()),
70+
},
71+
}
72+
73+
for _, tt := range tests {
74+
t.Run(tt.name, func(t *testing.T) {
75+
obj := tt.input
76+
77+
first, err := tt.converter.Transform(obj)
78+
require.NoError(t, err)
79+
80+
fields1, ok, err := unstructured.NestedSlice(first.Object, "metadata", "fields")
81+
require.NoError(t, err)
82+
require.True(t, ok)
83+
require.Equal(t, tt.wantField, fields1[0])
84+
85+
second, err := tt.converter.Transform(first)
86+
require.NoError(t, err)
87+
88+
fields2, ok, err := unstructured.NestedSlice(second.Object, "metadata", "fields")
89+
require.NoError(t, err)
90+
require.True(t, ok)
91+
require.Equal(t, fields1[0], fields2[0], "second transform should produce the same result as the first")
92+
})
93+
}
94+
}
95+
96+
func TestIsUnixMilli(t *testing.T) {
97+
tests := []struct {
98+
name string
99+
input string
100+
want bool
101+
}{
102+
{
103+
name: "valid unix milli timestamp",
104+
input: "1714567890123",
105+
want: true,
106+
},
107+
{
108+
name: "too short",
109+
input: "171456789012",
110+
want: false,
111+
},
112+
{
113+
name: "too long",
114+
input: "17145678901234",
115+
want: false,
116+
},
117+
{
118+
name: "contains non-digit character",
119+
input: "171456789012a",
120+
want: false,
121+
},
122+
{
123+
name: "empty string",
124+
input: "",
125+
want: false,
126+
},
127+
{
128+
name: "duration string",
129+
input: "5m",
130+
want: false,
131+
},
132+
{
133+
name: "RFC3339 timestamp",
134+
input: "2024-05-01T12:00:00Z",
135+
want: false,
136+
},
137+
{
138+
name: "negative number 13 chars",
139+
input: "-171456789012",
140+
want: false,
141+
},
142+
}
143+
144+
for _, tt := range tests {
145+
t.Run(tt.name, func(t *testing.T) {
146+
assert.Equal(t, tt.want, isUnixMilli(tt.input))
147+
})
148+
}
149+
}

pkg/sqlcache/informer/informer.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields m
8787
if !watchable {
8888
watchFunc = func(options metav1.ListOptions) (watch.Interface, error) {
8989
ctx, cancel := context.WithCancel(ctx)
90-
return newSyntheticWatcher(ctx, cancel).watch(client, options, defaultRefreshTime)
90+
return newSyntheticWatcher(ctx, cancel, gvk).watch(client, options, defaultRefreshTime)
9191
}
9292
}
9393
listWatcher := &cache.ListWatch{
@@ -130,11 +130,7 @@ func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields m
130130

131131
// In non-test mode `newInformer` is cache.NewSharedIndexInformer
132132
// defined in k8s.io/client-go/tools/cache/shared_informer.go : func NewSharedIndexInformer(lw ...
133-
134-
// We disable the WatchList feature here for two reasons:
135-
// 1. The WatchList feature makes our virtual function run twice for the initial objects, causing issues
136-
// 2. THe synthetic watcher doesn't support it
137-
sii := newInformer(&noWatchListListWatch{ListWatch: listWatcher}, example, resyncPeriod, cache.Indexers{})
133+
sii := newInformer(listWatcher, example, resyncPeriod, cache.Indexers{})
138134
if transform != nil {
139135
if err := sii.SetTransform(transform); err != nil {
140136
return nil, err
@@ -204,12 +200,3 @@ func SetSyntheticWatchableInterval(interval time.Duration) {
204200
func informerNameFromGVK(gvk schema.GroupVersionKind) string {
205201
return gvk.Group + "_" + gvk.Version + "_" + gvk.Kind
206202
}
207-
208-
// noWatchListListWatch disables WatchList feature
209-
type noWatchListListWatch struct {
210-
*cache.ListWatch
211-
}
212-
213-
func (n *noWatchListListWatch) IsWatchListSemanticsUnSupported() bool {
214-
return true
215-
}

pkg/sqlcache/informer/synthetic_watcher.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/sirupsen/logrus"
1010
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1111
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
12+
"k8s.io/apimachinery/pkg/runtime/schema"
1213
"k8s.io/apimachinery/pkg/watch"
1314
"k8s.io/client-go/dynamic"
1415
)
@@ -20,15 +21,17 @@ type SyntheticWatcher struct {
2021
stopChanLock sync.Mutex
2122
context context.Context
2223
cancelFunc context.CancelFunc
24+
gvk schema.GroupVersionKind
2325
}
2426

25-
func newSyntheticWatcher(context context.Context, cancel context.CancelFunc) *SyntheticWatcher {
27+
func newSyntheticWatcher(context context.Context, cancel context.CancelFunc, gvk schema.GroupVersionKind) *SyntheticWatcher {
2628
return &SyntheticWatcher{
2729
stopChan: make(chan struct{}),
2830
doneChan: make(chan struct{}),
2931
resultChan: make(chan watch.Event, 0),
3032
context: context,
3133
cancelFunc: cancel,
34+
gvk: gvk,
3235
}
3336
}
3437

@@ -51,9 +54,16 @@ func (rw *SyntheticWatcher) receive(client dynamic.ResourceInterface, options me
5154
previousState := make(map[string]objectHolder)
5255
ticker := time.NewTicker(interval)
5356

57+
initialSyncSent := false
58+
5459
for {
5560
select {
5661
case <-ticker.C:
62+
// Clear Watch-specific fields set by the WatchList flow since
63+
// the synthetic watcher is only used for non watchable resources.
64+
options.SendInitialEvents = nil
65+
options.ResourceVersionMatch = ""
66+
options.AllowWatchBookmarks = false
5767
list, err := client.List(rw.context, options)
5868
if err != nil {
5969
logrus.Errorf("synthetic watcher: client.List => error: %s", err)
@@ -103,6 +113,12 @@ func (rw *SyntheticWatcher) receive(client dynamic.ResourceInterface, options me
103113
}
104114
previousState = currentState
105115

116+
if !initialSyncSent {
117+
sendInitialSyncBookmark(list.GetResourceVersion(), rw.gvk, rw.resultChan)
118+
initialSyncSent = true
119+
logrus.Debugf("synthetic watcher: sent initial events end bookmark for %v", rw.gvk)
120+
}
121+
106122
case <-rw.stopChan:
107123
rw.cancelFunc()
108124
return
@@ -114,6 +130,23 @@ func (rw *SyntheticWatcher) receive(client dynamic.ResourceInterface, options me
114130
}()
115131
}
116132

133+
// sendInitialSyncBookmark constructs and sends a synthetic watch.Bookmark event.
134+
// This satisfies the client-go Reflector's requirement for the KEP-3157 WatchList protocol,
135+
// signaling that the initial stream of events has finished and stopping the timeout ticker.
136+
func sendInitialSyncBookmark(resourceVersion string, gvk schema.GroupVersionKind, resultChan chan watch.Event) {
137+
bookmarkObj := &unstructured.Unstructured{}
138+
bookmarkObj.SetAnnotations(map[string]string{
139+
metav1.InitialEventsAnnotationKey: "true",
140+
})
141+
bookmarkObj.SetResourceVersion(resourceVersion)
142+
bookmarkObj.SetGroupVersionKind(gvk)
143+
144+
resultChan <- watch.Event{
145+
Type: watch.Bookmark,
146+
Object: bookmarkObj,
147+
}
148+
}
149+
117150
func createWatchEvent(event watch.EventType, u *unstructured.Unstructured) (watch.Event, error) {
118151
return watch.Event{Type: event, Object: u}, nil
119152
}

pkg/sqlcache/informer/synthetic_watcher_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1818
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1919
"k8s.io/apimachinery/pkg/runtime"
20+
"k8s.io/apimachinery/pkg/runtime/schema"
2021
"k8s.io/apimachinery/pkg/watch"
2122
)
2223

@@ -84,7 +85,7 @@ func TestSyntheticWatcher(t *testing.T) {
8485
dynamicClient.EXPECT().List(gomock.Any(), gomock.Any()).AnyTimes().Return(list2, nil)
8586

8687
ctx, cancel := context.WithCancel(context.Background())
87-
sw := newSyntheticWatcher(ctx, cancel)
88+
sw := newSyntheticWatcher(ctx, cancel, schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ComponentStatus"})
8889
pollingInterval := 10 * time.Millisecond
8990
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
9091
return sw.watch(dynamicClient, options, pollingInterval)
@@ -108,20 +109,21 @@ func TestSyntheticWatcher(t *testing.T) {
108109
}()
109110
wg.Wait()
110111
// Verify we get what we expected to see
111-
assert.Len(t, results, 8)
112+
assert.Len(t, results, 9)
112113
for i, _ := range list.Items {
113114
assert.Equal(t, "added-result", results[i].eventName)
114115
}
115-
assert.Equal(t, "modified-result", results[len(list.Items)].eventName)
116+
assert.Equal(t, "bookmark-result", results[len(list.Items)].eventName)
116117
assert.Equal(t, "modified-result", results[len(list.Items)+1].eventName)
117-
assert.Equal(t, "deleted-result", results[len(list.Items)+2].eventName)
118-
assert.Equal(t, "stop", results[7].eventName)
118+
assert.Equal(t, "modified-result", results[len(list.Items)+2].eventName)
119+
assert.Equal(t, "deleted-result", results[len(list.Items)+3].eventName)
120+
assert.Equal(t, "stop", results[8].eventName)
119121
// We can't really assert that the events get the correct timestamps on them
120122
// because they can be held up in the channel for unexpected durations. I did have
121123
// assert.Greater(t, float64(timeDelta), 0.9*float64(pollingInterval))
122124
// but saw a failure -- the interval was actually 0.75 * pollingInterval.
123125
// So there's no point testing that.
124-
assert.Greater(t, results[4].createdAt, results[0].createdAt)
126+
assert.Greater(t, results[5].createdAt, results[0].createdAt)
125127
}
126128

127129
func TestIsUpdateObjectScenarios(t *testing.T) {

0 commit comments

Comments
 (0)