@@ -20,14 +20,8 @@ import (
20
20
"context"
21
21
"encoding/json"
22
22
"fmt"
23
- "sync"
24
23
"time"
25
24
26
- kcpapiextensionsclientset "github.com/kcp-dev/client-go/apiextensions/client"
27
- kcpapiextensionsv1informers "github.com/kcp-dev/client-go/apiextensions/informers/apiextensions/v1"
28
-
29
- kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
30
-
31
25
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
32
26
"k8s.io/apimachinery/pkg/api/meta"
33
27
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -36,7 +30,15 @@ import (
36
30
"k8s.io/client-go/util/workqueue"
37
31
"k8s.io/klog/v2"
38
32
33
+ kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
34
+ kcpapiextensionsv1informers "github.com/kcp-dev/client-go/apiextensions/informers/apiextensions/v1"
35
+ "github.com/kcp-dev/logicalcluster/v3"
36
+
37
+ "github.com/kcp-dev/kcp/pkg/indexers"
38
+ "github.com/kcp-dev/kcp/pkg/informer"
39
39
"github.com/kcp-dev/kcp/pkg/logging"
40
+ apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
41
+ apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1"
40
42
)
41
43
42
44
const (
@@ -46,8 +48,12 @@ const (
46
48
func NewController (
47
49
ctx context.Context ,
48
50
state * DynamicRESTMapper ,
49
- crdClusterClient * kcpapiextensionsclientset.ClusterClientset ,
50
51
crdInformer kcpapiextensionsv1informers.CustomResourceDefinitionClusterInformer ,
52
+ apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer ,
53
+ apiExportInformer apisv1alpha1informers.APIExportClusterInformer ,
54
+ apiResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer ,
55
+ globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer ,
56
+ globalAPIResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer ,
51
57
) (* Controller , error ) {
52
58
queue := workqueue .NewTypedRateLimitingQueueWithConfig (
53
59
workqueue .DefaultTypedControllerRateLimiter [string ](),
@@ -58,10 +64,19 @@ func NewController(
58
64
59
65
c := & Controller {
60
66
queue : queue ,
67
+ state : state ,
61
68
62
- crdInformer : crdInformer ,
69
+ getAPIExportByPath : func (path logicalcluster.Path , name string ) (* apisv1alpha1.APIExport , error ) {
70
+ return indexers .ByPathAndNameWithFallback [* apisv1alpha1.APIExport ](
71
+ apisv1alpha1 .Resource ("apiexports" ),
72
+ apiExportInformer .Informer ().GetIndexer (),
73
+ globalAPIExportInformer .Informer ().GetIndexer (),
74
+ path ,
75
+ name ,
76
+ )
77
+ },
63
78
64
- state : state ,
79
+ getAPIResourceSchema : informer . NewScopedGetterWithFallback ( apiResourceSchemaInformer . Lister (), globalAPIResourceSchemaInformer . Lister ()) ,
65
80
}
66
81
67
82
_ , _ = crdInformer .Informer ().AddEventHandler (cache.ResourceEventHandlerFuncs {
@@ -76,6 +91,18 @@ func NewController(
76
91
},
77
92
})
78
93
94
+ _ , _ = apiBindingInformer .Informer ().AddEventHandler (cache.ResourceEventHandlerFuncs {
95
+ AddFunc : func (obj interface {}) {
96
+ c .enqueueAPIBinding (obj .(* apisv1alpha1.APIBinding ), false )
97
+ },
98
+ DeleteFunc : func (obj interface {}) {
99
+ if tombstone , ok := obj .(cache.DeletedFinalStateUnknown ); ok {
100
+ obj = tombstone .Obj
101
+ }
102
+ c .enqueueAPIBinding (obj .(* apisv1alpha1.APIBinding ), true )
103
+ },
104
+ })
105
+
79
106
return c , nil
80
107
}
81
108
@@ -84,12 +111,11 @@ func NewController(
84
111
// updates the workspace index, which maps logical clusters to shard URLs.
85
112
type Controller struct {
86
113
queue workqueue.TypedRateLimitingInterface [string ]
114
+ state * DynamicRESTMapper
87
115
88
- crdInformer kcpapiextensionsv1informers.CustomResourceDefinitionClusterInformer
89
-
90
- lock sync.RWMutex
116
+ getAPIExportByPath func (path logicalcluster.Path , name string ) (* apisv1alpha1.APIExport , error )
91
117
92
- state * DynamicRESTMapper
118
+ getAPIResourceSchema func ( clusterName logicalcluster. Name , name string ) ( * apisv1alpha1. APIResourceSchema , error )
93
119
}
94
120
95
121
type gvkrKey struct {
@@ -138,6 +164,56 @@ func (c *Controller) enqueueCRD(crd *apiextensionsv1.CustomResourceDefinition, d
138
164
}
139
165
}
140
166
167
+ func (c * Controller ) enqueueAPIBinding (apiBinding * apisv1alpha1.APIBinding , deleted bool ) {
168
+ key , err := kcpcache .DeletionHandlingMetaClusterNamespaceKeyFunc (apiBinding )
169
+ if err != nil {
170
+ utilruntime .HandleError (err )
171
+ return
172
+ }
173
+
174
+ apiExportPath := logicalcluster .NewPath (apiBinding .Spec .Reference .Export .Path )
175
+ if apiExportPath .Empty () {
176
+ apiExportPath = logicalcluster .From (apiBinding ).Path ()
177
+ }
178
+
179
+ apiExport , err := c .getAPIExportByPath (apiExportPath , apiBinding .Spec .Reference .Export .Name )
180
+ if err != nil {
181
+ utilruntime .HandleError (err )
182
+ return
183
+ }
184
+
185
+ for _ , schemaName := range apiExport .Spec .LatestResourceSchemas {
186
+ sch , err := c .getAPIResourceSchema (logicalcluster .From (apiExport ), schemaName )
187
+ if err != nil {
188
+ fmt .Printf ("\n \n !!! X key=%s,apiExportPath=%s,logicalcluster.From(apiExport)=%s\n \n " , key , apiExportPath , logicalcluster .From (apiExport ))
189
+ utilruntime .HandleError (err )
190
+ return
191
+ }
192
+
193
+ for _ , schVersion := range sch .Spec .Versions {
194
+ encodedGvkKey , err := encodeGvkrKey (
195
+ gvkrKey {
196
+ Gvkr : gvkr {
197
+ Group : sch .Spec .Group ,
198
+ Version : schVersion .Name ,
199
+ Kind : sch .Spec .Names .Kind ,
200
+ ResourcePlural : sch .Spec .Names .Plural ,
201
+ ResourceSingular : sch .Spec .Names .Singular ,
202
+ },
203
+ Key : key ,
204
+ Deleted : deleted ,
205
+ },
206
+ )
207
+ if err != nil {
208
+ utilruntime .HandleError (fmt .Errorf ("%q controller failed encode APIResourceSchema object with key %q, err: %w" , ControllerName , key , err ))
209
+ }
210
+ logging .WithQueueKey (logging .WithReconciler (klog .Background (), ControllerName ), encodedGvkKey ).V (4 ).
211
+ Info ("queueing APIResourceSchema because of APIBinding" )
212
+ c .queue .Add (encodedGvkKey )
213
+ }
214
+ }
215
+ }
216
+
141
217
func (c * Controller ) Start (ctx context.Context , numThreads int ) {
142
218
defer utilruntime .HandleCrash ()
143
219
defer c .queue .ShutDown ()
0 commit comments