Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions internal/kgateway/agentjwksstore/cm_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package agentjwksstore

import (
"context"
"math"
"time"

"golang.org/x/time/rate"
"istio.io/istio/pkg/kube/controllers"
"istio.io/istio/pkg/kube/kclient"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/jwks"
"github.com/kgateway-dev/kgateway/v2/pkg/apiclient"
"github.com/kgateway-dev/kgateway/v2/pkg/logging"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var cmLogger = logging.New("jwks_store_config_map_controller")

const JwksStoreConfigMapName = "jwks-store"

type JwksStoreConfigMapsController struct {
apiClient apiclient.Client
cmClient kclient.Client[*corev1.ConfigMap]
eventQueue controllers.Queue
jwksUpdates chan map[string]string
jwksStore *jwks.JwksStore
deploymentNamespace string
waitForSync []cache.InformerSynced
}

var (
rateLimiter = workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[any](500*time.Millisecond, 10*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
)

func NewJWKSStoreConfigMapsController(apiClient apiclient.Client, deploymentNamespace string, jwksStore *jwks.JwksStore) *JwksStoreConfigMapsController {
cmLogger.Info("creating jwks store ConfigMap controller")
return &JwksStoreConfigMapsController{
apiClient: apiClient,
deploymentNamespace: deploymentNamespace,
jwksStore: jwksStore,
}
}

func (jcm *JwksStoreConfigMapsController) Init(ctx context.Context) {
jcm.cmClient = kclient.NewFiltered[*corev1.ConfigMap](jcm.apiClient,
kclient.Filter{
ObjectFilter: jcm.apiClient.ObjectFilter(),
Namespace: jcm.deploymentNamespace,
LabelSelector: jwks.JwksStoreLabelString})

jcm.waitForSync = []cache.InformerSynced{
jcm.cmClient.HasSynced,
}

jcm.jwksUpdates = jcm.jwksStore.SubscribeToUpdates()
jcm.eventQueue = controllers.NewQueue("JwksStoreController", controllers.WithReconciler(jcm.Reconcile), controllers.WithMaxAttempts(math.MaxInt), controllers.WithRateLimiter(rateLimiter))
}

func (jcm *JwksStoreConfigMapsController) Start(ctx context.Context) error {
cmLogger.Info("waiting for cache to sync")
jcm.apiClient.Core().WaitForCacheSync(
"kube jwks store ConfigMap syncer",
ctx.Done(),
jcm.waitForSync...,
)

cmLogger.Info("starting jwks store ConfigMap controller")
jcm.cmClient.AddEventHandler(
controllers.FromEventHandler(
func(o controllers.Event) {
jcm.eventQueue.AddObject(o.Latest())
}))

go func() {
for {
select {
case u := <-jcm.jwksUpdates:
for uri := range u {
jcm.eventQueue.AddObject(jcm.newJwksStoreConfigMap(jwks.JwksConfigMapName(uri)))
}
case <-ctx.Done():
return
}
}
}()
go jcm.eventQueue.Run(ctx.Done())

<-ctx.Done()
return nil
}

func (jcm *JwksStoreConfigMapsController) Reconcile(req types.NamespacedName) error {
cmLogger.Debug("syncing jwks store to ConfigMap(s)")
ctx := context.Background()

uri, storedJwks, ok := jcm.jwksStore.JwksByConfigMapName(req.Name)
if !ok {
return client.IgnoreNotFound(jcm.apiClient.Kube().CoreV1().ConfigMaps(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}))
}

existingCm := jcm.cmClient.Get(req.Name, req.Namespace)
if existingCm == nil {
newCm := jcm.newJwksStoreConfigMap(jwks.JwksConfigMapName(uri))
if err := jwks.SetJwksInConfigMap(newCm, uri, storedJwks); err != nil {
cmLogger.Error("error updating ConfigMap", "error", err)
return err // no retries?
}

_, err := jcm.apiClient.Kube().CoreV1().ConfigMaps(req.Namespace).Create(ctx, newCm, metav1.CreateOptions{})
if err != nil {
cmLogger.Error("error creating ConfigMap", "error", err)
return err
}
} else {
if err := jwks.SetJwksInConfigMap(existingCm, uri, storedJwks); err != nil {
cmLogger.Error("error updating ConfigMap", "error", err)
return err // no retries?
}
_, err := jcm.apiClient.Kube().CoreV1().ConfigMaps(req.Namespace).Update(ctx, existingCm, metav1.UpdateOptions{})
if err != nil {
cmLogger.Error("error updating jwks ConfigMap", "error", err)
return err
}
}

return nil
}

// runs on the leader only
func (jcm *JwksStoreConfigMapsController) NeedLeaderElection() bool {
return true
}

func (jcm *JwksStoreConfigMapsController) newJwksStoreConfigMap(name string) *corev1.ConfigMap {
return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: jcm.deploymentNamespace,
Labels: jwks.JwksStoreLabelMap,
},
Data: make(map[string]string),
}
}
95 changes: 0 additions & 95 deletions internal/kgateway/agentjwksstore/jwks_store_controller.go

This file was deleted.

96 changes: 96 additions & 0 deletions internal/kgateway/agentjwksstore/policy_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package agentjwksstore

