refactor(sidecar): encapsulate code better in order to share protocol implementation between different connectors#566
Conversation
- if cache_hit_threshold field is present in completion request, then we perform a decode first flow Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyanokashi <71283892+kyanokashi@users.noreply.github.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
…w decode first flow Signed-off-by: kyano <kyanokashi2@gmail.com>
- decrease verbosity for common log - add cache_hit_threshold attribute Signed-off-by: kyano <kyanokashi2@gmail.com>
…marshal decode response Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
…regardless of cache condition Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
…xecution Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Signed-off-by: kyano <kyanokashi2@gmail.com>
Addressed |
|
Hi. I didn't review the code yet, just a thought on naming. |
I think I'll rename to |
- renaming various components and packages - further encapsulation in submodules to avoid import cycles - improved encapsulation under well defined types Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
Co-authored-by: Shmuel Kallner <kallner@il.ibm.com> Signed-off-by: kyanokashi <71283892+kyanokashi@users.noreply.github.com>
Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
…ilder Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
|
@kyanokashi could you please remove the |
Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
Incorporates main changes (GIE update, models extractor plugin, dependency bumps, decode-first flow, lmcache→shared-storage rename) into the refactored sidecar architecture. Conflicts resolved by keeping refactor branch versions; superseded connector_shared_storage.go and duplicate isHTTPError() removed.
- Add FIXME comments for context.TODO() in allowlist pod watchers - Document dual-mutex acquisition pattern in rebuildAllowlist - Clarify Clone() shared state for data parallel workers Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
- Add PDProxyManager interface to decouple runners from concrete manager - Move manager.go to pd_proxy_manager.go - Add GetDecoderProxy() method to ProxyManager - Update go:generate directive to include PDProxyManager in mocks Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
- Update VLLMRunner and SGLangRunner to use PDProxyManager interface - Rename field from proxyManager to proxyHandler for clarity - Update profile handlers (chat_completions, data_parallel) to use interface - Update tests to use PDProxyManager mock Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
- Export BufferedResponseWriter and ResponseWriterWithBuffer - Export helper methods and constructor - Add documentation comments for all exported items Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
WriteHeader must be called before Write per http.ResponseWriter contract. Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
- Test prefill request modifications (stream=false, max_tokens, etc.) - Test decode request state restoration - Verify original request immutability - Cover both SharedStorage and NIXLV2 builders Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
- Test JSON error response formatting - Verify correct HTTP status codes (400, 502, 500) - Ensure error messages are propagated correctly - Rename http_errors.go to errors.go for consistency Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
- Test BufferedResponseWriter append and status code handling - Test ResponseWriterWithBuffer buffering and direct mode transition - Test SSE event delimiter counting for FirstChunkReady signal - Verify GetStatusCode and Buffered accessors Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
Generated by moq to include PDProxyManager interface mock. Signed-off-by: kyanokashi <kyanokashi2@gmail.com>
c50aa4c to
3c81eef
Compare
elevran
left a comment
There was a problem hiding this comment.
@kyanokashi thank you for this work.
It is a significant refactoring, restructuring the sidecar proxy code to improve modularity, testability, and maintainability.
Left some suggestions and nits. Nothing major from my side - all can go into future/followup PRs.
Leaving lgtm and approval to @shmuelk (he has a changes-requested on the PR)
| sr.proxyHandler.GetDecoderProxy().ServeHTTP(w, decodeReq) | ||
| } | ||
|
|
||
| func cloneWithJSONBody(r *http.Request, body []byte) *http.Request { |
There was a problem hiding this comment.
nit: consider sharing with runner_vllm.go's implementation of cloneRequestWithBody?
| Version: "v1", | ||
| Resource: "pods", | ||
| }).Namespace(av.namespace).List(context.TODO(), options) | ||
| }).Namespace(av.namespace).List(context.TODO(), options) // FIXME: use stored context from Start() to respect shutdown |
There was a problem hiding this comment.
nit: can you please extract TODO/FIXME from here and other files to a cleanup issue for awareness instead of keeping in the code?
| vLLMPort := flag.String("vllm-port", "8001", "the port vLLM is listening on") | ||
| vLLMDataParallelSize := flag.Int("data-parallel-size", 1, "the vLLM DATA-PARALLEL-SIZE value") | ||
| connector := flag.String("connector", proxy.ConnectorNIXLV2, "the P/D connector being used. Supported: "+strings.Join(supportedConnectors, ", ")) | ||
| connectorStr := flag.String("connector", types.ConnectorNIXLV2.String(), "the P/D connector being used. Supported: "+types.AllConnectorStrings()) |
There was a problem hiding this comment.
nit: could benefit from a specialized flag type (e.g., with validation and conversion to the type instead of "raw" string.
| return strings.Join(strs, ", ") | ||
| } | ||
|
|
||
| //go:generate moq -stub -out mock/types.go -pkg mock . ProtocolRunner PDProxyManager |
There was a problem hiding this comment.
Q: will generate run automatically on changes to this file or should the makefile add a dependency of mocks file on this file so it is run on change?
| // SGLangRunner implements P/D disaggregation for SGLang backends. | ||
| // Unlike VLLMRunner, SGLangRunner sends prefill and decode requests concurrently | ||
| // and uses SGLang's built-in bootstrap coordination mechanism. | ||
| type SGLangRunner struct { |
There was a problem hiding this comment.
nit: consider using builders for consistency with vllm runner?
| sglangBootstrapPort = 8998 | ||
|
|
||
| // Override from environment variable if set | ||
| if portStr := os.Getenv("SGLANG_BOOTSTRAP_PORT"); portStr != "" { |
There was a problem hiding this comment.
nit: perhaps use a command line option for this? environment variables are less visible to users (eg. not in --help). You can still the envvar as the default value.
|
|
||
| r.Header.Add(keys.RequestHeaderRequestID, uuidStr) | ||
|
|
||
| // If "cache_hit_threshold" is present in the request, we try to decode first. The decode node must meet the cache hit threshold in order to execute. |
There was a problem hiding this comment.
nit: might also want to keep decode first enablement behind a feature/cli flag.
There was a problem hiding this comment.
nit: we shuold be removing the data_parallel implementation from the sidecar altogether (not in this PR)
| prefillerInsecureSkipVerify bool | ||
|
|
||
| decoderProxy http.Handler // decoder proxy handler | ||
| prefillerProxies *lru.Cache[string, http.Handler] // cached prefiller proxy handlers |
There was a problem hiding this comment.
nit: consider adding note regarding Clone() sharing these as shallow copy (if I'm not mistaken there's a note on the server.Clone, but maybe highlight here)?
| "syscall" | ||
| "time" | ||
|
|
||
| httperrors "github.com/llm-d/llm-d-inference-scheduler/pkg/sidecar/proxy/http_errors" |
There was a problem hiding this comment.
nit: consider placement of all helper functions (e.g., from this file and http_errors?) in http package.
| @@ -1,5 +1,5 @@ | |||
| /* | |||
| Copyright 2025 The llm-d Authors. | |||
| Copyright 2026 The llm-d Authors. | |||
There was a problem hiding this comment.
| Copyright 2026 The llm-d Authors. | |
| Copyright 2025, 2026 The llm-d Authors. |
| sglangBootstrapPort = 8998 | ||
|
|
||
| // Override from environment variable if set | ||
| if portStr := os.Getenv("SGLANG_BOOTSTRAP_PORT"); portStr != "" { |
There was a problem hiding this comment.
This should come from a configuration arameter, just as the vLLM port does.
| func (sr *SGLangRunner) sendSGLangConcurrentRequests(w http.ResponseWriter, r *http.Request, body []byte, prefillHost string, logger logr.Logger) { | ||
| // Create separate requests for prefill and decode | ||
| prefillReq := cloneWithJSONBody(r, body) | ||
| decodeReq := cloneWithJSONBody(r, body) |
| @@ -1,5 +1,5 @@ | |||
| /* | |||
| Copyright 2025 The llm-d Authors. | |||
| Copyright 2026 The llm-d Authors. | |||
There was a problem hiding this comment.
| Copyright 2026 The llm-d Authors. | |
| Copyright 2025, 2026 The llm-d Authors. |
| } | ||
|
|
||
| var connectors = []string{ConnectorSharedStorage, ConnectorNIXLV2} | ||
| var connectorsArr = []types.Connector{types.ConnectorSharedStorage, types.ConnectorNIXLV2} |
There was a problem hiding this comment.
I don't think there is a need to call the variable connectorsArr
| var connectorsArr = []types.Connector{types.ConnectorSharedStorage, types.ConnectorNIXLV2} | |
| var connectors = []types.Connector{types.ConnectorSharedStorage, types.ConnectorNIXLV2} |
|
|
||
| for _, connector := range connectors { | ||
| When(fmt.Sprintf("running with the %s connector", connector), func() { | ||
| for _, connector := range connectorsArr { |
There was a problem hiding this comment.
| for _, connector := range connectorsArr { | |
| for _, connector := range connectors { |
| if vr.hasCacheThresholdFinishReason(response) { | ||
| return true | ||
| } | ||
| } |
There was a problem hiding this comment.
There are Go SSE parsing and writing packages. Any reason why one wasn't used?
There was a problem hiding this comment.
There's this one, but it isn't a standard Go endorsed package. Is this a dependency we'd want to introduce? https://github.com/tmaxmax/go-sse
| @@ -1,5 +1,5 @@ | |||
| /* | |||
| Copyright 2025 The llm-d Authors. | |||
| Copyright 2026 The llm-d Authors. | |||
There was a problem hiding this comment.
| Copyright 2026 The llm-d Authors. | |
| Copyright 2025, 2026 The llm-d Authors. |
| }) | ||
|
|
||
| It("should signal FirstChunkReady after 2 SSE events", func() { | ||
| rw.Write([]byte("data: {\"choices\":[{\"finish_reason\":null}]}\n\n")) //nolint:errcheck |
There was a problem hiding this comment.
There are Go SSE packages which might make this code cleaner.
Signed-off-by: kyano kyanokashi2@gmail.com