Skip to content

Commit 7811b49

Browse files
committed
Added DynamicRESTMapper
1 parent 9ea768b commit 7811b49

File tree

3 files changed

+401
-0
lines changed

3 files changed

+401
-0
lines changed
+142
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
Copyright 2025 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dynamicrestmapper
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
23+
"k8s.io/apimachinery/pkg/api/meta"
24+
"k8s.io/apimachinery/pkg/runtime/schema"
25+
)
26+
27+
type DynamicRESTMapper struct {
28+
defaultGroupVersions []schema.GroupVersion
29+
30+
resourceToKind map[schema.GroupVersionResource]schema.GroupVersionKind
31+
kindToPluralResource map[schema.GroupVersionKind]schema.GroupVersionResource
32+
kindToScope map[schema.GroupVersionKind]meta.RESTScope
33+
singularToPlural map[schema.GroupVersionResource]schema.GroupVersionResource
34+
pluralToSingular map[schema.GroupVersionResource]schema.GroupVersionResource
35+
36+
// Protects all DynamicRESTMapper's mappings.
37+
mapperLock sync.RWMutex
38+
}
39+
40+
var _ meta.RESTMapper = &DynamicRESTMapper{}
41+
42+
func NewDynamicRESTMapper(defaultGroupVersions []schema.GroupVersion) *DynamicRESTMapper {
43+
resourceToKind := make(map[schema.GroupVersionResource]schema.GroupVersionKind)
44+
kindToPluralResource := make(map[schema.GroupVersionKind]schema.GroupVersionResource)
45+
kindToScope := make(map[schema.GroupVersionKind]meta.RESTScope)
46+
singularToPlural := make(map[schema.GroupVersionResource]schema.GroupVersionResource)
47+
pluralToSingular := make(map[schema.GroupVersionResource]schema.GroupVersionResource)
48+
// TODO: verify name mappings work correctly when versions differ
49+
50+
return &DynamicRESTMapper{
51+
resourceToKind: resourceToKind,
52+
kindToPluralResource: kindToPluralResource,
53+
kindToScope: kindToScope,
54+
defaultGroupVersions: defaultGroupVersions,
55+
singularToPlural: singularToPlural,
56+
pluralToSingular: pluralToSingular,
57+
}
58+
}
59+
60+
func (m *DynamicRESTMapper) String() string {
61+
if m == nil {
62+
return "<nil>"
63+
}
64+
return fmt.Sprintf("DynamicRESTMapper{kindToPluralResource=%v}", m.kindToPluralResource)
65+
}
66+
67+
func (m *DynamicRESTMapper) add(typeMeta gvkr, scope meta.RESTScope) {
68+
var (
69+
kind = typeMeta.groupVersionKind()
70+
singular schema.GroupVersionResource
71+
plural schema.GroupVersionResource
72+
)
73+
74+
if typeMeta.ResourceSingular == "" || typeMeta.ResourcePlural == "" {
75+
singular, singular = meta.UnsafeGuessKindToResource(kind)
76+
} else {
77+
singular = typeMeta.groupVersionResourceSingular()
78+
plural = typeMeta.groupVersionResourcePlural()
79+
}
80+
81+
m.mapperLock.Lock()
82+
defer m.mapperLock.Unlock()
83+
84+
m.singularToPlural[singular] = plural
85+
m.pluralToSingular[plural] = singular
86+
87+
m.resourceToKind[singular] = kind
88+
m.resourceToKind[plural] = kind
89+
90+
m.kindToPluralResource[kind] = plural
91+
m.kindToScope[kind] = scope
92+
}
93+
94+
func (m *DynamicRESTMapper) remove(gvk schema.GroupVersionKind) {
95+
m.mapperLock.Lock()
96+
defer m.mapperLock.Unlock()
97+
98+
singular := m.kindToPluralResource[gvk]
99+
plural := m.singularToPlural[singular]
100+
101+
delete(m.kindToPluralResource, gvk)
102+
delete(m.kindToScope, gvk)
103+
delete(m.singularToPlural, singular)
104+
delete(m.pluralToSingular, plural)
105+
delete(m.resourceToKind, plural)
106+
}
107+
108+
// KindFor takes a partial resource and returns the single match. Returns an error if there are multiple matches
109+
func (m *DynamicRESTMapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) {
110+
return schema.GroupVersionKind{}, nil
111+
}
112+
113+
// KindsFor takes a partial resource and returns the list of potential kinds in priority order
114+
func (m *DynamicRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
115+
return nil, nil
116+
}
117+
118+
// ResourceFor takes a partial resource and returns the single match. Returns an error if there are multiple matches
119+
func (m *DynamicRESTMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) {
120+
return schema.GroupVersionResource{}, nil
121+
}
122+
123+
// ResourcesFor takes a partial resource and returns the list of potential resource in priority order
124+
func (m *DynamicRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
125+
return nil, nil
126+
}
127+
128+
// RESTMapping identifies a preferred resource mapping for the provided group kind.
129+
func (m *DynamicRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) {
130+
return nil, nil
131+
}
132+
133+
// RESTMappings returns all resource mappings for the provided group kind if no
134+
// version search is provided. Otherwise identifies a preferred resource mapping for
135+
// the provided version(s).
136+
func (m *DynamicRESTMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) {
137+
return nil, nil
138+
}
139+
140+
func (m *DynamicRESTMapper) ResourceSingularizer(resource string) (singular string, err error) {
141+
return "", nil
142+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
Copyright 2025 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dynamicrestmapper
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"fmt"
23+
"sync"
24+
"time"
25+
26+
kcpapiextensionsclientset "github.com/kcp-dev/client-go/apiextensions/client"
27+
kcpapiextensionsv1informers "github.com/kcp-dev/client-go/apiextensions/informers/apiextensions/v1"
28+
29+
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
30+
31+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
32+
"k8s.io/apimachinery/pkg/api/meta"
33+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
34+
"k8s.io/apimachinery/pkg/util/wait"
35+
"k8s.io/client-go/tools/cache"
36+
"k8s.io/client-go/util/workqueue"
37+
"k8s.io/klog/v2"
38+
39+
"github.com/kcp-dev/kcp/pkg/logging"
40+
)
41+
42+
const (
43+
ControllerName = "kcp-dynamicrestmapper"
44+
)
45+
46+
func NewController(
47+
ctx context.Context,
48+
state *DynamicRESTMapper,
49+
crdClusterClient *kcpapiextensionsclientset.ClusterClientset,
50+
crdInformer kcpapiextensionsv1informers.CustomResourceDefinitionClusterInformer,
51+
) (*Controller, error) {
52+
queue := workqueue.NewTypedRateLimitingQueueWithConfig(
53+
workqueue.DefaultTypedControllerRateLimiter[string](),
54+
workqueue.TypedRateLimitingQueueConfig[string]{
55+
Name: "controllerName",
56+
},
57+
)
58+
59+
c := &Controller{
60+
queue: queue,
61+
62+
crdInformer: crdInformer,
63+
64+
state: state,
65+
}
66+
67+
_, _ = crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
68+
AddFunc: func(obj interface{}) {
69+
c.enqueueCRD(obj.(*apiextensionsv1.CustomResourceDefinition), false)
70+
},
71+
DeleteFunc: func(obj interface{}) {
72+
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
73+
obj = tombstone.Obj
74+
}
75+
c.enqueueCRD(obj.(*apiextensionsv1.CustomResourceDefinition), true)
76+
},
77+
})
78+
79+
return c, nil
80+
}
81+
82+
// Controller watches Shards on the root shard, and then starts informers
83+
// for every Shard, watching the Workspaces on them. It then
84+
// updates the workspace index, which maps logical clusters to shard URLs.
85+
type Controller struct {
86+
queue workqueue.TypedRateLimitingInterface[string]
87+
88+
crdInformer kcpapiextensionsv1informers.CustomResourceDefinitionClusterInformer
89+
90+
lock sync.RWMutex
91+
92+
state *DynamicRESTMapper
93+
}
94+
95+
type gvkrKey struct {
96+
Gvkr gvkr
97+
Key string
98+
Deleted bool
99+
}
100+
101+
func encodeGvkrKey(k gvkrKey) (string, error) {
102+
bs, err := json.Marshal(k)
103+
return string(bs), err
104+
}
105+
106+
func decodeGvkrKey(k string) (gvkrKey, error) {
107+
var decoded gvkrKey
108+
err := json.Unmarshal([]byte(k), &decoded)
109+
return decoded, err
110+
}
111+
112+
func (c *Controller) enqueueCRD(crd *apiextensionsv1.CustomResourceDefinition, deleted bool) {
113+
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(crd)
114+
if err != nil {
115+
utilruntime.HandleError(err)
116+
return
117+
}
118+
119+
for _, crdVersion := range crd.Spec.Versions {
120+
encodedGvkKey, err := encodeGvkrKey(
121+
gvkrKey{
122+
Gvkr: gvkr{
123+
Group: crd.Spec.Group,
124+
Version: crdVersion.Name,
125+
Kind: crd.Spec.Names.Kind,
126+
ResourcePlural: crd.Spec.Names.Plural,
127+
ResourceSingular: crd.Spec.Names.Singular,
128+
},
129+
Key: key,
130+
Deleted: deleted,
131+
},
132+
)
133+
if err != nil {
134+
utilruntime.HandleError(fmt.Errorf("%q controller failed encode CRD object with key %q, err: %w", ControllerName, key, err))
135+
}
136+
logging.WithQueueKey(logging.WithReconciler(klog.Background(), ControllerName), encodedGvkKey).V(4).Info("queueing CRD")
137+
c.queue.Add(encodedGvkKey)
138+
}
139+
}
140+
141+
func (c *Controller) Start(ctx context.Context, numThreads int) {
142+
defer utilruntime.HandleCrash()
143+
defer c.queue.ShutDown()
144+
145+
logger := logging.WithReconciler(klog.FromContext(ctx), ControllerName)
146+
ctx = klog.NewContext(ctx, logger)
147+
logger.Info("Starting controller")
148+
defer logger.Info("Shutting down controller")
149+
150+
for i := 0; i < numThreads; i++ {
151+
go wait.UntilWithContext(ctx, c.startWorker, time.Second)
152+
}
153+
154+
<-ctx.Done()
155+
}
156+
157+
func (c *Controller) startWorker(ctx context.Context) {
158+
for c.processNextWorkItem(ctx) {
159+
}
160+
}
161+
162+
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
163+
// Wait until there is a new item in the working queue
164+
k, quit := c.queue.Get()
165+
if quit {
166+
return false
167+
}
168+
key := k
169+
170+
logger := logging.WithQueueKey(klog.FromContext(ctx), key)
171+
ctx = klog.NewContext(ctx, logger)
172+
logger.V(4).Info("processing key")
173+
174+
// No matter what, tell the queue we're done with this key, to unblock
175+
// other workers.
176+
defer c.queue.Done(key)
177+
178+
if err := c.process(ctx, key); err != nil {
179+
utilruntime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", ControllerName, key, err))
180+
c.queue.AddRateLimited(key)
181+
return true
182+
}
183+
c.queue.Forget(key)
184+
return true
185+
}
186+
187+
func (c *Controller) process(ctx context.Context, key string) error {
188+
gvkrKey, err := decodeGvkrKey(key)
189+
if err != nil {
190+
return err
191+
}
192+
193+
logger := logging.WithQueueKey(klog.FromContext(ctx), key)
194+
ctx = klog.NewContext(ctx, logger)
195+
196+
if !gvkrKey.Deleted {
197+
logger.V(4).Info("adding mapping", "kind", gvkrKey.Gvkr.groupVersionKind())
198+
c.state.add(gvkrKey.Gvkr, meta.RESTScopeRoot)
199+
} else {
200+
logger.V(4).Info("removing mapping", "kind", gvkrKey.Gvkr.groupVersionKind())
201+
c.state.remove(gvkrKey.Gvkr.groupVersionKind())
202+
}
203+
204+
return nil
205+
}

0 commit comments

Comments
 (0)