Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
b228bb7
feat: implement decode first flow on lmcache connector
kyanokashi Dec 9, 2025
a436b50
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Dec 9, 2025
a6ae771
fix: error handling
kyanokashi Dec 10, 2025
58388eb
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Dec 11, 2025
04b7ffd
chore: add back todo comment
kyanokashi Dec 11, 2025
ce74f50
refactor: reduce code complexity and duplication
kyanokashi Dec 11, 2025
1de6035
refactor: improve header copying
kyanokashi Dec 11, 2025
c0ac69e
chore: add comment explaning the cache_hit_threshold field and the ne…
kyanokashi Dec 15, 2025
7ce5e19
refactor: enhance logging for cache hit threshold in decode flow
kyanokashi Dec 15, 2025
6430a02
refactor: improve error handling and observability when failing to un…
kyanokashi Dec 15, 2025
4c15d95
chore: add deleted informational comments
kyanokashi Dec 15, 2025
cac084f
typo
kyanokashi Dec 15, 2025
91c7a06
refactor: make error logs more descriptive of the failure reason
kyanokashi Dec 15, 2025
69d30b5
feat: add cache hit threshold to prefill request so prefill executes …
kyanokashi Dec 15, 2025
1ed1d89
fix: typo
kyanokashi Dec 17, 2025
515b385
refactor: assign 0 cache_hit_threshold before final decode attempt
kyanokashi Dec 17, 2025
4c8659e
chore: update comment according to feedback
kyanokashi Dec 17, 2025
878585b
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Dec 17, 2025
722b58e
chore: remove istio workaround
kyanokashi Dec 17, 2025
487d333
fix: set cache hit threshold to 0 in prefill request for consistent e…
kyanokashi Dec 18, 2025
cb00b52
refactor: update the log
kyanokashi Dec 22, 2025
88739c6
feat: support online decoding
kyanokashi Dec 22, 2025
9fbb2d1
fix: preserve request body in lmcache connector
kyanokashi Dec 31, 2025
7b18827
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Dec 31, 2025
460b9c8
fix: support sse format for streamed decode
kyanokashi Jan 2, 2026
a722510
chore: add and improve log descriptions
kyanokashi Jan 2, 2026
e2b3380
fix: typo
kyanokashi Jan 4, 2026
548e6c8
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 5, 2026
160fb72
nit: undo capitalization
kyanokashi Jan 5, 2026
9a08dfb
fix: typos
kyanokashi Jan 5, 2026
7508f10
chore: improve error log observability
kyanokashi Jan 5, 2026
bd114fa
refactor: encapsulate http error checking in function and reuse
kyanokashi Jan 5, 2026
5a6a4f6
refactor: encapsulate and reuse code better
kyanokashi Jan 5, 2026
6e6ff8f
fix: lint error
kyanokashi Jan 6, 2026
ea60bf0
refactor: improve code encapsulation and reduce duplication
kyanokashi Jan 6, 2026
0cbc6f9
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 6, 2026
fd43c17
refactor: rename and simplify SSE event signaling logic
kyanokashi Jan 6, 2026
7030e38
refactor: rename lmcache to shared storage protocol
kyanokashi Jan 8, 2026
f84046a
Merge branch 'main' into feat/sidecar/lmcache-connector/decode-first
kyanokashi Jan 16, 2026
64606e0
refactor: restructure proxy components in order to share protocol imp…
kyanokashi Jan 15, 2026
cb2cae1
fix: make request builder unopinionated over field types
kyanokashi Jan 16, 2026
b90ee02
refactor: use already defined keys
kyanokashi Jan 16, 2026
fbf2c17
Apply suggestions from code review
kyanokashi Jan 16, 2026
b315031
fix: remove unused function
kyanokashi Jan 16, 2026
fb20b7d
fix: return err also when http error response fails
kyanokashi Jan 16, 2026
758fb78
Merge branch 'main' into refactor/sidecar/abstract-connector
kyanokashi Jan 20, 2026
aa6270a
Merge branch 'main' into refactor/sidecar/abstract-connector
kyanokashi Jan 22, 2026
1e71083
refactor: various refactorings
kyanokashi Jan 23, 2026
5fbab01
Apply suggestions from code review
kyanokashi Jan 26, 2026
aef9df1
refactor: rewrite function only used by tests as a test util
kyanokashi Jan 26, 2026
36f9b2e
refactor: rename default request builder to shared storage request bu…
kyanokashi Jan 26, 2026
0b8944a
chore: update copyright comments to year 2026
kyanokashi Jan 26, 2026
050b1d7
Merge branch 'main' into refactor/sidecar/abstract-connector
kyanokashi Jan 26, 2026
2490cf0
chore: remove pd-sidecar binary
kyanokashi Feb 5, 2026
30559f5
Merge branch 'main' into refactor/sidecar/abstract-connector
kyanokashi Feb 6, 2026
db93a8d
docs(sidecar): add FIXME comments and document shared state
kyanokashi Feb 9, 2026
3de2b43
refactor(sidecar): extract PDProxyManager interface
kyanokashi Feb 9, 2026
b19a041
refactor(sidecar): use PDProxyManager interface in runners
kyanokashi Feb 9, 2026
c150ae9
refactor(sidecar): export response writer types for testing
kyanokashi Feb 9, 2026
fab28a1
fix(sidecar): correct Write/WriteHeader order in mock handler
kyanokashi Feb 9, 2026
b580a7e
test(sidecar): add unit tests for request builders
kyanokashi Feb 9, 2026
984b93e
test(sidecar): add unit tests for http_errors package
kyanokashi Feb 9, 2026
081dd66
test(sidecar): add unit tests for response writers
kyanokashi Feb 9, 2026
3c81eef
chore(sidecar): regenerate mocks for PDProxyManager interface
kyanokashi Feb 9, 2026
c854728
Merge branch 'main' into refactor/sidecar/abstract-connector
kyanokashi Feb 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ vendor

