Skip to content

Commit cda0630

Browse files
committed
scaffolding for the gateway api controller
Change-Id: I2bca38e888a281a8a325e8e242ce11b5edebc45d
1 parent b723384 commit cda0630

File tree

207 files changed

+26940
-11
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

207 files changed

+26940
-11
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
k8s.io/controller-manager v0.32.3
1616
k8s.io/klog/v2 v2.130.1
1717
k8s.io/utils v0.0.0-20241210054802-24370beab758
18+
sigs.k8s.io/gateway-api v1.2.1
1819
sigs.k8s.io/kind v0.27.0
1920
)
2021

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7 h1:hcha5B1kVACrLujCKLbr8X
200200
k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7/go.mod h1:GewRfANuJ70iYzvn+i4lezLDAFzvjxZYK1gn1lWcfas=
201201
k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0=
202202
k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
203+
sigs.k8s.io/gateway-api v1.2.1 h1:fZZ/+RyRb+Y5tGkwxFKuYuSRQHu9dZtbjenblleOLHM=
204+
sigs.k8s.io/gateway-api v1.2.1/go.mod h1:EpNfEXNjiYfUJypf0eZ0P5iXA9ekSGWaS1WgPaM42X0=
203205
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE=
204206
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg=
205207
sigs.k8s.io/kind v0.27.0 h1:PQ3f0iAWNIj66LYkZ1ivhEg/+Zb6UPMbO+qVei/INZA=

