Skip to content
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
b228bb7
feat: implement decode first flow on lmcache connector
kyanokashi Dec 9, 2025
a436b50
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Dec 9, 2025
a6ae771
fix: error handling
kyanokashi Dec 10, 2025
58388eb
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Dec 11, 2025
04b7ffd
chore: add back todo comment
kyanokashi Dec 11, 2025
ce74f50
refactor: reduce code complexity and duplication
kyanokashi Dec 11, 2025
1de6035
refactor: improve header copying
kyanokashi Dec 11, 2025
c0ac69e
chore: add comment explaning the cache_hit_threshold field and the ne…
kyanokashi Dec 15, 2025
7ce5e19
refactor: enhance logging for cache hit threshold in decode flow
kyanokashi Dec 15, 2025
6430a02
refactor: improve error handling and observability when failing to un…
kyanokashi Dec 15, 2025
4c15d95
chore: add deleted informational comments
kyanokashi Dec 15, 2025
cac084f
typo
kyanokashi Dec 15, 2025
91c7a06
refactor: make error logs more descriptive of the failure reason
kyanokashi Dec 15, 2025
69d30b5
feat: add cache hit threshold to prefill request so prefill executes …
kyanokashi Dec 15, 2025
1ed1d89
fix: typo
kyanokashi Dec 17, 2025
515b385
refactor: assign 0 cache_hit_threshold before final decode attempt
kyanokashi Dec 17, 2025
4c8659e
chore: update comment according to feedback
kyanokashi Dec 17, 2025
878585b
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Dec 17, 2025
722b58e
chore: remove istio workaround
kyanokashi Dec 17, 2025
487d333
fix: set cache hit threshold to 0 in prefill request for consistent e…
kyanokashi Dec 18, 2025
cb00b52
refactor: update the log
kyanokashi Dec 22, 2025
88739c6
feat: support online decoding
kyanokashi Dec 22, 2025
9fbb2d1
fix: preserve request body in lmcache connector
kyanokashi Dec 31, 2025
7b18827
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Dec 31, 2025
460b9c8
fix: support sse format for streamed decode
kyanokashi Jan 2, 2026
a722510
chore: add and improve log descriptions
kyanokashi Jan 2, 2026
e2b3380
fix: typo
kyanokashi Jan 4, 2026
548e6c8
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 5, 2026
160fb72
nit: undo capitalization
kyanokashi Jan 5, 2026
9a08dfb
fix: typos
kyanokashi Jan 5, 2026
7508f10
chore: improve error log observability
kyanokashi Jan 5, 2026
bd114fa
refactor: encapsulate http error checking in function and reuse
kyanokashi Jan 5, 2026
5a6a4f6
refactor: encapsulate and reuse code better
kyanokashi Jan 5, 2026
6e6ff8f
fix: lint error
kyanokashi Jan 6, 2026
ea60bf0
refactor: improve code encapsulation and reduce duplication
kyanokashi Jan 6, 2026
0cbc6f9
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 6, 2026
fd43c17
refactor: rename and simplify SSE event signaling logic
kyanokashi Jan 6, 2026
7030e38
refactor: rename lmcache to shared storage protocol
kyanokashi Jan 8, 2026
f84046a
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 16, 2026
2f0e99e
fix: remove unused function
kyanokashi Jan 16, 2026
ea3f1da
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 23, 2026
2e35d85
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 31, 2026
d71b10b
test: e2e tests
kyanokashi Jan 31, 2026
e34600f
chore: claude gitignore
kyanokashi Jan 31, 2026
efd638a
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Feb 2, 2026
5e988e5
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Feb 3, 2026
fbb12fc
fix: sim deployment
kyanokashi Feb 5, 2026
903249f
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Feb 5, 2026
f6024d1
feat: make linter running on new code configurable
kyanokashi Feb 6, 2026
5f042e7
fix: lint errors
kyanokashi Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/pd-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (
// supportedConnectors defines all valid P/D connector types
supportedConnectors = []string{
proxy.ConnectorNIXLV2,
proxy.ConnectorLMCache,
proxy.ConnectorSharedStorage,
proxy.ConnectorSGLang,
}
)
Expand Down
91 changes: 0 additions & 91 deletions pkg/sidecar/proxy/connector_lmcache.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/sidecar/proxy/connector_nixlv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *Server) runNIXLProtocolV2(w http.ResponseWriter, r *http.Request, prefi
pw := &bufferedResponseWriter{}
prefillHandler.ServeHTTP(pw, preq)

if pw.statusCode < 200 || pw.statusCode >= 300 {
if isHTTPError(pw.statusCode) {
s.logger.Error(err, "request failed", "code", pw.statusCode)
w.WriteHeader(pw.statusCode)
return
Expand Down
276 changes: 276 additions & 0 deletions pkg/sidecar/proxy/connector_shared_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
/*
Copyright 2025 The llm-d Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package proxy

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"maps"
"net/http"
"strings"
)

func (s *Server) runSharedStorageProtocol(w http.ResponseWriter, r *http.Request, prefillPodHostPort string) {
s.logger.V(4).Info("running Shared Storage protocol", "url", prefillPodHostPort)

// Read and parse request body
defer r.Body.Close() //nolint:all
original, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest) // TODO: check FastAPI error code when failing to read body
w.Write([]byte(err.Error())) //nolint:all
return
}

// Parse completion request
var completionRequest map[string]any
if err := json.Unmarshal(original, &completionRequest); err != nil {
if err := errorJSONInvalid(err, w); err != nil {
s.logger.Error(err, "failed to send Invalid JSON error response to client")
}
return
}

// If "cache_hit_threshold" is present in the request, we try to decode first. The decode node must meet the cache hit threshold in order to execute.
// If the decode node is below the threshold, it won't process the request and return a "cache_threshold" finish reason. In that case,
// we fall back to P/D disaggregation: perform prefill and then decode.
// For more information refer to the RFC https://github.com/vllm-project/vllm/issues/24256
if cacheHitThreshold, hasCacheHitThreshold := completionRequest[requestFieldCacheHitThreshold]; hasCacheHitThreshold {
s.logger.V(4).Info("cache_hit_threshold field found in the request, trying to decode first", requestFieldCacheHitThreshold, cacheHitThreshold)
decodeReq := cloneRequestWithBody(r, original)
needsPrefill, err := s.tryDecode(w, decodeReq, completionRequest)
if err != nil {
return
}
if !needsPrefill {
s.logger.V(4).Info("decode succeeded without prefill")
return
}
s.logger.V(4).Info("decode failed due to failing to meet the cache hit threshold", requestFieldCacheHitThreshold, cacheHitThreshold)
}

// we clone the completion request to avoid modifying the original request
prefillRequest := maps.Clone(completionRequest)
if err := s.prefill(w, r, prefillPodHostPort, prefillRequest); err != nil {
s.logger.Error(err, "prefill failed")
return
}

s.logger.V(4).Info("forwarding to decoder after prefill")
completionRequest[requestFieldCacheHitThreshold] = 0
decodeRequestBody, err := json.Marshal(completionRequest)
if err != nil {
if err := errorJSONInvalid(err, w); err != nil {
s.logger.Error(err, "failed to send Invalid JSON error response to client")
}
return
}

decodeReq := cloneRequestWithBody(r, decodeRequestBody)
s.decoderProxy.ServeHTTP(w, decodeReq)
}

// tryDecode attempts to decode and returns whether prefill is needed.
func (s *Server) tryDecode(w http.ResponseWriter, r *http.Request, completionRequest map[string]any) (bool, error) {
if isStreaming, _ := completionRequest[requestFieldStream].(bool); isStreaming {
if flusher, ok := w.(flushableResponseWriter); ok {
bw := newResponseWriterWithBuffer(flusher)
return s.tryDecodeStreaming(bw, r)
}
}
return s.tryDecodeBuffered(w, r)
}

// tryDecodeBuffered handles non-streaming decode attempts.
// It buffers the entire response before inspecting it.
func (s *Server) tryDecodeBuffered(w http.ResponseWriter, r *http.Request) (bool, error) {
dw := &bufferedResponseWriter{}
s.decoderProxy.ServeHTTP(dw, r)

if isHTTPError(dw.statusCode) {

w.WriteHeader(dw.statusCode)
if dw.buffer.Len() > 0 {
w.Write([]byte(dw.buffer.String())) //nolint:all
}

err := errors.New("decode request failed")
s.logger.Error(err, "unexpected status code", "code", dw.statusCode)

return false, err
}

// Parse response to check finish_reason
var response map[string]any
if err := json.Unmarshal([]byte(dw.buffer.String()), &response); err != nil {
s.logger.Error(err, "failed to unmarshal decode response", "response", dw.buffer.String())

if err := errorInternalServerError(err, w); err != nil {
s.logger.Error(err, "failed to send error response to client")
}
return false, err
}

// Check for cache_threshold finish reason
if s.hasCacheThresholdFinishReason(response) {
return true, nil
}

// Decode succeeded, write response to client
maps.Copy(w.Header(), dw.headers)
w.Write([]byte(dw.buffer.String())) //nolint:all

return false, nil
}

// tryDecodeStreaming handles streaming decode attempts.
// It buffers the initial response to check for cache_threshold, then switches
// to direct streaming mode if decode succeeds.
func (s *Server) tryDecodeStreaming(w *responseWriterWithBuffer, r *http.Request) (bool, error) {
// Run ServeHTTP in a goroutine so we can inspect the initial choice to determine if we need to prefill.
done := make(chan struct{})
go func() {
defer close(done)
s.decoderProxy.ServeHTTP(w, r)
}()

// Wait for either:
// - firstChunkReady(): first body data is available in buffer
// - done: request completed (possibly with no body, e.g., error response)
select {
case <-w.firstChunkReady():
case <-done:
s.logger.V(4).Info("request completed without body data")
}

statusCode := w.getStatusCode()
if isHTTPError(statusCode) {
if err := w.flushBufferAndGoDirect(); err != nil {
s.logger.Error(err, "failed to flush buffer to client")
return false, err
}
return false, fmt.Errorf("decode request failed with status code: %d", statusCode)
}

// Check buffered SSE content for cache_threshold finish reason.
if s.checkBufferedResponseForCacheThreshold(w.buffered()) {
s.logger.V(4).Info("finish reason cache_threshold detected, needs prefill")
return true, nil
}

// No cache_threshold finish reason found, flush buffer and switch to direct mode
// to let the rest of the response stream through.
s.logger.V(4).Info("first response for request shows success without cache_threshold finish reason")
if err := w.flushBufferAndGoDirect(); err != nil {
s.logger.Error(err, "failed to flush buffer to client and switch to direct mode")
return false, err
}
<-done
return false, nil
}

// hasCacheThresholdFinishReason checks if a parsed response contains cache_threshold finish reason.
func (s *Server) hasCacheThresholdFinishReason(response map[string]any) bool {
choices, ok := response[responseFieldChoices].([]any)
if !ok || len(choices) == 0 {
return false
}

choice, ok := choices[0].(map[string]any)
if !ok {
return false
}

finishReason, ok := choice[responseFieldFinishReason].(string)
return ok && finishReason == finishReasonCacheThreshold
}

// checkBufferedResponseForCacheThreshold checks the buffered SSE response for cache_threshold finish reason.
// This is only called for streaming responses, so data is always in SSE format.
func (s *Server) checkBufferedResponseForCacheThreshold(data string) bool {
// Parse SSE format: "data: {...json...}\n\ndata: {...json...}\n\n"
for _, line := range strings.Split(data, "\n") {
line = strings.TrimSpace(line)
if line == "" || line == "data: [DONE]" || !strings.HasPrefix(line, "data: ") {
continue
}

jsonData := strings.TrimPrefix(line, "data: ")
var response map[string]any
if err := json.Unmarshal([]byte(jsonData), &response); err != nil {
s.logger.V(4).Info("skipping malformed SSE chunk", "chunk", jsonData)
continue
}

if s.hasCacheThresholdFinishReason(response) {
return true
}
}
return false
}

// prefill routes a request to a prefill node
func (s *Server) prefill(w http.ResponseWriter, r *http.Request, prefillPodHostPort string, completionRequest map[string]any) error {
// Prepare prefill request
completionRequest[requestFieldMaxTokens] = 1
completionRequest[requestFieldMaxCompletionTokens] = 1
completionRequest[requestFieldCacheHitThreshold] = 0

pbody, err := json.Marshal(completionRequest)
if err != nil {
if err := errorJSONInvalid(err, w); err != nil {
s.logger.Error(err, "failed to send Invalid JSON error response to client")
}
return err
}
preq := cloneRequestWithBody(r, pbody)

prefillHandler, err := s.prefillerProxyHandler(prefillPodHostPort)
if err != nil {
if err := errorBadGateway(err, w); err != nil {
s.logger.Error(err, "failed to send Bad Gateway error response to client")
}
return err
}

// send prefill request
s.logger.V(4).Info("sending prefill request", "to", prefillPodHostPort)
pw := &bufferedResponseWriter{}
prefillHandler.ServeHTTP(pw, preq)

if isHTTPError(pw.statusCode) {
s.logger.Error(nil, "prefill request failed", "code", pw.statusCode)
w.WriteHeader(pw.statusCode)
if pw.buffer.Len() > 0 {
w.Write([]byte(pw.buffer.String())) //nolint:all
}
return fmt.Errorf("prefill request failed with status code: %d", pw.statusCode)
}

s.logger.V(4).Info("prefill completed successfully")
return nil
}

func cloneRequestWithBody(r *http.Request, body []byte) *http.Request {
cloned := r.Clone(r.Context())
cloned.Body = io.NopCloser(bytes.NewReader(body))
cloned.ContentLength = int64(len(body))
return cloned
}
2 changes: 1 addition & 1 deletion pkg/sidecar/proxy/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type sidecarTestInfo struct {
proxy *Server
}

var connectors = []string{ConnectorLMCache, ConnectorNIXLV2}
var connectors = []string{ConnectorSharedStorage, ConnectorNIXLV2}

var _ = Describe("Common Connector tests", func() {

Expand Down
Loading