Skip to content

Commit 8d46579

Browse files
committed
Added changes to fetch dp properties from kvevents for DP-aware routing
Signed-off-by: satyamg1620 <Satyam.Gupta.3@ibm.com>
1 parent 729df62 commit 8d46579

File tree

21 files changed

+303
-82
lines changed

21 files changed

+303
-82
lines changed

api/indexerpb/indexer.pb.go

Lines changed: 23 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/indexerpb/indexer.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,8 @@ message GetPodScoresResponse {
4040
message PodScore {
4141
string pod = 1;
4242
double score = 2;
43+
// Data parallel rank for this pod. -1 (or absent) means non-DP deployment.
44+
// When present, the pod field still contains the base pod identifier (e.g., "pod-1"),
45+
// and the rank is provided separately for structured routing decisions.
46+
optional int32 data_parallel_rank = 3;
4347
}

api/indexerpb/indexer_grpc.pb.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/kv_cache_index/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func runPrompts(ctx context.Context, kvCacheIndexer *kvcache.Indexer) error {
151151
requestKeys := engineKeys
152152

153153
if err := kvCacheIndexer.KVBlockIndex().Add(ctx, engineKeys, requestKeys,
154-
[]kvblock.PodEntry{{PodIdentifier: "pod1", DeviceTier: "gpu"}}); err != nil {
154+
[]kvblock.PodEntry{{PodIdentifier: "pod1", DeviceTier: "gpu", DataParallelRank: kvblock.NoDataParallelRank}}); err != nil {
155155
return err
156156
}
157157

examples/kv_cache_index_service/server/server.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package main
1919
import (
2020
"context"
2121
"fmt"
22+
"strconv"
23+
"strings"
2224

2325
indexerpb "github.com/llm-d/llm-d-kv-cache/api/indexerpb"
2426
"github.com/llm-d/llm-d-kv-cache/examples/testdata"
@@ -53,9 +55,9 @@ func (s *IndexerService) AddSampleDataToIndexer(ctx context.Context) error {
5355

5456
// Sample pod entries simulating different pods with different device tiers
5557
podEntries := []kvblock.PodEntry{
56-
{PodIdentifier: "pod-1", DeviceTier: "gpu"},
57-
{PodIdentifier: "pod-2", DeviceTier: "gpu"},
58-
{PodIdentifier: "pod-3", DeviceTier: "cpu"},
58+
{PodIdentifier: "pod-1", DeviceTier: "gpu", DataParallelRank: kvblock.NoDataParallelRank},
59+
{PodIdentifier: "pod-2", DeviceTier: "gpu", DataParallelRank: kvblock.NoDataParallelRank},
60+
{PodIdentifier: "pod-3", DeviceTier: "cpu", DataParallelRank: kvblock.NoDataParallelRank},
5961
}
6062

6163
// For this example, requestKeys are identical to engineKeys (sampleKeys)
@@ -80,13 +82,23 @@ func (s *IndexerService) GetPodScores(ctx context.Context,
8082
return nil, fmt.Errorf("failed to get pod scores: %w", err)
8183
}
8284

83-
// Convert map[string]int to []*indexerpb.PodScore
85+
// Convert map[string]float64 to []*indexerpb.PodScore
86+
// Scoring keys are "pod-1" (non-DP) or "pod-1@dp0" (DP-aware)
8487
scores := make([]*indexerpb.PodScore, 0, len(podScores))
85-
for pod, score := range podScores {
86-
scores = append(scores, &indexerpb.PodScore{
87-
Pod: pod,
88+
for scoringKey, score := range podScores {
89+
ps := &indexerpb.PodScore{
8890
Score: score,
89-
})
91+
}
92+
if idx := strings.LastIndex(scoringKey, "@dp"); idx >= 0 {
93+
ps.Pod = scoringKey[:idx]
94+
if rank, err := strconv.ParseInt(scoringKey[idx+3:], 10, 32); err == nil {
95+
r := int32(rank)
96+
ps.DataParallelRank = &r
97+
}
98+
} else {
99+
ps.Pod = scoringKey
100+
}
101+
scores = append(scores, ps)
90102
}
91103

92104
return &indexerpb.GetPodScoresResponse{

examples/valkey_example/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ func demonstrateValkeyOperations(ctx context.Context, indexer *kvcache.Indexer)
134134
prompt := testdata.Prompt
135135

136136
podEntries := []kvblock.PodEntry{
137-
{PodIdentifier: "demo-pod-1", DeviceTier: "gpu"},
138-
{PodIdentifier: "demo-pod-2", DeviceTier: "gpu"},
137+
{PodIdentifier: "demo-pod-1", DeviceTier: "gpu", DataParallelRank: kvblock.NoDataParallelRank},
138+
{PodIdentifier: "demo-pod-2", DeviceTier: "gpu", DataParallelRank: kvblock.NoDataParallelRank},
139139
}
140140

141141
logger.Info("Processing testdata prompt", "model", modelName, "promptLength", len(prompt))

pkg/kvcache/kvblock/cost_aware_memory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ func (c *CostPodCache) CalculateByteSize(keyStr string) int64 {
150150
totalBytes += int64(len(entry.PodIdentifier)) // PodIdentifier string content
151151
totalBytes += int64(len(entry.DeviceTier)) // DeviceTier string content
152152
totalBytes += 32 // string headers (16 bytes each for 2 strings)
153+
totalBytes += 8 // DataParallelRank int field
153154
totalBytes += 8 // struct padding/alignment
154155
return true
155156
})

pkg/kvcache/kvblock/cost_aware_memory_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestCostAwareIndexSize(t *testing.T) {
4747
// first key
4848
engineKey1 := BlockHash(32490241)
4949
requestKey1 := BlockHash(18986637)
50-
entry1 := PodEntry{PodIdentifier: "pod1", DeviceTier: "gpu"}
50+
entry1 := PodEntry{PodIdentifier: "pod1", DeviceTier: "gpu", DataParallelRank: NoDataParallelRank}
5151

5252
costPodCache := &CostPodCache{}
5353
costPodCache.Add(entry1)
@@ -66,13 +66,13 @@ func TestCostAwareIndexSize(t *testing.T) {
6666
// Add second key
6767
engineKey2 := BlockHash(48712468)
6868
requestKey2 := BlockHash(87654321)
69-
err = index.Add(ctx, []BlockHash{engineKey2}, []BlockHash{requestKey2}, []PodEntry{{PodIdentifier: "pod2", DeviceTier: "gpu"}})
69+
err = index.Add(ctx, []BlockHash{engineKey2}, []BlockHash{requestKey2}, []PodEntry{{PodIdentifier: "pod2", DeviceTier: "gpu", DataParallelRank: NoDataParallelRank}})
7070
require.NoError(t, err)
7171

7272
// Add third key - should evict the first one due to LRU
7373
engineKey3 := BlockHash(96187092)
7474
requestKey3 := BlockHash(56789012)
75-
err = index.Add(ctx, []BlockHash{engineKey3}, []BlockHash{requestKey3}, []PodEntry{{PodIdentifier: "pod3", DeviceTier: "cpu"}})
75+
err = index.Add(ctx, []BlockHash{engineKey3}, []BlockHash{requestKey3}, []PodEntry{{PodIdentifier: "pod3", DeviceTier: "cpu", DataParallelRank: NoDataParallelRank}})
7676
require.NoError(t, err)
7777

7878
// Lookup should only return the last two keys
@@ -82,7 +82,7 @@ func TestCostAwareIndexSize(t *testing.T) {
8282
assert.Len(t, podsPerKey, 1) // Only requestKey3 should be present
8383
assert.Len(t, podsPerKey[requestKey3], 1)
8484

85-
assert.Contains(t, podsPerKey[requestKey3], PodEntry{PodIdentifier: "pod3", DeviceTier: "cpu"})
85+
assert.Contains(t, podsPerKey[requestKey3], PodEntry{PodIdentifier: "pod3", DeviceTier: "cpu", DataParallelRank: NoDataParallelRank})
8686
}
8787

8888
func TestSizeHumanize(t *testing.T) {

pkg/kvcache/kvblock/in_memory_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,19 +57,19 @@ func TestInMemoryIndexSize(t *testing.T) {
5757
// Add first key
5858
engineKey1 := BlockHash(72735753)
5959
requestKey1 := BlockHash(79215516)
60-
err = index.Add(ctx, []BlockHash{engineKey1}, []BlockHash{requestKey1}, []PodEntry{{PodIdentifier: "pod1", DeviceTier: "gpu"}})
60+
err = index.Add(ctx, []BlockHash{engineKey1}, []BlockHash{requestKey1}, []PodEntry{{PodIdentifier: "pod1", DeviceTier: "gpu", DataParallelRank: NoDataParallelRank}})
6161
require.NoError(t, err)
6262

6363
// Add second key
6464
engineKey2 := BlockHash(41341092)
6565
requestKey2 := BlockHash(12871930)
66-
err = index.Add(ctx, []BlockHash{engineKey2}, []BlockHash{requestKey2}, []PodEntry{{PodIdentifier: "pod2", DeviceTier: "gpu"}})
66+
err = index.Add(ctx, []BlockHash{engineKey2}, []BlockHash{requestKey2}, []PodEntry{{PodIdentifier: "pod2", DeviceTier: "gpu", DataParallelRank: NoDataParallelRank}})
6767
require.NoError(t, err)
6868

6969
// Add third key - should evict the first one due to LRU
7070
engineKey3 := BlockHash(34012886)
7171
requestKey3 := BlockHash(69914638)
72-
err = index.Add(ctx, []BlockHash{engineKey3}, []BlockHash{requestKey3}, []PodEntry{{PodIdentifier: "pod3", DeviceTier: "cpu"}})
72+
err = index.Add(ctx, []BlockHash{engineKey3}, []BlockHash{requestKey3}, []PodEntry{{PodIdentifier: "pod3", DeviceTier: "cpu", DataParallelRank: NoDataParallelRank}})
7373
require.NoError(t, err)
7474

7575
// Lookup should only return the last two keys
@@ -79,8 +79,8 @@ func TestInMemoryIndexSize(t *testing.T) {
7979
assert.Len(t, podsPerKey, 2) // Only key2 and key3 should be present
8080
assert.Len(t, podsPerKey[requestKey2], 1)
8181
assert.Len(t, podsPerKey[requestKey3], 1)
82-
assert.Contains(t, podsPerKey[requestKey2], PodEntry{PodIdentifier: "pod2", DeviceTier: "gpu"})
83-
assert.Contains(t, podsPerKey[requestKey3], PodEntry{PodIdentifier: "pod3", DeviceTier: "cpu"})
82+
assert.Contains(t, podsPerKey[requestKey2], PodEntry{PodIdentifier: "pod2", DeviceTier: "gpu", DataParallelRank: NoDataParallelRank})
83+
assert.Contains(t, podsPerKey[requestKey3], PodEntry{PodIdentifier: "pod3", DeviceTier: "cpu", DataParallelRank: NoDataParallelRank})
8484
}
8585

8686
func TestInMemoryIndexPodCacheSize(t *testing.T) {
@@ -99,9 +99,9 @@ func TestInMemoryIndexPodCacheSize(t *testing.T) {
9999
engineKey := BlockHash(28409753)
100100
requestKey := BlockHash(51374550)
101101
pods := []PodEntry{
102-
{PodIdentifier: "pod1", DeviceTier: "gpu"},
103-
{PodIdentifier: "pod2", DeviceTier: "gpu"},
104-
{PodIdentifier: "pod3", DeviceTier: "cpu"}, // This should evict pod1 due to LRU
102+
{PodIdentifier: "pod1", DeviceTier: "gpu", DataParallelRank: NoDataParallelRank},
103+
{PodIdentifier: "pod2", DeviceTier: "gpu", DataParallelRank: NoDataParallelRank},
104+
{PodIdentifier: "pod3", DeviceTier: "cpu", DataParallelRank: NoDataParallelRank}, // This should evict pod1 due to LRU
105105
}
106106

107107
err = index.Add(ctx, []BlockHash{engineKey}, []BlockHash{requestKey}, pods)
@@ -112,8 +112,8 @@ func TestInMemoryIndexPodCacheSize(t *testing.T) {
112112
require.NoError(t, err)
113113
assert.Len(t, podsPerKey, 1)
114114
assert.Len(t, podsPerKey[requestKey], 2, "Should only have 2 pods due to PodCacheSize limit")
115-
assert.Contains(t, podsPerKey[requestKey], PodEntry{PodIdentifier: "pod2", DeviceTier: "gpu"})
116-
assert.Contains(t, podsPerKey[requestKey], PodEntry{PodIdentifier: "pod3", DeviceTier: "cpu"})
115+
assert.Contains(t, podsPerKey[requestKey], PodEntry{PodIdentifier: "pod2", DeviceTier: "gpu", DataParallelRank: NoDataParallelRank})
116+
assert.Contains(t, podsPerKey[requestKey], PodEntry{PodIdentifier: "pod3", DeviceTier: "cpu", DataParallelRank: NoDataParallelRank})
117117
}
118118

119119
// TestSpeculativeAnnotation tests that speculative and confirmed PodEntries

pkg/kvcache/kvblock/index.go

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ import (
2626
"k8s.io/apimachinery/pkg/util/sets"
2727
)
2828

29+
const (
30+
// NoDataParallelRank indicates that no data parallel rank is set.
31+
// This is the default value for non-DP deployments.
32+
NoDataParallelRank = -1
33+
)
34+
2935
// IndexConfig holds the configuration for the KV-block index.
3036
// It may configure several backends such as listed within the struct.
3137
// If multiple backends are configured, only the first one will be used.
@@ -172,13 +178,98 @@ type PodEntry struct {
172178
DeviceTier string
173179
// Speculative indicates the entry was added predictively before a KV event confirmed it.
174180
Speculative bool
181+
// DataParallelRank is the data parallel rank of the pod.
182+
// A value of NoDataParallelRank (-1) indicates no DP rank is set (non-DP deployment).
183+
DataParallelRank int
184+
}
185+
186+
// NewPodEntry creates a PodEntry, converting a *int DP rank to the int sentinel form.
187+
// A nil dpRank is stored as NoDataParallelRank (-1).
188+
func NewPodEntry(podIdentifier, deviceTier string, dpRank *int) PodEntry {
189+
rank := NoDataParallelRank
190+
if dpRank != nil {
191+
rank = *dpRank
192+
}
193+
return PodEntry{
194+
PodIdentifier: podIdentifier,
195+
DeviceTier: deviceTier,
196+
DataParallelRank: rank,
197+
}
175198
}
176199

177200
// String returns a string representation of the PodEntry.
201+
// Format: "pod@tier" (no DP rank) or "pod@tier@dpN" (with DP rank).
178202
func (e *PodEntry) String() string {
179203
suffix := ""
180204
if e.Speculative {
181205
suffix = "[speculative]"
182206
}
183-
return fmt.Sprintf("%s@%s%s", e.PodIdentifier, e.DeviceTier, suffix)
207+
208+
if e.DataParallelRank == NoDataParallelRank {
209+
return fmt.Sprintf("%s@%s%s", e.PodIdentifier, e.DeviceTier, suffix)
210+
}
211+
return fmt.Sprintf("%s@%s%s@dp%s", e.PodIdentifier, e.DeviceTier, suffix, strconv.Itoa(e.DataParallelRank))
212+
}
213+
214+
// ParsePodEntry parses a PodEntry from its string representation.
215+
// It handles both "pod@tier" and "pod@tier@dpN" formats.
216+
func ParsePodEntry(s string) (PodEntry, error) {
217+
// Try 3-part format first: "pod@tier@dpN"
218+
parts := splitPodEntryString(s)
219+
switch len(parts) {
220+
case 3:
221+
dpStr := parts[2]
222+
if len(dpStr) < 3 || dpStr[:2] != "dp" {
223+
return PodEntry{}, fmt.Errorf("invalid dp rank format: %s", dpStr)
224+
}
225+
rank, err := strconv.Atoi(dpStr[2:])
226+
if err != nil {
227+
return PodEntry{}, fmt.Errorf("invalid dp rank number: %s", dpStr)
228+
}
229+
return PodEntry{
230+
PodIdentifier: parts[0],
231+
DeviceTier: parts[1],
232+
DataParallelRank: rank,
233+
}, nil
234+
case 2:
235+
return PodEntry{
236+
PodIdentifier: parts[0],
237+
DeviceTier: parts[1],
238+
DataParallelRank: NoDataParallelRank,
239+
}, nil
240+
default:
241+
return PodEntry{}, fmt.Errorf("invalid pod entry format: %s", s)
242+
}
243+
}
244+
245+
// splitPodEntryString splits a PodEntry string into its components.
246+
// It splits from the right to handle pod identifiers that may contain '@'.
247+
func splitPodEntryString(s string) []string {
248+
// Check for dp suffix (3-part format)
249+
lastAt := lastIndexByte(s, '@')
250+
if lastAt < 0 {
251+
return []string{s}
252+
}
253+
suffix := s[lastAt+1:]
254+
if len(suffix) >= 3 && suffix[:2] == "dp" {
255+
if _, err := strconv.Atoi(suffix[2:]); err == nil {
256+
// This is "something@dpN" — find the tier separator
257+
rest := s[:lastAt]
258+
secondLastAt := lastIndexByte(rest, '@')
259+
if secondLastAt >= 0 {
260+
return []string{rest[:secondLastAt], rest[secondLastAt+1:], suffix}
261+
}
262+
}
263+
}
264+
// 2-part format: "pod@tier"
265+
return []string{s[:lastAt], s[lastAt+1:]}
266+
}
267+
268+
func lastIndexByte(s string, c byte) int {
269+
for i := len(s) - 1; i >= 0; i-- {
270+
if s[i] == c {
271+
return i
272+
}
273+
}
274+
return -1
184275
}

0 commit comments

Comments
 (0)