pkg/controller/controller.go

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@ import (
2020
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
2121
ccmfeatures "k8s.io/controller-manager/pkg/features"
2222
"k8s.io/klog/v2"
23+
24+
gatewayclient "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
25+
gatewayinformers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions"
26+
2327
cpkconfig "sigs.k8s.io/cloud-provider-kind/pkg/config"
2428
"sigs.k8s.io/cloud-provider-kind/pkg/constants"
2529
"sigs.k8s.io/cloud-provider-kind/pkg/container"
30+
"sigs.k8s.io/cloud-provider-kind/pkg/gateway"
2631
"sigs.k8s.io/cloud-provider-kind/pkg/loadbalancer"
2732
"sigs.k8s.io/cloud-provider-kind/pkg/provider"
2833
"sigs.k8s.io/kind/pkg/cluster"
@@ -39,6 +44,7 @@ type ccm struct {
3944
factory informers.SharedInformerFactory
4045
serviceController *servicecontroller.Controller
4146
nodeController *nodecontroller.CloudNodeController
47+
gatewayController *gateway.Controller
4248
cancelFn context.CancelFunc
4349
}
4450

@@ -74,15 +80,15 @@ func (c *Controller) Run(ctx context.Context) {
7480
continue
7581
}
7682

77-
kubeClient, err := c.getKubeClient(ctx, cluster)
83+
restConfig, err := c.getRestConfig(ctx, cluster)
7884
if err != nil {
7985
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
8086
continue
8187
}
8288

8389
klog.V(2).Infof("Creating new cloud provider for cluster %s", cluster)
8490
cloud := provider.New(cluster, c.kind)
85-
ccm, err := startCloudControllerManager(ctx, cluster, kubeClient, cloud)
91+
ccm, err := startCloudControllerManager(ctx, cluster, restConfig, cloud)
8692
if err != nil {
8793
klog.Errorf("Failed to start cloud controller for cluster %s: %v", cluster, err)
8894
continue
@@ -123,9 +129,9 @@ func (c *Controller) getKubeConfig(cluster string, internal bool) (*rest.Config,
123129
return config, nil
124130
}
125131

126-
// getKubeClient returns a kubeclient for the cluster passed as argument
132+
// getRestConfig returns a valid rest.Config for the cluster passed as argument
127133
// It tries first to connect to the internal endpoint.
128-
func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kubernetes.Interface, error) {
134+
func (c *Controller) getRestConfig(ctx context.Context, cluster string) (*rest.Config, error) {
129135
addresses := []string{}
130136
internalConfig, err := c.getKubeConfig(cluster, true)
131137
if err != nil {
@@ -172,17 +178,12 @@ func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kuberne
172178
return nil, fmt.Errorf("restConfig for host %s not avaliable", host)
173179
}
174180

175-
kubeClient, err := kubernetes.NewForConfig(config)
176-
if err != nil {
177-
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
178-
return kubeClient, err
179-
}
180-
return kubeClient, nil
181+
return config, nil
181182
}
182183

183184
// TODO: implement leader election to not have problems with multiple providers
184185
// ref: https://github.com/kubernetes/kubernetes/blob/d97ea0f705847f90740cac3bc3dd8f6a4026d0b5/cmd/kube-scheduler/app/server.go#L211
185-
func startCloudControllerManager(ctx context.Context, clusterName string, kubeClient kubernetes.Interface, cloud cloudprovider.Interface) (*ccm, error) {
186+
func startCloudControllerManager(ctx context.Context, clusterName string, config *rest.Config, cloud cloudprovider.Interface) (*ccm, error) {
186187
// TODO: we need to set up the ccm specific feature gates
187188
// but try to avoid to expose this to users
188189
featureGates := utilfeature.DefaultMutableFeatureGate
@@ -191,6 +192,12 @@ func startCloudControllerManager(ctx context.Context, clusterName string, kubeCl
191192
return nil, err
192193
}
193194

195+
kubeClient, err := kubernetes.NewForConfig(config)
196+
if err != nil {
197+
klog.Errorf("Failed to create kubeClient for cluster %s: %v", clusterName, err)
198+
return nil, err
199+
}
200+
194201
client := kubeClient.Discovery().RESTClient()
195202
// wait for health
196203
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
@@ -246,6 +253,33 @@ func startCloudControllerManager(ctx context.Context, clusterName string, kubeCl
246253

247254
sharedInformers.Start(ctx.Done())
248255

256+
// Gateway setup
257+
gwClient, err := gatewayclient.NewForConfig(config)
258+
if err != nil {
259+
// This error shouldn't fail. It lives like this as a legacy.
260+
klog.Errorf("Failed to create Gateway API client: %v", err)
261+
cancel()
262+
return nil, err
263+
}
264+
265+
sharedGwInformers := gatewayinformers.NewSharedInformerFactory(gwClient, 60*time.Second)
266+
267+
gatewayController, err := gateway.New(
268+
gwClient,
269+
sharedGwInformers.Gateway().V1().Gateways(),
270+
sharedGwInformers.Gateway().V1().HTTPRoutes(),
271+
sharedGwInformers.Gateway().V1().GRPCRoutes(),
272+
)
273+
if err != nil {
274+
klog.Errorf("Failed to start gateway controller: %v", err)
275+
cancel()
276+
return nil, err
277+
}
278+
279+
go gatewayController.Run(ctx)
280+
281+
sharedGwInformers.Start(ctx.Done())
282+
249283
// This has to cleanup all the resources allocated by the cloud provider in this cluster
250284
// - containers as loadbalancers
251285
// - in windows and darwin ip addresses on the loopback interface
@@ -290,6 +324,7 @@ func startCloudControllerManager(ctx context.Context, clusterName string, kubeCl
290324
factory: sharedInformers,
291325
serviceController: serviceController,
292326
nodeController: nodeController,
327+
gatewayController: gatewayController,
293328
cancelFn: cancelFn}, nil
294329
}
295330

pkg/gateway/controller.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package gateway
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
apierrors "k8s.io/apimachinery/pkg/api/errors"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/apimachinery/pkg/util/runtime"
11+
"k8s.io/apimachinery/pkg/util/wait"
12+
"k8s.io/client-go/tools/cache"
13+
"k8s.io/client-go/util/workqueue"
14+
"k8s.io/klog/v2"
15+
"k8s.io/utils/ptr"
16+
17+
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
18+
gatewayclient "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
19+
gatewayinformers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis/v1"
20+
gatewaylisters "sigs.k8s.io/gateway-api/pkg/client/listers/apis/v1"
21+
)
22+
23+
const (
24+
controllerName = "kind.sigs.k8s.io/gateway-controller"
25+
gwClassName = "cloud-provider-kind"
26+
maxRetries = 5
27+
workers = 5
28+
)
29+
30+
type Controller struct {
31+
gwClient gatewayclient.Interface
32+
gatewayLister gatewaylisters.GatewayLister
33+
gatewayListerSynced cache.InformerSynced
34+
gatewayqueue workqueue.TypedRateLimitingInterface[string]
35+
36+
httprouteLister gatewaylisters.HTTPRouteLister
37+
httprouteListerSynced cache.InformerSynced
38+
httproutequeue workqueue.TypedRateLimitingInterface[string]
39+
40+
grpcrouteLister gatewaylisters.GRPCRouteLister
41+
grpcrouteListerSynced cache.InformerSynced
42+
grpcroutequeue workqueue.TypedRateLimitingInterface[string]
43+
}
44+
45+
func New(
46+
gwClient *gatewayclient.Clientset,
47+
gatewayInformer gatewayinformers.GatewayInformer,
48+
httprouteInformer gatewayinformers.HTTPRouteInformer,
49+
grpcrouteInformer gatewayinformers.GRPCRouteInformer,
50+
) (*Controller, error) {
51+
c := &Controller{
52+
gwClient: gwClient,
53+
gatewayLister: gatewayInformer.Lister(),
54+
gatewayListerSynced: gatewayInformer.Informer().HasSynced,
55+
gatewayqueue: workqueue.NewTypedRateLimitingQueueWithConfig(
56+
workqueue.DefaultTypedControllerRateLimiter[string](),
57+
workqueue.TypedRateLimitingQueueConfig[string]{Name: "gateway"},
58+
),
59+
httprouteLister: httprouteInformer.Lister(),
60+
httprouteListerSynced: httprouteInformer.Informer().HasSynced,
61+
httproutequeue: workqueue.NewTypedRateLimitingQueueWithConfig(
62+
workqueue.DefaultTypedControllerRateLimiter[string](),
63+
workqueue.TypedRateLimitingQueueConfig[string]{Name: "httproute"},
64+
),
65+
grpcrouteLister: grpcrouteInformer.Lister(),
66+
grpcrouteListerSynced: grpcrouteInformer.Informer().HasSynced,
67+
grpcroutequeue: workqueue.NewTypedRateLimitingQueueWithConfig(
68+
workqueue.DefaultTypedControllerRateLimiter[string](),
69+
workqueue.TypedRateLimitingQueueConfig[string]{Name: "grpcroute"},
70+
),
71+
}
72+
73+
_, err := gatewayInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
74+
AddFunc: func(obj interface{}) {},
75+
UpdateFunc: func(oldObj, newObj interface{}) {},
76+
DeleteFunc: func(obj interface{}) {},
77+
})
78+
if err != nil {
79+
return nil, err
80+
}
81+
82+
_, err = httprouteInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
83+
AddFunc: func(obj interface{}) {},
84+
UpdateFunc: func(oldObj, newObj interface{}) {},
85+
DeleteFunc: func(obj interface{}) {},
86+
})
87+
if err != nil {
88+
return nil, err
89+
}
90+
91+
_, err = grpcrouteInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
92+
AddFunc: func(obj interface{}) {},
93+
UpdateFunc: func(oldObj, newObj interface{}) {},
94+
DeleteFunc: func(obj interface{}) {},
95+
})
96+
if err != nil {
97+
return nil, err
98+
}
99+
100+
return c, nil
101+
}
102+
103+
// Run begins watching and syncing.
104+
func (c *Controller) Run(ctx context.Context) error {
105+
defer runtime.HandleCrashWithContext(ctx)
106+
107+
// Let the workers stop when we are done
108+
defer c.gatewayqueue.ShutDown()
109+
defer c.httproutequeue.ShutDown()
110+
defer c.grpcroutequeue.ShutDown()
111+
klog.Info("Starting Gateway API controller")
112+
113+
// Create GatewayClass if it does not exist
114+
gwClass := gatewayv1.GatewayClass{
115+
ObjectMeta: metav1.ObjectMeta{
116+
Name: gwClassName,
117+
},
118+
Spec: gatewayv1.GatewayClassSpec{
119+
ControllerName: controllerName,
120+
Description: ptr.To("cloud-provider-kind gateway API"),
121+
},
122+
}
123+
124+
if _, err := c.gwClient.GatewayV1().GatewayClasses().Get(ctx, gwClassName, metav1.GetOptions{}); apierrors.IsNotFound(err) {
125+
_, err := c.gwClient.GatewayV1().GatewayClasses().Create(ctx, &gwClass, metav1.CreateOptions{})
126+
if err != nil {
127+
klog.Infof("faile to create cloud-provider-kind GatewayClass: %v")
128+
return err
129+
}
130+
}
131+
132+
// Wait for all involved caches to be synced, before processing items from the queue is started
133+
if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), c.gatewayListerSynced, c.httprouteListerSynced, c.grpcrouteListerSynced) {
134+
return fmt.Errorf("Timed out waiting for caches to sync")
135+
}
136+
137+
for i := 0; i < workers; i++ {
138+
go wait.UntilWithContext(ctx, c.runGatewayWorker, time.Second)
139+
go wait.UntilWithContext(ctx, c.runHTTPRouteWorker, time.Second)
140+
go wait.UntilWithContext(ctx, c.runGRPCrouteWorker, time.Second)
141+
}
142+
143+
<-ctx.Done()
144+
klog.Info("Stopping Gateway API controller")
145+
return nil
146+
}

