Skip to content

Commit 1b84ee7

Browse files
committed
enable reverse proxy in server-requesting pod
1 parent b5cd296 commit 1b84ee7

8 files changed

Lines changed: 267 additions & 2 deletions

File tree

cmd/requester/main.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/server/requester/coordination"
2626
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/server/requester/probes"
27+
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/server/requester/proxy"
2728
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
2829

2930
"k8s.io/klog/v2"
@@ -48,6 +49,11 @@ func main() {
4849
spiPort = "8081"
4950
}
5051

52+
proxyPort := os.Getenv("PROXY_PORT")
53+
if proxyPort == "" {
54+
proxyPort = "8082"
55+
}
56+
5157
var ready atomic.Bool
5258

5359
var wg sync.WaitGroup
@@ -72,5 +78,14 @@ func main() {
7278
}
7379
}()
7480

81+
// Start the reverse proxy server
82+
go func() {
83+
defer wg.Done()
84+
err := proxy.Run(ctx, proxyPort)
85+
if err != nil {
86+
logger.Error(err, "failed to start requester proxy server")
87+
}
88+
}()
89+
7590
wg.Wait()
7691
}

docs/dual-pods.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,39 @@ assigned to the server-requesting Pod) for running `vllm serve`. To
125125
swap a model out, the controller issues a request that does not
126126
include those details.
127127

128+
#### Requester Reverse Proxy
129+
130+
The requester container includes a reverse proxy server that forwards
131+
inference requests to the actual vLLM instance running in the
132+
server-providing Pod (typically managed by the launcher). This
133+
abstraction allows clients to send requests to the server-requesting
134+
Pod without needing to know the actual port vLLM is listening on.
135+
136+
The reverse proxy operates as follows:
137+
138+
1. **Initialization**: When the dual-pods controller binds a
139+
server-requesting Pod to a server-providing Pod, it sends an HTTP
140+
POST request to the requester's proxy initialization endpoint
141+
(`/v1/proxy/init`). The request body contains the target address
142+
(launcher Pod IP) and the allocated port:
143+
144+
```json
145+
{"address": "10.244.1.5", "port": 8005}
146+
```
147+
148+
2. **Request forwarding**: Once initialized, the reverse proxy
149+
forwards all incoming HTTP requests to the configured vLLM
150+
instance. This includes OpenAI-compatible API endpoints like
151+
`/v1/chat/completions`, `/v1/completions`, etc.
152+
153+
3. **Status checking**: The proxy's initialization status can be
154+
queried via an HTTP GET request to `/v1/proxy/init`.
155+
156+
This design decouples the client-facing endpoint (server-requesting
157+
Pod) from the actual inference server location (server-providing Pod
158+
with dynamic port), enabling flexible resource management and model
159+
swapping without disrupting inference clients.
160+
128161
### Scenarios
129162

130163
The outer product of

pkg/controller/dual-pods/inference-server.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,15 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
574574
return fmt.Errorf("launcher Pod %q has no IP assigned yet", launcherPod.Name), true
575575
}
576576

