Skip to content

Commit f1691b7

Browse files
committed
feat:add support to invalidate KV cache via AllBlocksCleared event
1 parent 8d31bc7 commit f1691b7

File tree

15 files changed

+175
-12
lines changed

15 files changed

+175
-12
lines changed

docs/architecture.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ sequenceDiagram
9696
else BlockRemoved
9797
Worker->>Index: Evict(key, podEntry)
9898
else AllBlocksCleared
99-
Note over Worker: No-op
99+
Worker->>Index: Clear()
100100
end
101101
end
102102
```

examples/helper/events.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,31 @@ func SimulateRemoveEvent(ctx context.Context, publisher *Publisher) error {
109109
return nil
110110
}
111111

112+
func SimulateClearAllBlocksEvent(ctx context.Context, publisher *Publisher) error {
113+
logger := log.FromContext(ctx)
114+
logger.Info("@@@ Simulating vLLM engine clear all blocks...")
115+
116+
clearAllBlocksEvent := []any{
117+
"AllBlocksCleared",
118+
}
119+
120+
clearAllBlocksPayload, _ := msgpack.Marshal(clearAllBlocksEvent)
121+
122+
clearAllBlockEventBatch := []any{
123+
float64(time.Now().UnixNano()) / 1e9,
124+
[]msgpack.RawMessage{clearAllBlocksPayload},
125+
nil,
126+
}
127+
128+
if err := publisher.PublishEvent(ctx, topic, clearAllBlockEventBatch); err != nil {
129+
return fmt.Errorf("failed to publish AllBlocksCleared event: %w", err)
130+
}
131+
logger.Info("@@@ Published AllBlocksCleared event", "topic", topic)
132+
133+
time.Sleep(3 * time.Second)
134+
return nil
135+
}
136+
112137
func SetupEventsPool(ctx context.Context, kvBlockIndex kvblock.Index) (*kvevents.Pool, error) {
113138
logger := log.FromContext(ctx)
114139

examples/kv_events/offline/main.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,32 @@ func RunEventsDemo(ctx context.Context, kvCacheIndexer *kvcache.Indexer, publish
201201
}
202202
logger.Info("@@@ Final pod scores after BlockRemoved events", "pods", pods)
203203

204+
// Simulate vLLM engine publishing BlockStored events
205+
err = helper.SimulateProduceEvent(ctx, publisher)
206+
if err != nil {
207+
return err
208+
}
209+
210+
// Query again to see the effect of the events
211+
pods, err = kvCacheIndexer.GetPodScores(ctx, testdata.RenderReq, testdata.Prompt, testdata.ModelName, nil)
212+
if err != nil {
213+
return err
214+
}
215+
logger.Info("@@@ Pod scores after BlockStored events", "pods", pods)
216+
217+
// Simulate vLLM engine publishing AllBlocksCleared event
218+
err = helper.SimulateClearAllBlocksEvent(ctx, publisher)
219+
if err != nil {
220+
return err
221+
}
222+
223+
// Query again to see the effect of the events
224+
pods, err = kvCacheIndexer.GetPodScores(ctx, testdata.RenderReq, testdata.Prompt, testdata.ModelName, nil)
225+
if err != nil {
226+
return err
227+
}
228+
logger.Info("@@@ Pod scores after AllBlocksCleared events", "pods", pods)
229+
204230
logger.Info("Events demo completed. Pool continues listening for more events...")
205231
logger.Info("Press Ctrl+C to shutdown")
206232

examples/valkey_example/main.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,21 @@ func demonstrateValkeyOperations(ctx context.Context, indexer *kvcache.Indexer)
201201

202202
logger.Info("Cache lookup after eviction", "keysFound", len(lookupAfterEvict))
203203

204+
// Clear the cache
205+
logger.Info("Clearing the cache")
206+
err = indexer.KVBlockIndex().Clear(ctx)
207+
if err != nil {
208+
return fmt.Errorf("failed to clear cache: %w", err)
209+
}
210+
211+
// Lookup again after clear
212+
lookupAfterClear, err := indexer.KVBlockIndex().Lookup(ctx, promptKeys, nil)
213+
if err != nil {
214+
return fmt.Errorf("failed to lookup after clear: %w", err)
215+
}
216+
217+
logger.Info("Cache lookup after clear", "keysFound", len(lookupAfterClear))
218+
204219
// Final score check to see the difference
205220
finalScores, err := indexer.GetPodScores(ctx, nil, prompt, modelName, []string{"demo-pod-1", "demo-pod-2"})
206221
if err != nil {

pkg/kvcache/kvblock/cost_aware_memory.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,3 +312,17 @@ func (m *CostAwareMemoryIndex) GetRequestKey(ctx context.Context, engineKey Bloc
312312
}
313313
return requestKey, nil
314314
}
315+
316+
// Clear removes all entries from the index backend.
317+
func (m *CostAwareMemoryIndex) Clear(ctx context.Context) error {
318+
m.mu.Lock()
319+
defer m.mu.Unlock()
320+
traceLogger := log.FromContext(ctx).V(logging.TRACE).WithName("kvblock.CostAwareMemoryIndex.Clear")
321+
322+
m.requestKeys.Purge()
323+
m.data.Clear()
324+
325+
m.data.Wait()
326+
traceLogger.Info("Cleared CostAwareMemoryIndex")
327+
return nil
328+
}

pkg/kvcache/kvblock/in_memory.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,3 +277,12 @@ func podsPerKeyPrintHelper(ks map[BlockHash][]PodEntry) string {
277277
}
278278
return b.String()
279279
}
280+
281+
// Clear removes all entries from the index backend.
282+
func (m *InMemoryIndex) Clear(ctx context.Context) error {
283+
traceLogger := log.FromContext(ctx).V(logging.TRACE).WithName("kvblock.InMemoryIndex.Clear")
284+
m.engineToRequestKeys.Purge()
285+
m.data.Purge()
286+
traceLogger.Info("Cleared InMemoryIndex")
287+
return nil
288+
}

pkg/kvcache/kvblock/index.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ type Index interface {
133133
Evict(ctx context.Context, engineKey BlockHash, entries []PodEntry) error
134134
// GetRequestKey returns the requestKey associated with the given engineKey.
135135
GetRequestKey(ctx context.Context, engineKey BlockHash) (BlockHash, error)
136+
// Clear removes all entries from the index backend.
137+
Clear(ctx context.Context) error
136138
}
137139

138140
// BlockHash struct represents a unique identifier for a KV-cache block.

pkg/kvcache/kvblock/index_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ func testCommonIndexBehavior(t *testing.T, indexFactory func(t *testing.T) Index
6363
index := indexFactory(t)
6464
testConcurrentOperations(t, ctx, index)
6565
})
66+
67+
t.Run("Clear", func(t *testing.T) {
68+
index := indexFactory(t)
69+
testClear(t, ctx, index)
70+
})
6671
}
6772

6873
// testBasicAddAndLookup tests basic Add and Lookup functionality.
@@ -213,6 +218,30 @@ func testEvictBasic(t *testing.T, ctx context.Context, index Index) {
213218
assert.ElementsMatch(t, expected, podsPerKey[requestKey])
214219
}
215220

221+
func testClear(t *testing.T, ctx context.Context, index Index) {
222+
t.Helper()
223+
engineKey := BlockHash(17434655)
224+
requestKey := BlockHash(59244875)
225+
entries := []PodEntry{
226+
{PodIdentifier: "pod1", DeviceTier: "gpu"},
227+
{PodIdentifier: "pod2", DeviceTier: "gpu"},
228+
{PodIdentifier: "pod3", DeviceTier: "cpu"},
229+
}
230+
231+
// Add entries
232+
err := index.Add(ctx, []BlockHash{engineKey}, []BlockHash{requestKey}, entries)
233+
require.NoError(t, err)
234+
235+
// Clear the index
236+
err = index.Clear(ctx)
237+
require.NoError(t, err)
238+
239+
// Verify that the index is empty
240+
podsPerKey, err := index.Lookup(ctx, []BlockHash{requestKey}, sets.Set[string]{})
241+
require.NoError(t, err)
242+
assert.Len(t, podsPerKey, 0)
243+
}
244+
216245
// testConcurrentOperations tests thread safety with concurrent operations.
217246
func testConcurrentOperations(t *testing.T, ctx context.Context, index Index) {
218247
t.Helper()
@@ -245,6 +274,10 @@ func testConcurrentOperations(t *testing.T, ctx context.Context, index Index) {
245274
if err := index.Evict(ctx, engineKey, entries); err != nil {
246275
errChan <- err
247276
}
277+
case 3: // Clear
278+
if err := index.Clear(ctx); err != nil {
279+
errChan <- err
280+
}
248281
}
249282
}
250283
}(goroutineID)

pkg/kvcache/kvblock/instrumented_index.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type instrumentedIndex struct {
2727
}
2828

2929
// NewInstrumentedIndex wraps an Index and emits metrics for Add, Evict, and
30-
// Lookup.
30+
// Lookup, Clear.
3131
func NewInstrumentedIndex(next Index) Index {
3232
return &instrumentedIndex{next: next}
3333
}
@@ -90,3 +90,9 @@ func recordHitMetrics(keyToPods map[BlockHash][]PodEntry) {
9090
metrics.MaxPodHitCount.Add(float64(maxHit))
9191
metrics.LookupHits.Add(float64(maxHit))
9292
}
93+
94+
func (m *instrumentedIndex) Clear(ctx context.Context) error {
95+
err := m.next.Clear(ctx)
96+
metrics.Clear.Add(1)
97+
return err
98+
}

pkg/kvcache/kvblock/redis.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,15 @@ func (r *RedisIndex) GetRequestKey(ctx context.Context, engineKey BlockHash) (Bl
292292
func redisEngineKey(engineKey BlockHash) string {
293293
return "engine:" + engineKey.String()
294294
}
295+
296+
// Clear removes all entries from the index backend.
297+
func (r *RedisIndex) Clear(ctx context.Context) error {
298+
logger := log.FromContext(ctx).WithName("kvblock.RedisIndex.Clear")
299+
err := r.RedisClient.FlushDB(ctx).Err()
300+
if err != nil {
301+
return fmt.Errorf("failed to flush %s db: %w", r.BackendType, err)
302+
}
303+
logger.Info("Cleared", "index", r.BackendType)
304+
305+
return nil
306+
}

0 commit comments

Comments
 (0)