Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
b228bb7
feat: implement decode first flow on lmcache connector
kyanokashi Dec 9, 2025
a436b50
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Dec 9, 2025
a6ae771
fix: error handling
kyanokashi Dec 10, 2025
58388eb
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Dec 11, 2025
04b7ffd
chore: add back todo comment
kyanokashi Dec 11, 2025
ce74f50
refactor: reduce code complexity and duplication
kyanokashi Dec 11, 2025
1de6035
refactor: improve header copying
kyanokashi Dec 11, 2025
c0ac69e
chore: add comment explaning the cache_hit_threshold field and the ne…
kyanokashi Dec 15, 2025
7ce5e19
refactor: enhance logging for cache hit threshold in decode flow
kyanokashi Dec 15, 2025
6430a02
refactor: improve error handling and observability when failing to un…
kyanokashi Dec 15, 2025
4c15d95
chore: add deleted informational comments
kyanokashi Dec 15, 2025
cac084f
typo
kyanokashi Dec 15, 2025
91c7a06
refactor: make error logs more descriptive of the failure reason
kyanokashi Dec 15, 2025
69d30b5
feat: add cache hit threshold to prefill request so prefill executes …
kyanokashi Dec 15, 2025
1ed1d89
fix: typo
kyanokashi Dec 17, 2025
515b385
refactor: assign 0 cache_hit_threshold before final decode attempt
kyanokashi Dec 17, 2025
4c8659e
chore: update comment according to feedback
kyanokashi Dec 17, 2025
878585b
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Dec 17, 2025
722b58e
chore: remove istio workaround
kyanokashi Dec 17, 2025
487d333
fix: set cache hit threshold to 0 in prefill request for consistent e…
kyanokashi Dec 18, 2025
cb00b52
refactor: update the log
kyanokashi Dec 22, 2025
88739c6
feat: support online decoding
kyanokashi Dec 22, 2025
9fbb2d1
fix: preserve request body in lmcache connector
kyanokashi Dec 31, 2025
7b18827
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Dec 31, 2025
460b9c8
fix: support sse format for streamed decode
kyanokashi Jan 2, 2026
a722510
chore: add and improve log descriptions
kyanokashi Jan 2, 2026
e2b3380
fix: typo
kyanokashi Jan 4, 2026
548e6c8
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 5, 2026
160fb72
nit: undo capitalization
kyanokashi Jan 5, 2026
9a08dfb
fix: typos
kyanokashi Jan 5, 2026
7508f10
chore: improve error log observability
kyanokashi Jan 5, 2026
bd114fa
refactor: encapsulate http error checking in function and reuse
kyanokashi Jan 5, 2026
5a6a4f6
refactor: encapsulate and reuse code better
kyanokashi Jan 5, 2026
6e6ff8f
fix: lint error
kyanokashi Jan 6, 2026
ea60bf0
refactor: improve code encapsulation and reduce duplication
kyanokashi Jan 6, 2026
0cbc6f9
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 6, 2026
fd43c17
refactor: rename and simplify SSE event signaling logic
kyanokashi Jan 6, 2026
7030e38
refactor: rename lmcache to shared storage protocol
kyanokashi Jan 8, 2026
f84046a
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 16, 2026
2f0e99e
fix: remove unused function
kyanokashi Jan 16, 2026
ea3f1da
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 23, 2026
2e35d85
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 31, 2026
d71b10b
test: e2e tests
kyanokashi Jan 31, 2026
e34600f
chore: claude gitignore
kyanokashi Jan 31, 2026
efd638a
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Feb 2, 2026
5e988e5
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Feb 3, 2026
fbb12fc
fix: sim deployment
kyanokashi Feb 5, 2026
903249f
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Feb 5, 2026
f6024d1
feat: make linter running on new code configurable
kyanokashi Feb 6, 2026
5f042e7
fix: lint errors
kyanokashi Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 88 additions & 16 deletions pkg/sidecar/proxy/connector_lmcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ package proxy

import (
"encoding/json"
"fmt"
"io"
"maps"
"net/http"
"strings"
)