pkg/gateway/gateway.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package gateway
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
apierrors "k8s.io/apimachinery/pkg/api/errors"
9+
"k8s.io/apimachinery/pkg/util/runtime"
10+
"k8s.io/client-go/tools/cache"
11+
"k8s.io/klog/v2"
12+
)
13+
14+
func (c *Controller) processNextGatewayItem() bool {
15+
// Wait until there is a new item in the working queue
16+
key, quit := c.gatewayqueue.Get()
17+
if quit {
18+
return false
19+
}
20+
defer c.gatewayqueue.Done(key)
21+
22+
err := c.syncGateway(key)
23+
24+
c.handleGatewayErr(err, key)
25+
return true
26+
}
27+
28+
// syncToStdout is the business logic of the controller. In this controller it simply prints
29+
// information about the pod to stdout. In case an error happened, it has to simply return the error.
30+
// The retry logic should not be part of the business logic.
31+
func (c *Controller) syncGateway(key string) error {
32+
startTime := time.Now()
33+
defer func() {
34+
klog.V(4).Infof("Finished syncing gateway %q (%v)", key, time.Since(startTime))
35+
}()
36+
37+
namespace, name, err := cache.SplitMetaNamespaceKey(key)
38+
if err != nil {
39+
return err
40+
}
41+
42+
gw, err := c.gatewayLister.Gateways(namespace).Get(name)
43+
if err != nil {
44+
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
45+
return err
46+
}
47+
48+
if apierrors.IsNotFound(err) {
49+
// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
50+
fmt.Printf("Gateway %s does not exist anymore\n", key)
51+
} else {
52+
// Note that you also have to check the uid if you have a local controlled resource, which
53+
// is dependent on the actual instance, to detect that a Pod was recreated with the same name
54+
fmt.Printf("Sync/Add/Update for Gateway %s\n", gw.GetName())
55+
}
56+
return nil
57+
}
58+
59+
// handleErr checks if an error happened and makes sure we will retry later.
60+
func (c *Controller) handleGatewayErr(err error, key string) {
61+
if err == nil {
62+
c.gatewayqueue.Forget(key)
63+
return
64+
}
65+
66+
if c.gatewayqueue.NumRequeues(key) < maxRetries {
67+
klog.Infof("Error syncing Gateway %v: %v", key, err)
68+
69+
// Re-enqueue the key rate limited. Based on the rate limiter on the
70+
// queue and the re-enqueue history, the key will be processed later again.
71+
c.gatewayqueue.AddRateLimited(key)
72+
return
73+
}
74+
75+
c.gatewayqueue.Forget(key)
76+
// Report to an external entity that, even after several retries, we could not successfully process this key
77+
runtime.HandleError(err)
78+
klog.Infof("Dropping Gateway %q out of the queue: %v", key, err)
79+
}
80+
81+
func (c *Controller) runGatewayWorker(ctx context.Context) {
82+
for c.processNextGatewayItem() {
83+
}
84+
}

0 commit comments

Comments
 (0)