Skip to content

Commit 5bec433

Browse files
CARRY: Add watcher to controller and webhook
1 parent fdafd7a commit 5bec433

File tree

3 files changed

+139
-19
lines changed

3 files changed

+139
-19
lines changed

config/components/rbac/role.yaml

+8
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,14 @@ rules:
9292
- list
9393
- update
9494
- watch
95+
- apiGroups:
96+
- apiextensions.k8s.io
97+
resources:
98+
- customresourcedefinitions
99+
verbs:
100+
- get
101+
- list
102+
- watch
95103
- apiGroups:
96104
- autoscaling.x-k8s.io
97105
resources:

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ require (
1515
github.com/prometheus/client_model v0.5.0
1616
github.com/ray-project/kuberay/ray-operator v1.1.0-alpha.0
1717
go.uber.org/zap v1.26.0
18+
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
1819
k8s.io/api v0.29.2
20+
k8s.io/apiextensions-apiserver v0.29.0
1921
k8s.io/apimachinery v0.29.2
2022
k8s.io/apiserver v0.29.1
2123
k8s.io/autoscaler/cluster-autoscaler/apis v0.0.0-20240325113845-0130d33747bb
@@ -100,7 +102,6 @@ require (
100102
go.uber.org/atomic v1.11.0 // indirect
101103
go.uber.org/multierr v1.11.0 // indirect
102104
golang.org/x/crypto v0.18.0 // indirect
103-
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
104105
golang.org/x/mod v0.14.0 // indirect
105106
golang.org/x/net v0.20.0 // indirect
106107
golang.org/x/oauth2 v0.12.0 // indirect
@@ -121,7 +122,6 @@ require (
121122
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
122123
gopkg.in/yaml.v2 v2.4.0 // indirect
123124
gopkg.in/yaml.v3 v3.0.1 // indirect
124-
k8s.io/apiextensions-apiserver v0.29.0 // indirect
125125
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect
126126
k8s.io/kms v0.29.1 // indirect
127127
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 // indirect

pkg/controller/jobframework/setup.go

+129-17
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,38 @@ import (
2222
"fmt"
2323
"os"
2424

25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/runtime/schema"
29+
"k8s.io/client-go/tools/cache"
30+
2531
"github.com/go-logr/logr"
32+
"golang.org/x/exp/slices"
2633
"k8s.io/apimachinery/pkg/api/meta"
2734
ctrl "sigs.k8s.io/controller-runtime"
2835
"sigs.k8s.io/controller-runtime/pkg/client"
2936
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3037

38+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
39+
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
40+
"k8s.io/apimachinery/pkg/watch"
41+
retrywatch "k8s.io/client-go/tools/watch"
42+
3143
"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
3244
)
3345

46+
const (
47+
pytorchjobAPI = "pytorchjobs.kubeflow.org"
48+
rayclusterAPI = "rayclusters.ray.io"
49+
)
50+
3451
var (
3552
errFailedMappingResource = errors.New("restMapper failed mapping resource")
3653
)
3754

55+
// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch
56+
3857
// SetupControllers setups all controllers and webhooks for integrations.
3958
// When the platform developers implement a separate kueue-manager to manage the in-house custom jobs,
4059
// they can easily setup controllers and webhooks for the in-house custom jobs.
@@ -62,24 +81,14 @@ func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
6281
if err != nil {
6382
return fmt.Errorf("%s: %w: %w", fwkNamePrefix, errFailedMappingResource, err)
6483
}
65-
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
66-
if !meta.IsNoMatchError(err) {
67-
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
68-
}
69-
logger.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
84+
if !isAPIAvailable(context.TODO(), mgr, rayclusterAPI) {
85+
logger.Info("API not available, waiting for it to become available... - Skipping setup of controller and webhook")
86+
waitForAPI(context.TODO(), logger, mgr, rayclusterAPI, func() {
87+
setupComponents(mgr, logger, gvk, fwkNamePrefix, cb, opts...)
88+
})
7089
} else {
71-
if err = cb.NewReconciler(
72-
mgr.GetClient(),
73-
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, options.ManagerName)),
74-
opts...,
75-
).SetupWithManager(mgr); err != nil {
76-
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
77-
}
78-
if err = cb.SetupWebhook(mgr, opts...); err != nil {
79-
return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err)
80-
}
81-
logger.Info("Set up controller and webhook for job framework")
82-
return nil
90+
logger.Info("API is available, setting up components...")
91+
setupComponents(mgr, logger, gvk, fwkNamePrefix, cb, opts...)
8392
}
8493
}
8594
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
@@ -89,6 +98,39 @@ func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
8998
})
9099
}
91100