577+
// Initialize the reverse proxy between the launcher Pod and the requester Pod.
578+
// Requests can be proxied to the launcher Pod from the requester Pod.
579+
url := fmt.Sprintf("http://%s:%s%s", requestingPod.Status.PodIP, adminPort, stubapi.InitProxy)
580+
if err := doPostWithData(url, bytes.NewReader([]byte(fmt.Sprintf("{\"address\":\"%s\",\"port\":%d}",
581+
launcherIP, desiredPort)))); err != nil {
582+
logger.Error(err, "Failed to initialize requester proxy")
583+
return err, true
584+
}
585+
577586
launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherIP, ctlrcommon.LauncherServicePort)
578587
lClient, err := NewLauncherClient(launcherBaseURL)
579588
if err != nil {
@@ -1357,13 +1366,16 @@ func (ctl *controller) ensureReqState(ctx context.Context, requestingPod *corev1
13571366
return err, err != nil
13581367
}
13591368

1360-
// doPost does the HTTP POST request/response to the given URL.
13611369
func doPost(url string) error {
1370+
return doPostWithData(url, nil)
1371+
}
1372+
1373+
func doPostWithData(url string, data io.Reader) error {
13621374
client := &http.Client{
13631375
Timeout: 5 * time.Second,
13641376
}
13651377

1366-
resp, err := client.Post(url, "application/json", nil)
1378+
resp, err := client.Post(url, "application/json", data)
13671379
if err != nil {
13681380
return fmt.Errorf("http post %q: %w", url, err)
13691381
}

pkg/server/requester/coordination/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131

3232
"k8s.io/klog/v2"
3333

34+
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/server/requester/proxy"
3435
stubapi "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/spi"
3536
)
3637

@@ -212,6 +213,7 @@ func RunWithGPUUUIDs(ctx context.Context, port string, ready *atomic.Bool, logWr
212213
mux.HandleFunc("POST "+stubapi.BecomeReadyPath, newSetReadyHandler(logger, ready, true))
213214
mux.HandleFunc("POST "+stubapi.BecomeUnreadyPath, newSetReadyHandler(logger, ready, false))
214215
mux.HandleFunc("POST "+stubapi.SetLogPath, newSetLogHandler(logger, logWriter))
216+
mux.HandleFunc(stubapi.InitProxy, proxy.Initialize)
215217

216218
server := &http.Server{
217219
Addr: ":" + port,
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
Copyright 2025 The llm-d Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package proxy
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"fmt"
23+
"io"
24+
"net"
25+
"net/http"
26+
"net/http/httputil"
27+
"net/url"
28+
"sync"
29+
"sync/atomic"
30+
"time"
31+
32+
"k8s.io/klog/v2"
33+
)
34+
35+
// ConfigRequest is the request body to configure the proxy target
36+
type ConfigRequest struct {
37+
Address string `json:"address"`
38+
Port int `json:"port"`
39+
}
40+
41+
// proxy is a lazy HTTP reverse proxy that only starts after receiving
42+
// the first configuration request
43+
type proxy struct {
44+
mu sync.RWMutex
45+
targetURL *url.URL
46+
proxy *httputil.ReverseProxy
47+
initialized atomic.Bool
48+
}
49+
50+
// singleton instance initialized once at startup
51+
var instance = &proxy{}
52+
53+
// Run starts the proxy server on the given port
54+
func Run(ctx context.Context, port string) error {
55+
logger := klog.FromContext(ctx).WithName("proxy-server")
56+
logger.Info("starting proxy server")
57+
58+
mux := http.NewServeMux()
59+
mux.HandleFunc("/", serveProxy)
60+
61+
server := &http.Server{
62+
Addr: fmt.Sprintf(":%s", port),
63+
Handler: mux,
64+
ReadTimeout: 30 * time.Second,
65+
WriteTimeout: 5 * time.Minute, // Long timeout for inference requests
66+
IdleTimeout: 120 * time.Second,
67+
}
68+
69+
go func() {
70+
<-ctx.Done()
71+
logger.Info("shutting down")
72+
73+
ctx, cancelFn := context.WithTimeout(context.Background(), 60*time.Second)
74+
defer cancelFn()
75+
if err := server.Shutdown(ctx); err != nil {
76+
logger.Error(err, "failed to gracefully shutdown")
77+
}
78+
}()
79+
80+
logger.Info("starting server", "port", port)
81+
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
82+
return fmt.Errorf("listen and serve error: %w", err)
83+
}
84+
85+
logger.Info("server stopped")
86+
return nil
87+
}
88+
89+
// serveProxy proxies requests to the target server
90+
func serveProxy(w http.ResponseWriter, r *http.Request) {
91+
if !instance.initialized.Load() {
92+
http.Error(w, "proxy not initialized", http.StatusServiceUnavailable)
93+
return
94+
}
95+
96+
// Proxy the request
97+
instance.proxy.ServeHTTP(w, r)
98+
}
99+
100+
// Initialize handles proxy initialization and configuration
101+
func Initialize(w http.ResponseWriter, r *http.Request) {
102+
// Get proxy status
103+
if r.Method == http.MethodGet {
104+
if instance.initialized.Load() {
105+
targetURL := instance.targetURL
106+
w.WriteHeader(http.StatusOK)
107+
if targetURL != nil {
108+
fmt.Fprintf(w, "proxying to %s", targetURL)
109+
} else {
110+
_, _ = w.Write([]byte("proxy initialized but targetURL is nil"))
111+
}
112+
} else {
113+
w.WriteHeader(http.StatusOK)
114+
_, _ = w.Write([]byte("proxy not initialized"))
115+
}
116+
return
117+
}
118+
119+
if r.Method != http.MethodPost {
120+
http.Error(w, "invalid method", http.StatusMethodNotAllowed)
121+
return
122+
}
123+
124+
// Try initialize server
125+
if instance.initialized.Load() {
126+
http.Error(w, "proxy already initialized", http.StatusConflict)
127+
return
128+
}
129+
130+
// Need to initialize - acquire write lock
131+
instance.mu.Lock()
132+
defer instance.mu.Unlock()
133+
134+
// Double-check after acquiring write lock
135+
if instance.initialized.Load() {
136+
http.Error(w, "proxy already initialized", http.StatusConflict)
137+
return
138+
}
139+
140+
// Parse configuration from request body
141+
body, err := io.ReadAll(r.Body)
142+
if err != nil {
143+
http.Error(w, fmt.Sprintf("failed to read request body: %v", err), http.StatusBadRequest)
144+
return
145+
}
146+
defer r.Body.Close()
147+
148+
var config ConfigRequest
149+
if err := json.Unmarshal(body, &config); err != nil {
150+
http.Error(w, fmt.Sprintf("failed to parse JSON: %v", err), http.StatusBadRequest)
151+
return
152+
}
153+
154+
if config.Address == "" {
155+
http.Error(w, "address is required", http.StatusBadRequest)
156+
return
157+
}
158+
159+
if config.Port <= 0 || config.Port > 65535 {
160+
http.Error(w, "invalid port", http.StatusBadRequest)
161+
return
162+
}
163+
164+
// Create target URL
165+
targetURL := &url.URL{
166+
Scheme: "http",
167+
Host: net.JoinHostPort(config.Address, fmt.Sprintf("%d", config.Port)),
168+
}
169+
170+
// Create the reverse proxy
171+
instance.targetURL = targetURL
172+
instance.proxy = httputil.NewSingleHostReverseProxy(targetURL)
173+
174+
// Customize error handling
175+
originalErrorHandler := instance.proxy.ErrorHandler
176+
instance.proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
177+
if originalErrorHandler != nil {
178+
originalErrorHandler(w, r, err)
179+
} else {
180+
http.Error(w, fmt.Sprintf("proxy error: %v", err), http.StatusBadGateway)
181+
}
182+
}
183+
184+
instance.initialized.Store(true)
185+
w.WriteHeader(http.StatusOK)
186+
fmt.Fprintf(w, "initialized proxy to: %s", targetURL)
187+
}

