Skip to content

Commit eccccf4

Browse files
committed
scaffolding for the gateway api controller
Change-Id: I2bca38e888a281a8a325e8e242ce11b5edebc45d
1 parent b723384 commit eccccf4

File tree

207 files changed

+26997
-11
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

207 files changed

+26997
-11
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
k8s.io/controller-manager v0.32.3
1616
k8s.io/klog/v2 v2.130.1
1717
k8s.io/utils v0.0.0-20241210054802-24370beab758
18+
sigs.k8s.io/gateway-api v1.2.1
1819
sigs.k8s.io/kind v0.27.0
1920
)
2021

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7 h1:hcha5B1kVACrLujCKLbr8X
200200
k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7/go.mod h1:GewRfANuJ70iYzvn+i4lezLDAFzvjxZYK1gn1lWcfas=
201201
k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0=
202202
k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
203+
sigs.k8s.io/gateway-api v1.2.1 h1:fZZ/+RyRb+Y5tGkwxFKuYuSRQHu9dZtbjenblleOLHM=
204+
sigs.k8s.io/gateway-api v1.2.1/go.mod h1:EpNfEXNjiYfUJypf0eZ0P5iXA9ekSGWaS1WgPaM42X0=
203205
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE=
204206
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg=
205207
sigs.k8s.io/kind v0.27.0 h1:PQ3f0iAWNIj66LYkZ1ivhEg/+Zb6UPMbO+qVei/INZA=

