Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 28 additions & 21 deletions examples/helper/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/llm-d/llm-d-kv-cache/examples/testdata"
"github.com/llm-d/llm-d-kv-cache/pkg/kvcache/kvblock"
"github.com/llm-d/llm-d-kv-cache/pkg/kvevents"
"github.com/llm-d/llm-d-kv-cache/pkg/utils"
"github.com/vmihailenco/msgpack/v5"
"sigs.k8s.io/controller-runtime/pkg/log"
)
Expand All @@ -33,23 +32,27 @@ func SimulateProduceEvent(ctx context.Context, publisher *Publisher) error {
logger := log.FromContext(ctx)
logger.Info("@@@ Simulating vLLM engine publishing BlockStored events...")
medium := "GPU"
blockStoredEvent := kvevents.BlockStored{
BlockHashes: utils.SliceMap(testdata.PromptHashes, func(h uint64) any { return h }),
ParentBlockHash: nil,
TokenIds: []uint32{1, 2, 3},
BlockSize: 256,
LoraID: nil,
Medium: &medium,
LoraName: nil,

// Create event in vLLM msgpack array format: [tag, hashes, parent, tokens, blockSize, loraID, medium, loraName]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, test events were created using specific event structures and then converted to a tagged union format via ToTaggedUnion(). This tagged union matched the exact format vllm sends to llm-d. The tagged union structure was necessary because of double marshaling: first to extracted the event type tag, and the second for the actual event data. I avoided it so I completely removed the ToTaggedUnion().

blockStoredEvent := []any{
"BlockStored", // Tag
testdata.PromptHashes, // BlockHashes (already []uint64)
nil, // ParentBlockHash
[]uint32{1, 2, 3}, // TokenIds
256, // BlockSize
nil, // LoraID
medium, // Medium
nil, // LoraName
}

//nolint // won't fail
blockStoredPayload, _ := msgpack.Marshal(blockStoredEvent.ToTaggedUnion())
blockStoredPayload, _ := msgpack.Marshal(blockStoredEvent)

eventBatch := kvevents.EventBatch{
TS: float64(time.Now().UnixNano()) / 1e9,
Events: []msgpack.RawMessage{blockStoredPayload},
DataParallelRank: nil,
// Create vLLM msgpack event batch in array format: [timestamp, [event1, event2, ...], data_parallel_rank]
eventBatch := []any{
float64(time.Now().UnixNano()) / 1e9, // Timestamp
[][]byte{blockStoredPayload}, // Events array
nil, // DataParallelRank
}

if err := publisher.PublishEvent(ctx, topic, eventBatch); err != nil {
Expand All @@ -70,17 +73,21 @@ func SimulateProduceEvent(ctx context.Context, publisher *Publisher) error {
func SimulateRemoveEvent(ctx context.Context, publisher *Publisher) error {
logger := log.FromContext(ctx)
logger.Info("@@@ Simulating vLLM engine removing some blocks...")
blockRemovedEvent := kvevents.BlockRemoved{
BlockHashes: []any{testdata.PromptHashes[2], testdata.PromptHashes[3]},

// Create event in vLLM msgpack array format: [tag, hashes]
blockRemovedEvent := []any{
"BlockRemoved",
[]uint64{testdata.PromptHashes[2], testdata.PromptHashes[3]},
}

//nolint // won't fail
blockRemovedPayload, _ := msgpack.Marshal(blockRemovedEvent.ToTaggedUnion())
blockRemovedPayload, _ := msgpack.Marshal(blockRemovedEvent)

removeEventBatch := kvevents.EventBatch{
TS: float64(time.Now().UnixNano()) / 1e9,
Events: []msgpack.RawMessage{blockRemovedPayload},
DataParallelRank: nil,
// Create vLLM msgpack event batch in array format: [timestamp, [event1, event2, ...], data_parallel_rank]
removeEventBatch := []any{
float64(time.Now().UnixNano()) / 1e9,
[][]byte{blockRemovedPayload},
nil,
}

if err := publisher.PublishEvent(ctx, topic, removeEventBatch); err != nil {
Expand Down
16 changes: 12 additions & 4 deletions examples/kv_events/pod_reconciler/pod_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/llm-d/llm-d-kv-cache/pkg/kvevents"
"github.com/llm-d/llm-d-kv-cache/pkg/kvevents/engineadapter"
"github.com/llm-d/llm-d-kv-cache/pkg/utils/logging"
)

Expand All @@ -44,8 +45,10 @@ type PodReconcilerConfig struct {
PodNamespace string
// TopicFilter is the ZMQ subscription filter (e.g., "kv@").
TopicFilter string
// SocketPort is the port where vLLM pods expose ZMQ (default: 5557).
// SocketPort is the port where LLM pods expose ZMQ (default: 5557).
SocketPort string
// EngineType specifies which LLM engine type this reconciler manages.
EngineType string
}

// NewPodReconcilerConfig creates a PodReconcilerConfig from kvevents.PodDiscoveryConfig.
Expand All @@ -71,6 +74,7 @@ func NewPodReconcilerConfig(cfg *kvevents.PodDiscoveryConfig, topicFilter string
PodNamespace: cfg.PodNamespace,
TopicFilter: topicFilter,
SocketPort: fmt.Sprintf("%d", socketPort),
EngineType: cfg.EngineType,
}, nil
}

Expand Down Expand Up @@ -118,13 +122,17 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
podIdentifier := req.String()
endpoint := r.buildEndpoint(&pod)

// Get engine type from config (currently vLLM only)
engineType := engineadapter.EngineType(r.Config.EngineType)

debugLogger.Info("Ensuring subscriber for pod",
"pod", req,
"endpoint", endpoint,
"podIP", pod.Status.PodIP)
"podIP", pod.Status.PodIP,
"engineType", engineType)

if err := r.SubscriberManager.EnsureSubscriber(ctx, podIdentifier, endpoint, r.Config.TopicFilter, true); err != nil {
debugLogger.Error(err, "Failed to ensure subscriber for pod", "pod", req)
if err := r.SubscriberManager.EnsureSubscriber(ctx, podIdentifier, endpoint, r.Config.TopicFilter, engineType, true); err != nil {
debugLogger.Error(err, "Failed to ensure subscriber for pod", "pod", req, "engineType", engineType)
return ctrl.Result{}, err
}

Expand Down
2 changes: 1 addition & 1 deletion examples/kv_events/vllm/vllm_kv_cache_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def create_llm():
disable_hybrid_kv_cache_manager=True,
kv_events_config=kv_events_config,
block_size=16,
prefix_caching_hash_algo="sha256_cbor",
prefix_caching_hash_algo="sha256_cbor_64bit",
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had this error when running the test:
INFO 02-24 02:10:17 [__init__.py:235] Automatically detected platform cuda. usage: vllm serve [model_tag] [options] vllm serve: error: argument --prefix-caching-hash-algo: invalid choice: 'sha256_cbor' (choose from builtin, sha256, sha256_cbor_64bit)

enable_lora=True,
max_model_len=4096,
)
Expand Down
26 changes: 26 additions & 0 deletions pkg/kvevents/decoder/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright 2025 The llm-d Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package decoder

// Decoder defines the interface for encoding and decoding raw bytes.
type Decoder interface {
// Decode unmarshals data into the provided value.
Decode(data []byte, v interface{}) error

// Encode marshals the provided value into bytes.
Encode(v interface{}) ([]byte, error)
}
48 changes: 48 additions & 0 deletions pkg/kvevents/decoder/msgpack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2025 The llm-d Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package decoder

import (
"fmt"

"github.com/vmihailenco/msgpack/v5"
)

// MsgpackDecoder implements Decoder for MessagePack format.
type MsgpackDecoder struct{}

// NewMsgpackDecoder creates a new msgpack decoder.
func NewMsgpackDecoder() *MsgpackDecoder {
return &MsgpackDecoder{}
}

// Decode unmarshals msgpack data into the provided value.
func (m *MsgpackDecoder) Decode(data []byte, v interface{}) error {
if err := msgpack.Unmarshal(data, v); err != nil {
return fmt.Errorf("failed to decode msgpack: %w", err)
}
return nil
}

// Encode marshals the provided value into msgpack bytes.
func (m *MsgpackDecoder) Encode(v interface{}) ([]byte, error) {
data, err := msgpack.Marshal(v)
if err != nil {
return nil, fmt.Errorf("failed to encode msgpack: %w", err)
}
return data, nil
}
75 changes: 75 additions & 0 deletions pkg/kvevents/engineadapter/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
Copyright 2025 The llm-d Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engineadapter

import (
"context"
"fmt"

"github.com/llm-d/llm-d-kv-cache/pkg/kvevents/decoder"
"github.com/llm-d/llm-d-kv-cache/pkg/kvevents/events"
"github.com/llm-d/llm-d-kv-cache/pkg/kvevents/transport"
)

// EngineType represents the type of LLM engine.
type EngineType string

const (
// EngineTypeVLLM represents the vLLM engine.
EngineTypeVLLM EngineType = "vllm"
)

// NewAdapter creates a new engine adapter based on the engine type.
func NewAdapter(engineType EngineType) (EngineAdapter, error) {
// It looks useless right now but we're preparing for future support of other engines ;)
switch engineType {
case EngineTypeVLLM:
return NewVLLMAdapter()
default:
return nil, fmt.Errorf("unknown engine type: %s", engineType)
}
}

// EngineAdapter defines the interface for engine-specific adapters.
// Each inference engine has its own adapter implementation that handles
// engine-specific operations.
type EngineAdapter interface {
// Transport returns the transport layer for receiving messages.
Transport() transport.Transport

// Decoder returns the decoder for parsing message payloads.
Decoder() decoder.Decoder

// getHashAsUint64 converts engine-specific hash formats to uint64.
getHashAsUint64(raw any) (uint64, error)

// ReceiveAndDecode receives a message from the transport, parses it,
// decodes the payload, and returns a batch of generic events.
ReceiveAndDecode(ctx context.Context) (*events.EventBatch, error)

// Connect establishes a connection to a remote endpoint.
Connect(ctx context.Context, endpoint string) error

// Bind listens on a local endpoint for incoming connections.
Bind(ctx context.Context, endpoint string) error

// SubscribeToTopic sets the topic filter for receiving messages.
SubscribeToTopic(topicFilter string) error

// Close closes the adapter and releases all resources.
Close() error
}
Loading
Loading