Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ func BuildFlagSet(flagSet *pflag.FlagSet) error {
return err
}

flagSet.BoolP("enable-metadata-prefetch", "", false, "Enables background prefetching of object metadata when a directory is first opened. This reduces latency for subsequent file lookups by pre-filling the metadata cache.")
flagSet.BoolP("enable-metadata-prefetch", "", true, "Enables background prefetching of object metadata when a directory is first opened. This reduces latency for subsequent file lookups by pre-filling the metadata cache.")

if err := flagSet.MarkHidden("enable-metadata-prefetch"); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion cfg/params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ params:
usage: >-
Enables background prefetching of object metadata when a directory is first opened.
This reduces latency for subsequent file lookups by pre-filling the metadata cache.
default: false
default: true
deprecated: false
hide-flag: true

Expand Down
2 changes: 1 addition & 1 deletion internal/fs/inode/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func NewDirInode(
}
// readObjectsUnlocked is used by the prefetcher so the background worker performs GCS I/O without the lock,
// acquiring d.mu only to update the cache.
typed.prefetcher = NewMetadataPrefetcher(cfg, prefetchSem, typed.readObjectsUnlocked)
typed.prefetcher = NewMetadataPrefetcher(cfg, prefetchSem, cacheClock, typed.readObjectsUnlocked)

var cache metadata.TypeCache
if !cfg.EnableTypeCacheDeprecation {
Expand Down
27 changes: 24 additions & 3 deletions internal/fs/inode/dir_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/googlecloudplatform/gcsfuse/v3/cfg"
"github.com/googlecloudplatform/gcsfuse/v3/internal/logger"
"github.com/jacobsa/timeutil"
"golang.org/x/sync/semaphore"
)