# Build output
/build

28 changes: 6 additions & 22 deletions cmd/pd-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,21 @@ import (
"net/url"
"os"
"strconv"
"strings"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/llm-d/llm-d-inference-scheduler/pkg/sidecar/proxy"
"github.com/llm-d/llm-d-inference-scheduler/pkg/sidecar/proxy/runners/types"
"github.com/llm-d/llm-d-inference-scheduler/pkg/sidecar/version"
)

var (
// supportedConnectors defines all valid P/D connector types
supportedConnectors = []string{
proxy.ConnectorNIXLV2,
proxy.ConnectorSharedStorage,
proxy.ConnectorSGLang,
}
)

func main() {
port := flag.String("port", "8000", "the port the sidecar is listening on")
vLLMPort := flag.String("vllm-port", "8001", "the port vLLM is listening on")
vLLMDataParallelSize := flag.Int("data-parallel-size", 1, "the vLLM DATA-PARALLEL-SIZE value")
connector := flag.String("connector", proxy.ConnectorNIXLV2, "the P/D connector being used. Supported: "+strings.Join(supportedConnectors, ", "))
connectorStr := flag.String("connector", types.ConnectorNIXLV2.String(), "the P/D connector being used. Supported: "+types.AllConnectorStrings())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could benefit from a specialized flag type (e.g., with validation and conversion to the type instead of "raw" string.

prefillerUseTLS := flag.Bool("prefiller-use-tls", false, "whether to use TLS when sending requests to prefillers")
decoderUseTLS := flag.Bool("decoder-use-tls", false, "whether to use TLS when sending requests to the decoder")
prefillerInsecureSkipVerify := flag.Bool("prefiller-tls-insecure-skip-verify", false, "configures the proxy to skip TLS verification for requests to prefiller")
Expand Down Expand Up @@ -72,16 +63,9 @@ func main() {

logger.Info("Proxy starting", "Built on", version.BuildRef, "From Git SHA", version.CommitSHA)

// Validate connector
isValidConnector := false
for _, validConnector := range supportedConnectors {
if *connector == validConnector {
isValidConnector = true
break
}
}
if !isValidConnector {
logger.Info("Error: --connector must be one of: " + strings.Join(supportedConnectors, ", "))
connector := types.Connector(*connectorStr)
if err := connector.Validate(); err != nil {
logger.Error(err, "invalid connector")
return
}
logger.Info("p/d connector validated", "connector", connector)
Expand Down Expand Up @@ -127,7 +111,7 @@ func main() {
}

config := proxy.Config{
Connector: *connector,
Connector: connector,
PrefillerUseTLS: *prefillerUseTLS,
PrefillerInsecureSkipVerify: *prefillerInsecureSkipVerify,
DecoderInsecureSkipVerify: *decoderInsecureSkipVerify,
Expand Down
11 changes: 8 additions & 3 deletions pkg/sidecar/proxy/allowlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,15 @@ func (av *AllowlistValidator) createPodInformer(poolName string, selector labels
Group: "",
Version: "v1",
Resource: "pods",
}).Namespace(av.namespace).List(context.TODO(), options)
}).Namespace(av.namespace).List(context.TODO(), options) // FIXME: use stored context from Start() to respect shutdown
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you please extract TODO/FIXME from here and other files to a cleanup issue for awareness instead of keeping in the code?

},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = selector.String()
return av.dynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}).Namespace(av.namespace).Watch(context.TODO(), options)
}).Namespace(av.namespace).Watch(context.TODO(), options) // FIXME: use stored context from Start() to respect shutdown
},
}