pkg/spi/interface.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,15 @@ const SetLogPath = "/v1/set-log"
5959
// LogStartPosParam is the name of the query parameter that
6060
// holds that starting position of a log chunk.
6161
const LogStartPosParam = "startPos"
62+
63+
// InitProxy is the path for initializing the HTTP reverse proxy。
64+
// The proxy is used to forward requests from the server-requesting
65+
// pod to the server-providing pod.
66+
// Supports two HTTP methods:
67+
// - GET: retrieves the initialization status of the proxy.
68+
// Returns status info.
69+
// - POST: initializes the proxy with a target address and port.
70+
// The request body should contain a JSON object with "address"
71+
// and "port" fields. After successful initialization,
72+
// the proxy will forward requests to the configured target server.
73+
const InitProxy = "/v1/proxy/init"

test/e2e/mkobjs-openshift.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ spec:
227227
containerPort: 8080
228228
- name: spi
229229
containerPort: 8081
230+
- name: proxy
231+
containerPort: 8082
230232
readinessProbe:
231233
httpGet:
232234
path: /ready

test/e2e/mkobjs.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ spec:
183183
containerPort: 8080
184184
- name: spi
185185
containerPort: 8081
186+
- name: proxy
187+
containerPort: 8082
186188
readinessProbe:
187189
httpGet:
188190
path: /ready

0 commit comments

Comments
 (0)