Skip to content

Commit af4bce8

Browse files
committed
lux: raise tunnel coverage >90% and add a live production e2e test (spec 18)
- internal/tunnel to 90.4%: cover DefaultURL, NodeID persistence (incl. the no-XDG home path), writeError, the forwarder happy/unreachable/closed- stream paths, getJSON errors, bad-JSON discovery, runSession discover error, Run context-cancel exit, and heartbeatLoop cancel. - prod_e2e_test: an opt-in real e2e (LATERE_LUX_E2E=1 + LATERE_LUX_TOKEN) that runs `latere lux serve` against a live Lux (production by default), waits for the local model to appear in the catalog, and asserts a real completion via /local/v1. Skips without the opt-in / token / local Ollama.
1 parent 51c9859 commit af4bce8

2 files changed

Lines changed: 329 additions & 0 deletions

File tree

internal/tunnel/extra_test.go

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
package tunnel
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"io"
7+
"net"
8+
"net/http"
9+
"net/http/httptest"
10+
"strings"
11+
"testing"
12+
"time"
13+
)
14+
15+
func TestDefaultURL(t *testing.T) {
16+
cases := map[string]string{
17+
RuntimeOllama: "http://localhost:11434",
18+
RuntimeVLLM: "http://localhost:8000",
19+
RuntimeLMStudio: "http://localhost:1234",
20+
RuntimeLlamaCPP: "http://localhost:8080",
21+
RuntimeMLX: "http://localhost:8080",
22+
"unknown": "http://localhost:11434",
23+
}
24+
for runtime, want := range cases {
25+
if got := DefaultURL(runtime); got != want {
26+
t.Errorf("DefaultURL(%q) = %q, want %q", runtime, got, want)
27+
}
28+
}
29+
}
30+
31+
func TestNodeIDStableAndPersisted(t *testing.T) {
32+
dir := t.TempDir()
33+
t.Setenv("XDG_CONFIG_HOME", dir)
34+
first := NodeID()
35+
if !strings.HasPrefix(first, "node-") {
36+
t.Errorf("NodeID = %q, want node- prefix", first)
37+
}
38+
if second := NodeID(); second != first {
39+
t.Errorf("NodeID not stable: %q vs %q", first, second)
40+
}
41+
// Persisted to disk and reused across the helper's path.
42+
if got := nodeIDPath(); got == "" || !strings.Contains(got, "tunnel-node-id") {
43+
t.Errorf("nodeIDPath = %q", got)
44+
}
45+
}
46+
47+
func TestNodeIDPathUsesHomeWhenNoXDG(t *testing.T) {
48+
t.Setenv("XDG_CONFIG_HOME", "")
49+
got := nodeIDPath()
50+
if got == "" || !strings.Contains(got, "latere") {
51+
t.Errorf("nodeIDPath without XDG = %q, want a path under the home config dir", got)
52+
}
53+
}
54+
55+
func TestDiscoverBadJSON(t *testing.T) {
56+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
57+
_, _ = w.Write([]byte("not json"))
58+
}))
59+
defer srv.Close()
60+
if _, err := discover(context.Background(), srv.Client(), RuntimeVLLM, srv.URL, nil); err == nil {
61+
t.Error("discover with bad JSON = nil error, want decode error")
62+
}
63+
}
64+
65+
func TestHandleClosedStream(t *testing.T) {
66+
f := &forwarder{ctx: context.Background(), client: &http.Client{}, upstream: "http://127.0.0.1:1"}
67+
c1, c2 := net.Pipe()
68+
_ = c2.Close() // peer hangs up before sending a request
69+
// handle must return without panicking on the ReadRequest error.
70+
done := make(chan struct{})
71+
go func() { f.handle(c1); close(done) }()
72+
select {
73+
case <-done:
74+
case <-time.After(time.Second):
75+
t.Error("handle did not return on a closed stream")
76+
}
77+
}
78+
79+
func TestWriteErrorProducesBadGateway(t *testing.T) {
80+
c1, c2 := net.Pipe()
81+
go func() {
82+
writeError(c1, io.EOF)
83+
_ = c1.Close()
84+
}()
85+
resp, err := http.ReadResponse(bufio.NewReader(c2), nil)
86+
if err != nil {
87+
t.Fatalf("read response: %v", err)
88+
}
89+
defer resp.Body.Close()
90+
if resp.StatusCode != http.StatusBadGateway {
91+
t.Errorf("status = %d, want 502", resp.StatusCode)
92+
}
93+
body, _ := io.ReadAll(resp.Body)
94+
if !strings.Contains(string(body), "local.unreachable") {
95+
t.Errorf("body = %s, want local.unreachable", body)
96+
}
97+
}
98+
99+
func TestForwarderUnreachableUpstream(t *testing.T) {
100+
f := &forwarder{ctx: context.Background(), client: &http.Client{Timeout: time.Second}, upstream: "http://127.0.0.1:1"}
101+
c1, c2 := net.Pipe()
102+
go f.handle(c1)
103+
104+
req, _ := http.NewRequest(http.MethodPost, "http://x/v1/chat/completions", strings.NewReader(`{}`))
105+
go func() { _ = req.Write(c2) }()
106+
resp, err := http.ReadResponse(bufio.NewReader(c2), req)
107+
if err != nil {
108+
t.Fatalf("read response: %v", err)
109+
}
110+
defer resp.Body.Close()
111+
if resp.StatusCode != http.StatusBadGateway {
112+
t.Errorf("status = %d, want 502 for unreachable upstream", resp.StatusCode)
113+
}
114+
}
115+
116+
func TestForwarderForwardsToUpstream(t *testing.T) {
117+
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
118+
_, _ = w.Write([]byte("ok:" + r.URL.Path))
119+
}))
120+
defer upstream.Close()
121+
122+
f := &forwarder{ctx: context.Background(), client: &http.Client{}, upstream: upstream.URL}
123+
c1, c2 := net.Pipe()
124+
go f.handle(c1)
125+
req, _ := http.NewRequest(http.MethodPost, "http://x/v1/chat/completions", strings.NewReader(`{}`))
126+
go func() { _ = req.Write(c2) }()
127+
resp, err := http.ReadResponse(bufio.NewReader(c2), req)
128+
if err != nil {
129+
t.Fatalf("read: %v", err)
130+
}
131+
defer resp.Body.Close()
132+
body, _ := io.ReadAll(resp.Body)
133+
if string(body) != "ok:/v1/chat/completions" {
134+
t.Errorf("body = %q", body)
135+
}
136+
}
137+
138+
func TestGetJSONErrors(t *testing.T) {
139+
// Non-200 surfaces an error.
140+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
141+
w.WriteHeader(http.StatusInternalServerError)
142+
}))
143+
defer srv.Close()
144+
var dst struct{}
145+
if err := getJSON(context.Background(), srv.Client(), srv.URL, &dst); err == nil {
146+
t.Error("getJSON on 500 = nil error, want error")
147+
}
148+
// Unreachable host surfaces an error.
149+
if err := getJSON(context.Background(), &http.Client{Timeout: time.Second}, "http://127.0.0.1:1/x", &dst); err == nil {
150+
t.Error("getJSON on unreachable = nil error, want error")
151+
}
152+
}
153+
154+
func TestRunSessionDiscoverError(t *testing.T) {
155+
// An unreachable upstream makes discovery fail, so runSession returns
156+
// before any dial.
157+
err := runSession(context.Background(), Options{
158+
LuxURL: "http://127.0.0.1:1",
159+
Bearer: func(context.Context) (string, error) { return "t", nil },
160+
Runtime: RuntimeOllama,
161+
UpstreamURL: "http://127.0.0.1:1",
162+
Out: io.Discard,
163+
})
164+
if err == nil {
165+
t.Error("runSession with unreachable upstream = nil error, want discovery error")
166+
}
167+
}
168+
169+
func TestRunStopsOnContextCancel(t *testing.T) {
170+
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
171+
defer cancel()
172+
// Upstream unreachable so each session fails fast; Run should keep
173+
// retrying then return when the context expires.
174+
err := Run(ctx, Options{
175+
LuxURL: "http://127.0.0.1:1",
176+
Bearer: func(context.Context) (string, error) { return "t", nil },
177+
Runtime: RuntimeOllama,
178+
UpstreamURL: "http://127.0.0.1:1",
179+
HeartbeatInterval: time.Second,
180+
Out: io.Discard,
181+
})
182+
if err == nil {
183+
t.Error("Run = nil error on context expiry, want ctx error")
184+
}
185+
}
186+
187+
func TestHeartbeatLoopStopsOnContextCancel(t *testing.T) {
188+
c1, c2 := net.Pipe()
189+
defer c1.Close()
190+
defer c2.Close()
191+
ctx, cancel := context.WithCancel(context.Background())
192+
done := make(chan struct{})
193+
go func() {
194+
heartbeatLoop(ctx, c1, Options{
195+
HeartbeatInterval: 10 * time.Millisecond,
196+
Bearer: func(context.Context) (string, error) { return "tok", nil },
197+
})
198+
close(done)
199+
}()
200+
// Drain the pipe so heartbeat writes don't block.
201+
go io.Copy(io.Discard, c2)
202+
time.Sleep(40 * time.Millisecond)
203+
cancel()
204+
select {
205+
case <-done:
206+
case <-time.After(time.Second):
207+
t.Error("heartbeatLoop did not stop on context cancel")
208+
}
209+
}

