Skip to content

Commit 939f851

Browse files
committed
fix: Close data race in InMemoryIndex Add/Evict with RWMutex
Signed-off-by: Guangya Liu <gyliu513@gmail.com>
1 parent 8f07127 commit 939f851

File tree

1 file changed

+19
-20
lines changed

1 file changed

+19
-20
lines changed

pkg/kvcache/kvblock/in_memory.go

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ func NewInMemoryIndex(cfg *InMemoryIndexConfig) (*InMemoryIndex, error) {
7676

7777
// InMemoryIndex is an in-memory implementation of the Index interface.
7878
type InMemoryIndex struct {
79+
// mu protects multi-step read-modify-write sequences on data/engineToRequestKeys
80+
// from racing with concurrent Add/Evict operations.
81+
mu sync.RWMutex
7982
// data holds the mapping of requestKeys to sets of pod identifiers.
8083
data *lru.Cache[BlockHash, *PodCache]
8184
// engineToRequestKeys holds the mapping of engineKeys to requestKeys.
@@ -89,10 +92,7 @@ var _ Index = &InMemoryIndex{}
8992
// PodCache represents a cache for pod entries.
9093
type PodCache struct {
9194
// cache is an LRU cache that maps PodEntry to their last access time.
92-
// thread-safe.
9395
cache *lru.Cache[PodEntry, struct{}]
94-
// mu protects the cache from concurrent access during check-and-set operations.
95-
mu sync.Mutex
9696
}
9797

9898
// Lookup receives a list of requestKeys and a set of pod identifiers,
@@ -112,6 +112,9 @@ func (m *InMemoryIndex) Lookup(ctx context.Context, requestKeys []BlockHash,
112112

113113
traceLogger := log.FromContext(ctx).V(logging.TRACE).WithName("kvblock.InMemoryIndex.Lookup")
114114

115+
m.mu.RLock()
116+
defer m.mu.RUnlock()
117+
115118
podsPerKey := make(map[BlockHash][]PodEntry)
116119
highestHitIdx := 0
117120

@@ -157,6 +160,9 @@ func (m *InMemoryIndex) Add(ctx context.Context, engineKeys, requestKeys []Block
157160

158161
traceLogger := log.FromContext(ctx).V(logging.TRACE).WithName("kvblock.InMemoryIndex.Add")
159162

163+
m.mu.Lock()
164+
defer m.mu.Unlock()
165+
160166
for i, requestKey := range requestKeys {
161167
engineKey := engineKeys[i]
162168

@@ -197,11 +203,9 @@ func (m *InMemoryIndex) Add(ctx context.Context, engineKeys, requestKeys []Block
197203
}
198204
}
199205

200-
podCache.mu.Lock()
201206
for _, entry := range entries {
202207
podCache.cache.Add(entry, struct{}{})
203208
}
204-
podCache.mu.Unlock()
205209

206210
traceLogger.Info("added pods to key", "requestKey", requestKey, "engineKey", engineKey, "pods", entries)
207211
}
@@ -217,6 +221,9 @@ func (m *InMemoryIndex) Evict(ctx context.Context, engineKey BlockHash, entries
217221

218222
traceLogger := log.FromContext(ctx).V(logging.TRACE).WithName("kvblock.InMemoryIndex.Evict")
219223

224+
m.mu.Lock()
225+
defer m.mu.Unlock()
226+
220227
requestKey, found := m.engineToRequestKeys.Get(engineKey)
221228
if !found {
222229
traceLogger.Info("engineKey not found in index, nothing to evict", "engineKey", engineKey)
@@ -230,28 +237,17 @@ func (m *InMemoryIndex) Evict(ctx context.Context, engineKey BlockHash, entries
230237
return nil
231238
}
232239

233-
podCache.mu.Lock()
234240
for _, entry := range entries {
235241
podCache.cache.Remove(entry)
236242
}
237243

238-
isEmpty := podCache.cache.Len() == 0
239-
podCache.mu.Unlock()
240-
241244
traceLogger.Info("evicted pods from key", "requestKey", requestKey, "engineKey", engineKey, "pods", entries)
242245

243246
// Remove key from main cache if empty
244-
if isEmpty {
245-
// Re-fetch and hold the lock through removal to prevent racing with Add
246-
if currentCache, stillExists := m.data.Get(requestKey); stillExists && currentCache != nil {
247-
currentCache.mu.Lock()
248-
if currentCache.cache.Len() == 0 {
249-
m.data.Remove(requestKey)
250-
m.engineToRequestKeys.Remove(engineKey)
251-
traceLogger.Info("removed requestKey from index as no pods remain", "requestKey", requestKey, "engineKey", engineKey)
252-
}
253-
currentCache.mu.Unlock()
254-
}
247+
if podCache.cache.Len() == 0 {
248+
m.data.Remove(requestKey)
249+
m.engineToRequestKeys.Remove(engineKey)
250+
traceLogger.Info("removed requestKey from index as no pods remain", "requestKey", requestKey, "engineKey", engineKey)
255251
}
256252

257253
return nil
@@ -260,6 +256,9 @@ func (m *InMemoryIndex) Evict(ctx context.Context, engineKey BlockHash, entries
260256
// GetRequestKey returns the requestKey associated with the given engineKey.
261257
// Returns an error if the engineKey mapping is missing (e.g., already evicted).
262258
func (m *InMemoryIndex) GetRequestKey(ctx context.Context, engineKey BlockHash) (BlockHash, error) {
259+
m.mu.RLock()
260+
defer m.mu.RUnlock()
261+
263262
requestKey, found := m.engineToRequestKeys.Get(engineKey)
264263
if !found {
265264
return EmptyBlockHash, fmt.Errorf("engine key not found: %s", engineKey.String())

0 commit comments

Comments
 (0)