Skip to content

Commit fe26e5a

Browse files
authored
fix(cache-proxy): forward origin status, body, and headers verbatim (#470)
* test(cache-proxy): cover origin status passthrough (RED) Five new tests plus an update to the existing TestHandleProxyOriginError, all asserting that the cache proxy must forward upstream status codes, bodies, and response headers verbatim instead of collapsing every non-2xx into a 502. The user case that motivated this: S3 returns 400 with <Code>ExpiredToken</Code> in an XML envelope. The proxy was rewriting that to 502 with a Go-formatted error string, which (a) made DuckDB's httpfs treat a terminal auth failure as transient and retry it indefinitely, and (b) hid the real S3 error class from operators. New cases: - 5xx forwarded verbatim (replaces the old 502-asserting test) - 400 forwarded verbatim, body + Content-Type + X-Amz-Request-Id preserved - 404 forwarded verbatim - 416 forwarded verbatim with Content-Range preserved - error responses are NOT cached (the next request hits origin) - pure network errors (no HTTP response) still get 502 (the only case where 502 is correct, since there's no upstream status to forward) Tests fail with the current implementation; the next commit makes them pass. * fix(cache-proxy): forward origin status, body, and headers verbatim (GREEN) The proxy was previously translating every non-2xx upstream response into a 502 Bad Gateway with a Go-formatted error string. That: 1. Hid the real S3 error class — DuckDB's httpfs treats 5xx as transient and retries, so a terminal 4xx (e.g. an ExpiredToken auth failure) was being retried indefinitely instead of failing fast and surfacing the real cause. 2. Stripped the XML error envelope DuckLake parses, replacing it with a Go error string DuckLake doesn't understand. 3. Dropped headers (Content-Type, X-Amz-Request-Id, Content-Range) that DuckDB and operators rely on. This change introduces `originStatusError`, a typed error returned by fetchOrigin whenever the upstream responds with status >= 400. The caller in HandleProxy detects it via errors.As and forwards the captured status code, body (up to 1 MiB), and headers (minus hop-by-hop) back to the client unchanged. Pure transport errors (DNS, connection refused, TLS, timeout) keep returning 502 — there's no upstream status to forward in that case, and 5xx is what httpfs's transient-retry policy is designed for. Tests in the previous commit covered: - 5xx forwarded verbatim (replaces the old 502-asserting test) - 400 forwarded verbatim with XML body + Content-Type + amz headers - 404 forwarded verbatim - 416 forwarded verbatim with Content-Range preserved - error responses are NOT cached (the next request hits origin) - true network errors still get 502
1 parent f17320d commit fe26e5a

2 files changed

Lines changed: 201 additions & 8 deletions

File tree

