Skip to content

Commit 84a46c1

Browse files
committed
scaffolding for the gateway api controller
Change-Id: I2bca38e888a281a8a325e8e242ce11b5edebc45d
1 parent b723384 commit 84a46c1

File tree

204 files changed

+26711
-11
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

204 files changed

+26711
-11
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
k8s.io/controller-manager v0.32.3
1616
k8s.io/klog/v2 v2.130.1
1717
k8s.io/utils v0.0.0-20241210054802-24370beab758
18+
sigs.k8s.io/gateway-api v1.2.1
1819
sigs.k8s.io/kind v0.27.0
1920
)
2021

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7 h1:hcha5B1kVACrLujCKLbr8X
200200
k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7/go.mod h1:GewRfANuJ70iYzvn+i4lezLDAFzvjxZYK1gn1lWcfas=
201201
k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0=
202202
k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
203+
sigs.k8s.io/gateway-api v1.2.1 h1:fZZ/+RyRb+Y5tGkwxFKuYuSRQHu9dZtbjenblleOLHM=
204+
sigs.k8s.io/gateway-api v1.2.1/go.mod h1:EpNfEXNjiYfUJypf0eZ0P5iXA9ekSGWaS1WgPaM42X0=
203205
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE=
204206
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg=
205207
sigs.k8s.io/kind v0.27.0 h1:PQ3f0iAWNIj66LYkZ1ivhEg/+Zb6UPMbO+qVei/INZA=

