Skip to content
9 changes: 7 additions & 2 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ These components are maintained in the `llm-d-inference-scheduler` repository an
| Scorer | Description | Env Vars |
|------------------|--------------------------------------------|----------|
| Session-aware | Prefers pods from same session | `ENABLE_SESSION_AWARE_SCORER`, `SESSION_AWARE_SCORER_WEIGHT`, `PREFILL_ENABLE_SESSION_AWARE_SCORER`, `PREFILL_SESSION_AWARE_SCORER_WEIGHT` |
| Prefix-aware | Scores based on prompt prefix history;<br>lightweight but may not reflect actual KV-cache state | `ENABLE_PREFIX_AWARE_SCORER`, `PREFIX_AWARE_SCORER_WEIGHT`, `PREFILL_ENABLE_PREFIX_AWARE_SCORER`, `PREFILL_PREFIX_AWARE_SCORER_WEIGHT`, `PREFIX_SCORER_BLOCK_SIZE`|
| KVCache-aware | Scores based on real KV-cache state on vLLM;<br>more accurate but requires extra computation and cycles to track the current cache state | `ENABLE_KVCACHE_AWARE_SCORER`, `KVCACHE_INDEXER_REDIS_ADDR`, `PREFILL_ENABLE_KVCACHE_AWARE_SCORER`, `PREFILL_KVCACHE_INDEXER_REDIS_ADDR`, `HF_TOKEN`, `KVCACHE_INDEXER_REDIS_ADDR` |
| Prefix-aware | Scores based on prompt prefix history;<br>lightweight but may not reflect actual KV-cache state | `ENABLE_PREFIX_AWARE_SCORER`, `PREFIX_AWARE_SCORER_WEIGHT`, `PREFILL_ENABLE_PREFIX_AWARE_SCORER`, `PREFILL_PREFIX_AWARE_SCORER_WEIGHT`, `PREFIX_SCORER_CACHE_CAPACITY`, `PREFIX_SCORER_CACHE_BLOCK_SIZE`|
| KVCache-aware | Scores based on real KV-cache state on vLLM;<br>more accurate but requires extra computation and cycles to track the current cache state | `ENABLE_KVCACHE_AWARE_SCORER`, `KVCACHE_INDEXER_REDIS_ADDR`, `PREFILL_ENABLE_KVCACHE_AWARE_SCORER`, `PREFILL_KVCACHE_INDEXER_REDIS_ADDR`, `HF_TOKEN`, `KVCACHE_INDEXER_REDIS_ADDR` |
| Load-aware | Avoids busy pods | `ENABLE_LOAD_AWARE_SCORER`, `LOAD_AWARE_SCORER_WEIGHT`, `PREFILL_ENABLE_LOAD_AWARE_SCORER`, `PREFILL_LOAD_AWARE_SCORER_WEIGHT` |

### Prefill / Decode Configuration
Expand All @@ -92,6 +92,11 @@ In case Disaggrigated Prefill is enabled, you should also define the following e
- Toggle P/D mode: `PD_ENABLED=true`
- Threshold: `PD_PROMPT_LEN_THRESHOLD=<value>`

### Prefix Aware Scorer Configuration

- `PREFIX_SCORER_CACHE_CAPACITY` - the cache capacity sets the maximum number of blocks the LRU cache can store. A block maps from a chunk of a prompt to a set of pods that are estimated to have the prefix of the prompt that ends at the keyed chunk.
- `PREFIX_SCORER_CACHE_BLOCK_SIZE` - the cache block size defines the length of the prompt chunk that a block is keyed by.

