Skip to content

Commit 107cd7a

Browse files
committed
feat: support SGLang KV events in VLLMAdapter
SGLang's BlockStored event may have fewer fields than vLLM (omitting medium, lora_name, extra_keys when they are None/default). This change pads shorter msgpack arrays with nil values before struct deserialization, allowing VLLMAdapter to handle both vLLM and SGLang events transparently.
1 parent 729df62 commit 107cd7a

File tree

2 files changed

+75
-11
lines changed

2 files changed

+75
-11
lines changed

pkg/kvevents/engineadapter/vllm_adapter.go

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,18 @@ func (v *VLLMAdapter) getHashAsUint64(raw any) (uint64, error) {
118118
}
119119
}
120120

121+
const (
122+
// Expected field counts for msgpack array structs.
123+
// Used for padding shorter arrays from SGLang which may omit trailing optional fields.
124+
// See SGLang source: sglang/srt/disaggregation/kv_events.py (BlockStored, BlockRemoved classes)
125+
blockStoredFieldCount = 9 // tag + block_hashes + parent + tokens + block_size + lora_id + medium + lora_name + extra_keys
126+
blockRemovedFieldCount = 3 // tag + block_hashes + medium
127+
128+
// Minimum required fields (excluding trailing optional ones).
129+
blockStoredMinFields = 5 // tag + block_hashes + parent + tokens + block_size
130+
blockRemovedMinFields = 2 // tag + block_hashes
131+
)
132+
121133
// vLLM msgpack-specific event structures.
122134
// These structs are designed for msgpack array encoding and match vLLM's format.
123135
type msgpackVLLMEventBatch struct {
@@ -188,10 +200,31 @@ func (v *VLLMAdapter) decodeVLLMEvent(rawEventBytes []byte) (kvevents.GenericEve
188200
return converter(rawEventBytes)
189201
}
190202

191-
// convertBlockStoredEvent decodes and converts a msgpack vLLM BlockStored event to a generic event.
203+
// convertBlockStoredEvent decodes and converts a BlockStored event to a generic event.
204+
// Supports both vLLM and SGLang by padding shorter arrays with nil for trailing optional fields.
205+
// SGLang may omit medium, lora_name, and extra_keys when they are None/default.
206+
// See: sglang/srt/disaggregation/kv_events.py (BlockStored class, array_like=True, omit_defaults=True)
192207
func (v *VLLMAdapter) convertBlockStoredEvent(rawEventBytes []byte) (kvevents.GenericEvent, error) {
208+
var fields []any
209+
if err := msgpack.Unmarshal(rawEventBytes, &fields); err != nil {
210+
return nil, fmt.Errorf("failed to decode BlockStored event: %w", err)
211+
}
212+
213+
if len(fields) < blockStoredMinFields {
214+
return nil, fmt.Errorf("BlockStored event has too few fields: %d (minimum %d)", len(fields), blockStoredMinFields)
215+
}
216+
217+
// Pad to full struct size so msgpack array decode succeeds.
218+
for len(fields) < blockStoredFieldCount {
219+
fields = append(fields, nil)
220+
}
221+
paddedBytes, err := msgpack.Marshal(fields)
222+
if err != nil {
223+
return nil, fmt.Errorf("failed to re-marshal padded BlockStored event: %w", err)
224+
}
225+
193226
var vllmEvent msgpackVLLMBlockStoredEvent
194-
if err := msgpack.Unmarshal(rawEventBytes, &vllmEvent); err != nil {
227+
if err := msgpack.Unmarshal(paddedBytes, &vllmEvent); err != nil {
195228
return nil, fmt.Errorf("failed to decode BlockStored event: %w", err)
196229
}
197230

@@ -244,10 +277,29 @@ func (v *VLLMAdapter) convertBlockStoredEvent(rawEventBytes []byte) (kvevents.Ge
244277
}, nil
245278
}
246279

247-
// convertBlockRemovedEvent decodes and converts a msgpack vLLM BlockRemoved event to a generic event.
280+
// convertBlockRemovedEvent decodes and converts a BlockRemoved event to a generic event.
281+
// Supports both vLLM and SGLang by padding shorter arrays with nil for trailing optional fields.
282+
// See: sglang/srt/disaggregation/kv_events.py (BlockRemoved class, array_like=True, omit_defaults=True)
248283
func (v *VLLMAdapter) convertBlockRemovedEvent(rawEventBytes []byte) (kvevents.GenericEvent, error) {
284+
var fields []any
285+
if err := msgpack.Unmarshal(rawEventBytes, &fields); err != nil {
286+
return nil, fmt.Errorf("failed to decode BlockRemoved event: %w", err)
287+
}
288+
289+
if len(fields) < blockRemovedMinFields {
290+
return nil, fmt.Errorf("BlockRemoved event has too few fields: %d (minimum %d)", len(fields), blockRemovedMinFields)
291+
}
292+
293+
for len(fields) < blockRemovedFieldCount {
294+
fields = append(fields, nil)
295+
}
296+
paddedBytes, err := msgpack.Marshal(fields)
297+
if err != nil {
298+
return nil, fmt.Errorf("failed to re-marshal padded BlockRemoved event: %w", err)
299+
}
300+
249301
var vllmEvent msgpackVLLMBlockRemovedEvent
250-
if err := msgpack.Unmarshal(rawEventBytes, &vllmEvent); err != nil {
302+
if err := msgpack.Unmarshal(paddedBytes, &vllmEvent); err != nil {
251303
return nil, fmt.Errorf("failed to decode BlockRemoved event: %w", err)
252304
}
253305

pkg/kvevents/engineadapter/vllm_adapter_test.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -178,26 +178,38 @@ func TestDecodeVLLMEvent_BlockStoredWithLora(t *testing.T) {
178178
assert.Equal(t, [][]any{{"uuid-A", "salt"}, nil}, blockStored.ExtraKeys)
179179
}
180180

181-
// TestDecodeVLLMEvent_BlockStoredMissingLoraName tests decoding with missing field.
181+
// TestDecodeVLLMEvent_BlockStoredMissingLoraName tests decoding with missing trailing fields.
182+
// This matches SGLang's BlockStored format which has no lora_name or extra_keys.
182183
func TestDecodeVLLMEvent_BlockStoredMissingLoraName(t *testing.T) {
183184
adapter := NewVLLMAdapter()
184185

185-
vllmEvent := []any{
186+
// SGLang format: 7 fields (tag + 6 data fields, no lora_name, no extra_keys)
187+
sglangEvent := []any{
186188
"BlockStored",
187189
[]any{uint64(300), uint64(301)},
188190
uint64(299),
189191
[]uint32{7, 8, 9},
190192
64,
191-
123,
192-
"gpu",
193+
nil, // lora_id
194+
"GPU", // medium
193195
}
194196

195-
rawBytes, err := msgpack.Marshal(vllmEvent)
197+
rawBytes, err := msgpack.Marshal(sglangEvent)
196198
require.NoError(t, err)
197199

198200
event, err := adapter.decodeVLLMEvent(rawBytes)
199-
assert.Error(t, err)
200-
assert.Nil(t, event)
201+
require.NoError(t, err, "SGLang format without lora_name should decode successfully")
202+
require.NotNil(t, event)
203+
204+
blockStored, ok := event.(*kvevents.BlockStoredEvent)
205+
require.True(t, ok)
206+
assert.Equal(t, []uint64{300, 301}, blockStored.BlockHashes)
207+
assert.Equal(t, uint64(299), blockStored.ParentHash)
208+
assert.Equal(t, []uint32{7, 8, 9}, blockStored.Tokens)
209+
assert.Equal(t, "GPU", blockStored.DeviceTier)
210+
assert.Nil(t, blockStored.LoraID)
211+
assert.Nil(t, blockStored.LoraName, "SGLang does not send lora_name")
212+
assert.Nil(t, blockStored.ExtraKeys, "SGLang does not send extra_keys")
201213
}
202214

203215
// TestDecodeVLLMEvent_BlockStoredInvalidExtraKeys tests invalid extra_keys type.

0 commit comments

Comments
 (0)