101+
func setupComponents(mgr ctrl.Manager, log logr.Logger, gvk schema.GroupVersionKind, fwkNamePrefix string, cb IntegrationCallbacks, opts ...Option) {
102+
// Attempt to get the REST mapping for the GVK
103+
if _, err := mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
104+
if !meta.IsNoMatchError(err) {
105+
log.Error(err, fmt.Sprintf("%s: unable to get REST mapping", fwkNamePrefix))
106+
return
107+
}
108+
log.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
109+
} else {
110+
if err := setupControllerAndWebhook(mgr, gvk, fwkNamePrefix, cb, opts...); err != nil {
111+
log.Error(err, "Failed to set up controller and webhook")
112+
} else {
113+
log.Info("Set up controller and webhook for job framework")
114+
}
115+
}
116+
}
117+
118+
func setupControllerAndWebhook(mgr ctrl.Manager, gvk schema.GroupVersionKind, fwkNamePrefix string, cb IntegrationCallbacks, opts ...Option) error {
119+
if err := cb.NewReconciler(
120+
mgr.GetClient(),
121+
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", gvk.Kind, "managerName")), // Ensure managerName is defined or fetched
122+
opts...,
123+
).SetupWithManager(mgr); err != nil {
124+
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
125+
}
126+
127+
if err := cb.SetupWebhook(mgr, opts...); err != nil {
128+
return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err)
129+
}
130+
131+
return nil
132+
}
133+
92134
// SetupIndexes setups the indexers for integrations.
93135
// When the platform developers implement a separate kueue-manager to manage the in-house custom jobs,
94136
// they can easily setup indexers for the in-house custom jobs.
@@ -105,3 +147,73 @@ func SetupIndexes(ctx context.Context, indexer client.FieldIndexer, opts ...Opti
105147
return nil
106148
})
107149
}
150+
151+
func isAPIAvailable(ctx context.Context, mgr ctrl.Manager, apiName string) bool {
152+
crdClient, err := apiextensionsclientset.NewForConfig(mgr.GetConfig())
153+
exitOnError(err, "unable to create CRD client")
154+
155+
crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
156+
exitOnError(err, "unable to list CRDs")
157+
158+
return slices.ContainsFunc(crdList.Items, func(crd apiextensionsv1.CustomResourceDefinition) bool {
159+
return crd.Name == apiName
160+
})
161+
}
162+
163+
func waitForAPI(ctx context.Context, log logr.Logger, mgr ctrl.Manager, apiName string, action func()) {
164+
crdClient, err := apiextensionsclientset.NewForConfig(mgr.GetConfig())
165+
exitOnError(err, "unable to create CRD client")
166+
167+
crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
168+
exitOnError(err, "unable to list CRDs")
169+
170+
// If API is already available, just invoke action
171+
if slices.ContainsFunc(crdList.Items, func(crd apiextensionsv1.CustomResourceDefinition) bool {
172+
return crd.Name == apiName
173+
}) {
174+
action()
175+
return
176+
}
177+
178+
// Wait for the API to become available then invoke action
179+
log.Info(fmt.Sprintf("API %v not available, setting up retry watcher", apiName))
180+
retryWatcher, err := retrywatch.NewRetryWatcher(crdList.ResourceVersion, &cache.ListWatch{
181+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
182+
return crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
183+
},
184+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
185+
return crdClient.ApiextensionsV1().CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{})
186+
},
187+
})
188+
exitOnError(err, "unable to create retry watcher")
189+
190+
defer retryWatcher.Stop()
191+
for {
192+
select {
193+
case <-ctx.Done():
194+
return
195+
case event := <-retryWatcher.ResultChan():
196+
switch event.Type {
197+
case watch.Error:
198+
exitOnError(apierrors.FromObject(event.Object), fmt.Sprintf("error watching for API %v", apiName))
199+
200+
case watch.Added, watch.Modified:
201+
if crd := event.Object.(*apiextensionsv1.CustomResourceDefinition); crd.Name == apiName &&
202+
slices.ContainsFunc(crd.Status.Conditions, func(condition apiextensionsv1.CustomResourceDefinitionCondition) bool {
203+
return condition.Type == apiextensionsv1.Established && condition.Status == apiextensionsv1.ConditionTrue
204+
}) {
205+
log.Info(fmt.Sprintf("API %v installed, invoking deferred action", apiName))
206+
action()
207+
return
208+
}
209+
}
210+
}
211+
}
212+
}
213+
214+
func exitOnError(err error, msg string) {
215+
if err != nil {
216+
fmt.Sprint(err, msg)
217+
os.Exit(1)
218+
}
219+
}

0 commit comments

Comments
 (0)