Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 38 additions & 18 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type Option func(*options)

type options struct {
getSources source.GetSources
invalidateHosts source.InvalidateHosts
resolveHandlers map[string]remote.Handler
metadataStore metadata.Store
overlayOpaqueType layer.OverlayOpaqueType
Expand All @@ -177,6 +178,12 @@ func WithGetSources(s source.GetSources) Option {
}
}

func WithInvalidateHosts(f source.InvalidateHosts) Option {
return func(opts *options) {
opts.invalidateHosts = f
}
}

func WithResolveHandler(name string, handler remote.Handler) Option {
return func(opts *options) {
if opts.resolveHandlers == nil {
Expand Down Expand Up @@ -320,6 +327,7 @@ func NewFilesystem(ctx context.Context, root string, cfg config.FSConfig, opts .
ctx: ctx,
resolver: r,
getSources: getSources,
invalidateHosts: fsOpts.invalidateHosts,
debug: cfg.Debug,
layer: make(map[string]layer.Layer),
disableVerification: cfg.DisableVerification,
Expand Down Expand Up @@ -414,6 +422,7 @@ type filesystem struct {
layerMu sync.Mutex
disableVerification bool
getSources source.GetSources
invalidateHosts source.InvalidateHosts
metricsController *layermetrics.Controller
attrTimeout time.Duration
entryTimeout time.Duration
Expand Down Expand Up @@ -1207,31 +1216,42 @@ func (fs *filesystem) check(ctx context.Context, l layer.Layer, labels map[strin
}
log.G(ctx).WithError(err).Warn("failed to connect to blob")

// Check failed. Try to refresh the connection with fresh source information
// Try to refresh with current (possibly cached) sources.
src, err := fs.refreshLayer(ctx, l, labels)
if err == nil {
return nil
}

// Invalidate cached registry hosts and retry with fresh credentials.
if fs.invalidateHosts != nil {
for _, s := range src {
fs.invalidateHosts(s.Name.String())
}
_, err = fs.refreshLayer(ctx, l, labels)
return err
}

return err
}

// refreshLayer fetches sources and attempts to refresh the layer connection.
// It returns the sources that were tried (for cache invalidation) and any error.
func (fs *filesystem) refreshLayer(ctx context.Context, l layer.Layer, labels map[string]string) ([]source.Source, error) {
src, err := fs.getSources(labels)
if err != nil {
return err
return nil, err
}
var (
retrynum = 1
rErr = fmt.Errorf("failed to refresh connection")
)
for retry := 0; retry < retrynum; retry++ {
log.G(ctx).Warnf("refreshing(%d)...", retry)
for _, s := range src {
err := l.Refresh(ctx, s.Hosts, s.Name, s.Target)
if err == nil {
log.G(ctx).Debug("Successfully refreshed connection")
return nil
}
var errs []error
for _, s := range src {
if err := l.Refresh(ctx, s.Hosts, s.Name, s.Target); err == nil {
return src, nil
} else {
log.G(ctx).WithError(err).Warnf("failed to refresh the layer %q from %q",
s.Target.Digest, s.Name)
rErr = fmt.Errorf("failed(layer:%q, ref:%q): %v: %w",
s.Target.Digest, s.Name, err, rErr)
errs = append(errs, err)
}
}

return rErr
return src, errors.Join(errs...)
}

func isIDMappedDir(mountpoint string) bool {
Expand Down
4 changes: 4 additions & 0 deletions fs/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ const (
// to reduce package dependency
type RegistryHosts func(imgRefSpec reference.Spec) ([]docker.RegistryHost, error)

// InvalidateHosts invalidates cached registry host configurations for a given
// image reference, forcing fresh credentials to be fetched on the next request.
type InvalidateHosts func(ref string)

// FromDefaultLabels returns a function for converting snapshot labels to
// source information based on labels.
func FromDefaultLabels(hosts RegistryHosts) GetSources {
Expand Down
253 changes: 253 additions & 0 deletions integration/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,16 @@ package integration
import (
"bufio"
"bytes"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"math/big"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
Expand All @@ -48,6 +56,7 @@ import (
"github.com/awslabs/soci-snapshotter/config"
"github.com/awslabs/soci-snapshotter/soci/store"
shell "github.com/awslabs/soci-snapshotter/util/dockershell"
"github.com/awslabs/soci-snapshotter/util/dockershell/compose"
"github.com/awslabs/soci-snapshotter/util/testutil"
)

Expand Down Expand Up @@ -875,3 +884,247 @@ func TestRunWithIdMap(t *testing.T) {
}
}
}

// TestCredentialRefreshOnCheckFailure verifies that when registry credentials
// are rotated, the snapshotter recovers by invalidating its cached registry
// hosts and creating a fresh AuthClient.
//
// The test uses Bearer token auth. When the token server password changes,
// the old AuthClient's docker.Authorizer has a cached Bearer handler that
// tries to fetch a new token with the OLD password. The token server rejects
// it. This error occurs inside doBearerAuth/fetchToken — AddResponses is never
// called, so the handler is never deleted. The Authorizer is stuck.
//
// With InvalidateRegistryHosts, the cache is cleared, a fresh Authorizer
// (with empty handlers map) is created, and on the next 401 from the registry,
// AddResponses creates a new handler with fresh credentials from docker config.
func TestCredentialRefreshOnCheckFailure(t *testing.T) {
const containerImage = alpineImage

regConfig := newRegistryConfig()
caCertDir := "/usr/local/share/ca-certificates"

pRoot, err := testutil.GetProjectRoot()
if err != nil {
t.Fatal(err)
}

// Generate RSA cert with both registry host and localhost SANs
// (localhost needed for token server realm URL).
certPEM, keyPEM, err := generateSelfSignedCertMultiSAN(regConfig.host, "localhost")
if err != nil {
t.Fatal(err)
}

hostVolumeMount := t.TempDir()
authDir := filepath.Join(hostVolumeMount, "auth")
if err := os.MkdirAll(authDir, 0777); err != nil {
t.Fatal(err)
}
for name, data := range map[string][]byte{
"domain.crt": certPEM,
"domain.key": keyPEM,
"signing.key": keyPEM, // same RSA key for token signing
"signing.crt": certPEM, // registry verifies JWTs with this cert
"password.txt": []byte(regConfig.pass),
} {
if err := os.WriteFile(filepath.Join(authDir, name), data, 0666); err != nil {
t.Fatalf("failed to write %s: %v", name, err)
}
}

// Build the token server binary for Linux (test containers are Linux).
tokenServerSrc := filepath.Join(pRoot, "integration", "tokenserver")
tokenServerBin := filepath.Join(authDir, "tokenserver")
buildCmd := exec.Command("go", "build", "-o", tokenServerBin, tokenServerSrc)
buildCmd.Env = append(os.Environ(), "CGO_ENABLED=0")
if out, err := buildCmd.CombinedOutput(); err != nil {
t.Fatalf("failed to build token server: %v\n%s", err, out)
}

// Zot config (unused for registry:3.0.0 but needed by compose template infra).
zotDir := filepath.Join(hostVolumeMount, "etc/zot")
if err := os.MkdirAll(zotDir, 0777); err != nil {
t.Fatal(err)
}
zotCfg, err := testutil.ApplyTextTemplate(zotConfigTemplate, zotConfigStruct{Address: regConfig.host})
if err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(zotDir, "config.json"), []byte(zotCfg), 0666); err != nil {
t.Fatal(err)
}

// Custom compose template: registry with Bearer token auth.
// The token realm points to localhost:5001 (token server inside testing container).
composeTemplate := fmt.Sprintf(`
services:
testing:
image: soci_base:soci_test
privileged: true
init: true
entrypoint: [ "/integ_entrypoint.sh" ]
environment:
- NO_PROXY=127.0.0.1,localhost,%s:443
- GOCOVERDIR=/test_coverage
tmpfs:
- /tmp:exec,mode=777
- /var/lib/containerd
- /var/lib/soci-snapshotter-grpc
volumes:
- /dev/fuse:/dev/fuse
- {{.ImageContextDir}}/cov/integration:/test_coverage
- {{.HostVolumeMount}}/auth:/auth
registry:
image: {{.RegistryImageRef}}
container_name: %s
environment:
- REGISTRY_AUTH=token
- REGISTRY_AUTH_TOKEN_REALM=https://localhost:5001/token
- REGISTRY_AUTH_TOKEN_SERVICE=registry
- REGISTRY_AUTH_TOKEN_ISSUER=test-issuer
- REGISTRY_AUTH_TOKEN_ROOTCERTBUNDLE=/auth/signing.crt
- REGISTRY_HTTP_TLS_CERTIFICATE=/auth/domain.crt
- REGISTRY_HTTP_TLS_KEY=/auth/domain.key
- REGISTRY_HTTP_ADDR=%s:443
- REGISTRY_STORAGE_DELETE_ENABLED=true
volumes:
- {{.HostVolumeMount}}/auth:/auth:ro
- {{.HostVolumeMount}}/etc/zot/config.json:/etc/zot/config.json:ro
`, regConfig.host, regConfig.host, regConfig.host)

reporter := testutil.NewTestingReporter(t)
s, err := testutil.ApplyTextTemplate(composeTemplate, dockerComposeYaml{
ImageContextDir: pRoot,
RegistryImageRef: "public.ecr.aws/docker/library/registry:2",
HostVolumeMount: hostVolumeMount,
})
if err != nil {
t.Fatal(err)
}
buildArgs, err := getBuildArgsFromEnv()
if err != nil {
t.Fatal(err)
}
c, err := compose.Up(s,
compose.WithBuildArgs(buildArgs...),
compose.WithStdio(reporter.Stdout(), reporter.Stderr()))
if err != nil {
t.Fatalf("compose up failed: %v", err)
}
defer c.Cleanup()
de, ok := c.Get("testing")
if !ok {
t.Fatal("failed to get testing shell")
}
sh := shell.New(de, reporter)

// Install TLS cert.
crtPath := filepath.Join(caCertDir, "domain.crt")
if err := testutil.WriteFileContents(sh, crtPath, certPEM, 0600); err != nil {
t.Fatal(err)
}
sh.X("trust", "anchor", crtPath)

// Start the token server inside the testing container.
// The binary is built on the host and mounted via the auth volume.
// Use short-lived tokens (3s). After expiry, the cached Bearer handler
// in docker.Authorizer tries to re-fetch a token from the token server
// using the baked-in credentials. If the password has changed, fetchToken
// fails inside doBearerAuth — AddResponses is never called, so the handler
// is never deleted. The Authorizer is stuck with stale credentials.
sh.Gox("/auth/tokenserver",
"--addr", ":5001",
"--cert", "/auth/domain.crt",
"--key", "/auth/domain.key",
"--signing-key", "/auth/signing.key",
"--password-file", "/auth/password.txt",
"--issuer", "test-issuer",
"--token-ttl", "3")
// Wait for token server to start.
sh.Retry(30, "curl", "-sk", "https://localhost:5001/token")

// Login to the registry (fetches Bearer token via token server).
sh.Retry(100, "nerdctl", "login", "-u", regConfig.user, "-p", regConfig.pass, regConfig.host)

// check_always forces check() on every Mounts() call.
withCheckAlways := func(cfg *config.Config) {
cfg.ServiceConfig.FSConfig.BlobConfig.CheckAlways = true
}

rebootContainerd(t, sh, getContainerdConfigToml(t, false), getSnapshotterConfigToml(t, withCheckAlways, withDisableBgFetcher))
copyImage(sh, dockerhub(containerImage), regConfig.mirror(containerImage))
image := regConfig.mirror(containerImage).ref
indexDigest := buildIndex(sh, regConfig.mirror(containerImage), withMinLayerSize(0), withSpanSize(1<<20))
sh.X("soci", "push", "--user", regConfig.creds(), regConfig.mirror(containerImage).ref)

rebootContainerd(t, sh, getContainerdConfigToml(t, false), getSnapshotterConfigToml(t, withCheckAlways, withDisableBgFetcher))
sh.X(append(imagePullCmd, "--soci-index-digest", indexDigest, image)...)

// First run: warms up registryHostMap cache. The docker.Authorizer creates
// a Bearer handler that caches the token and token server credentials.
sh.X(append(runSociCmd, "--name", "warmup", "--rm", image, "echo", "ok")...)

// Wait for the short-lived token to expire (3s TTL + buffer).
sh.X("sleep", "5")

// Rotate the token server password.
newPass := "rotatedpass"
if err := os.WriteFile(filepath.Join(authDir, "password.txt"), []byte(newPass), 0666); err != nil {
t.Fatal(err)
}

// Login with the new password (updates docker config keychain).
sh.X("nerdctl", "login", "-u", regConfig.user, "-p", newPass, regConfig.host)

// Second run: check() fires (check_always=true). The cached AuthClient's
// docker.Authorizer has a Bearer handler whose token has expired.
// doBearerAuth calls fetchToken with the OLD password baked into the
// handler's TokenOptions. The token server rejects it. This error occurs
// inside doBearerAuth — AddResponses is never called, so the handler is
// never deleted. The Authorizer is stuck with stale credentials.
//
// Without InvalidateRegistryHosts: stuck with stale handler → permanent failure.
// With InvalidateRegistryHosts: cache cleared → fresh Authorizer → empty
// handlers map → first request goes unauthenticated → registry returns 401
// Bearer challenge → AddResponses creates new handler with fresh password
// from docker config → fetchToken succeeds → container runs.
_, err = sh.OLog(append(runSociCmd, "--name", "after-rotation", "--rm", image, "echo", "ok")...)
if err != nil {
t.Fatalf("expected container run to succeed after credential rotation, got: %v", err)
}
}

func generateSelfSignedCertMultiSAN(hosts ...string) (crt, key []byte, _ error) {
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 60)
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
if err != nil {
return nil, nil, err
}
template := x509.Certificate{
IsCA: true,
BasicConstraintsValid: true,
SerialNumber: serialNumber,
Subject: pkix.Name{CommonName: hosts[0]},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(1, 0, 0),
KeyUsage: x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
DNSNames: hosts,
}
privatekey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return nil, nil, err
}
cert, err := x509.CreateCertificate(rand.Reader, &template, &template, &privatekey.PublicKey, privatekey)
if err != nil {
return nil, nil, err
}
certPem := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})
privBytes, err := x509.MarshalPKCS8PrivateKey(privatekey)
if err != nil {
return nil, nil, err
}
keyPem := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})
return certPem, keyPem, nil
}
Loading