-
Notifications
You must be signed in to change notification settings - Fork 107
Expand file tree
/
Copy pathindex.go
More file actions
186 lines (164 loc) · 6.9 KB
/
index.go
File metadata and controls
186 lines (164 loc) · 6.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/*
Copyright 2025 The llm-d Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kvblock
import (
"context"
"fmt"
"strconv"
"time"
"github.com/llm-d/llm-d-kv-cache/pkg/kvcache/metrics"
"k8s.io/apimachinery/pkg/util/sets"
)
// IndexConfig holds the configuration for the KV-block index.
// It may configure several backends such as listed within the struct.
// If multiple backends are configured, only the first one will be used.
type IndexConfig struct {
// InMemoryConfig holds the configuration for the in-memory index.
InMemoryConfig *InMemoryIndexConfig `json:"inMemoryConfig"`
// RedisConfig holds the configuration for the Redis index.
RedisConfig *RedisIndexConfig `json:"redisConfig"`
// ValkeyConfig holds the configuration for the Valkey index.
ValkeyConfig *RedisIndexConfig `json:"valkeyConfig"`
// CostAwareMemoryConfig holds the configuration for the cost-aware memory index.
CostAwareMemoryConfig *CostAwareMemoryIndexConfig `json:"costAwareMemoryConfig"`
// EnableMetrics toggles whether admissions/evictions/hits/misses are
// recorded.
EnableMetrics bool `json:"enableMetrics"`
// MetricsLoggingInterval defines the interval at which metrics are logged.
// If zero, metrics logging is disabled.
// Requires `EnableMetrics` to be true.
MetricsLoggingInterval time.Duration `json:"metricsLoggingInterval"`
}
// DefaultIndexConfig returns a default configuration for the KV-block index.
func DefaultIndexConfig() *IndexConfig {
return &IndexConfig{
InMemoryConfig: DefaultInMemoryIndexConfig(),
EnableMetrics: false,
}
}
// NewIndex creates a new Index instance.
func NewIndex(ctx context.Context, cfg *IndexConfig) (Index, error) {
if cfg == nil {
cfg = DefaultIndexConfig()
}
var idx Index
var err error
switch {
case cfg.CostAwareMemoryConfig != nil:
idx, err = NewCostAwareMemoryIndex(cfg.CostAwareMemoryConfig)
if err != nil {
return nil, fmt.Errorf("failed to create cost-aware memory index: %w", err)
}
case cfg.ValkeyConfig != nil:
//nolint:contextcheck // NewValkeyIndex does not accept context parameter
idx, err = NewValkeyIndex(cfg.ValkeyConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Valkey index: %w", err)
}
case cfg.RedisConfig != nil:
//nolint:contextcheck // NewRedisIndex does not accept context parameter
idx, err = NewRedisIndex(cfg.RedisConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Redis index: %w", err)
}
case cfg.InMemoryConfig != nil:
idx, err = NewInMemoryIndex(cfg.InMemoryConfig)
if err != nil {
return nil, fmt.Errorf("failed to create in-memory index: %w", err)
}
default:
return nil, fmt.Errorf("no valid index configuration provided")
}
// wrap in metrics only if enabled
if cfg.EnableMetrics {
idx = NewInstrumentedIndex(idx)
metrics.Register()
if cfg.MetricsLoggingInterval > 0 {
// this is non-blocking
metrics.StartMetricsLogging(ctx, cfg.MetricsLoggingInterval)
}
}
return idx, nil
}
// Index defines the interface for a backend that manages KV-block
// indexing.
//
// An index backend is a data store that will aggregate possibly the entire
// global KV cache block index, and will be used to retrieve pod-localities
// for a given set of consecutive keys that constitute a prefix-cache hit.
// The hit may not necessarily be on all keys, but of the longest prefix match.
//
// The index backend allows efficient tracking of which vLLM engines hold which
// KV-blocks, on what device tier, and when they were last updated.
//
// Index operations are thread-safe and can be performed concurrently.
type Index interface {
// Lookup receives a list of keys and a set of pod identifiers,
// and retrieves the filtered pods associated with those keys.
// The filtering is done based on the pod identifiers provided.
// If the podIdentifierSet is empty, all pods are returned.
//
// It returns:
// 1. A map where the keys are those in requestKeys and the values are pod-identifiers.
// 2. An error if any occurred during the operation.
Lookup(ctx context.Context, requestKeys []BlockHash, podIdentifierSet sets.Set[string]) (map[BlockHash][]PodEntry, error)
// Add adds a set of engineKeys/requestKeys and their associated pod entries to the index backend.
// If engineKeys is nil, only requestKey -> PodEntry mappings are created (no engineKey -> requestKey mapping).
// This is used for speculative entries where engine keys are not yet known.
// All implementations must handle nil engineKeys without panicking.
Add(ctx context.Context, engineKeys, requestKeys []BlockHash, entries []PodEntry) error
// Evict removes a key and its associated pod entries from the index backend.
// keyType indicates whether the key is an EngineKey (requires engine→request lookup)
// or a RequestKey (used directly).
Evict(ctx context.Context, key BlockHash, keyType KeyType, entries []PodEntry) error
// GetRequestKey returns the requestKey associated with the given engineKey.
GetRequestKey(ctx context.Context, engineKey BlockHash) (BlockHash, error)
}
// KeyType indicates whether a key passed to Evict is an engine key or a request key.
type KeyType int
const (
// EngineKey means the key is an engine-assigned key that must be resolved
// to a request key via the engineToRequestKeys mapping.
EngineKey KeyType = iota
// RequestKey means the key is a request key and can be used directly.
// This is used for speculative entries that were added without engineKey mapping.
RequestKey
)
// BlockHash struct represents a unique identifier for a KV-cache block.
type BlockHash uint64
// EmptyBlockHash represents an invalid or uninitialized block hash.
// This serves as the "error value".
const EmptyBlockHash BlockHash = 0
// String returns a string representation of the Key.
func (c BlockHash) String() string {
return strconv.FormatUint(uint64(c), 10)
}
// PodEntry struct represents a pod entry in the KV-block index.
type PodEntry struct {
// PodIdentifier is the unique identifier for the pod.
PodIdentifier string
// DeviceTier is the tier of the device where the KV-block is stored.
DeviceTier string
// Speculative indicates the entry was added predictively before a KV event confirmed it.
Speculative bool
// StoredGroups tracks the group IDs that have stored this block.
StoredGroups []int
}
// String returns a string representation of the PodEntry.
func (e *PodEntry) String() string {
suffix := ""
if e.Speculative {
suffix = "[speculative]"
}
return fmt.Sprintf("%s@%s%s", e.PodIdentifier, e.DeviceTier, suffix)
}