Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4
sigs.k8s.io/controller-runtime v0.22.5
sigs.k8s.io/gateway-api v1.4.1
sigs.k8s.io/gateway-api-inference-extension v0.0.0-20260128235548-fd30cb97714a
sigs.k8s.io/gateway-api-inference-extension v0.0.0-20260203182229-e1cbdd1f76c5
)

require (
Expand Down Expand Up @@ -107,13 +107,13 @@ require (
github.com/xlab/treeprint v1.2.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 // indirect
go.opentelemetry.io/otel v1.39.0 // indirect
go.opentelemetry.io/otel v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.39.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/metric v1.40.0 // indirect
go.opentelemetry.io/otel/sdk v1.39.0 // indirect
go.opentelemetry.io/otel/trace v1.39.0 // indirect
go.opentelemetry.io/otel/trace v1.40.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -321,22 +321,22 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 h1:ssfIgGNANqpVFCndZvcuyKbl0g+UAVcbBcqGkG28H0Y=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0/go.mod h1:GQ/474YrbE4Jx8gZ4q5I4hrhUzM6UPzyrqJYV2AqPoQ=
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 h1:f0cb2XPmrqn4XMy9PNliTgRKJgS5WcL/u0/WRYGz4t0=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0/go.mod h1:vnakAaFckOMiMtOIhFI2MNH4FYrZzXCYxmb1LlhoGz8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 h1:in9O8ESIOlwJAEGTkkf34DesGRAc/Pn8qJ7k3r/42LM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0/go.mod h1:Rp0EXBm5tfnv0WL+ARyO/PHBEaEAT8UUHQ6AGJcSq6c=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.39.0 h1:8UPA4IbVZxpsD76ihGOQiFml99GPAEZLohDXvqHdi6U=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.39.0/go.mod h1:MZ1T/+51uIVKlRzGw1Fo46KEWThjlCBZKl2LzY5nv4g=
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g=
go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc=
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE=
go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8=
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw=
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A=
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
Expand Down Expand Up @@ -446,8 +446,8 @@ sigs.k8s.io/controller-runtime v0.22.5 h1:v3nfSUMowX/2WMp27J9slwGFyAt7IV0YwBxAkr
sigs.k8s.io/controller-runtime v0.22.5/go.mod h1:pc5SoYWnWI6I+cBHYYdZ7B6YHZVY5xNfll88JB+vniI=
sigs.k8s.io/gateway-api v1.4.1 h1:NPxFutNkKNa8UfLd2CMlEuhIPMQgDQ6DXNKG9sHbJU8=
sigs.k8s.io/gateway-api v1.4.1/go.mod h1:AR5RSqciWP98OPckEjOjh2XJhAe2Na4LHyXD2FUY7Qk=
sigs.k8s.io/gateway-api-inference-extension v0.0.0-20260128235548-fd30cb97714a h1:Ce5CZ0R3c5H475uEuJ92FMgux3j99wDrSsI4ivTBEXQ=
sigs.k8s.io/gateway-api-inference-extension v0.0.0-20260128235548-fd30cb97714a/go.mod h1:lvMpB9a+Lk+xBi5Pk6teUG+NqA16WR8nRpmBNFJbflU=
sigs.k8s.io/gateway-api-inference-extension v0.0.0-20260203182229-e1cbdd1f76c5 h1:JJzE5RL3y4ep3JCYF0e7djWDW6JvYVrO157kHgd1hF4=
sigs.k8s.io/gateway-api-inference-extension v0.0.0-20260203182229-e1cbdd1f76c5/go.mod h1:5aE9jyjAIlAAfUFT22NDVyk9Ru8i2HSCCHMTXgKWNAo=
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5EXP7sU1kvOlxwZh5txg=
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg=
sigs.k8s.io/kustomize/api v0.21.0 h1:I7nry5p8iDJbuRdYS7ez8MUvw7XVNPcIP5GkzzuXIIQ=
Expand Down
2 changes: 1 addition & 1 deletion pkg/sidecar/proxy/chat_completions.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,5 @@ func (s *Server) chatCompletionsHandler(w http.ResponseWriter, r *http.Request)
}

s.logger.V(4).Info("SSRF protection: prefill target allowed", "target", prefillHostPort)
s.runConnectorProtocol(w, r, prefillHostPort)
s.runConnectorProtocol(w, r, prefillHostPort, APITypeChatCompletions)
}
4 changes: 3 additions & 1 deletion pkg/sidecar/proxy/chat_completions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ func TestServer_chatCompletionsHandler(t *testing.T) {
s.prefillSamplerFn = func(n int) int { return i % n }
// verify the hostPort value
var hostPort string
s.runConnectorProtocol = func(_ http.ResponseWriter, _ *http.Request, selectedHostPort string) { hostPort = selectedHostPort }
s.runConnectorProtocol = func(_ http.ResponseWriter, _ *http.Request, selectedHostPort string, _ APIType) {
hostPort = selectedHostPort
}
var passthrough bool
s.decoderProxy = http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
passthrough = true
Expand Down
17 changes: 12 additions & 5 deletions pkg/sidecar/proxy/connector_lmcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ import (
"strings"
)

func (s *Server) runLMCacheProtocol(w http.ResponseWriter, r *http.Request, prefillPodHostPort string) {
s.logger.Info("running LMCache protocol")
// runLMCacheProtocol handles the LMCache protocol for all OpenAI API types.
// The apiType parameter determines which token limit fields to use:
// - Chat Completions: max_tokens, max_completion_tokens
// - Responses/Conversations: max_output_tokens
func (s *Server) runLMCacheProtocol(w http.ResponseWriter, r *http.Request, prefillPodHostPort string, apiType APIType) {
s.logger.Info("running LMCache protocol", "apiType", apiType)

// Read and parse request body
defer r.Body.Close() //nolint:all
Expand All @@ -44,13 +48,16 @@ func (s *Server) runLMCacheProtocol(w http.ResponseWriter, r *http.Request, pref
return
}

// Create prefiller request. Set max_tokens to 1.
// Create prefiller request. Set token limits to 1 based on API type.

ctx := r.Context()
preq := r.Clone(ctx)

completionRequest[requestFieldMaxTokens] = 1
completionRequest[requestFieldMaxCompletionTokens] = 1
// Set token limits to 1 for prefill based on API type
tokenFields := apiType.TokenLimitFields()
for _, field := range tokenFields {
completionRequest[field] = 1
}

pbody, err := json.Marshal(completionRequest)
if err != nil {
Expand Down
47 changes: 34 additions & 13 deletions pkg/sidecar/proxy/connector_nixlv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ import (
"github.com/google/uuid"
)

func (s *Server) runNIXLProtocolV2(w http.ResponseWriter, r *http.Request, prefillPodHostPort string) {
s.logger.V(4).Info("running NIXL protocol V2", "url", prefillPodHostPort)
// runNIXLProtocolV2 handles the NIXL v2 protocol for all OpenAI API types.
// The apiType parameter determines which token limit fields to use:
// - Chat Completions: max_tokens, max_completion_tokens
// - Responses/Conversations: max_output_tokens
func (s *Server) runNIXLProtocolV2(w http.ResponseWriter, r *http.Request, prefillPodHostPort string, apiType APIType) {
s.logger.V(4).Info("running NIXL protocol V2", "url", prefillPodHostPort, "apiType", apiType)

// Read request body
defer r.Body.Close() //nolint:all
Expand Down Expand Up @@ -64,10 +68,23 @@ func (s *Server) runNIXLProtocolV2(w http.ResponseWriter, r *http.Request, prefi

preq.Header.Add(requestHeaderRequestID, uuidStr)

// Save original values based on API type
streamValue, streamOk := completionRequest[requestFieldStream]
streamOptionsValue, streamOptionsOk := completionRequest[requestFieldStreamOptions]
maxTokensValue, maxTokensOk := completionRequest[requestFieldMaxTokens]
maxCompletionTokensValue, maxCompletionTokensOk := completionRequest[requestFieldMaxCompletionTokens]

// Save token limit fields based on API type
tokenFields := apiType.TokenLimitFields()
savedTokenValues := make(map[string]struct {
value any
ok bool
})
for _, field := range tokenFields {
value, ok := completionRequest[field]
savedTokenValues[field] = struct {
value any
ok bool
}{value, ok}
}

completionRequest[requestFieldKVTransferParams] = map[string]any{
requestFieldDoRemoteDecode: true,
Expand All @@ -80,8 +97,11 @@ func (s *Server) runNIXLProtocolV2(w http.ResponseWriter, r *http.Request, prefi

completionRequest[requestFieldStream] = false
delete(completionRequest, requestFieldStreamOptions)
completionRequest[requestFieldMaxTokens] = 1
completionRequest[requestFieldMaxCompletionTokens] = 1

// Set token limits to 1 for prefill based on API type
for _, field := range tokenFields {
completionRequest[field] = 1
}

pbody, err := json.Marshal(completionRequest)
if err != nil {
Expand Down Expand Up @@ -145,14 +165,15 @@ func (s *Server) runNIXLProtocolV2(w http.ResponseWriter, r *http.Request, prefi
if streamOptionsOk {
completionRequest[requestFieldStreamOptions] = streamOptionsValue
}
delete(completionRequest, requestFieldMaxTokens)
if maxTokensOk {
completionRequest[requestFieldMaxTokens] = maxTokensValue
}
delete(completionRequest, requestFieldMaxCompletionTokens)
if maxCompletionTokensOk {
completionRequest[requestFieldMaxCompletionTokens] = maxCompletionTokensValue

// Restore token limit fields based on API type
for _, field := range tokenFields {
delete(completionRequest, field)
if saved := savedTokenValues[field]; saved.ok {
completionRequest[field] = saved.value
}
}

completionRequest[requestFieldKVTransferParams] = pKVTransferParams

dbody, err := json.Marshal(completionRequest)
Expand Down
Loading