Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 22 additions & 124 deletions pkg/agentgateway/jwks/config_map_syncer.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

potential follow up: rename this file

Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,42 @@ import (

"istio.io/istio/pkg/kube/kclient"
"istio.io/istio/pkg/kube/krt"
"istio.io/istio/pkg/ptr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kgateway-dev/kgateway/v2/pkg/apiclient"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/krtutil"
)

const jwksStorePrefix = "jwks-store"
const JwksStoreComponent = "app.kubernetes.io/component"
const configMapKey = "jwks-store"
const jwksStoreComponentLabel = "app.kubernetes.io/component"

func JwksStoreLabelSelector(storePrefix string) string {
return jwksStoreComponentLabel + "=" + storePrefix
}

func JwksStoreConfigMapLabel(storePrefix string) map[string]string {
return map[string]string{jwksStoreComponentLabel: storePrefix}
}

// configMapSyncer is used for writing/reading jwks' to/from ConfigMaps.
type configMapSyncer struct {
deploymentNamespace string
storePrefix string
cmAccessor cmAccessor
}

// this is an abstraction over ConfigMap access to facilitate testing
type cmAccessor interface {
Create(context.Context, *corev1.ConfigMap) error
Update(context.Context, *corev1.ConfigMap) error
Delete(context.Context, string) error
List() []*corev1.ConfigMap
Get(string) *corev1.ConfigMap
WaitForCacheSync(ctx context.Context) bool
deploymentNamespace string
cmCollection krt.Collection[*corev1.ConfigMap]
}

func NewConfigMapSyncer(client apiclient.Client, storePrefix, deploymentNamespace string, krtOptions krtutil.KrtOptions) *configMapSyncer {
cmCollection := krt.NewFilteredInformer[*corev1.ConfigMap](client,
kclient.Filter{
ObjectFilter: client.ObjectFilter(),
LabelSelector: JwksStoreComponent + "=" + storePrefix},
LabelSelector: JwksStoreLabelSelector(storePrefix)},
krtOptions.ToOptions("config_map_syncer/ConfigMaps")...)

toret := configMapSyncer{
deploymentNamespace: deploymentNamespace,
storePrefix: storePrefix,
cmAccessor: &defaultCmAccessor{
client: client,
deploymentNamespace: deploymentNamespace,
cmCollection: cmCollection,
},
cmCollection: cmCollection,
}

