Skip to content

Commit f9172a9

Browse files
mayabarvMaroon
andauthored
Prefix aware scorer initialization (#143)
* read configuration of prefix aware scorer from environment variables Signed-off-by: Maya Barnea <mayab@il.ibm.com> * update prefix aware scorer by envrinotment variables values Signed-off-by: Maya Barnea <mayab@il.ibm.com> * - update prefix scorer related environment variables names - remove PREFIX_SCORER_MAX_BLOCK_CACHE_SIZE, which will be defined internally - update names of configuration variables in prefix scorer configuration Signed-off-by: Maya Barnea <mayab@il.ibm.com> * Update pkg/scheduling/plugins/scorer/prefix_store.go Add explanation for prefix scorer's environment variable by Maroon Co-authored-by: Maroon Ayoub <Maroonay@gmail.com> Signed-off-by: Maya Barnea <mayab@il.ibm.com> * Update pkg/scheduling/plugins/scorer/prefix_store.go Add cache block size variable description (Maroon) Co-authored-by: Maroon Ayoub <Maroonay@gmail.com> Signed-off-by: Maya Barnea <mayab@il.ibm.com> * fix merge problems Signed-off-by: Maya Barnea <mayab@il.ibm.com> * update prefix store environment variables info in documentation Signed-off-by: Maya Barnea <mayab@il.ibm.com> * update comments for prefix scorer default values Signed-off-by: Maya Barnea <mayab@il.ibm.com> * update comments for prefix store default value Signed-off-by: Maya Barnea <mayab@il.ibm.com> --------- Signed-off-by: Maya Barnea <mayab@il.ibm.com> Co-authored-by: Maroon Ayoub <Maroonay@gmail.com>
1 parent f750962 commit f9172a9

File tree

8 files changed

+56
-42
lines changed

8 files changed

+56
-42
lines changed

docs/architecture.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ These components are maintained in the `llm-d-inference-scheduler` repository an
8181
| Scorer | Description | Env Vars |
8282
|------------------|--------------------------------------------|----------|
8383
| Session-aware | Prefers pods from same session | `ENABLE_SESSION_AWARE_SCORER`, `SESSION_AWARE_SCORER_WEIGHT`, `PREFILL_ENABLE_SESSION_AWARE_SCORER`, `PREFILL_SESSION_AWARE_SCORER_WEIGHT` |
84-
| Prefix-aware | Scores based on prompt prefix history;<br>lightweight but may not reflect actual KV-cache state | `ENABLE_PREFIX_AWARE_SCORER`, `PREFIX_AWARE_SCORER_WEIGHT`, `PREFILL_ENABLE_PREFIX_AWARE_SCORER`, `PREFILL_PREFIX_AWARE_SCORER_WEIGHT`, `PREFIX_SCORER_BLOCK_SIZE`|
85-
| KVCache-aware | Scores based on real KV-cache state on vLLM;<br>more accurate but requires extra computation and cycles to track the current cache state | `ENABLE_KVCACHE_AWARE_SCORER`, `KVCACHE_INDEXER_REDIS_ADDR`, `PREFILL_ENABLE_KVCACHE_AWARE_SCORER`, `PREFILL_KVCACHE_INDEXER_REDIS_ADDR`, `HF_TOKEN`, `KVCACHE_INDEXER_REDIS_ADDR` |
84+
| Prefix-aware | Scores based on prompt prefix history;<br>lightweight but may not reflect actual KV-cache state | `ENABLE_PREFIX_AWARE_SCORER`, `PREFIX_AWARE_SCORER_WEIGHT`, `PREFILL_ENABLE_PREFIX_AWARE_SCORER`, `PREFILL_PREFIX_AWARE_SCORER_WEIGHT`, `PREFIX_SCORER_CACHE_CAPACITY`, `PREFIX_SCORER_CACHE_BLOCK_SIZE`|
85+
| KVCache-aware | Scores based on real KV-cache state on vLLM;<br>more accurate but requires extra computation and cycles to track the current cache state | `ENABLE_KVCACHE_AWARE_SCORER`, `KVCACHE_INDEXER_REDIS_ADDR`, `PREFILL_ENABLE_KVCACHE_AWARE_SCORER`, `PREFILL_KVCACHE_INDEXER_REDIS_ADDR`, `HF_TOKEN`, `KVCACHE_INDEXER_REDIS_ADDR` |
8686
| Load-aware | Avoids busy pods | `ENABLE_LOAD_AWARE_SCORER`, `LOAD_AWARE_SCORER_WEIGHT`, `PREFILL_ENABLE_LOAD_AWARE_SCORER`, `PREFILL_LOAD_AWARE_SCORER_WEIGHT` |
8787

8888
### Prefill / Decode Configuration
@@ -92,6 +92,11 @@ In case Disaggrigated Prefill is enabled, you should also define the following e
9292
- Toggle P/D mode: `PD_ENABLED=true`
9393
- Threshold: `PD_PROMPT_LEN_THRESHOLD=<value>`
9494

95+
### Prefix Aware Scorer Configuration
96+
97+
- `PREFIX_SCORER_CACHE_CAPACITY` - the cache capacity sets the maximum number of blocks the LRU cache can store. A block maps from a chunk of a prompt to a set of pods that are estimated to have the prefix of the prompt that ends at the keyed chunk.
98+
- `PREFIX_SCORER_CACHE_BLOCK_SIZE` - the cache block size defines the length of the prompt chunk that a block is keyed by.
99+
95100
#### Prefill Scorers:
96101
```bash
97102
export PREFILL_ENABLE_SESSION_AWARE_SCORER=true

pkg/config/config.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,13 @@ const (
5050
pdPromptLenThresholdEnvKey = "PD_PROMPT_LEN_THRESHOLD"
5151
pdPromptLenThresholdDefault = 100
5252

53-
prefixScorerBlockSizeEnvKey = "PREFIX_SCORER_BLOCK_SIZE"
54-
prefixScorerBlockSizeDefault = 256
53+
prefixCacheCapacityEnvKey = "PREFIX_SCORER_CACHE_CAPACITY"
54+
// DefaultPrefixCacheCapacity defines the default value for maximum number of blocks the LRU cache can store.
55+
DefaultPrefixCacheCapacity = 500000
56+
57+
prefixScorerCacheBlockSizeEnvKey = "PREFIX_SCORER_CACHE_BLOCK_SIZE"
58+
// DefaultPrefixCacheBlockSize defines the default value of how many runes each block contains in the prefix cache.
59+
DefaultPrefixCacheBlockSize = 256
5560
)
5661

5762
// Config contains scheduler configuration, currently configuration is loaded from environment variables
@@ -60,7 +65,8 @@ type Config struct {
6065
PrefillSchedulerPlugins map[string]int
6166
PDEnabled bool
6267
PDThreshold int
63-
PrefixBlockSize int
68+
PrefixCacheBlockSize int
69+
PrefixCacheCapacity int
6470
}
6571

6672
// LoadConfig loads configuration from environment variables and returns a new instance of Config
@@ -77,7 +83,8 @@ func LoadConfig(logger logr.Logger) *Config {
7783
PrefillSchedulerPlugins: loadPluginInfo(logger, true, pluginNames),
7884
PDEnabled: env.GetEnvString(pdEnabledEnvKey, "false", logger) == "true",
7985
PDThreshold: env.GetEnvInt(pdPromptLenThresholdEnvKey, pdPromptLenThresholdDefault, logger),
80-
PrefixBlockSize: env.GetEnvInt(prefixScorerBlockSizeEnvKey, prefixScorerBlockSizeDefault, logger),
86+
PrefixCacheBlockSize: env.GetEnvInt(prefixScorerCacheBlockSizeEnvKey, DefaultPrefixCacheBlockSize, logger),
87+
PrefixCacheCapacity: env.GetEnvInt(prefixCacheCapacityEnvKey, DefaultPrefixCacheCapacity, logger),
8188
}
8289
}
8390

pkg/scheduling/pd/scheduler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ type Datastore interface {
6060
// provided configuration.
6161
func NewScheduler(ctx context.Context, schedulerConfig *config.Config, ds Datastore) (*Scheduler, error) {
6262
prefixConfig := scorer.DefaultPrefixStoreConfig()
63-
prefixConfig.BlockSize = schedulerConfig.PrefixBlockSize
63+
prefixConfig.CacheBlockSize = schedulerConfig.PrefixCacheBlockSize
64+
prefixConfig.CacheCapacity = schedulerConfig.PrefixCacheCapacity
6465

6566
scheduler := &Scheduler{
6667
threshold: schedulerConfig.PDThreshold,

pkg/scheduling/pd/scheduler_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ func TestPDSchedule(t *testing.T) {
141141
PrefillSchedulerPlugins: map[string]int{},
142142
PDEnabled: true,
143143
PDThreshold: 5,
144-
PrefixBlockSize: 256,
144+
PrefixCacheBlockSize: 256,
145+
PrefixCacheCapacity: 50000,
145146
}
146147

147148
for _, test := range tests {

pkg/scheduling/plugins/scorer/prefix_aware.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func (s *PrefixAwareScorer) GetCachedPercentage(pod, prompt string) float64 {
144144
}
145145

146146
intVal, _ := rawVal.(int)
147-
return float64(intVal*s.prefixStore.blockSize) / float64(len(prompt))
147+
return float64(intVal*s.prefixStore.cacheBlockSize) / float64(len(prompt))
148148
}
149149

150150
// cleanup Cleans up hits map

pkg/scheduling/plugins/scorer/prefix_aware_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func TestPrefixAwareScorer(t *testing.T) {
113113
t.Run(tt.name, func(t *testing.T) {
114114
// Reset prefix store for each test
115115
config := scorer.DefaultPrefixStoreConfig()
116-
config.BlockSize = 5 // set small chunking for testing
116+
config.CacheBlockSize = 5 // set small chunking for testing
117117

118118
s := scorer.NewPrefixAwareScorer(ctx, config)
119119

@@ -157,13 +157,13 @@ func TestPrefixAwareScorerProfiling(t *testing.T) {
157157

158158
name2Pod := createPods(nPodsTotal)
159159
config := scorer.DefaultPrefixStoreConfig()
160-
text := generateNonRepeatingText(config.BlockSize * nPodsInStore)
160+
text := generateNonRepeatingText(config.CacheBlockSize * nPodsInStore)
161161
t.Run(testName, func(t *testing.T) {
162162
start := time.Now() // record start time
163163
config := scorer.DefaultPrefixStoreConfig()
164164
s := scorer.NewPrefixAwareScorer(ctx, config)
165165
for i := range nPodsInStore {
166-
prompt := text[0 : (i+1)*config.BlockSize-1]
166+
prompt := text[0 : (i+1)*config.CacheBlockSize-1]
167167
err := s.GetPrefixStore().AddEntry(modelName, prompt, &name2Pod["pod"+strconv.Itoa(i)].NamespacedName)
168168
if err != nil {
169169
t.Errorf("Failed to add entry to prefix store: %v", err)

pkg/scheduling/plugins/scorer/prefix_store.go

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,34 +10,34 @@ import (
1010

1111
"github.com/cespare/xxhash/v2"
1212
lru "github.com/hashicorp/golang-lru/v2"
13+
14+
"github.com/llm-d/llm-d-inference-scheduler/pkg/config"
1315
)
1416

1517
const (
16-
// defaultMaxCacheSize sets the maximum number of blocks the LRU cache can store.
17-
defaultMaxCacheSize = 500000
18-
// defaultBlockSize defines how many runes each block contains in the prefix cache.
19-
defaultBlockSize = 256
20-
// defaultMaxBlockCacheSize sets the maximum number of pods a block can store.
21-
defaultMaxBlockCacheSize = 100
18+
// defaultMaxBlockPods defined the default maximum number of pods a block can store. Currently this value cannot be changed by configuration
19+
defaultMaxBlockPods = 100
2220
)
2321

2422
// PrefixStoreConfig contains initialization configuration for PrefixStore.
2523
type PrefixStoreConfig struct {
26-
// CacheSize sets the maximum number of blocks the LRU cache can store.
27-
CacheSize int
28-
// BlockSize defines how many runes each block contains in the prefix cache.
29-
BlockSize int
30-
// BlockCacheSize sets the maximum number of pods a block can store.
31-
BlockCacheSize int
24+
// CacheCapacity sets the maximum number of blocks the LRU cache can store.
25+
// A block maps from a chunk of a prompt to a set of pods that are estimated to have
26+
// the prefix of the prompt that ends at the keyed chunk.
27+
CacheCapacity int
28+
// CacheBlockSize defines the length of the prompt chunk that a block is keyed by.
29+
CacheBlockSize int
30+
// MaxBlockPods sets the maximum number of pods a block can store.
31+
MaxBlockPods int
3232
}
3333

3434
// DefaultPrefixStoreConfig returns an PrefixStoreConfig instance with default
3535
// configuration.
3636
func DefaultPrefixStoreConfig() *PrefixStoreConfig {
3737
return &PrefixStoreConfig{
38-
CacheSize: defaultMaxCacheSize,
39-
BlockSize: defaultBlockSize,
40-
BlockCacheSize: defaultMaxBlockCacheSize,
38+
CacheCapacity: config.DefaultPrefixCacheCapacity,
39+
CacheBlockSize: config.DefaultPrefixCacheBlockSize,
40+
MaxBlockPods: defaultMaxBlockPods,
4141
}
4242
}
4343

@@ -51,9 +51,9 @@ type block struct {
5151
type PrefixStore struct {
5252
sync.RWMutex
5353

54-
cacheSize int
55-
blockSize int
56-
blockCacheSize int
54+
cacheCapacity int
55+
cacheBlockSize int
56+
maxBlockPods int
5757

5858
store map[string]*lru.Cache[uint64, *block]
5959
}
@@ -66,16 +66,16 @@ func NewPrefixStore(config *PrefixStoreConfig) *PrefixStore {
6666
}
6767

6868
return &PrefixStore{
69-
cacheSize: config.CacheSize,
70-
blockSize: config.BlockSize,
71-
blockCacheSize: config.BlockCacheSize,
69+
cacheCapacity: config.CacheCapacity,
70+
cacheBlockSize: config.CacheBlockSize,
71+
maxBlockPods: config.MaxBlockPods,
7272
store: make(map[string]*lru.Cache[uint64, *block]),
7373
}
7474
}
7575

7676
// AddEntry adds a new entry to the prefix store.
7777
func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.NamespacedName) error {
78-
if prompt == "" || pod == nil || len(prompt) < s.blockSize /* skip if prompt is too short */ {
78+
if prompt == "" || pod == nil || len(prompt) < s.cacheBlockSize /* skip if prompt is too short */ {
7979
return nil
8080
}
8181

@@ -84,7 +84,7 @@ func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.Names
8484
cache, ok := s.store[modelName]
8585
if !ok {
8686
var err error
87-
cache, err = lru.New[uint64, *block](s.cacheSize)
87+
cache, err = lru.New[uint64, *block](s.cacheCapacity)
8888
if err != nil {
8989
return fmt.Errorf("failed to create LRU cache for model %s: %w", modelName, err)
9090
}
@@ -98,8 +98,8 @@ func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.Names
9898
digest := xxhash.New()
9999

100100
// Chunk the text into blocks and populate the cache
101-
for start := 0; start < len(promptBytes); start += s.blockSize {
102-
end := start + s.blockSize
101+
for start := 0; start < len(promptBytes); start += s.cacheBlockSize {
102+
end := start + s.cacheBlockSize
103103
if end > len(promptBytes) {
104104
break // skip partial blocks
105105
}
@@ -118,7 +118,7 @@ func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.Names
118118

119119
b, ok := cache.Get(blockHash)
120120
if !ok {
121-
pods, err := lru.New[types.NamespacedName, time.Time](s.blockCacheSize)
121+
pods, err := lru.New[types.NamespacedName, time.Time](s.maxBlockPods)
122122
if err != nil {
123123
return fmt.Errorf("failed to create LRU cache for block: %w", err)
124124
}
@@ -136,7 +136,7 @@ func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.Names
136136
// FindMatchingPods finds all pods that match the given prompt and model name.
137137
// It returns a map of pods and the number of blocks they match.
138138
func (s *PrefixStore) FindMatchingPods(prompt, modelName string) map[string]int {
139-
if prompt == "" || modelName == "" || len(prompt) < s.blockSize /* skip if prompt is too short */ {
139+
if prompt == "" || modelName == "" || len(prompt) < s.cacheBlockSize /* skip if prompt is too short */ {
140140
return nil
141141
}
142142

@@ -153,8 +153,8 @@ func (s *PrefixStore) FindMatchingPods(prompt, modelName string) map[string]int
153153
digest := xxhash.New()
154154

155155
matchedPods := make(map[string]int)
156-
for start := 0; start < len(promptBytes); start += s.blockSize {
157-
end := start + s.blockSize
156+
for start := 0; start < len(promptBytes); start += s.cacheBlockSize {
157+
end := start + s.cacheBlockSize
158158
if end > len(promptBytes) {
159159
break // skip partial blocks
160160
}

pkg/scheduling/plugins/scorer/prefix_store_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func TestBasicPrefixOperations(t *testing.T) {
1717
_ = log.IntoContext(ctx, logr.New(log.NullLogSink{}))
1818

1919
config := scorer.DefaultPrefixStoreConfig()
20-
config.BlockSize = 5 // set small chunking for testing
20+
config.CacheBlockSize = 5 // set small chunking for testing
2121
store := scorer.NewPrefixStore(config)
2222

2323
podName := k8stypes.NamespacedName{

0 commit comments

Comments
 (0)