cmd/cache-proxy/proxy.go

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"log/slog"
@@ -185,6 +186,22 @@ func (p *CacheProxy) HandleProxy(w http.ResponseWriter, r *http.Request) {
185186

186187
data, contentType, source, err := p.fetchDedup(cacheKey, r, rangeHeader)
187188
if err != nil {
189+
// An origin that responded with a non-2xx (e.g. S3 returning a 400 with
190+
// <Code>ExpiredToken</Code> in an XML envelope) is forwarded back to
191+
// DuckDB verbatim — same status code, same body, same headers minus
192+
// hop-by-hop. This preserves the error class so httpfs can distinguish
193+
// transient (5xx) from terminal (4xx) failures, and gives DuckLake the
194+
// raw S3 error body it knows how to parse.
195+
var oe *originStatusError
196+
if errors.As(err, &oe) {
197+
slog.Warn("Origin returned non-2xx; forwarding verbatim.",
198+
"url", r.URL.String(), "range", rangeHeader, "status", oe.status, "body_preview", previewBody(oe.body))
199+
oe.writeTo(w)
200+
return
201+
}
202+
// True transport-level failure (DNS, connection refused, TLS, timeout):
203+
// no upstream status exists, so 502 Bad Gateway is the right answer
204+
// here — and it's also the one DuckDB's httpfs treats as transient.
188205
slog.Error("Failed to fetch.", "url", r.URL.String(), "range", rangeHeader, "error", err)
189206
http.Error(w, err.Error(), http.StatusBadGateway)
190207
return
@@ -251,8 +268,15 @@ func (p *CacheProxy) fetchOrigin(r *http.Request) ([]byte, string, error) {
251268
defer func() { _ = resp.Body.Close() }()
252269

253270
if resp.StatusCode >= 400 {
254-
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2048))
255-
return nil, "", fmt.Errorf("origin %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
271+
// Capture the body up to a generous cap. S3 error envelopes are
272+
// typically <1 KiB; the cap is just a guard against a misbehaving
273+
// origin streaming forever. The 60s context above also protects us.
274+
body, _ := io.ReadAll(io.LimitReader(resp.Body, originErrorBodyCap))
275+
return nil, "", &originStatusError{
276+
status: resp.StatusCode,
277+
headers: resp.Header.Clone(),
278+
body: body,
279+
}
256280
}
257281

258282
data, err := io.ReadAll(resp.Body)
@@ -262,6 +286,53 @@ func (p *CacheProxy) fetchOrigin(r *http.Request) ([]byte, string, error) {
262286
return data, resp.Header.Get("Content-Type"), nil
263287
}
264288

289+
// originErrorBodyCap is the maximum number of bytes we'll buffer from a
290+
// non-2xx origin response. S3 XML error envelopes are tiny; this is just a
291+
// safety net.
292+
const originErrorBodyCap = 1 << 20 // 1 MiB
293+
294+
// originStatusError captures a non-2xx response from the origin so the
295+
// proxy can forward it back to the client verbatim. The status code, body,
296+
// and response headers are all preserved (minus hop-by-hop) so DuckDB sees
297+
// exactly what S3 said — including the XML error envelope DuckLake may
298+
// inspect.
299+
type originStatusError struct {
300+
status int
301+
headers http.Header
302+
body []byte
303+
}
304+
305+
func (e *originStatusError) Error() string {
306+
return fmt.Sprintf("origin %d: %s", e.status, strings.TrimSpace(string(e.body)))
307+
}
308+
309+
// writeTo replays the captured origin response onto w. Any header the
310+
// origin set that isn't a hop-by-hop is forwarded; status code and body
311+
// follow.
312+
func (e *originStatusError) writeTo(w http.ResponseWriter) {
313+
for k, vv := range e.headers {
314+
if hopByHop[strings.ToLower(k)] {
315+
continue
316+
}
317+
for _, v := range vv {
318+
w.Header().Add(k, v)
319+
}
320+
}
321+
w.WriteHeader(e.status)
322+
_, _ = w.Write(e.body)
323+
}
324+
325+
// previewBody returns up to 256 bytes of the body for log lines so we don't
326+
// spam multi-KiB XML envelopes into structured logs while still keeping the
327+
// useful prefix (S3 puts the <Code>...</Code> first).
328+
func previewBody(body []byte) string {
329+
const max = 256
330+
if len(body) <= max {
331+
return string(body)
332+
}
333+
return string(body[:max]) + "...(truncated)"
334+
}
335+
265336
// serveBody writes cached data back to the client, reconstructing 206 Partial
266337
// Content semantics when the original request had a Range header.
267338
func (p *CacheProxy) serveBody(w http.ResponseWriter, data []byte, rangeHeader, contentType string) {

cmd/cache-proxy/proxy_test.go

Lines changed: 128 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,136 @@ func TestHandleProxyRejectsNonAbsoluteURL(t *testing.T) {
118118
}
119119
}
120120

121-
func TestHandleProxyOriginError(t *testing.T) {
121+
// TestHandleProxyForwardsOrigin5xxVerbatim: any non-2xx the origin returns
122+
// must be passed back to DuckDB unchanged. Translating a 500 into a 502
123+
// (the old behaviour) made DuckDB's httpfs retry transient-class errors that
124+
// were really terminal, and hid the real status from logs and the client.
125+
func TestHandleProxyForwardsOrigin5xxVerbatim(t *testing.T) {
122126
proxy := newTestProxy(t)
123127
_, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
124128
http.Error(w, "boom", http.StatusInternalServerError)
125129
})
126130
rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/broken", http.Header{"Range": []string{"bytes=0-1"}})
131+
if rec.Code != http.StatusInternalServerError {
132+
t.Fatalf("status = %d, want 500 forwarded verbatim", rec.Code)
133+
}
134+
if !strings.Contains(rec.Body.String(), "boom") {
135+
t.Errorf("body = %q, want it to contain origin body 'boom'", rec.Body.String())
136+
}
137+
}
138+
139+
// TestHandleProxyForwardsOrigin400Verbatim is the case the user actually
140+
// hit: S3 returns 400 with an XML envelope (<Code>ExpiredToken</Code>) and
141+
// DuckDB needs to see *that* body and *that* status, not a generic 502 with
142+
// a Go-formatted error string. Without verbatim passthrough the error class
143+
// (4xx terminal vs 5xx retriable) is lost and httpfs retries non-retriable
144+
// auth failures.
145+
func TestHandleProxyForwardsOrigin400Verbatim(t *testing.T) {
146+
proxy := newTestProxy(t)
147+
const errBody = `<?xml version="1.0" encoding="UTF-8"?>
148+
<Error><Code>ExpiredToken</Code><Message>The provided token has expired.</Message></Error>`
149+
_, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
150+
w.Header().Set("Content-Type", "application/xml")
151+
w.Header().Set("X-Amz-Request-Id", "TESTREQID123")
152+
w.WriteHeader(http.StatusBadRequest)
153+
_, _ = w.Write([]byte(errBody))
154+
})
155+
rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/expired.parquet", http.Header{"Range": []string{"bytes=0-1023"}})
156+
157+
if rec.Code != http.StatusBadRequest {
158+
t.Fatalf("status = %d, want 400 forwarded verbatim", rec.Code)
159+
}
160+
if got := rec.Body.String(); got != errBody {
161+
t.Errorf("body mismatch:\n got = %q\nwant = %q", got, errBody)
162+
}
163+
if ct := rec.Header().Get("Content-Type"); ct != "application/xml" {
164+
t.Errorf("Content-Type = %q, want application/xml so DuckLake parses it as an S3 error envelope", ct)
165+
}
166+
if rid := rec.Header().Get("X-Amz-Request-Id"); rid != "TESTREQID123" {
167+
t.Errorf("X-Amz-Request-Id = %q, want TESTREQID123 (preserved for debugging)", rid)
168+
}
169+
}
170+
171+
// TestHandleProxyForwardsOrigin404Verbatim: a 404 must stay a 404 so
172+
// DuckDB / DuckLake can distinguish "object missing" (terminal) from
173+
// "transient gateway error" (retriable).
174+
func TestHandleProxyForwardsOrigin404Verbatim(t *testing.T) {
175+
proxy := newTestProxy(t)
176+
_, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
177+
w.WriteHeader(http.StatusNotFound)
178+
_, _ = w.Write([]byte("missing"))
179+
})
180+
rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/gone.parquet", http.Header{"Range": []string{"bytes=0-1"}})
181+
if rec.Code != http.StatusNotFound {
182+
t.Fatalf("status = %d, want 404 forwarded verbatim", rec.Code)
183+
}
184+
}
185+
186+
// TestHandleProxyForwardsOrigin416Verbatim: Range Not Satisfiable carries
187+
// semantically important metadata for DuckLake; collapsing to 502 made it
188+
// look like a network error.
189+
func TestHandleProxyForwardsOrigin416Verbatim(t *testing.T) {
190+
proxy := newTestProxy(t)
191+
_, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
192+
w.Header().Set("Content-Range", "bytes */1024")
193+
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
194+
})
195+
rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/short.parquet", http.Header{"Range": []string{"bytes=999999-1000000"}})
196+
if rec.Code != http.StatusRequestedRangeNotSatisfiable {
197+
t.Fatalf("status = %d, want 416 forwarded verbatim", rec.Code)
198+
}
199+
if cr := rec.Header().Get("Content-Range"); cr != "bytes */1024" {
200+
t.Errorf("Content-Range = %q, want 'bytes */1024' (DuckDB uses this to learn the actual file size)", cr)
201+
}
202+
}
203+
204+
// TestHandleProxyDoesNotCacheErrorResponses: a transient origin error
205+
// must not poison the cache. The next request for the same key has to hit
206+
// the origin again — otherwise an ExpiredToken error would persist past
207+
// the credential refresh that fixes it.
208+
func TestHandleProxyDoesNotCacheErrorResponses(t *testing.T) {
209+
proxy := newTestProxy(t)
210+
var calls int32
211+
_, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
212+
n := atomic.AddInt32(&calls, 1)
213+
if n == 1 {
214+
w.WriteHeader(http.StatusBadRequest)
215+
_, _ = w.Write([]byte("first call fails"))
216+
return
217+
}
218+
w.Header().Set("Content-Type", "application/octet-stream")
219+
w.WriteHeader(http.StatusOK)
220+
_, _ = w.Write([]byte("ok-now"))
221+
})
222+
223+
rec := doForwardProxyRequest(proxy, "GET", originURL+"/bucket/flaky.parquet", http.Header{"Range": []string{"bytes=0-5"}})
224+
if rec.Code != http.StatusBadRequest {
225+
t.Fatalf("first call: status = %d, want 400", rec.Code)
226+
}
227+
228+
rec = doForwardProxyRequest(proxy, "GET", originURL+"/bucket/flaky.parquet", http.Header{"Range": []string{"bytes=0-5"}})
229+
if rec.Code != http.StatusPartialContent {
230+
t.Fatalf("retry: status = %d, want 206 (cache must not have stored the prior error)", rec.Code)
231+
}
232+
if atomic.LoadInt32(&calls) != 2 {
233+
t.Errorf("origin calls = %d, want 2 (the cache must not serve a previously-failed request from cache)", calls)
234+
}
235+
}
236+
237+
// TestHandleProxyNetworkErrorStill502: when the origin is fully
238+
// unreachable (no HTTP response at all), 502 is still the right answer —
239+
// there was no upstream status to forward, and httpfs's "retry on 5xx"
240+
// behaviour is appropriate here.
241+
func TestHandleProxyNetworkErrorStill502(t *testing.T) {
242+
proxy := newTestProxy(t)
243+
// Construct a URL that points at no listener: srv.Close before use.
244+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
245+
dead := srv.URL
246+
srv.Close()
247+
248+
rec := doForwardProxyRequest(proxy, "GET", dead+"/bucket/anything", http.Header{"Range": []string{"bytes=0-1"}})
127249
if rec.Code != http.StatusBadGateway {
128-
t.Errorf("status = %d, want 502 on origin error", rec.Code)
250+
t.Fatalf("status = %d, want 502 when origin is unreachable", rec.Code)
129251
}
130252
}
131253

@@ -241,10 +363,10 @@ func TestFetchOriginPreservesSignedHeaders(t *testing.T) {
241363
})
242364

243365
h := http.Header{
244-
"Range": []string{"bytes=0-1"},
245-
"Authorization": []string{"AWS4-HMAC-SHA256 Credential=AKIATEST/20260101/us-east-1/s3/aws4_request, SignedHeaders=host, Signature=abcdef"},
246-
"X-Amz-Date": []string{"20260101T000000Z"},
247-
"X-Amz-Content-Sha256": []string{"UNSIGNED-PAYLOAD"},
366+
"Range": []string{"bytes=0-1"},
367+
"Authorization": []string{"AWS4-HMAC-SHA256 Credential=AKIATEST/20260101/us-east-1/s3/aws4_request, SignedHeaders=host, Signature=abcdef"},
368+
"X-Amz-Date": []string{"20260101T000000Z"},
369+
"X-Amz-Content-Sha256": []string{"UNSIGNED-PAYLOAD"},
248370
// Hop-by-hop header must NOT be forwarded.
249371
"Proxy-Connection": []string{"Keep-Alive"},
250372
}

0 commit comments

Comments
 (0)