pkg/controller/controller.go

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@ import (
2020
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
2121
ccmfeatures "k8s.io/controller-manager/pkg/features"
2222
"k8s.io/klog/v2"
23+
24+
gatewayclient "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
25+
gatewayinformers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions"
26+
2327
cpkconfig "sigs.k8s.io/cloud-provider-kind/pkg/config"
2428
"sigs.k8s.io/cloud-provider-kind/pkg/constants"
2529
"sigs.k8s.io/cloud-provider-kind/pkg/container"
30+
"sigs.k8s.io/cloud-provider-kind/pkg/gateway"
2631
"sigs.k8s.io/cloud-provider-kind/pkg/loadbalancer"
2732
"sigs.k8s.io/cloud-provider-kind/pkg/provider"
2833
"sigs.k8s.io/kind/pkg/cluster"
@@ -39,6 +44,7 @@ type ccm struct {
3944
factory informers.SharedInformerFactory
4045
serviceController *servicecontroller.Controller
4146
nodeController *nodecontroller.CloudNodeController
47+
gatewayController *gateway.Controller
4248
cancelFn context.CancelFunc
4349
}
4450

@@ -74,15 +80,15 @@ func (c *Controller) Run(ctx context.Context) {
7480
continue
7581
}
7682

77-
kubeClient, err := c.getKubeClient(ctx, cluster)
83+
restConfig, err := c.getRestConfig(ctx, cluster)
7884
if err != nil {
7985
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
8086
continue
8187
}
8288

8389
klog.V(2).Infof("Creating new cloud provider for cluster %s", cluster)
8490
cloud := provider.New(cluster, c.kind)
85-
ccm, err := startCloudControllerManager(ctx, cluster, kubeClient, cloud)
91+
ccm, err := startCloudControllerManager(ctx, cluster, restConfig, cloud)
8692
if err != nil {
8793
klog.Errorf("Failed to start cloud controller for cluster %s: %v", cluster, err)
8894
continue
@@ -123,9 +129,9 @@ func (c *Controller) getKubeConfig(cluster string, internal bool) (*rest.Config,
123129
return config, nil
124130
}
125131

126-
// getKubeClient returns a kubeclient for the cluster passed as argument
132+
// getRestConfig returns a valid rest.Config for the cluster passed as argument
127133
// It tries first to connect to the internal endpoint.
128-
func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kubernetes.Interface, error) {
134+
func (c *Controller) getRestConfig(ctx context.Context, cluster string) (*rest.Config, error) {
129135
addresses := []string{}
130136
internalConfig, err := c.getKubeConfig(cluster, true)
131137
if err != nil {
@@ -172,17 +178,12 @@ func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kuberne
172178
return nil, fmt.Errorf("restConfig for host %s not avaliable", host)
173179
}
174180

175-
kubeClient, err := kubernetes.NewForConfig(config)
176-
if err != nil {
177-
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
178-
return kubeClient, err
179-
}
180-
return kubeClient, nil
181+
return config, nil
181182
}
182183

183184
// TODO: implement leader election to not have problems with multiple providers
184185
// ref: https://github.com/kubernetes/kubernetes/blob/d97ea0f705847f90740cac3bc3dd8f6a4026d0b5/cmd/kube-scheduler/app/server.go#L211
185-
func startCloudControllerManager(ctx context.Context, clusterName string, kubeClient kubernetes.Interface, cloud cloudprovider.Interface) (*ccm, error) {
186+
func startCloudControllerManager(ctx context.Context, clusterName string, config *rest.Config, cloud cloudprovider.Interface) (*ccm, error) {
186187
// TODO: we need to set up the ccm specific feature gates
187188
// but try to avoid to expose this to users
188189
featureGates := utilfeature.DefaultMutableFeatureGate
@@ -191,6 +192,12 @@ func startCloudControllerManager(ctx context.Context, clusterName string, kubeCl
191192
return nil, err
192193
}
193194

195+
kubeClient, err := kubernetes.NewForConfig(config)
196+
if err != nil {
197+
klog.Errorf("Failed to create kubeClient for cluster %s: %v", clusterName, err)
198+
return nil, err
199+
}
200+
194201
client := kubeClient.Discovery().RESTClient()
195202
// wait for health
196203
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
@@ -246,6 +253,33 @@ func startCloudControllerManager(ctx context.Context, clusterName string, kubeCl
246253

247254
sharedInformers.Start(ctx.Done())
248255

256+
// Gateway setup
257+
gwClient, err := gatewayclient.NewForConfig(config)
258+
if err != nil {
259+
// This error shouldn't fail. It lives like this as a legacy.
260+
klog.Errorf("Failed to create Gateway API client: %v", err)
261+
cancel()
262+
return nil, err
263+
}
264+
265+
sharedGwInformers := gatewayinformers.NewSharedInformerFactory(gwClient, 60*time.Second)
266+
267+
gatewayController, err := gateway.New(
268+
gwClient,
269+
sharedGwInformers.Gateway().V1().Gateways(),
270+
sharedGwInformers.Gateway().V1().HTTPRoutes(),
271+
sharedGwInformers.Gateway().V1().GRPCRoutes(),
272+
)
273+
if err != nil {
274+
klog.Errorf("Failed to start gateway controller: %v", err)
275+
cancel()
276+
return nil, err
277+
}
278+
279+
go gatewayController.Run(ctx)
280+
281+
sharedGwInformers.Start(ctx.Done())
282+
249283
// This has to cleanup all the resources allocated by the cloud provider in this cluster
250284
// - containers as loadbalancers
251285
// - in windows and darwin ip addresses on the loopback interface
@@ -290,6 +324,7 @@ func startCloudControllerManager(ctx context.Context, clusterName string, kubeCl
290324
factory: sharedInformers,
291325
serviceController: serviceController,
292326
nodeController: nodeController,
327+
gatewayController: gatewayController,
293328
cancelFn: cancelFn}, nil
294329
}
295330

pkg/gateway/controller.go

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package gateway
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
apierrors "k8s.io/apimachinery/pkg/api/errors"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/apimachinery/pkg/util/runtime"
11+
"k8s.io/apimachinery/pkg/util/wait"
12+
"k8s.io/client-go/tools/cache"
13+
"k8s.io/client-go/util/workqueue"
14+
"k8s.io/klog/v2"
15+
"k8s.io/utils/ptr"
16+
17+
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
18+
gatewayclient "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
19+
gatewayinformers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis/v1"
20+
gatewaylisters "sigs.k8s.io/gateway-api/pkg/client/listers/apis/v1"
21+
)
22+
23+
const (
24+
controllerName = "kind.sigs.k8s.io/gateway-controller"
25+
gwClassName = "cloud-provider-kind"
26+
maxRetries = 5
27+
workers = 5
28+
)
29+
30+
type Controller struct {
31+
gwClient gatewayclient.Interface
32+
gatewayLister gatewaylisters.GatewayLister
33+
gatewayListerSynced cache.InformerSynced
34+
gatewayqueue workqueue.TypedRateLimitingInterface[string]
35+
36+
httprouteLister gatewaylisters.HTTPRouteLister
37+
httprouteListerSynced cache.InformerSynced
38+
httproutequeue workqueue.TypedRateLimitingInterface[string]
39+
40+
grpcrouteLister gatewaylisters.GRPCRouteLister
41+
grpcrouteListerSynced cache.InformerSynced
42+
grpcroutequeue workqueue.TypedRateLimitingInterface[string]
43+
}
44+
45+
func New(
46+
gwClient *gatewayclient.Clientset,
47+
gatewayInformer gatewayinformers.GatewayInformer,
48+
httprouteInformer gatewayinformers.HTTPRouteInformer,
49+
grpcrouteInformer gatewayinformers.GRPCRouteInformer,
50+
) (*Controller, error) {
51+
c := &Controller{
52+
gwClient: gwClient,
53+
gatewayLister: gatewayInformer.Lister(),
54+
gatewayListerSynced: gatewayInformer.Informer().HasSynced,
55+
gatewayqueue: workqueue.NewTypedRateLimitingQueueWithConfig(
56+
workqueue.DefaultTypedControllerRateLimiter[string](),
57+
workqueue.TypedRateLimitingQueueConfig[string]{Name: "gateway"},
58+
),
59+
httprouteLister: httprouteInformer.Lister(),
60+
httprouteListerSynced: httprouteInformer.Informer().HasSynced,
61+
httproutequeue: workqueue.NewTypedRateLimitingQueueWithConfig(
62+
workqueue.DefaultTypedControllerRateLimiter[string](),
63+
workqueue.TypedRateLimitingQueueConfig[string]{Name: "httproute"},
64+
),
65+
grpcrouteLister: grpcrouteInformer.Lister(),
66+
grpcrouteListerSynced: grpcrouteInformer.Informer().HasSynced,
67+
grpcroutequeue: workqueue.NewTypedRateLimitingQueueWithConfig(
68+
workqueue.DefaultTypedControllerRateLimiter[string](),
69+
workqueue.TypedRateLimitingQueueConfig[string]{Name: "grpcroute"},
70+
),
71+
}
72+
73+
_, err := gatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
74+
AddFunc: func(obj interface{}) {
75+
gw := obj.(*gatewayv1.Gateway)
76+
if gw.Spec.GatewayClassName != gwClassName {
77+
return
78+
}
79+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
80+
if err == nil {
81+
c.gatewayqueue.Add(key)
82+
}
83+
},
84+
UpdateFunc: func(oldObj, newObj interface{}) {
85+
gw := newObj.(*gatewayv1.Gateway)
86+
if gw.Spec.GatewayClassName != gwClassName {
87+
return
88+
}
89+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(newObj)
90+
if err == nil {
91+
c.gatewayqueue.Add(key)
92+
}
93+
},
94+
DeleteFunc: func(obj interface{}) {
95+
gw := obj.(*gatewayv1.Gateway)
96+
if gw.Spec.GatewayClassName != gwClassName {
97+
return
98+
}
99+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
100+
if err == nil {
101+
c.gatewayqueue.Add(key)
102+
}
103+
},
104+
})
105+
if err != nil {
106+
return nil, err
107+
}
108+
109+
_, err = httprouteInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
110+
AddFunc: func(obj interface{}) {
111+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
112+
if err == nil {
113+
c.httproutequeue.Add(key)
114+
}
115+
},
116+
UpdateFunc: func(oldObj, newObj interface{}) {
117+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(newObj)
118+
if err == nil {
119+
c.httproutequeue.Add(key)
120+
}
121+
},
122+
DeleteFunc: func(obj interface{}) {
123+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
124+
if err == nil {
125+
c.httproutequeue.Add(key)
126+
}
127+
},
128+
})
129+
if err != nil {
130+
return nil, err
131+
}
132+
133+
_, err = grpcrouteInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
134+
AddFunc: func(obj interface{}) {
135+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
136+
if err == nil {
137+
c.grpcroutequeue.Add(key)
138+
}
139+
},
140+
UpdateFunc: func(oldObj, newObj interface{}) {
141+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(newObj)
142+
if err == nil {
143+
c.grpcroutequeue.Add(key)
144+
}
145+
},
146+
DeleteFunc: func(obj interface{}) {
147+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
148+
if err == nil {
149+
c.grpcroutequeue.Add(key)
150+
}
151+
},
152+
})
153+
if err != nil {
154+
return nil, err
155+
}
156+
157+
return c, nil
158+
}
159+
160+
// Run begins watching and syncing.
161+
func (c *Controller) Run(ctx context.Context) error {
162+
defer runtime.HandleCrashWithContext(ctx)
163+
164+
// Let the workers stop when we are done
165+
defer c.gatewayqueue.ShutDown()
166+
defer c.httproutequeue.ShutDown()
167+
defer c.grpcroutequeue.ShutDown()
168+
klog.Info("Starting Gateway API controller")
169+
170+
// Create GatewayClass if it does not exist
171+
gwClass := gatewayv1.GatewayClass{
172+
ObjectMeta: metav1.ObjectMeta{
173+
Name: gwClassName,
174+
},
175+
Spec: gatewayv1.GatewayClassSpec{
176+
ControllerName: controllerName,
177+
Description: ptr.To("cloud-provider-kind gateway API"),
178+
},
179+
}
180+
181+
if _, err := c.gwClient.GatewayV1().GatewayClasses().Get(ctx, gwClassName, metav1.GetOptions{}); apierrors.IsNotFound(err) {
182+
_, err := c.gwClient.GatewayV1().GatewayClasses().Create(ctx, &gwClass, metav1.CreateOptions{})
183+
if err != nil {
184+
klog.Infof("faile to create cloud-provider-kind GatewayClass: %v")
185+
return err
186+
}
187+
}
188+
189+
// Wait for all involved caches to be synced, before processing items from the queue is started
190+
if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), c.gatewayListerSynced, c.httprouteListerSynced, c.grpcrouteListerSynced) {
191+
return fmt.Errorf("Timed out waiting for caches to sync")
192+
}
193+
194+
for i := 0; i < workers; i++ {
195+
go wait.UntilWithContext(ctx, c.runGatewayWorker, time.Second)
196+
go wait.UntilWithContext(ctx, c.runHTTPRouteWorker, time.Second)
197+
go wait.UntilWithContext(ctx, c.runGRPCrouteWorker, time.Second)
198+
}
199+
200+
<-ctx.Done()
201+
klog.Info("Stopping Gateway API controller")
202+
return nil
203+
}

0 commit comments

Comments
 (0)