Skip to content

Commit 527fb18

Browse files
authored
Use rwmutex to make reverseHandles map safe for concurrent access (#146)
1 parent 28bb217 commit 527fb18

2 files changed

Lines changed: 156 additions & 19 deletions

File tree

helpers/cachinghandler.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/binary"
66
"io/fs"
77
"reflect"
8+
"sync"
89

910
"github.com/willscott/go-nfs"
1011

@@ -24,12 +25,11 @@ func NewCachingHandlerWithVerifierLimit(h nfs.Handler, limit int, verifierLimit
2425
nfs.Log.Warnf("Caching handler created with insufficient cache to support directory listing", "size", limit, "verifiers", verifierLimit)
2526
}
2627
cache, _ := lru.New[uuid.UUID, entry](limit)
27-
reverseCache := make(map[string][]uuid.UUID)
2828
verifiers, _ := lru.New[uint64, verifier](verifierLimit)
2929
return &CachingHandler{
3030
Handler: h,
3131
activeHandles: cache,
32-
reverseHandles: reverseCache,
32+
reverseHandles: make(map[string][]uuid.UUID),
3333
activeVerifiers: verifiers,
3434
cacheLimit: limit,
3535
}
@@ -38,10 +38,11 @@ func NewCachingHandlerWithVerifierLimit(h nfs.Handler, limit int, verifierLimit
3838
// CachingHandler implements to/from handle via an LRU cache.
3939
type CachingHandler struct {
4040
nfs.Handler
41-
activeHandles *lru.Cache[uuid.UUID, entry]
42-
reverseHandles map[string][]uuid.UUID
43-
activeVerifiers *lru.Cache[uint64, verifier]
44-
cacheLimit int
41+
activeHandles *lru.Cache[uuid.UUID, entry]
42+
reverseHandles map[string][]uuid.UUID
43+
reverseHandlesMu sync.RWMutex
44+
activeVerifiers *lru.Cache[uint64, verifier]
45+
cacheLimit int
4546
}
4647

4748
type entry struct {
@@ -70,10 +71,7 @@ func (c *CachingHandler) ToHandle(f billy.Filesystem, path []string) []byte {
7071
c.evictReverseCache(rk, evictedKey)
7172
}
7273

73-
if _, ok := c.reverseHandles[joinedPath]; !ok {
74-
c.reverseHandles[joinedPath] = []uuid.UUID{}
75-
}
76-
c.reverseHandles[joinedPath] = append(c.reverseHandles[joinedPath], id)
74+
c.appendReverseHandle(joinedPath, id)
7775
b, _ := id.MarshalBinary()
7876

7977
return b
@@ -103,11 +101,7 @@ func (c *CachingHandler) FromHandle(fh []byte) (billy.Filesystem, []string, erro
103101
}
104102

105103
func (c *CachingHandler) searchReverseCache(f billy.Filesystem, path string) []byte {
106-
uuids, exists := c.reverseHandles[path]
107-
108-
if !exists {
109-
return nil
110-
}
104+
uuids := c.getReverseHandles(path)
111105

112106
for _, id := range uuids {
113107
if candidate, ok := c.activeHandles.Get(id); ok {
@@ -121,20 +115,33 @@ func (c *CachingHandler) searchReverseCache(f billy.Filesystem, path string) []b
121115
}
122116

123117
func (c *CachingHandler) evictReverseCache(path string, handle uuid.UUID) {
124-
uuids, exists := c.reverseHandles[path]
118+
c.reverseHandlesMu.Lock()
119+
defer c.reverseHandlesMu.Unlock()
125120

126-
if !exists {
121+
uuids, ok := c.reverseHandles[path]
122+
if !ok {
127123
return
128124
}
129125
for i, u := range uuids {
130126
if u == handle {
131-
uuids = append(uuids[:i], uuids[i+1:]...)
132-
c.reverseHandles[path] = uuids
127+
c.reverseHandles[path] = append(uuids[:i], uuids[i+1:]...)
133128
return
134129
}
135130
}
136131
}
137132

133+
func (c *CachingHandler) getReverseHandles(path string) []uuid.UUID {
134+
c.reverseHandlesMu.RLock()
135+
defer c.reverseHandlesMu.RUnlock()
136+
return c.reverseHandles[path]
137+
}
138+
139+
func (c *CachingHandler) appendReverseHandle(path string, id uuid.UUID) {
140+
c.reverseHandlesMu.Lock()
141+
defer c.reverseHandlesMu.Unlock()
142+
c.reverseHandles[path] = append(c.reverseHandles[path], id)
143+
}
144+
138145
func (c *CachingHandler) InvalidateHandle(fs billy.Filesystem, handle []byte) error {
139146
//Remove from cache
140147
id, _ := uuid.FromBytes(handle)

helpers/cachinghandler_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package helpers
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"testing"
7+
8+
"github.com/willscott/go-nfs/helpers/memfs"
9+
)
10+
11+
// TestCachingHandlerConcurrentToHandle tests that concurrent calls to ToHandle
12+
// are thread-safe. Run with -race flag to detect data races:
13+
//
14+
// go test -race -run TestCachingHandlerConcurrentToHandle ./helpers/
15+
func TestCachingHandlerConcurrentToHandle(t *testing.T) {
16+
mem := memfs.New()
17+
handler := NewNullAuthHandler(mem)
18+
cacheHandler := NewCachingHandler(handler, 1024).(*CachingHandler)
19+
20+
const numGoroutines = 10
21+
const numOperations = 100
22+
23+
var wg sync.WaitGroup
24+
wg.Add(numGoroutines)
25+
26+
for i := 0; i < numGoroutines; i++ {
27+
go func(id int) {
28+
defer wg.Done()
29+
for j := 0; j < numOperations; j++ {
30+
// Each goroutine creates handles for different paths
31+
// but also accesses some shared paths to maximize contention
32+
path := []string{fmt.Sprintf("file-%d-%d.txt", id, j)}
33+
_ = cacheHandler.ToHandle(mem, path)
34+
35+
// Also access a shared path to increase contention
36+
sharedPath := []string{fmt.Sprintf("shared-%d.txt", j%10)}
37+
_ = cacheHandler.ToHandle(mem, sharedPath)
38+
}
39+
}(i)
40+
}
41+
42+
wg.Wait()
43+
}
44+
45+
// TestCachingHandlerConcurrentToHandleAndFromHandle tests concurrent access
46+
// to both ToHandle and FromHandle methods.
47+
func TestCachingHandlerConcurrentToHandleAndFromHandle(t *testing.T) {
48+
mem := memfs.New()
49+
handler := NewNullAuthHandler(mem)
50+
cacheHandler := NewCachingHandler(handler, 1024).(*CachingHandler)
51+
52+
const numGoroutines = 10
53+
const numOperations = 100
54+
55+
// Pre-create some handles
56+
handles := make([][]byte, 20)
57+
for i := 0; i < 20; i++ {
58+
path := []string{fmt.Sprintf("precreated-%d.txt", i)}
59+
handles[i] = cacheHandler.ToHandle(mem, path)
60+
}
61+
62+
var wg sync.WaitGroup
63+
wg.Add(numGoroutines * 2)
64+
65+
// Writers - create new handles
66+
for i := 0; i < numGoroutines; i++ {
67+
go func(id int) {
68+
defer wg.Done()
69+
for j := 0; j < numOperations; j++ {
70+
path := []string{fmt.Sprintf("new-file-%d-%d.txt", id, j)}
71+
_ = cacheHandler.ToHandle(mem, path)
72+
}
73+
}(i)
74+
}
75+
76+
// Readers - read existing handles
77+
for i := 0; i < numGoroutines; i++ {
78+
go func(id int) {
79+
defer wg.Done()
80+
for j := 0; j < numOperations; j++ {
81+
handle := handles[j%len(handles)]
82+
_, _, _ = cacheHandler.FromHandle(handle)
83+
}
84+
}(i)
85+
}
86+
87+
wg.Wait()
88+
}
89+
90+
// TestCachingHandlerConcurrentInvalidateHandle tests concurrent access
91+
// when handles are being invalidated.
92+
func TestCachingHandlerConcurrentInvalidateHandle(t *testing.T) {
93+
mem := memfs.New()
94+
handler := NewNullAuthHandler(mem)
95+
cacheHandler := NewCachingHandler(handler, 1024).(*CachingHandler)
96+
97+
const numGoroutines = 10
98+
const numOperations = 100
99+
100+
var wg sync.WaitGroup
101+
wg.Add(numGoroutines * 2)
102+
103+
// Create and invalidate handles concurrently
104+
for i := 0; i < numGoroutines; i++ {
105+
go func(id int) {
106+
defer wg.Done()
107+
for j := 0; j < numOperations; j++ {
108+
path := []string{fmt.Sprintf("invalidate-%d-%d.txt", id, j)}
109+
handle := cacheHandler.ToHandle(mem, path)
110+
// Immediately invalidate some handles
111+
if j%3 == 0 {
112+
_ = cacheHandler.InvalidateHandle(mem, handle)
113+
}
114+
}
115+
}(i)
116+
}
117+
118+
// Concurrent ToHandle calls on shared paths
119+
for i := 0; i < numGoroutines; i++ {
120+
go func(id int) {
121+
defer wg.Done()
122+
for j := 0; j < numOperations; j++ {
123+
sharedPath := []string{fmt.Sprintf("shared-invalidate-%d.txt", j%20)}
124+
_ = cacheHandler.ToHandle(mem, sharedPath)
125+
}
126+
}(i)
127+
}
128+
129+
wg.Wait()
130+
}

0 commit comments

Comments
 (0)