Expand All @@ -35,10 +36,13 @@ type MetadataPrefetcher struct {
// Variables for metadata prefetching.
enabled bool
metadataCacheTTL time.Duration
statCacheSize int64
state atomic.Uint32 // 0=Ready, 1=InProgress
ctx context.Context
cancel context.CancelFunc
maxPrefetchCount int64
cacheClock timeutil.Clock
lastPrefetchTime atomic.Pointer[time.Time]
// isLargeDir indicates if the directory size exceeds maxPrefetchCount.
// If true, we start prefetching from the looked-up object's offset.
isLargeDir atomic.Bool
Expand All @@ -53,16 +57,19 @@ type MetadataPrefetcher struct {
func NewMetadataPrefetcher(
cfg *cfg.Config,
prefetchSem *semaphore.Weighted, // Shared semaphore across all MetadataPrefetchers.
cacheClock timeutil.Clock,
listFunc func(context.Context, string, string, int) (map[Name]*Core, []string, string, error),
) *MetadataPrefetcher {
// Initialize a new context for metadata prefetch worker so it can run in background.
ctx, cancel := context.WithCancel(context.Background())
return &MetadataPrefetcher{
enabled: cfg.MetadataCache.EnableMetadataPrefetch,
metadataCacheTTL: time.Duration(cfg.MetadataCache.TtlSecs) * time.Second,
statCacheSize: cfg.MetadataCache.StatCacheMaxSizeMb,
ctx: ctx,
cancel: cancel,
maxPrefetchCount: cfg.MetadataCache.MetadataPrefetchEntriesLimit,
cacheClock: cacheClock,
sem: prefetchSem,
listCallFunc: listFunc,
// state is 0 (prefetchReady) by default.
Expand All @@ -75,9 +82,20 @@ func (p *MetadataPrefetcher) Run(fullObjectName string) {
// Do not trigger prefetching if:
// 1. metadata prefetch config is disabled.
// 2. metadata cache ttl is 0 (disabled).
// 3. prefetch state is in progress already.
if !p.enabled || p.metadataCacheTTL == 0 || !p.state.CompareAndSwap(prefetchReady, prefetchInProgress) {
// Another prefetch is already in progress. Abort.
// 3. stat cache size is 0.
if !p.enabled || p.metadataCacheTTL == 0 || p.statCacheSize == 0 {
return
}

// 4. Trigger prefetch only if last prefetch time was before metadata cache ttl expiry.
lastPrefetchTime := p.lastPrefetchTime.Load()
now := p.cacheClock.Now()
if lastPrefetchTime != nil && now.Sub(*lastPrefetchTime) < p.metadataCacheTTL {
return
}

// 5. Ensure only one prefetch runs at a time for this directory.
if !p.state.CompareAndSwap(prefetchReady, prefetchInProgress) {
return
}

Expand Down Expand Up @@ -141,6 +159,9 @@ func (p *MetadataPrefetcher) Run(fullObjectName string) {
}
continuationToken = newTok
}
// Update lastPrefetchTime on successful completion.
now := p.cacheClock.Now()
p.lastPrefetchTime.Store(&now)
}()
}

Expand Down
67 changes: 62 additions & 5 deletions internal/fs/inode/dir_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -56,6 +57,7 @@ func (t *DirPrefetchTest) setup(enablePrefetch bool, ttl time.Duration) (d *dirI
MetadataCache: cfg.MetadataCacheConfig{
EnableMetadataPrefetch: enablePrefetch,
TypeCacheMaxSizeMb: 400,
StatCacheMaxSizeMb: 400,
TtlSecs: 60,
MetadataPrefetchEntriesLimit: 5000,
},
Expand Down Expand Up @@ -262,9 +264,9 @@ func (t *DirPrefetchTest) TestMetadataPrefetcher_ConcurrencyLimit() {
limit := int64(2)
sem := semaphore.NewWeighted(limit)
blockChan := make(chan struct{})
p1 := NewMetadataPrefetcher(t.config, sem, blockingListFunc(blockChan))
p2 := NewMetadataPrefetcher(t.config, sem, blockingListFunc(blockChan))
p3 := NewMetadataPrefetcher(t.config, sem, blockingListFunc(blockChan))
p1 := NewMetadataPrefetcher(t.config, sem, &t.clock, blockingListFunc(blockChan))
p2 := NewMetadataPrefetcher(t.config, sem, &t.clock, blockingListFunc(blockChan))
p3 := NewMetadataPrefetcher(t.config, sem, &t.clock, blockingListFunc(blockChan))
// 1. Run two prefetches to fill up the limit.
p1.Run("dir1/obj1")
p2.Run("dir2/obj2")
Expand Down Expand Up @@ -303,13 +305,13 @@ func (t *DirPrefetchTest) TestMetadataPrefetcher_RespectsMaxParallelPrefetchesCo
<-blockChan
return nil, nil, "", nil
}
p := NewMetadataPrefetcher(t.config, sem, listFunc)
p := NewMetadataPrefetcher(t.config, sem, &t.clock, listFunc)

// Trigger multiple runs on the same prefetcher (simulating different objects in same dir)
// and different prefetchers.
p.Run("a/1")
p.Run("a/2") // Will be skipped by atomic state check anyway
p2 := NewMetadataPrefetcher(t.config, sem, listFunc)
p2 := NewMetadataPrefetcher(t.config, sem, &t.clock, listFunc)
p2.Run("b/1") // Should be skipped by semaphore check

time.Sleep(10 * time.Millisecond)
Expand All @@ -318,3 +320,58 @@ func (t *DirPrefetchTest) TestMetadataPrefetcher_RespectsMaxParallelPrefetchesCo
mu.Unlock()
close(blockChan)
}

func mockListFuncWithCtr() (*atomic.Int32, func(ctx context.Context, tok string, startOffset string, limit int) (map[Name]*Core, []string, string, error)) {
var listCalls atomic.Int32
mockListFunc := func(ctx context.Context, tok string, startOffset string, limit int) (map[Name]*Core, []string, string, error) {
listCalls.Add(1)
return make(map[Name]*Core), nil, "", nil
}
return &listCalls, mockListFunc
}

func (t *DirPrefetchTest) TestMetadataPrefetcher_TTLGuard() {
listCallCtr, mockListFunc := mockListFuncWithCtr()
sem := semaphore.NewWeighted(1)
p := NewMetadataPrefetcher(t.config, sem, &t.clock, mockListFunc)

// 1. Initial Run: Should trigger a prefetch.
p.Run("dir/obj1")
assert.Eventually(t.T(), func() bool {
return listCallCtr.Load() == 1
}, 200*time.Millisecond, 10*time.Millisecond)

// 2. Immediate Run: Should NOT trigger another prefetch because the
// TTL has not passed since the last run.
p.Run("dir/obj2")
// Wait to ensure no new prefetch is triggered.
time.Sleep(20 * time.Millisecond)
assert.Equal(t.T(), int32(1), listCallCtr.Load())

// 3. Run after TTL expiry: Should trigger a new prefetch.
t.clock.AdvanceTime(60 * time.Second)
p.Run("dir/obj3")

assert.Eventually(t.T(), func() bool {
return listCallCtr.Load() == 2
}, 200*time.Millisecond, 10*time.Millisecond)
}

func (t *DirPrefetchTest) TestMetadataPrefetcher_0CacheSize() {
listCallCtr, mockListFunc := mockListFuncWithCtr()
config := &cfg.Config{
MetadataCache: cfg.MetadataCacheConfig{
EnableMetadataPrefetch: true,
StatCacheMaxSizeMb: 0,
TtlSecs: 60,
},
}
p := NewMetadataPrefetcher(config, semaphore.NewWeighted(1), &t.clock, mockListFunc)

p.Run("dir/obj1")

// Allow some time for any potential background work to start.
time.Sleep(20 * time.Millisecond)
// Assert that no list calls were made because stat cache size is 0.
assert.Equal(t.T(), int32(0), listCallCtr.Load())
}
Loading