Expand Down Expand Up @@ -363,7 +363,12 @@ func (av *AllowlistValidator) onPodDelete(obj interface{}) {
av.rebuildAllowlist()
}

// rebuildAllowlist rebuilds the entire allowlist from current pod state
// rebuildAllowlist rebuilds the entire allowlist from current pod state.
// NOTE: This method acquires allowedTargetsMu then podInformersMu (RLock).
// There is a brief window where allowedTargets may be inconsistent with
// podInformers if a pod informer is being added/removed concurrently.
// This is acceptable because the allowlist is rebuilt on every pod event
// and will converge quickly.
func (av *AllowlistValidator) rebuildAllowlist() {
av.allowedTargetsMu.Lock()
defer av.allowedTargetsMu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/sidecar/proxy/chat_completions.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *Server) chatCompletionsHandler(w http.ResponseWriter, r *http.Request)
s.logger.V(4).Info("skip disaggregated prefill")

if !s.forwardDataParallel || !s.dataParallelHandler(w, r) {
s.decoderProxy.ServeHTTP(w, r)
s.pdProxyManager.decoderProxy.ServeHTTP(w, r)
}
return
}
Expand All @@ -75,5 +75,5 @@ func (s *Server) chatCompletionsHandler(w http.ResponseWriter, r *http.Request)
}

s.logger.V(4).Info("SSRF protection: prefill target allowed", "target", prefillHostPort)
s.runConnectorProtocol(w, r, prefillHostPort)
s.protocolRunner.Run(w, r, prefillHostPort, s.logger)
}
11 changes: 8 additions & 3 deletions pkg/sidecar/proxy/chat_completions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"net/http/httptest"
"testing"

"github.com/go-logr/logr"
"github.com/llm-d/llm-d-inference-scheduler/pkg/common"
"github.com/llm-d/llm-d-inference-scheduler/pkg/sidecar/proxy/runners/types/mock"
)

func TestServer_chatCompletionsHandler(t *testing.T) {
Expand Down Expand Up @@ -119,12 +121,15 @@ func TestServer_chatCompletionsHandler(t *testing.T) {
s.prefillSamplerFn = func(n int) int { return i % n }
// verify the hostPort value
var hostPort string
s.runConnectorProtocol = func(_ http.ResponseWriter, _ *http.Request, selectedHostPort string) { hostPort = selectedHostPort }
s.protocolRunner = &mock.ProtocolRunnerMock{
RunFunc: func(_ http.ResponseWriter, _ *http.Request, prefillPodHost string, _ logr.Logger) {
hostPort = prefillPodHost
},
}
var passthrough bool
s.decoderProxy = http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
s.pdProxyManager.decoderProxy = http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
passthrough = true
})
s.dataParallelProxies = make(map[string]http.Handler)
recorder := httptest.NewRecorder()
recorder.Code = 0
s.chatCompletionsHandler(recorder, tt.r)
Expand Down
175 changes: 0 additions & 175 deletions pkg/sidecar/proxy/connector_nixlv2.go

This file was deleted.

Loading