pkg/controller/controller.go

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@ import (
2020
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
2121
ccmfeatures "k8s.io/controller-manager/pkg/features"
2222
"k8s.io/klog/v2"
23+
24+
gatewayclient "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
25+
gatewayinformers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions"
26+
2327
cpkconfig "sigs.k8s.io/cloud-provider-kind/pkg/config"
2428
"sigs.k8s.io/cloud-provider-kind/pkg/constants"
2529
"sigs.k8s.io/cloud-provider-kind/pkg/container"
30+
"sigs.k8s.io/cloud-provider-kind/pkg/gateway"
2631
"sigs.k8s.io/cloud-provider-kind/pkg/loadbalancer"
2732
"sigs.k8s.io/cloud-provider-kind/pkg/provider"
2833
"sigs.k8s.io/kind/pkg/cluster"
@@ -39,6 +44,7 @@ type ccm struct {
3944
factory informers.SharedInformerFactory
4045
serviceController *servicecontroller.Controller
4146
nodeController *nodecontroller.CloudNodeController
47+
gatewayController *gateway.Controller
4248
cancelFn context.CancelFunc
4349
}
4450

@@ -74,15 +80,15 @@ func (c *Controller) Run(ctx context.Context) {
7480
continue
7581
}
7682

77-
kubeClient, err := c.getKubeClient(ctx, cluster)
83+
restConfig, err := c.getRestConfig(ctx, cluster)
7884
if err != nil {
7985
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
8086
continue
8187
}
8288

8389
klog.V(2).Infof("Creating new cloud provider for cluster %s", cluster)
8490
cloud := provider.New(cluster, c.kind)
85-
ccm, err := startCloudControllerManager(ctx, cluster, kubeClient, cloud)
91+
ccm, err := startCloudControllerManager(ctx, cluster, restConfig, cloud)
8692
if err != nil {
8793
klog.Errorf("Failed to start cloud controller for cluster %s: %v", cluster, err)
8894
continue
@@ -123,9 +129,9 @@ func (c *Controller) getKubeConfig(cluster string, internal bool) (*rest.Config,
123129
return config, nil
124130
}
125131

126-
// getKubeClient returns a kubeclient for the cluster passed as argument
132+
// getRestConfig returns a valid rest.Config for the cluster passed as argument
127133
// It tries first to connect to the internal endpoint.
128-
func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kubernetes.Interface, error) {
134+
func (c *Controller) getRestConfig(ctx context.Context, cluster string) (*rest.Config, error) {
129135
addresses := []string{}
130136
internalConfig, err := c.getKubeConfig(cluster, true)
131137
if err != nil {
@@ -172,17 +178,12 @@ func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kuberne
172178
return nil, fmt.Errorf("restConfig for host %s not avaliable", host)
173179
}
174180

175-
kubeClient, err := kubernetes.NewForConfig(config)
176-
if err != nil {
177-
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
178-
return kubeClient, err
179-
}
180-
return kubeClient, nil
181+
return config, nil
181182
}
182183

183184
// TODO: implement leader election to not have problems with multiple providers
184185
// ref: https://github.com/kubernetes/kubernetes/blob/d97ea0f705847f90740cac3bc3dd8f6a4026d0b5/cmd/kube-scheduler/app/server.go#L211
185-
func startCloudControllerManager(ctx context.Context, clusterName string, kubeClient kubernetes.Interface, cloud cloudprovider.Interface) (*ccm, error) {
186+
func startCloudControllerManager(ctx context.Context, clusterName string, config *rest.Config, cloud cloudprovider.Interface) (*ccm, error) {
186187
// TODO: we need to set up the ccm specific feature gates
187188
// but try to avoid to expose this to users
188189
featureGates := utilfeature.DefaultMutableFeatureGate
@@ -191,6 +192,12 @@ func startCloudControllerManager(ctx context.Context, clusterName string, kubeCl
191192
return nil, err
192193
}
193194

195+
kubeClient, err := kubernetes.NewForConfig(config)
196+
if err != nil {
197+
klog.Errorf("Failed to create kubeClient for cluster %s: %v", clusterName, err)
198+
return nil, err
199+
}
200+
194201
client := kubeClient.Discovery().RESTClient()
195202
// wait for health
196203
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
@@ -246,6 +253,33 @@ func startCloudControllerManager(ctx context.Context, clusterName string, kubeCl
246253

247254
sharedInformers.Start(ctx.Done())
248255

256+
// Gateway setup
257+
gwClient, err := gatewayclient.NewForConfig(config)
258+
if err != nil {
259+
// This error shouldn't fail. It lives like this as a legacy.
260+
klog.Errorf("Failed to create Gateway API client: %v", err)
261+
cancel()
262+
return nil, err
263+
}
264+
265+
sharedGwInformers := gatewayinformers.NewSharedInformerFactory(gwClient, 60*time.Second)
266+
267+
gatewayController, err := gateway.New(
268+
gwClient,
269+
sharedGwInformers.Gateway().V1().Gateways(),
270+
sharedGwInformers.Gateway().V1().HTTPRoutes(),
271+
sharedGwInformers.Gateway().V1().GRPCRoutes(),
272+
)
273+
if err != nil {
274+
klog.Errorf("Failed to start gateway controller: %v", err)
275+
cancel()
276+
return nil, err
277+
}
278+
279+
go gatewayController.Run(ctx)
280+
281+
sharedGwInformers.Start(ctx.Done())
282+
249283
// This has to cleanup all the resources allocated by the cloud provider in this cluster
250284
// - containers as loadbalancers
251285
// - in windows and darwin ip addresses on the loopback interface
@@ -290,6 +324,7 @@ func startCloudControllerManager(ctx context.Context, clusterName string, kubeCl
290324
factory: sharedInformers,
291325
serviceController: serviceController,
292326
nodeController: nodeController,
327+
gatewayController: gatewayController,
293328
cancelFn: cancelFn}, nil
294329
}
295330

pkg/gateway/controller.go

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package gateway
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
apierrors "k8s.io/apimachinery/pkg/api/errors"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/apimachinery/pkg/util/runtime"
11+
"k8s.io/apimachinery/pkg/util/wait"
12+
"k8s.io/client-go/tools/cache"
13+
"k8s.io/client-go/util/workqueue"
14+
"k8s.io/klog/v2"
15+
"k8s.io/utils/ptr"
16+
17+
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
18+
gatewayclient "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
19+
gatewayinformers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis/v1"
20+
gatewaylisters "sigs.k8s.io/gateway-api/pkg/client/listers/apis/v1"
21+
)
22+
23+
const (
24+
controllerName = "kind.sigs.k8s.io/gateway-controller"
25+
gwClassName = "cloud-provider-kind"
26+
maxRetries = 5
27+
workers = 5
28+
)
29+
30+
type Controller struct {
31+
gwClient gatewayclient.Interface
32+
gatewayLister gatewaylisters.GatewayLister
33+
gatewayListerSynced cache.InformerSynced
34+
gatewayqueue workqueue.TypedRateLimitingInterface[string]
35+
36+
httprouteLister gatewaylisters.HTTPRouteLister
37+
httprouteListerSynced cache.InformerSynced
38+
httproutequeue workqueue.TypedRateLimitingInterface[string]
39+
40+
grpcrouteLister gatewaylisters.GRPCRouteLister
41+
grpcrouteListerSynced cache.InformerSynced
42+
grpcroutequeue workqueue.TypedRateLimitingInterface[string]
43+
}
44+
45+
func New(
46+
gwClient *gatewayclient.Clientset,
47+
gatewayInformer gatewayinformers.GatewayInformer,
48+
httprouteInformer gatewayinformers.HTTPRouteInformer,
49+
grpcrouteInformer gatewayinformers.GRPCRouteInformer,
50+
) (*Controller, error) {
51+
c := &Controller{
52+
gwClient: gwClient,
53+
}
54+
55+
return c, nil
56+
}
57+
58+
func (c *Controller) processNextGatewayItem() bool {
59+
// Wait until there is a new item in the working queue
60+
key, quit := c.gatewayqueue.Get()
61+
if quit {
62+
return false
63+
}
64+
defer c.gatewayqueue.Done(key)
65+
66+
err := c.syncGateway(key)
67+
68+
c.handleGatewayErr(err, key)
69+
return true
70+
}
71+
72+
// syncToStdout is the business logic of the controller. In this controller it simply prints
73+
// information about the pod to stdout. In case an error happened, it has to simply return the error.
74+
// The retry logic should not be part of the business logic.
75+
func (c *Controller) syncGateway(key string) error {
76+
startTime := time.Now()
77+
defer func() {
78+
klog.V(4).Infof("Finished syncing gateway %q (%v)", key, time.Since(startTime))
79+
}()
80+
81+
namespace, name, err := cache.SplitMetaNamespaceKey(key)
82+
if err != nil {
83+
return err
84+
}
85+
86+
gw, err := c.gatewayLister.Gateways(namespace).Get(name)
87+
if err != nil {
88+
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
89+
return err
90+
}
91+
92+
if apierrors.IsNotFound(err) {
93+
// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
94+
fmt.Printf("Gateway %s does not exist anymore\n", key)
95+
} else {
96+
// Note that you also have to check the uid if you have a local controlled resource, which
97+
// is dependent on the actual instance, to detect that a Pod was recreated with the same name
98+
fmt.Printf("Sync/Add/Update for Gateway %s\n", gw.GetName())
99+
}
100+
return nil
101+
}
102+
103+
// handleErr checks if an error happened and makes sure we will retry later.
104+
func (c *Controller) handleGatewayErr(err error, key string) {
105+
if err == nil {
106+
c.gatewayqueue.Forget(key)
107+
return
108+
}
109+
110+
if c.gatewayqueue.NumRequeues(key) < maxRetries {
111+
klog.Infof("Error syncing Gateway %v: %v", key, err)
112+
113+
// Re-enqueue the key rate limited. Based on the rate limiter on the
114+
// queue and the re-enqueue history, the key will be processed later again.
115+
c.gatewayqueue.AddRateLimited(key)
116+
return
117+
}
118+
119+
c.gatewayqueue.Forget(key)
120+
// Report to an external entity that, even after several retries, we could not successfully process this key
121+
runtime.HandleError(err)
122+
klog.Infof("Dropping Gateway %q out of the queue: %v", key, err)
123+
}
124+
125+
// Run begins watching and syncing.
126+
func (c *Controller) Run(ctx context.Context) error {
127+
defer runtime.HandleCrashWithContext(ctx)
128+
129+
// Let the workers stop when we are done
130+
defer c.gatewayqueue.ShutDown()
131+
klog.Info("Starting Gateway controller")
132+
133+
// Create GatewayClass if it does not exist
134+
gwClass := gatewayv1.GatewayClass{
135+
ObjectMeta: metav1.ObjectMeta{
136+
Name: gwClassName,
137+
},
138+
Spec: gatewayv1.GatewayClassSpec{
139+
ControllerName: controllerName,
140+
Description: ptr.To("cloud-provider-kind gateway API"),
141+
},
142+
}
143+
144+
if _, err := c.gwClient.GatewayV1().GatewayClasses().Get(ctx, gwClassName, metav1.GetOptions{}); apierrors.IsNotFound(err) {
145+
_, err := c.gwClient.GatewayV1().GatewayClasses().Create(ctx, &gwClass, metav1.CreateOptions{})
146+
if err != nil {
147+
klog.Infof("faile to create cloud-provider-kind GatewayClass: %v")
148+
return err
149+
}
150+
}
151+
152+
// Wait for all involved caches to be synced, before processing items from the queue is started
153+
if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), c.gatewayListerSynced, c.httprouteListerSynced, c.grpcrouteListerSynced) {
154+
return fmt.Errorf("Timed out waiting for caches to sync")
155+
}
156+
157+
for i := 0; i < workers; i++ {
158+
go wait.UntilWithContext(ctx, c.runGatewayWorker, time.Second)
159+
}
160+
161+
<-ctx.Done()
162+
klog.Info("Stopping Gateway controller")
163+
return nil
164+
}
165+
166+
func (c *Controller) runGatewayWorker(ctx context.Context) {
167+
for c.processNextGatewayItem() {
168+
}
169+
}

vendor/modules.txt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,34 @@ k8s.io/utils/net
679679
k8s.io/utils/pointer
680680
k8s.io/utils/ptr
681681
k8s.io/utils/trace
682+
# sigs.k8s.io/gateway-api v1.2.1
683+
## explicit; go 1.22.0
684+
sigs.k8s.io/gateway-api/apis/applyconfiguration/apis/v1
685+
sigs.k8s.io/gateway-api/apis/applyconfiguration/apis/v1alpha2
686+
sigs.k8s.io/gateway-api/apis/applyconfiguration/apis/v1alpha3
687+
sigs.k8s.io/gateway-api/apis/applyconfiguration/apis/v1beta1
688+
sigs.k8s.io/gateway-api/apis/applyconfiguration/internal
689+
sigs.k8s.io/gateway-api/apis/v1
690+
sigs.k8s.io/gateway-api/apis/v1alpha2
691+
sigs.k8s.io/gateway-api/apis/v1alpha3
692+
sigs.k8s.io/gateway-api/apis/v1beta1
693+
sigs.k8s.io/gateway-api/pkg/client/clientset/versioned
694+
sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/scheme
695+
sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/typed/apis/v1
696+
sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/typed/apis/v1alpha2
697+
sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/typed/apis/v1alpha3
698+
sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/typed/apis/v1beta1
699+
sigs.k8s.io/gateway-api/pkg/client/informers/externalversions
700+
sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis
701+
sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis/v1
702+
sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis/v1alpha2
703+
sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis/v1alpha3
704+
sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis/v1beta1
705+
sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/internalinterfaces
706+
sigs.k8s.io/gateway-api/pkg/client/listers/apis/v1
707+
sigs.k8s.io/gateway-api/pkg/client/listers/apis/v1alpha2
708+
sigs.k8s.io/gateway-api/pkg/client/listers/apis/v1alpha3
709+
sigs.k8s.io/gateway-api/pkg/client/listers/apis/v1beta1
682710
# sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8
683711
## explicit; go 1.23
684712
sigs.k8s.io/json

0 commit comments

Comments
 (0)