func (s *Server) runLMCacheProtocol(w http.ResponseWriter, r *http.Request, prefillPodHostPort string) {
s.logger.Info("running LMCache protocol")

// Read and parse request body
defer r.Body.Close() //nolint:all
original, err := io.ReadAll(r.Body)
if err != nil {
Expand All @@ -35,7 +36,6 @@ func (s *Server) runLMCacheProtocol(w http.ResponseWriter, r *http.Request, pref
return
}

// Parse completion request
var completionRequest map[string]any
if err := json.Unmarshal(original, &completionRequest); err != nil {
if err := errorJSONInvalid(err, w); err != nil {
Expand All @@ -44,11 +44,85 @@ func (s *Server) runLMCacheProtocol(w http.ResponseWriter, r *http.Request, pref
return
}

// Create prefiller request. Set max_tokens to 1.
if s.forwardDataParallel && s.dataParallelHandler(w, r) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole "if" code block and its contents should be removed. We don't want to start with "prefill". We want to start with "tryDecode" flow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to account for the temporary workaround @shmuelk mentioned he put in place for something related to istio. In that case we would only prefill, therefore, we don't want to call tryDecode.

Copy link

@kfirwolfson kfirwolfson Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that if DP is in use, it's only for the Decode phase, and prefill works normally. @shmuelk please correct me if I am mistaken. Assuming this code will be removed soon, maybe we can avoid having a special handling for s.forwardDataParallel == true

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is for the workaround we have for Istio 1.28.0.

Istio 1.28.1 has a fix for this issue. We will be removing this code in the future.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be when sending to prefill or decode, @shmuelk ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the istio workaround specifically intends to avoid decoding and only prefill for some scenarios.

I guess the question is whether the condition caused from the istio workaround and the cache_hit_threshold being present in the request could exist. Technically it shouldn't because that field is only relevant for decoding correct me if I'm wrong @kfirwolfson

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure anyone will try running the connector_lmcache.py code with s.forwardDataParallel==True, let alone with cache_hit_threshold>0, before the s.forwardDataParallel code will be removed, so I don't think it matters much.
But for now, let's just remove this whole "if" statement. It's inaccurate for when s.forwardDataParallel==True and irrelevant for when s.forwardDataParallel==False.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err := s.prefill(w, r, prefillPodHostPort, completionRequest); err != nil {
s.logger.Error(err, "prefill failed")
}
return
}

// If "cache_hit_threshold" is present in the request, we try to decode first. The decode node must mit the cache hit threshold in order to execute.
// If the decode node is below the threshold, it won't process the request and return a "cache_threshold" finish reason. In that case, we need to prefill.
// For more infromation refer to the RFC https://github.com/vllm-project/vllm/issues/24256
if _, hasCacheHitThreshold := completionRequest[requestFieldCacheHitThreshold]; hasCacheHitThreshold {
needsPrefill, err := s.tryDecode(w, r)
if err != nil {
return
}
if !needsPrefill {
s.logger.V(4).Info("decode succeeded without prefill")
return
}
s.logger.Info("decode failed due to insufficient cache hit threshold.")
}

if err := s.prefill(w, r, prefillPodHostPort, completionRequest); err != nil {
s.logger.Error(err, "prefill failed")
return
}

s.logger.V(4).Info("forwarding to decoder after prefill")
r.Body = io.NopCloser(strings.NewReader(string(original)))
s.decoderProxy.ServeHTTP(w, r)
}

// tryDecode attempts to decode and returns whether prefill is needed
func (s *Server) tryDecode(w http.ResponseWriter, r *http.Request) (bool, error) {
dw := &bufferedResponseWriter{}
s.decoderProxy.ServeHTTP(dw, r)

// Check for non-success status codes
if dw.statusCode < 200 || dw.statusCode >= 300 {
w.WriteHeader(dw.statusCode)
if dw.buffer.Len() > 0 {
w.Write([]byte(dw.buffer.String())) //nolint:all
}
return false, fmt.Errorf("decode request failed with status code: %d", dw.statusCode)
}

// Parse response to check finish_reason
var response map[string]any
if err := json.Unmarshal([]byte(dw.buffer.String()), &response); err != nil {
w.WriteHeader(dw.statusCode)
w.Write([]byte(dw.buffer.String())) //nolint:all
return false, err
}

// Check for cache_threshold finish reason
if choices, ok := response[responseFieldChoices].([]any); ok && len(choices) > 0 {
if choice, ok := choices[0].(map[string]any); ok {
if finishReason, ok := choice[responseFieldFinishReason].(string); ok {
if finishReason == finishReasonCacheThreshold {
return true, nil
}
}

}
}

// Decode succeeded, write response to client

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how this would work with Streaming (vLLM online inference with partial responses). Would it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm this is tricky.

Previously we were passing a pass through writer to the decode proxy which was responsible for writing responses back to the client.

Now, because we need to parse the response to determine the finish reason, we use a buffered writer so we could read the first choice.

Let me explore some options. I'm thinking of either updating bufferedResponseWriter to support streaming or implementing a new writer type that handles the cache_threshold case specifically

Copy link
Contributor Author

@kyanokashi kyanokashi Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what I ended up doing 88739c6

It's a bit complex, but couldn't think of a better way to do it.

Still need to test this

@kfirwolfson

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work.

maps.Copy(w.Header(), dw.headers)
w.Write([]byte(dw.buffer.String())) //nolint:all

return false, nil
}

// prefill routes a request to a preill node
func (s *Server) prefill(w http.ResponseWriter, r *http.Request, prefillPodHostPort string, completionRequest map[string]any) error {
ctx := r.Context()
preq := r.Clone(ctx)

// Prepare prefill request
completionRequest[requestFieldMaxTokens] = 1
completionRequest[requestFieldMaxCompletionTokens] = 1

Expand All @@ -57,35 +131,33 @@ func (s *Server) runLMCacheProtocol(w http.ResponseWriter, r *http.Request, pref
if err := errorJSONInvalid(err, w); err != nil {
s.logger.Error(err, "failed to send error response to client")
}
return
return err
}
preq.Body = io.NopCloser(strings.NewReader(string(pbody)))
preq.ContentLength = int64(len(pbody))

// Forward request to prefiller

prefillHandler, err := s.prefillerProxyHandler(prefillPodHostPort)
if err != nil {
if err := errorBadGateway(err, w); err != nil {
s.logger.Error(err, "failed to send error response to client")
}
return
return err
}

// send prefill request
s.logger.V(4).Info("sending prefill request", "to", prefillPodHostPort)
pw := &bufferedResponseWriter{}
prefillHandler.ServeHTTP(pw, preq)

if pw.statusCode < 200 || pw.statusCode >= 300 {
s.logger.Error(err, "request failed", "code", pw.statusCode)
s.logger.Error(nil, "prefill request failed", "code", pw.statusCode)
w.WriteHeader(pw.statusCode)
return
if pw.buffer.Len() > 0 {
w.Write([]byte(pw.buffer.String())) //nolint:all
}
return err
}

// Forward original request to local decoder

r.Body = io.NopCloser(strings.NewReader(string(original)))
if !s.forwardDataParallel || !s.dataParallelHandler(w, r) {
s.logger.V(4).Info("sending request to decoder", "to", s.decoderURL.Host)
s.decoderProxy.ServeHTTP(w, r)
}
s.logger.V(4).Info("prefill completed successfully")
return nil
}
6 changes: 6 additions & 0 deletions pkg/sidecar/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ const (
requestFieldRemotePort = "remote_port"
requestFieldStream = "stream"
requestFieldStreamOptions = "stream_options"
requestFieldCacheHitThreshold = "cache_hit_threshold"

responseFieldChoices = "choices"
responseFieldFinishReason = "finish_reason"

finishReasonCacheThreshold = "cache_threshold"

// SGLang bootstrap fields
requestFieldBootstrapHost = "bootstrap_host"
Expand Down