return &toret
Expand All @@ -65,7 +55,7 @@ func NewConfigMapSyncer(client apiclient.Client, storePrefix, deploymentNamespac
// Load jwks from a ConfigMap.
// Returns a map of jwks-uri -> jwks (currently one jwks-uri per ConfigMap).
func JwksFromConfigMap(cm *corev1.ConfigMap) (map[string]string, error) {
jwksStore := cm.Data[jwksStorePrefix]
jwksStore := cm.Data[configMapKey]
jwks := make(map[string]string)
err := json.Unmarshal(([]byte)(jwksStore), &jwks)
if err != nil {
Expand All @@ -74,70 +64,27 @@ func JwksFromConfigMap(cm *corev1.ConfigMap) (map[string]string, error) {
return jwks, nil
}

func (cs *configMapSyncer) WaitForCacheSync(ctx context.Context) bool {
return cs.cmAccessor.WaitForCacheSync(ctx)
}

// Generates ConfigMap name based on jwks uri. Resulting name is a concatenation of "jwks-store-" prefix and an MD5 hash of the jwks uri.
// The length of the name is a constant 32 chars (hash) + legth of the prefix.
func JwksConfigMapName(storePrefix, jwksUri string) string {
hash := md5.Sum([]byte(jwksUri)) //nolint:gosec
return fmt.Sprintf("%s-%s", storePrefix, hex.EncodeToString(hash[:]))
}

// Write out jwks' in updates to ConfigMaps, one jwks uri per ConfigMap. updates contains a map of jwks-uri to serialized jwks.
// Each ConfigMap is labelled with "app.kubernetes.io/component":"jwks-store" to support bulk loading of jwks' handled by LoadJwksFromConfigMaps().
func (cs *configMapSyncer) WriteJwksToConfigMaps(ctx context.Context, updates map[string]string) error {
log := log.FromContext(ctx)
errs := make([]error, 0)

for uri, jwks := range updates {
switch jwks {
case "": // empty jwks == remove the underlying ConfigMap
err := cs.cmAccessor.Delete(ctx, JwksConfigMapName(cs.storePrefix, uri))
if client.IgnoreNotFound(err) != nil {
log.Error(err, "error deleting jwks ConfigMap")
errs = append(errs, err)
}
default:
cmData, err := json.Marshal(map[string]string{uri: jwks})
if err != nil {
log.Error(err, "error serialiazing jwks")
errs = append(errs, err)
continue
}

existing := cs.cmAccessor.Get(JwksConfigMapName(cs.storePrefix, uri))
if existing == nil {
cm := cs.newJwksStoreConfigMap(JwksConfigMapName(cs.storePrefix, uri))
cm.Data[jwksStorePrefix] = string(cmData)

err := cs.cmAccessor.Create(ctx, cm)
if err != nil {
log.Error(err, "error persisting jwks to ConfigMap")
errs = append(errs, err)
continue
}
} else {
existing.Data[jwksStorePrefix] = string(cmData)
err = cs.cmAccessor.Update(ctx, existing)
if err != nil {
log.Error(err, "error updating jwks ConfigMap")
errs = append(errs, err)
continue
}
}
}
func SetJwksInConfigMap(cm *corev1.ConfigMap, uri, jwks string) error {
b, err := json.Marshal(map[string]string{uri: jwks})
if err != nil {
return err
}

return errors.Join(errs...)
cm.Data[configMapKey] = string(b)
return nil
}

// Loads all jwks persisted in ConfigMaps. The result is a map of jwks-uri to serialized jwks.
func (cs *configMapSyncer) LoadJwksFromConfigMaps(ctx context.Context) (map[string]string, error) {
log := log.FromContext(ctx)

allPersistedJwks := cs.cmAccessor.List()
allPersistedJwks := cs.cmCollection.List()

if len(allPersistedJwks) == 0 {
return nil, nil
Expand All @@ -158,52 +105,3 @@ func (cs *configMapSyncer) LoadJwksFromConfigMaps(ctx context.Context) (map[stri

return toret, errors.Join(errs...)
}

func (cs *configMapSyncer) newJwksStoreConfigMap(name string) *corev1.ConfigMap {
return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: cs.deploymentNamespace,
Labels: map[string]string{JwksStoreComponent: cs.storePrefix},
},
Data: make(map[string]string),
}
}

type defaultCmAccessor struct {
cmCollection krt.Collection[*corev1.ConfigMap]
client apiclient.Client
deploymentNamespace string
}

var _ cmAccessor = &defaultCmAccessor{}

func (cma *defaultCmAccessor) Create(ctx context.Context, newCm *corev1.ConfigMap) error {
_, err := cma.client.Kube().CoreV1().ConfigMaps(cma.deploymentNamespace).Create(ctx, newCm, metav1.CreateOptions{})
return err
}

func (cma *defaultCmAccessor) Update(ctx context.Context, existingCm *corev1.ConfigMap) error {
_, err := cma.client.Kube().CoreV1().ConfigMaps(cma.deploymentNamespace).Update(ctx, existingCm, metav1.UpdateOptions{})
return err
}

func (cma *defaultCmAccessor) Delete(ctx context.Context, cmName string) error {
return cma.client.Kube().CoreV1().ConfigMaps(cma.deploymentNamespace).Delete(ctx, cmName, metav1.DeleteOptions{})
}

func (cma *defaultCmAccessor) Get(cmName string) *corev1.ConfigMap {
cmPtr := cma.cmCollection.GetKey(types.NamespacedName{Namespace: cma.deploymentNamespace, Name: cmName}.String())
if cmPtr == nil {
return nil
}
return ptr.Flatten(cmPtr)
}

func (cma *defaultCmAccessor) List() []*corev1.ConfigMap {
return cma.cmCollection.List()
}

func (cma *defaultCmAccessor) WaitForCacheSync(ctx context.Context) bool {
return cma.client.Core().WaitForCacheSync("config_map_syncer/ConfigMaps", ctx.Done(), cma.cmCollection.HasSynced)
}
25 changes: 18 additions & 7 deletions pkg/agentgateway/jwks/jwks_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package jwks
import (
"encoding/json"
"errors"
"sync"

"github.com/go-jose/go-jose/v4"
)

type jwksCache struct {
l sync.Mutex
jwks map[string]string // jwks uri -> jwks
}

Expand All @@ -29,32 +31,41 @@ func (c *jwksCache) LoadJwksFromStores(storedJwks map[string]string) error {
errs = append(errs, err)
continue
}
newCache.compareAndAddJwks(uri, jwks)
newCache.addJwks(uri, jwks)
}

c.l.Lock()
c.jwks = newCache.jwks
c.l.Unlock()
return errors.Join(errs...)
}

func (c *jwksCache) GetJwks(uri string) (string, bool) {
c.l.Lock()
defer c.l.Unlock()

jwks, ok := c.jwks[uri]
return jwks, ok
}

// Add a jwks to cache. If an exact same jwks is already present in the cache, the result is a nop.
// TODO (dmitri-d) check for max size
func (c *jwksCache) compareAndAddJwks(uri string, jwks jose.JSONWebKeySet) (string, error) {
func (c *jwksCache) addJwks(uri string, jwks jose.JSONWebKeySet) (string, error) {
serializedJwks, err := json.Marshal(jwks)
if err != nil {
return "", err
}

if j, ok := c.jwks[uri]; ok {
if j == string(serializedJwks) {
return "", nil
}
}
c.l.Lock()
defer c.l.Unlock()

c.jwks[uri] = string(serializedJwks)
return c.jwks[uri], nil
}

// Remove jwks from cache.
func (c *jwksCache) deleteJwks(uri string) {
c.l.Lock()
delete(c.jwks, uri)
c.l.Unlock()
}
Loading