Skip to content

Commit e90e8b6

Browse files
committed
Handing Group id in in KN events
Signed-off-by: Kapil Jain <kapiljain1989@gmail.com>
1 parent 8bf6f8a commit e90e8b6

File tree

7 files changed

+337
-63
lines changed

7 files changed

+337
-63
lines changed

pkg/kvcache/kvblock/cost_aware_memory.go

Lines changed: 73 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -103,23 +103,73 @@ func (m *CostAwareMemoryIndex) MaxCost() int64 {
103103

104104
// CostPodCache wraps a sync.Map of PodEntry and provides cost calculation for memory usage estimation.
105105
type CostPodCache struct {
106-
cache sync.Map // map[PodEntry]struct{}
106+
cache sync.Map // map[string]*PodEntry (key: "podID@tier")
107107
// size tracks the number of entries in cache for O(1) Len().
108108
size atomic.Int64
109109
}
110110

111-
// Add adds a PodEntry to the cache.
111+
// Add adds or updates a PodEntry in the cache, merging StoredGroups if the entry exists.
112112
func (c *CostPodCache) Add(entry PodEntry) {
113-
if _, loaded := c.cache.LoadOrStore(entry, struct{}{}); !loaded {
113+
cacheKey := podCacheKey(entry.PodIdentifier, entry.DeviceTier, entry.Speculative)
114+
115+
// Try to load existing entry
116+
if existingVal, loaded := c.cache.Load(cacheKey); loaded {
117+
if existingEntry, ok := existingVal.(*PodEntry); ok {
118+
// Merge StoredGroups
119+
existingEntry.StoredGroups = mergeGroupsUnique(existingEntry.StoredGroups, entry.StoredGroups)
120+
// Store updated entry
121+
c.cache.Store(cacheKey, existingEntry)
122+
}
123+
} else {
124+
// Create new entry
125+
newEntry := &PodEntry{
126+
PodIdentifier: entry.PodIdentifier,
127+
DeviceTier: entry.DeviceTier,
128+
Speculative: entry.Speculative,
129+
StoredGroups: mergeGroupsUnique(nil, entry.StoredGroups),
130+
}
131+
c.cache.Store(cacheKey, newEntry)
114132
c.size.Add(1)
115133
}
116134
}
117135

118-
// Delete removes a PodEntry from the cache.
136+
// Delete removes a PodEntry from the cache entirely.
119137
func (c *CostPodCache) Delete(entry PodEntry) {
120-
if _, loaded := c.cache.LoadAndDelete(entry); loaded {
138+
cacheKey := podCacheKey(entry.PodIdentifier, entry.DeviceTier, entry.Speculative)
139+
if _, loaded := c.cache.LoadAndDelete(cacheKey); loaded {
140+
c.size.Add(-1)
141+
}
142+
}
143+
144+
// RemoveGroups removes specified groups from a PodEntry's StoredGroups.
145+
// If no groups remain, the entry is deleted.
146+
func (c *CostPodCache) RemoveGroups(entry PodEntry) bool {
147+
cacheKey := podCacheKey(entry.PodIdentifier, entry.DeviceTier, entry.Speculative)
148+
149+
existingVal, loaded := c.cache.Load(cacheKey)
150+
if !loaded {
151+
return false
152+
}
153+
154+
existingEntry, ok := existingVal.(*PodEntry)
155+
if !ok {
156+
return false
157+
}
158+
159+
// Remove specified groups
160+
updatedGroups := removeGroups(existingEntry.StoredGroups, entry.StoredGroups)
161+
162+
if len(updatedGroups) == 0 {
163+
// No groups left, delete the entry
164+
c.cache.Delete(cacheKey)
121165
c.size.Add(-1)
166+
return true
122167
}
168+
169+
// Update with remaining groups
170+
existingEntry.StoredGroups = updatedGroups
171+
c.cache.Store(cacheKey, existingEntry)
172+
return false
123173
}
124174

125175
// Len returns the number of entries in the cache.
@@ -141,16 +191,22 @@ func (c *CostPodCache) CalculateByteSize(keyStr string) int64 {
141191

142192
// Count entries and calculate their size
143193
c.cache.Range(func(key, value interface{}) bool {
144-
entry, ok := key.(PodEntry)
145-
if !ok {
194+
// key is now a string, value is *PodEntry
195+
keyStr, okKey := key.(string)
196+
entry, okEntry := value.(*PodEntry)
197+
if !okKey || !okEntry {
146198
return true
147199
}
148200

149201
entryCount++
150-
totalBytes += int64(len(entry.PodIdentifier)) // PodIdentifier string content
151-
totalBytes += int64(len(entry.DeviceTier)) // DeviceTier string content
152-
totalBytes += 32 // string headers (16 bytes each for 2 strings)
153-
totalBytes += 8 // struct padding/alignment
202+
totalBytes += int64(len(keyStr)) // cache key string
203+
totalBytes += int64(len(entry.PodIdentifier)) // PodIdentifier string content
204+
totalBytes += int64(len(entry.DeviceTier)) // DeviceTier string content
205+
totalBytes += int64(len(entry.StoredGroups) * 8) // StoredGroups slice (8 bytes per int)
206+
totalBytes += 32 // string headers (16 bytes each for 2 strings)
207+
totalBytes += 24 // slice header for StoredGroups
208+
totalBytes += 8 // pointer to PodEntry
209+
totalBytes += 8 // struct padding/alignment
154210
return true
155211
})
156212

@@ -234,17 +290,17 @@ func (m *CostAwareMemoryIndex) Lookup(ctx context.Context, requestKeys []BlockHa
234290
if podIdentifierSet.Len() == 0 {
235291
// If no pod identifiers are provided, return all pods
236292
pods.cache.Range(func(k, value interface{}) bool {
237-
if pod, ok := k.(PodEntry); ok {
238-
podsPerKey[key] = append(podsPerKey[key], pod)
293+
if pod, ok := value.(*PodEntry); ok {
294+
podsPerKey[key] = append(podsPerKey[key], *pod)
239295
}
240296
return true
241297
})
242298
} else {
243299
// Filter pods based on the provided pod identifiers
244300
pods.cache.Range(func(k, value interface{}) bool {
245-
if pod, ok := k.(PodEntry); ok {
301+
if pod, ok := value.(*PodEntry); ok {
246302
if podIdentifierSet.Has(pod.PodIdentifier) {
247-
podsPerKey[key] = append(podsPerKey[key], pod)
303+
podsPerKey[key] = append(podsPerKey[key], *pod)
248304
}
249305
}
250306
return true
@@ -307,7 +363,8 @@ func (m *CostAwareMemoryIndex) Evict(ctx context.Context, key BlockHash, keyType
307363
podCacheLenBefore := podCache.Len()
308364

309365
for _, entry := range entries {
310-
podCache.Delete(entry)
366+
// Remove groups from the entry; if no groups remain, the entry is deleted
367+
podCache.RemoveGroups(entry)
311368
}
312369

313370
if podCache.Len() == 0 {

pkg/kvcache/kvblock/in_memory.go

Lines changed: 108 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ var _ Index = &InMemoryIndex{}
8888

8989
// PodCache represents a cache for pod entries.
9090
type PodCache struct {
91-
// cache is an LRU cache that maps PodEntry to their last access time.
92-
// thread-safe.
93-
cache *lru.Cache[PodEntry, struct{}]
91+
// cache is an LRU cache that maps "podID@tier" keys to PodEntry pointers.
92+
// This allows in-place updates of StoredGroups without recreating entries.
93+
cache *lru.Cache[string, *PodEntry]
9494
// mu protects the cache from concurrent access during check-and-set operations.
9595
mu sync.Mutex
9696
}
@@ -126,12 +126,14 @@ func (m *InMemoryIndex) Lookup(ctx context.Context, requestKeys []BlockHash,
126126

127127
if podIdentifierSet.Len() == 0 {
128128
// If no pod identifiers are provided, return all pods
129-
podsPerKey[requestKey] = pods.cache.Keys()
129+
for _, podEntry := range pods.cache.Values() {
130+
podsPerKey[requestKey] = append(podsPerKey[requestKey], *podEntry)
131+
}
130132
} else {
131133
// Filter pods based on the provided pod identifiers
132-
for _, pod := range pods.cache.Keys() {
133-
if podIdentifierSet.Has(pod.PodIdentifier) {
134-
podsPerKey[requestKey] = append(podsPerKey[requestKey], pod)
134+
for _, podEntry := range pods.cache.Values() {
135+
if podIdentifierSet.Has(podEntry.PodIdentifier) {
136+
podsPerKey[requestKey] = append(podsPerKey[requestKey], *podEntry)
135137
}
136138
}
137139
}
@@ -174,7 +176,7 @@ func (m *InMemoryIndex) Add(ctx context.Context, engineKeys, requestKeys []Block
174176
//nolint:nestif // double-checked locking pattern
175177
if !found {
176178
// Create new cache
177-
cache, err := lru.New[PodEntry, struct{}](m.podCacheSize)
179+
cache, err := lru.New[string, *PodEntry](m.podCacheSize)
178180
if err != nil {
179181
return fmt.Errorf("failed to create pod cache for key %s: %w", requestKey.String(), err)
180182
}
@@ -201,11 +203,30 @@ func (m *InMemoryIndex) Add(ctx context.Context, engineKeys, requestKeys []Block
201203

202204
podCache.mu.Lock()
203205
for _, entry := range entries {
204-
podCache.cache.Add(entry, struct{}{})
206+
cacheKey := podCacheKey(entry.PodIdentifier, entry.DeviceTier, entry.Speculative)
207+
208+
// Check if entry already exists
209+
existingEntry, found := podCache.cache.Get(cacheKey)
210+
if found {
211+
// Merge StoredGroups, deduplicating and preserving order
212+
existingEntry.StoredGroups = mergeGroupsUnique(existingEntry.StoredGroups, entry.StoredGroups)
213+
// Re-add to update LRU position
214+
podCache.cache.Add(cacheKey, existingEntry)
215+
traceLogger.Info("updated existing pod entry with merged groups",
216+
"requestKey", requestKey, "pod", existingEntry)
217+
} else {
218+
// Create new entry (copy to avoid mutation)
219+
newEntry := &PodEntry{
220+
PodIdentifier: entry.PodIdentifier,
221+
DeviceTier: entry.DeviceTier,
222+
Speculative: entry.Speculative,
223+
StoredGroups: mergeGroupsUnique(nil, entry.StoredGroups),
224+
}
225+
podCache.cache.Add(cacheKey, newEntry)
226+
traceLogger.Info("added new pod entry", "requestKey", requestKey, "pod", newEntry)
227+
}
205228
}
206229
podCache.mu.Unlock()
207-
208-
traceLogger.Info("added pods to key", "requestKey", requestKey, "pods", entries)
209230
}
210231

211232
return nil
@@ -252,13 +273,36 @@ func (m *InMemoryIndex) Evict(ctx context.Context, key BlockHash, keyType KeyTyp
252273

253274
podCache.mu.Lock()
254275
for _, entry := range entries {
255-
podCache.cache.Remove(entry)
276+
cacheKey := podCacheKey(entry.PodIdentifier, entry.DeviceTier, entry.Speculative)
277+
278+
existingEntry, found := podCache.cache.Get(cacheKey)
279+
if !found {
280+
traceLogger.Info("pod entry not found for eviction, skipping",
281+
"requestKey", requestKey, "podID", entry.PodIdentifier, "tier", entry.DeviceTier)
282+
continue
283+
}
284+
285+
// Remove the specified groups from StoredGroups
286+
updatedGroups := removeGroups(existingEntry.StoredGroups, entry.StoredGroups)
287+
288+
if len(updatedGroups) == 0 {
289+
// No groups left, remove the entire pod entry
290+
podCache.cache.Remove(cacheKey)
291+
traceLogger.Info("removed pod entry (no groups remaining)",
292+
"requestKey", requestKey, "pod", existingEntry)
293+
} else {
294+
// Update with remaining groups
295+
existingEntry.StoredGroups = updatedGroups
296+
podCache.cache.Add(cacheKey, existingEntry)
297+
traceLogger.Info("updated pod entry after group removal",
298+
"requestKey", requestKey, "pod", existingEntry, "remainingGroups", updatedGroups)
299+
}
256300
}
257301

258302
isEmpty := podCache.cache.Len() == 0
259303
podCache.mu.Unlock()
260304

261-
traceLogger.Info("evicted pods from key", "requestKey", requestKey, "key", key, "keyType", keyType, "pods", entries)
305+
traceLogger.Info("processed eviction", "requestKey", requestKey, "key", key, "keyType", keyType, "entries", entries)
262306

263307
// Remove key from main cache if empty.
264308
// Re-fetch and hold the lock through removal to prevent racing with Add.
@@ -294,6 +338,57 @@ func (m *InMemoryIndex) GetRequestKey(ctx context.Context, engineKey BlockHash)
294338
return requestKey, nil
295339
}
296340

341+
// podCacheKey generates a cache key for a pod entry.
342+
// Format: "podIdentifier@deviceTier" or "podIdentifier@deviceTier[speculative]".
343+
func podCacheKey(podIdentifier, deviceTier string, speculative bool) string {
344+
key := podIdentifier + "@" + deviceTier
345+
if speculative {
346+
key += "[speculative]"
347+
}
348+
return key
349+
}
350+
351+
// mergeGroupsUnique merges two group lists, removing duplicates and preserving order.
352+
// Elements from 'existing' come first, followed by new elements from 'incoming'.
353+
func mergeGroupsUnique(existing, incoming []int) []int {
354+
// 1. If incoming is empty, return existing as-is
355+
if len(incoming) == 0 {
356+
return existing
357+
}
358+
firstIncoming := incoming[0]
359+
360+
for _, v := range existing {
361+
if v == firstIncoming {
362+
return existing // Already there, nothing to do
363+
}
364+
}
365+
result := make([]int, 0, len(existing)+1)
366+
result = append(result, existing...)
367+
result = append(result, firstIncoming)
368+
return result
369+
}
370+
371+
// removeGroups removes specified groups from the list,
372+
// maintaining order of remaining elements.
373+
func removeGroups(existing, toRemove []int) []int {
374+
if len(toRemove) == 0 || len(existing) == 0 {
375+
return existing
376+
}
377+
target := toRemove[0]
378+
targetIdx := -1
379+
for i, v := range existing {
380+
if v == target {
381+
targetIdx = i
382+
break
383+
}
384+
}
385+
if targetIdx == -1 {
386+
return existing
387+
}
388+
copy(existing[targetIdx:], existing[targetIdx+1:])
389+
return existing[:len(existing)-1]
390+
}
391+
297392
// podsPerKeyPrintHelper formats a map of keys to pod names for printing.
298393
func podsPerKeyPrintHelper(ks map[BlockHash][]PodEntry) string {
299394
var b strings.Builder

pkg/kvcache/kvblock/index.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ type PodEntry struct {
172172
DeviceTier string
173173
// Speculative indicates the entry was added predictively before a KV event confirmed it.
174174
Speculative bool
175+
// StoredGroups tracks the group IDs that have stored this block.
176+
StoredGroups []int
175177
}
176178

177179
// String returns a string representation of the PodEntry.

0 commit comments

Comments
 (0)