-
Notifications
You must be signed in to change notification settings - Fork 96
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add cache specialized for access tokens
Signed-off-by: Matheus Pimenta <[email protected]>
- Loading branch information
1 parent
00d1ceb
commit 73243ae
Showing
2 changed files
with
228 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
Copyright 2025 The Flux authors | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package cache | ||
|
||
import ( | ||
"sync" | ||
"time" | ||
) | ||
|
||
// Token is an interface that represents an access token that can be used | ||
// to authenticate with a cloud provider. The only common method is to get the | ||
// duration of the token, because different providers may have different ways to | ||
// represent the token. For example, Azure and GCP use an opaque string token, | ||
// while AWS uses the pair of access key id and secret access key. Consumers of | ||
// this token should know what type to cast this interface to. | ||
type Token interface { | ||
// GetDuration returns the duration for which the token is valid relative to | ||
// approximately time.Now(). This is used to determine when the token should | ||
// be refreshed. | ||
GetDuration() time.Duration | ||
} | ||
|
||
// TokenCache is a thread-safe cache specialized in storing and retrieving | ||
// access tokens. It uses an LRU cache as the underlying storage and takes | ||
// care of expiring tokens in a pessimistic way by storing both a timestamp | ||
// with a monotonic clock (the Go default) and an absolute timestamp created | ||
// from the Unix timestamp of when the token was created. The token is | ||
// considered expired when either timestamps are older than the current time. | ||
// This strategy ensures that expired tokens aren't kept in the cache for | ||
// longer than their expiration time. Also, tokens expire on 80% of their | ||
// lifetime, which is the same strategy used by kubelet for rotating | ||
// ServiceAccount tokens. | ||
type TokenCache struct { | ||
cache *LRU[*tokenItem] | ||
mu sync.Mutex | ||
} | ||
|
||
type tokenItem struct { | ||
token Token | ||
mono time.Time | ||
unix time.Time | ||
} | ||
|
||
func (ti *tokenItem) expired() bool { | ||
now := time.Now() | ||
return ti.mono.Before(now) || ti.unix.Before(now) | ||
} | ||
|
||
// NewTokenCache returns a new TokenCache with the given capacity. | ||
func NewTokenCache(capacity int, opts ...Options) *TokenCache { | ||
cache, _ := NewLRU[*tokenItem](capacity, opts...) | ||
return &TokenCache{cache: cache} | ||
} | ||
|
||
// Get returns the token for the given key, or nil if the key is not in the cache. | ||
func (c *TokenCache) Get(key string) Token { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
|
||
item, err := c.cache.Get(key) | ||
if err != nil { | ||
return nil | ||
} | ||
|
||
if item.expired() { | ||
c.cache.Delete(key) | ||
return nil | ||
} | ||
|
||
return item.token | ||
} | ||
|
||
// Set adds a token to the cache with the given key. | ||
func (c *TokenCache) Set(key string, token Token) { | ||
item := c.newTokenItem(token) | ||
c.mu.Lock() | ||
c.cache.Set(key, item) | ||
c.mu.Unlock() | ||
} | ||
|
||
// RecordCacheEvent records a cache event (cache_miss or cache_hit) with kind, | ||
// name and namespace of the associated object being reconciled. | ||
func (c *TokenCache) RecordCacheEvent(event, kind, name, namespace string) { | ||
c.cache.RecordCacheEvent(event, kind, name, namespace) | ||
} | ||
|
||
// DeleteCacheEvent deletes the cache event (cache_miss or cache_hit) metric for | ||
// the associated object being reconciled, given their kind, name and namespace. | ||
func (c *TokenCache) DeleteCacheEvent(event, kind, name, namespace string) { | ||
c.cache.DeleteCacheEvent(event, kind, name, namespace) | ||
} | ||
|
||
func (c *TokenCache) newTokenItem(token Token) *tokenItem { | ||
// Kubelet rotates ServiceAccount tokens when 80% of their lifetime has | ||
// passed, so we'll use the same threshold to consider tokens expired. | ||
// | ||
// Ref: https://github.com/kubernetes/kubernetes/blob/4032177faf21ae2f99a2012634167def2376b370/pkg/kubelet/token/token_manager.go#L172-L174 | ||
d := (token.GetDuration() * 8) / 10 | ||
|
||
mono := time.Now().Add(d) | ||
unix := time.Unix(mono.Unix(), 0) | ||
|
||
return &tokenItem{ | ||
token: token, | ||
mono: mono, | ||
unix: unix, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
Copyright 2025 The Flux authors | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package cache_test | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
. "github.com/onsi/gomega" | ||
|
||
"github.com/fluxcd/pkg/cache" | ||
) | ||
|
||
type testToken struct { | ||
duration time.Duration | ||
} | ||
|
||
func (t *testToken) GetDuration() time.Duration { | ||
return t.duration | ||
} | ||
|
||
func TestTokenCache_Lifecycle(t *testing.T) { | ||
g := NewWithT(t) | ||
|
||
tc := cache.NewTokenCache(1) | ||
|
||
retrieved := tc.Get("test") | ||
g.Expect(retrieved).To(BeNil()) | ||
|
||
token := &testToken{duration: 100 * time.Second} | ||
tc.Set("test", token) | ||
retrieved = tc.Get("test") | ||
g.Expect(retrieved).To(Equal(token)) | ||
|
||
token2 := &testToken{duration: 3 * time.Second} | ||
tc.Set("test", token2) | ||
retrieved = tc.Get("test") | ||
g.Expect(retrieved).To(Equal(token2)) | ||
g.Expect(retrieved).NotTo(Equal(token)) | ||
|
||
time.Sleep(3 * time.Second) | ||
retrieved = tc.Get("test") | ||
g.Expect(retrieved).To(BeNil()) | ||
} | ||
|
||
func TestTokenCache_Expiration(t *testing.T) { | ||
for _, tt := range []struct { | ||
name string | ||
opts []cache.Options | ||
tokenDuration time.Duration | ||
sleepDuration time.Duration | ||
expected bool | ||
}{ | ||
{ | ||
name: "token does not expire before 80 percent of its duration", | ||
tokenDuration: 5 * time.Second, | ||
sleepDuration: 3 * time.Second, | ||
expected: true, | ||
}, | ||
{ | ||
name: "token expires after 80 percent of its duration", | ||
tokenDuration: 1 * time.Second, | ||
sleepDuration: 810 * time.Millisecond, | ||
expected: false, | ||
}, | ||
{ | ||
name: "token with expiration longer than cache max duration expires on max duration", | ||
opts: []cache.Options{cache.WithMaxDuration(1 * time.Second)}, | ||
tokenDuration: time.Hour, | ||
sleepDuration: 2 * time.Second, | ||
expected: false, | ||
}, | ||
} { | ||
t.Run(tt.name, func(t *testing.T) { | ||
g := NewWithT(t) | ||
|
||
tc := cache.NewTokenCache(1, tt.opts...) | ||
|
||
token := &testToken{duration: tt.tokenDuration} | ||
tc.Set("test", token) | ||
|
||
time.Sleep(tt.sleepDuration) | ||
|
||
retrieved := tc.Get("test") | ||
if tt.expected { | ||
g.Expect(retrieved).NotTo(BeNil()) | ||
} else { | ||
g.Expect(retrieved).To(BeNil()) | ||
} | ||
}) | ||
} | ||
} |