#### Prefill Scorers:
```bash
export PREFILL_ENABLE_SESSION_AWARE_SCORER=true
Expand Down
15 changes: 11 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ const (
pdPromptLenThresholdEnvKey = "PD_PROMPT_LEN_THRESHOLD"
pdPromptLenThresholdDefault = 100

prefixScorerBlockSizeEnvKey = "PREFIX_SCORER_BLOCK_SIZE"
prefixScorerBlockSizeDefault = 256
prefixCacheCapacityEnvKey = "PREFIX_SCORER_CACHE_CAPACITY"
// DefaultPrefixCacheCapacity defines the default value for maximum number of blocks the LRU cache can store.
DefaultPrefixCacheCapacity = 500000

prefixScorerCacheBlockSizeEnvKey = "PREFIX_SCORER_CACHE_BLOCK_SIZE"
// DefaultPrefixCacheBlockSize defines the default value of how many runes each block contains in the prefix cache.
DefaultPrefixCacheBlockSize = 256
)

// Config contains scheduler configuration, currently configuration is loaded from environment variables
Expand All @@ -60,7 +65,8 @@ type Config struct {
PrefillSchedulerPlugins map[string]int
PDEnabled bool
PDThreshold int
PrefixBlockSize int
PrefixCacheBlockSize int
PrefixCacheCapacity int
}

// LoadConfig loads configuration from environment variables and returns a new instance of Config
Expand All @@ -77,7 +83,8 @@ func LoadConfig(logger logr.Logger) *Config {
PrefillSchedulerPlugins: loadPluginInfo(logger, true, pluginNames),
PDEnabled: env.GetEnvString(pdEnabledEnvKey, "false", logger) == "true",
PDThreshold: env.GetEnvInt(pdPromptLenThresholdEnvKey, pdPromptLenThresholdDefault, logger),
PrefixBlockSize: env.GetEnvInt(prefixScorerBlockSizeEnvKey, prefixScorerBlockSizeDefault, logger),
PrefixCacheBlockSize: env.GetEnvInt(prefixScorerCacheBlockSizeEnvKey, DefaultPrefixCacheBlockSize, logger),
PrefixCacheCapacity: env.GetEnvInt(prefixCacheCapacityEnvKey, DefaultPrefixCacheCapacity, logger),
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduling/pd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type Datastore interface {
// provided configuration.
func NewScheduler(ctx context.Context, schedulerConfig *config.Config, ds Datastore) (*Scheduler, error) {
prefixConfig := scorer.DefaultPrefixStoreConfig()
prefixConfig.BlockSize = schedulerConfig.PrefixBlockSize
prefixConfig.CacheBlockSize = schedulerConfig.PrefixCacheBlockSize
prefixConfig.CacheCapacity = schedulerConfig.PrefixCacheCapacity

scheduler := &Scheduler{
threshold: schedulerConfig.PDThreshold,
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduling/pd/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func TestPDSchedule(t *testing.T) {
PrefillSchedulerPlugins: map[string]int{},
PDEnabled: true,
PDThreshold: 5,
PrefixBlockSize: 256,
PrefixCacheBlockSize: 256,
PrefixCacheCapacity: 50000,
}

for _, test := range tests {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduling/plugins/scorer/prefix_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *PrefixAwareScorer) GetCachedPercentage(pod, prompt string) float64 {
}

intVal, _ := rawVal.(int)
return float64(intVal*s.prefixStore.blockSize) / float64(len(prompt))
return float64(intVal*s.prefixStore.cacheBlockSize) / float64(len(prompt))
}

// cleanup Cleans up hits map
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduling/plugins/scorer/prefix_aware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestPrefixAwareScorer(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
// Reset prefix store for each test
config := scorer.DefaultPrefixStoreConfig()
config.BlockSize = 5 // set small chunking for testing
config.CacheBlockSize = 5 // set small chunking for testing

s := scorer.NewPrefixAwareScorer(ctx, config)

Expand Down Expand Up @@ -157,13 +157,13 @@ func TestPrefixAwareScorerProfiling(t *testing.T) {

name2Pod := createPods(nPodsTotal)
config := scorer.DefaultPrefixStoreConfig()
text := generateNonRepeatingText(config.BlockSize * nPodsInStore)
text := generateNonRepeatingText(config.CacheBlockSize * nPodsInStore)
t.Run(testName, func(t *testing.T) {
start := time.Now() // record start time
config := scorer.DefaultPrefixStoreConfig()
s := scorer.NewPrefixAwareScorer(ctx, config)
for i := range nPodsInStore {
prompt := text[0 : (i+1)*config.BlockSize-1]
prompt := text[0 : (i+1)*config.CacheBlockSize-1]
err := s.GetPrefixStore().AddEntry(modelName, prompt, &name2Pod["pod"+strconv.Itoa(i)].NamespacedName)
if err != nil {
t.Errorf("Failed to add entry to prefix store: %v", err)
Expand Down
58 changes: 29 additions & 29 deletions pkg/scheduling/plugins/scorer/prefix_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,34 @@ import (

"github.com/cespare/xxhash/v2"
lru "github.com/hashicorp/golang-lru/v2"

"github.com/llm-d/llm-d-inference-scheduler/pkg/config"
)

const (
// defaultMaxCacheSize sets the maximum number of blocks the LRU cache can store.
defaultMaxCacheSize = 500000
// defaultBlockSize defines how many runes each block contains in the prefix cache.
defaultBlockSize = 256
// defaultMaxBlockCacheSize sets the maximum number of pods a block can store.
defaultMaxBlockCacheSize = 100
// defaultMaxBlockPods defined the default maximum number of pods a block can store. Currently this value cannot be changed by configuration
defaultMaxBlockPods = 100
)

// PrefixStoreConfig contains initialization configuration for PrefixStore.
type PrefixStoreConfig struct {
// CacheSize sets the maximum number of blocks the LRU cache can store.
CacheSize int
// BlockSize defines how many runes each block contains in the prefix cache.
BlockSize int
// BlockCacheSize sets the maximum number of pods a block can store.
BlockCacheSize int
// CacheCapacity sets the maximum number of blocks the LRU cache can store.
// A block maps from a chunk of a prompt to a set of pods that are estimated to have
// the prefix of the prompt that ends at the keyed chunk.
CacheCapacity int
// CacheBlockSize defines the length of the prompt chunk that a block is keyed by.
CacheBlockSize int
// MaxBlockPods sets the maximum number of pods a block can store.
MaxBlockPods int
}

// DefaultPrefixStoreConfig returns an PrefixStoreConfig instance with default
// configuration.
func DefaultPrefixStoreConfig() *PrefixStoreConfig {
return &PrefixStoreConfig{
CacheSize: defaultMaxCacheSize,
BlockSize: defaultBlockSize,
BlockCacheSize: defaultMaxBlockCacheSize,
CacheCapacity: config.DefaultPrefixCacheCapacity,
CacheBlockSize: config.DefaultPrefixCacheBlockSize,
MaxBlockPods: defaultMaxBlockPods,
}
}

Expand All @@ -51,9 +51,9 @@ type block struct {
type PrefixStore struct {
sync.RWMutex

cacheSize int
blockSize int
blockCacheSize int
cacheCapacity int
cacheBlockSize int
maxBlockPods int

store map[string]*lru.Cache[uint64, *block]
}
Expand All @@ -66,16 +66,16 @@ func NewPrefixStore(config *PrefixStoreConfig) *PrefixStore {
}

return &PrefixStore{
cacheSize: config.CacheSize,
blockSize: config.BlockSize,
blockCacheSize: config.BlockCacheSize,
cacheCapacity: config.CacheCapacity,
cacheBlockSize: config.CacheBlockSize,
maxBlockPods: config.MaxBlockPods,
store: make(map[string]*lru.Cache[uint64, *block]),
}
}

// AddEntry adds a new entry to the prefix store.
func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.NamespacedName) error {
if prompt == "" || pod == nil || len(prompt) < s.blockSize /* skip if prompt is too short */ {
if prompt == "" || pod == nil || len(prompt) < s.cacheBlockSize /* skip if prompt is too short */ {
return nil
}

Expand All @@ -84,7 +84,7 @@ func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.Names
cache, ok := s.store[modelName]
if !ok {
var err error
cache, err = lru.New[uint64, *block](s.cacheSize)
cache, err = lru.New[uint64, *block](s.cacheCapacity)
if err != nil {
return fmt.Errorf("failed to create LRU cache for model %s: %w", modelName, err)
}
Expand All @@ -98,8 +98,8 @@ func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.Names
digest := xxhash.New()

// Chunk the text into blocks and populate the cache
for start := 0; start < len(promptBytes); start += s.blockSize {
end := start + s.blockSize
for start := 0; start < len(promptBytes); start += s.cacheBlockSize {
end := start + s.cacheBlockSize
if end > len(promptBytes) {
break // skip partial blocks
}
Expand All @@ -118,7 +118,7 @@ func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.Names

b, ok := cache.Get(blockHash)
if !ok {
pods, err := lru.New[types.NamespacedName, time.Time](s.blockCacheSize)
pods, err := lru.New[types.NamespacedName, time.Time](s.maxBlockPods)
if err != nil {
return fmt.Errorf("failed to create LRU cache for block: %w", err)
}
Expand All @@ -136,7 +136,7 @@ func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.Names
// FindMatchingPods finds all pods that match the given prompt and model name.
// It returns a map of pods and the number of blocks they match.
func (s *PrefixStore) FindMatchingPods(prompt, modelName string) map[string]int {
if prompt == "" || modelName == "" || len(prompt) < s.blockSize /* skip if prompt is too short */ {
if prompt == "" || modelName == "" || len(prompt) < s.cacheBlockSize /* skip if prompt is too short */ {
return nil
}

Expand All @@ -153,8 +153,8 @@ func (s *PrefixStore) FindMatchingPods(prompt, modelName string) map[string]int
digest := xxhash.New()

matchedPods := make(map[string]int)
for start := 0; start < len(promptBytes); start += s.blockSize {
end := start + s.blockSize
for start := 0; start < len(promptBytes); start += s.cacheBlockSize {
end := start + s.cacheBlockSize
if end > len(promptBytes) {
break // skip partial blocks
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduling/plugins/scorer/prefix_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestBasicPrefixOperations(t *testing.T) {
_ = log.IntoContext(ctx, logr.New(log.NullLogSink{}))

config := scorer.DefaultPrefixStoreConfig()
config.BlockSize = 5 // set small chunking for testing
config.CacheBlockSize = 5 // set small chunking for testing
store := scorer.NewPrefixStore(config)

podName := k8stypes.NamespacedName{
Expand Down
Loading