Skip to content

Commit

Permalink
Add cache specialized for access tokens
Browse files Browse the repository at this point in the history
Signed-off-by: Matheus Pimenta <[email protected]>
  • Loading branch information
matheuscscp committed Feb 23, 2025
1 parent 00d1ceb commit 2089a72
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 0 deletions.
10 changes: 10 additions & 0 deletions cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type storeOptions struct {
interval time.Duration
registerer prometheus.Registerer
metricsPrefix string
maxDuration time.Duration
}

// Options is a function that sets the store options.
Expand Down Expand Up @@ -75,3 +76,12 @@ func WithMetricsPrefix(prefix string) Options {
return nil
}
}

// WithMaxDuration sets the maximum duration for an item to be stored in a timed cache,
// like the TokenCache.
func WithMaxDuration(d time.Duration) Options {
return func(o *storeOptions) error {
o.maxDuration = d
return nil
}
}
131 changes: 131 additions & 0 deletions cache/token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
Copyright 2025 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"sync"
"time"
)

// Token is an interface that represents an access token that can be used
// to authenticate with a cloud provider. The only common method is to get the
// duration of the token, because different providers may have different ways to
// represent the token. For example, Azure and GCP use an opaque string token,
// while AWS uses the pair of access key id and secret access key. Consumers of
// this token should know what type to cast this interface to.
type Token interface {
// GetDuration returns the duration for which the token is valid relative to
// approximately time.Now(). This is used to determine when the token should
// be refreshed.
GetDuration() time.Duration
}

// TokenCache is a thread-safe cache specialized in storing and retrieving
// access tokens. It uses an LRU cache as the underlying storage and takes
// care of expiring tokens in a pessimistic way by storing both a timestamp
// with a monotonic clock (the Go default) and an absolute timestamp created
// from the Unix timestamp of when the token was created. The token is
// considered expired when either timestamps are older than the current time.
// This strategy ensures that expired tokens aren't kept in the cache for
// longer than their expiration time. Also, tokens expire on 80% of their
// lifetime, which is the same strategy used by kubelet for rotating
// ServiceAccount tokens.
type TokenCache struct {
cache *LRU[*tokenItem]
maxDuration time.Duration
mu sync.Mutex
}

type tokenItem struct {
token Token
mono time.Time
unix time.Time
}

func (ti *tokenItem) expired() bool {
now := time.Now()
return ti.mono.Before(now) || ti.unix.Before(now)
}

// NewTokenCache returns a new TokenCache with the given capacity.
func NewTokenCache(capacity int, opts ...Options) *TokenCache {
var options storeOptions
for _, o := range opts {
_ = o(&options)
}
cache, _ := NewLRU[*tokenItem](capacity, opts...)
return &TokenCache{cache: cache, maxDuration: options.maxDuration}
}

// Get returns the token for the given key, or nil if the key is not in the cache.
func (c *TokenCache) Get(key string) Token {
c.mu.Lock()
defer c.mu.Unlock()

item, err := c.cache.Get(key)
if err != nil {
return nil
}

if item.expired() {
c.cache.Delete(key)
return nil
}

return item.token
}

// Set adds a token to the cache with the given key.
func (c *TokenCache) Set(key string, token Token) {
item := c.newTokenItem(token)
c.mu.Lock()
c.cache.Set(key, item)
c.mu.Unlock()
}

// RecordCacheEvent records a cache event (cache_miss or cache_hit) with kind,
// name and namespace of the associated object being reconciled.
func (c *TokenCache) RecordCacheEvent(event, kind, name, namespace string) {
c.cache.RecordCacheEvent(event, kind, name, namespace)
}

// DeleteCacheEvent deletes the cache event (cache_miss or cache_hit) metric for
// the associated object being reconciled, given their kind, name and namespace.
func (c *TokenCache) DeleteCacheEvent(event, kind, name, namespace string) {
c.cache.DeleteCacheEvent(event, kind, name, namespace)
}

func (c *TokenCache) newTokenItem(token Token) *tokenItem {
// Kubelet rotates ServiceAccount tokens when 80% of their lifetime has
// passed, so we'll use the same threshold to consider tokens expired.
//
// Ref: https://github.com/kubernetes/kubernetes/blob/4032177faf21ae2f99a2012634167def2376b370/pkg/kubelet/token/token_manager.go#L172-L174
d := (token.GetDuration() * 8) / 10

if m := c.maxDuration; 0 < m && m < d {
d = m
}

mono := time.Now().Add(d)
unix := time.Unix(mono.Unix(), 0)

return &tokenItem{
token: token,
mono: mono,
unix: unix,
}
}
106 changes: 106 additions & 0 deletions cache/token_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright 2025 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache_test

import (
"testing"
"time"

. "github.com/onsi/gomega"

"github.com/fluxcd/pkg/cache"
)

type testToken struct {
duration time.Duration
}

func (t *testToken) GetDuration() time.Duration {
return t.duration
}

func TestTokenCache_Lifecycle(t *testing.T) {
g := NewWithT(t)

tc := cache.NewTokenCache(1)

retrieved := tc.Get("test")
g.Expect(retrieved).To(BeNil())

token := &testToken{duration: 100 * time.Second}
tc.Set("test", token)
retrieved = tc.Get("test")
g.Expect(retrieved).To(Equal(token))

token2 := &testToken{duration: 3 * time.Second}
tc.Set("test", token2)
retrieved = tc.Get("test")
g.Expect(retrieved).To(Equal(token2))
g.Expect(retrieved).NotTo(Equal(token))

time.Sleep(3 * time.Second)
retrieved = tc.Get("test")
g.Expect(retrieved).To(BeNil())
}

func TestTokenCache_Expiration(t *testing.T) {
for _, tt := range []struct {
name string
opts []cache.Options
tokenDuration time.Duration
sleepDuration time.Duration
expected bool
}{
{
name: "token does not expire before 80 percent of its duration",
tokenDuration: 5 * time.Second,
sleepDuration: 3 * time.Second,
expected: true,
},
{
name: "token expires after 80 percent of its duration",
tokenDuration: 1 * time.Second,
sleepDuration: 810 * time.Millisecond,
expected: false,
},
{
name: "token with expiration longer than cache max duration expires on max duration",
opts: []cache.Options{cache.WithMaxDuration(1 * time.Second)},
tokenDuration: time.Hour,
sleepDuration: 2 * time.Second,
expected: false,
},
} {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

tc := cache.NewTokenCache(1, tt.opts...)

token := &testToken{duration: tt.tokenDuration}
tc.Set("test", token)

time.Sleep(tt.sleepDuration)

retrieved := tc.Get("test")
if tt.expected {
g.Expect(retrieved).NotTo(BeNil())
} else {
g.Expect(retrieved).To(BeNil())
}
})
}
}

0 comments on commit 2089a72

Please sign in to comment.