import (
"context"

"istio.io/istio/pkg/kube/controllers"
"istio.io/istio/pkg/kube/kclient"
"istio.io/istio/pkg/kube/krt"
"k8s.io/client-go/tools/cache"

"github.com/kgateway-dev/kgateway/v2/api/v1alpha1"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/jwks"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/pkg/agentgateway/plugins"
"github.com/kgateway-dev/kgateway/v2/pkg/apiclient"
"github.com/kgateway-dev/kgateway/v2/pkg/logging"
)

type JwksStorePolicyController struct {
agw *plugins.AgwCollections
apiClient apiclient.Client
jwks krt.Collection[jwks.JwksSource]
jwksChanges chan jwks.JwksSource
waitForSync []cache.InformerSynced
}

var polLogger = logging.New("jwks_store_policy_controller")

func NewJWKSStorePolicyController(apiClient apiclient.Client, agw *plugins.AgwCollections) *JwksStorePolicyController {
polLogger.Info("creating jwks store policy controller")
return &JwksStorePolicyController{
agw: agw,
apiClient: apiClient,
jwksChanges: make(chan jwks.JwksSource),
}
}

func (j *JwksStorePolicyController) Init(ctx context.Context) {
policyCol := krt.WrapClient(kclient.NewFilteredDelayed[*v1alpha1.AgentgatewayPolicy](
j.apiClient,
wellknown.AgentgatewayPolicyGVR,
kclient.Filter{ObjectFilter: j.agw.Client.ObjectFilter()},
), j.agw.KrtOpts.ToOptions("AgentgatewayPolicy")...)
j.jwks = krt.NewManyCollection(policyCol, func(krtctx krt.HandlerContext, p *v1alpha1.AgentgatewayPolicy) []jwks.JwksSource {
if p.Spec.Traffic == nil || p.Spec.Traffic.JWTAuthentication == nil {
return nil
}

toret := make([]jwks.JwksSource, 0)
for _, provider := range p.Spec.Traffic.JWTAuthentication.Providers {
if provider.JWKS.Remote == nil {
continue
}
toret = append(toret, jwks.JwksSource{JwksURL: provider.JWKS.Remote.JwksUri, Ttl: provider.JWKS.Remote.CacheDuration.Duration})
}

return toret
}, j.agw.KrtOpts.ToOptions("JwksSources")...)

j.waitForSync = []cache.InformerSynced{
policyCol.HasSynced,
}
}

func (j *JwksStorePolicyController) Start(ctx context.Context) error {
polLogger.Info("waiting for cache to sync")
j.apiClient.Core().WaitForCacheSync(
"kube AgentgatewayPolicy syncer",
ctx.Done(),
j.waitForSync...,
)

polLogger.Info("staring jwks store policy controller")
j.jwks.Register(func(o krt.Event[jwks.JwksSource]) {
switch o.Event {
case controllers.EventAdd, controllers.EventUpdate:
j.jwksChanges <- *o.New
case controllers.EventDelete:
deleted := *o.Old
deleted.Deleted = true
j.jwksChanges <- deleted
}
})

<-ctx.Done()
return nil
}

// runs on the leader only
func (j *JwksStorePolicyController) NeedLeaderElection() bool {
return true
}

func (j *JwksStorePolicyController) JwksChanges() chan jwks.JwksSource {
return j.jwksChanges
}
16 changes: 11 additions & 5 deletions internal/kgateway/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,17 +238,23 @@ func NewControllerBuilder(ctx context.Context, cfg StartConfig) (*ControllerBuil
return nil, err
}

jwksStoreCtrl := agentjwksstore.NewJWKSStoreController(cfg.Manager, cfg.Client, cfg.AgwCollections)
if err := cfg.Manager.Add(jwksStoreCtrl); err != nil {
setupLog.Error(err, "unable to add agentgateway JwksStoreController runnable")
jwksStorePolicyCtrl := agentjwksstore.NewJWKSStorePolicyController(cfg.Client, cfg.AgwCollections)
jwksStorePolicyCtrl.Init(ctx)
if err := cfg.Manager.Add(jwksStorePolicyCtrl); err != nil {
setupLog.Error(err, "unable to add agentgateway JwksStorePolicyController runnable")
return nil, err
}
jwksStoreCtrl.Init(ctx)
jwksStore := jwks.BuildJwksStore(ctx, cfg.Client, cfg.CommonCollections, jwksStoreCtrl.JwksQueue(), namespaces.GetPodNamespace())
jwksStore := jwks.BuildJwksStore(ctx, cfg.Client, cfg.CommonCollections, jwksStorePolicyCtrl.JwksChanges(), namespaces.GetPodNamespace())
if err := cfg.Manager.Add(jwksStore); err != nil {
setupLog.Error(err, "unable to add agentgateway JwksStore runnable")
return nil, err
}
jwksStoreCMCtrl := agentjwksstore.NewJWKSStoreConfigMapsController(cfg.Client, namespaces.GetPodNamespace(), jwksStore)
jwksStoreCMCtrl.Init(ctx)
if err := cfg.Manager.Add(jwksStoreCMCtrl); err != nil {
setupLog.Error(err, "unable to add agentgateway JwksStoreConfigMapController runnable")
return nil, err
}
}

setupLog.Info("starting controller builder")
Expand Down
Loading
Loading