internal/tunnel/prod_e2e_test.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package tunnel
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"io"
8+
"net/http"
9+
"os"
10+
"strings"
11+
"testing"
12+
"time"
13+
)
14+
15+
// TestProdE2EServeAndCall is a real end-to-end test of `latere lux serve`
16+
// against a live Lux server (production by default). It is opt-in because
17+
// it exposes the local model to the target Lux and needs a real credential:
18+
// it skips unless LATERE_LUX_E2E=1 and LATERE_LUX_TOKEN are set, and unless
19+
// a local Ollama serving the model is reachable.
20+
//
21+
// Run it with, e.g.:
22+
//
23+
// LATERE_LUX_E2E=1 \
24+
// LATERE_LUX_TOKEN="$(latere lux token)" \
25+
// go test ./internal/tunnel/ -run TestProdE2E -v
26+
//
27+
// Override LUX_API_URL to target staging, and LATERE_LUX_E2E_MODEL to use a
28+
// different local model (default gemma4:latest).
29+
func TestProdE2EServeAndCall(t *testing.T) {
30+
if os.Getenv("LATERE_LUX_E2E") != "1" {
31+
t.Skip("set LATERE_LUX_E2E=1 to run the live Lux e2e (it exposes a local model to the target)")
32+
}
33+
token := os.Getenv("LATERE_LUX_TOKEN")
34+
if token == "" {
35+
t.Skip("LATERE_LUX_E2E set but LATERE_LUX_TOKEN missing; provide a bearer (e.g. `latere lux token`)")
36+
}
37+
luxURL := os.Getenv("LUX_API_URL")
38+
if luxURL == "" {
39+
luxURL = "https://lux.latere.ai"
40+
}
41+
luxURL = strings.TrimRight(luxURL, "/")
42+
model := os.Getenv("LATERE_LUX_E2E_MODEL")
43+
if model == "" {
44+
model = "gemma4:latest"
45+
}
46+
47+
// Require a reachable local Ollama serving the model.
48+
hc := &http.Client{Timeout: 5 * time.Second}
49+
if _, err := discover(context.Background(), hc, RuntimeOllama, DefaultURL(RuntimeOllama), []string{model}); err != nil {
50+
t.Skipf("local Ollama not serving %q: %v", model, err)
51+
}
52+
53+
ctx, cancel := context.WithCancel(context.Background())
54+
defer cancel()
55+
56+
// Start the real serve loop against the live Lux.
57+
serveErr := make(chan error, 1)
58+
go func() {
59+
serveErr <- Run(ctx, Options{
60+
LuxURL: luxURL,
61+
Bearer: func(context.Context) (string, error) { return token, nil },
62+
Runtime: RuntimeOllama,
63+
Models: []string{model},
64+
NodeID: "e2e-prod-" + NodeID(),
65+
HeartbeatInterval: 5 * time.Second,
66+
Out: io.Discard,
67+
})
68+
}()
69+
70+
// Wait until the model shows up as a live tunnel in the catalog,
71+
// confirming the tunnel registered through the live server.
72+
if !waitForCatalog(t, hc, luxURL, token, model, 30*time.Second) {
73+
t.Fatalf("model %q never appeared as local/ in %s/lux/v1/models within 30s "+
74+
"(is the tunnel feature enabled on the target and does the token carry llm.serve?)", model, luxURL)
75+
}
76+
77+
// Call it through Lux and assert a real completion comes back.
78+
body := `{"model":"` + model + `","stream":false,"max_tokens":20,` +
79+
`"messages":[{"role":"user","content":"Reply with exactly: PONG"}]}`
80+
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, luxURL+"/local/v1/chat/completions", strings.NewReader(body))
81+
req.Header.Set("Authorization", "Bearer "+token)
82+
req.Header.Set("Content-Type", "application/json")
83+
resp, err := (&http.Client{Timeout: 120 * time.Second}).Do(req)
84+
if err != nil {
85+
t.Fatalf("call /local: %v", err)
86+
}
87+
defer resp.Body.Close()
88+
respBody, _ := io.ReadAll(resp.Body)
89+
if resp.StatusCode != http.StatusOK {
90+
t.Fatalf("/local status = %d: %s", resp.StatusCode, respBody)
91+
}
92+
var out struct {
93+
Choices []struct {
94+
Message struct{ Content string } `json:"message"`
95+
} `json:"choices"`
96+
}
97+
if err := json.Unmarshal(respBody, &out); err != nil || len(out.Choices) == 0 || out.Choices[0].Message.Content == "" {
98+
t.Fatalf("no completion via prod tunnel: err=%v body=%s", err, respBody)
99+
}
100+
t.Logf("prod e2e: %s replied %q via %s/local/v1", model, out.Choices[0].Message.Content, luxURL)
101+
}
102+
103+
func waitForCatalog(t *testing.T, hc *http.Client, luxURL, token, model string, timeout time.Duration) bool {
104+
t.Helper()
105+
deadline := time.Now().Add(timeout)
106+
for time.Now().Before(deadline) {
107+
req, _ := http.NewRequest(http.MethodGet, luxURL+"/lux/v1/models", nil)
108+
req.Header.Set("Authorization", "Bearer "+token)
109+
resp, err := hc.Do(req)
110+
if err == nil {
111+
b, _ := io.ReadAll(resp.Body)
112+
_ = resp.Body.Close()
113+
if resp.StatusCode == http.StatusOK && bytes.Contains(b, []byte(`"local/`+model+`"`)) {
114+
return true
115+
}
116+
}
117+
time.Sleep(time.Second)
118+
}
119+
return false
120+
}

0 commit comments

Comments
 (0)