Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache specialized for access tokens #870

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions cache/token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Copyright 2025 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"sync"
"time"
)

// Token is an interface that represents an access token that can be used
// to authenticate with a cloud provider. The only common method is to get the
// duration of the token, because different providers may have different ways to
// represent the token. For example, Azure and GCP use an opaque string token,
// while AWS uses the pair of access key id and secret access key. Consumers of
// this token should know what type to cast this interface to.
type Token interface {
// GetDuration returns the duration for which the token is valid relative to
// approximately time.Now(). This is used to determine when the token should
// be refreshed.
GetDuration() time.Duration
}

// TokenCache is a thread-safe cache specialized in storing and retrieving
// access tokens. It uses an LRU cache as the underlying storage and takes
// care of expiring tokens in a pessimistic way by storing both a timestamp
// with a monotonic clock (the Go default) and an absolute timestamp created
// from the Unix timestamp of when the token was created. The token is
// considered expired when either timestamps are older than the current time.
// This strategy ensures that expired tokens aren't kept in the cache for
// longer than their expiration time. Also, tokens expire on 80% of their
// lifetime, which is the same strategy used by kubelet for rotating
// ServiceAccount tokens.
type TokenCache struct {
cache *LRU[*tokenItem]
mu sync.Mutex
}

type tokenItem struct {
token Token
mono time.Time
unix time.Time
}

func (ti *tokenItem) expired() bool {
now := time.Now()
return ti.mono.Before(now) || ti.unix.Before(now)
}

// NewTokenCache returns a new TokenCache with the given capacity.
func NewTokenCache(capacity int, opts ...Options) *TokenCache {
cache, _ := NewLRU[*tokenItem](capacity, opts...)
return &TokenCache{cache: cache}
}

// Get returns the token for the given key, or nil if the key is not in the cache.
func (c *TokenCache) Get(key string) Token {
c.mu.Lock()
defer c.mu.Unlock()

item, err := c.cache.Get(key)
if err != nil {
return nil
}

if item.expired() {
c.cache.Delete(key)
return nil
}

return item.token
}

// Set adds a token to the cache with the given key.
func (c *TokenCache) Set(key string, token Token) {
item := c.newTokenItem(token)
c.mu.Lock()
c.cache.Set(key, item)
c.mu.Unlock()
}

// RecordCacheEvent records a cache event (cache_miss or cache_hit) with kind,
// name and namespace of the associated object being reconciled.
func (c *TokenCache) RecordCacheEvent(event, kind, name, namespace string) {
c.cache.RecordCacheEvent(event, kind, name, namespace)
}

// DeleteCacheEvent deletes the cache event (cache_miss or cache_hit) metric for
// the associated object being reconciled, given their kind, name and namespace.
func (c *TokenCache) DeleteCacheEvent(event, kind, name, namespace string) {
c.cache.DeleteCacheEvent(event, kind, name, namespace)
}

func (c *TokenCache) newTokenItem(token Token) *tokenItem {
// Kubelet rotates ServiceAccount tokens when 80% of their lifetime has
// passed, so we'll use the same threshold to consider tokens expired.
//
// Ref: https://github.com/kubernetes/kubernetes/blob/4032177faf21ae2f99a2012634167def2376b370/pkg/kubelet/token/token_manager.go#L172-L174
d := (token.GetDuration() * 8) / 10

mono := time.Now().Add(d)
unix := time.Unix(mono.Unix(), 0)

return &tokenItem{
token: token,
mono: mono,
unix: unix,
}
}
106 changes: 106 additions & 0 deletions cache/token_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright 2025 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache_test

import (
"testing"
"time"

. "github.com/onsi/gomega"

"github.com/fluxcd/pkg/cache"
)

type testToken struct {
duration time.Duration
}

func (t *testToken) GetDuration() time.Duration {
return t.duration
}

func TestTokenCache_Lifecycle(t *testing.T) {
g := NewWithT(t)

tc := cache.NewTokenCache(1)

retrieved := tc.Get("test")
g.Expect(retrieved).To(BeNil())

token := &testToken{duration: 100 * time.Second}
tc.Set("test", token)
retrieved = tc.Get("test")
g.Expect(retrieved).To(Equal(token))

token2 := &testToken{duration: 3 * time.Second}
tc.Set("test", token2)
retrieved = tc.Get("test")
g.Expect(retrieved).To(Equal(token2))
g.Expect(retrieved).NotTo(Equal(token))

time.Sleep(3 * time.Second)
retrieved = tc.Get("test")
g.Expect(retrieved).To(BeNil())
}

func TestTokenCache_Expiration(t *testing.T) {
for _, tt := range []struct {
name string
opts []cache.Options
tokenDuration time.Duration
sleepDuration time.Duration
expected bool
}{
{
name: "token does not expire before 80 percent of its duration",
tokenDuration: 5 * time.Second,
sleepDuration: 3 * time.Second,
expected: true,
},
{
name: "token expires after 80 percent of its duration",
tokenDuration: 1 * time.Second,
sleepDuration: 810 * time.Millisecond,
expected: false,
},
{
name: "token with expiration longer than cache max duration expires on max duration",
opts: []cache.Options{cache.WithMaxDuration(1 * time.Second)},
tokenDuration: time.Hour,
sleepDuration: 2 * time.Second,
expected: false,
},
} {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

tc := cache.NewTokenCache(1, tt.opts...)

token := &testToken{duration: tt.tokenDuration}
tc.Set("test", token)

time.Sleep(tt.sleepDuration)

retrieved := tc.Get("test")
if tt.expected {
g.Expect(retrieved).NotTo(BeNil())
} else {
g.Expect(retrieved).To(BeNil())
}
})
}
}
Loading