Skip to content

Commit d34dcb3

Browse files
dliubclaudem-Bilal
authored
fix(client): re-encode _search body on 401 retry (empty-body / dropped-payload bug) (#118)
* fix(client): re-encode _search body on 401 retry and check transport error On an HTTP 401 the connector re-authenticates and replays the SAME *esapi.SearchRequest via req.Do. The request body is an io.Reader that the first req.Do fully drains, so the retried _search was sent with an empty body. Elasticsearch interprets an empty _search body as match_all and returns unfiltered results — the intermittent "dropped payload on retry" symptom seen after token expiry/rotation mid-traffic (JPMorgan Chase support ticket #15000). - Capture the encoded body once and rebuild a fresh reader before every attempt so the retried _search carries the identical query body. - Check the error from the first req.Do BEFORE calling res.IsError(), which would panic on a nil response after a transport-level failure. - Add per-attempt request logging of the actual body + target index sent on the wire; the retry line carries the literal "Retry Query" marker. The log reflects what is truly sent, so it shows the empty body under the old bug. - Add hermetic functional tests (httptest fake ES, no real cluster) that reproduce the empty-retry-body bug and verify the fix. The env var ELASTICSEARCH_DISABLE_RETRY_BODY_REBUILD flips between buggy and fixed paths. Co-authored-by: Mohd Bilal <bilal@hasura.io> Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * docs: add pr_result.md summarizing the 401-retry fix, tests and logging Co-authored-by: Mohd Bilal <bilal@hasura.io> Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * refactor(client): remove testing-only env-var toggle for retry body rebuild Address review feedback: drop the ELASTICSEARCH_DISABLE_RETRY_BODY_REBUILD mechanism entirely. The retry now always rebuilds the request body unconditionally before req.Do, and the "Retry Query" log line logs the exact bytes sent (string(body)). - Delete retryBodyRebuildDisabled() and the guard around the rebuild. - Delete retryBodyForLog(); log string(body) which equals the wire body. - Tests no longer reference the env var; the functional test asserts only the fixed behaviour. Reproducing the old bug is documented as commenting out the rebuild line in search(). Primary fix (rebuild body before every attempt), secondary fix (check the first req.Do error before res.IsError()), and per-attempt request logging with the "Retry Query" marker are all retained. Co-authored-by: Mohd Bilal <bilal@hasura.io> Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * chore: remove stray pr_result.md from repo This reporting file was committed by mistake and should not live in the repo. Co-authored-by: Mohd Bilal <bilal@hasura.io> Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: dliub <noreply@anthropic.com> Co-authored-by: Mohd Bilal <bilal@hasura.io>
1 parent 8a3eb41 commit d34dcb3

2 files changed

Lines changed: 279 additions & 3 deletions

File tree

elasticsearch/client.go

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9+
"io"
910
"os"
11+
"strings"
1012

1113
"github.com/elastic/go-elasticsearch/v8"
1214
"github.com/elastic/go-elasticsearch/v8/esapi"
@@ -166,21 +168,48 @@ func (e *Client) Search(ctx context.Context, index string, body map[string]inter
166168

167169
// search is a helper function to perform a search operation in elastic search.
168170
func (e *Client) search(ctx context.Context, o ...func(*esapi.SearchRequest)) (*esapi.Response, error) {
171+
logger := connector.GetLogger(ctx)
172+
169173
req := &esapi.SearchRequest{}
170174

171175
for _, opt := range o {
172176
opt(req)
173177
}
174178

179+
// req.Body is an io.Reader that is fully drained by req.Do. If we retry the
180+
// request after a 401 (see below) by calling req.Do again on the same req,
181+
// the already-drained reader sends an empty body. Elasticsearch treats an
182+
// empty _search body as a match_all query and returns unfiltered results.
183+
// To make the request safely repeatable, capture the encoded body once and
184+
// rebuild a fresh reader before every attempt.
185+
body, err := drainBody(req.Body)
186+
if err != nil {
187+
return nil, fmt.Errorf("error reading search request body: %w", err)
188+
}
189+
index := strings.Join(req.Index, ",")
190+
191+
// First attempt.
192+
req.Body = bytes.NewReader(body)
193+
logger.DebugContext(ctx, "Query", "index", index, "body", string(body))
175194
res, err := req.Do(ctx, e.client)
195+
// Check the transport error before touching res: on a transport-level
196+
// failure res is nil and res.IsError() would panic.
197+
if err != nil {
198+
return nil, fmt.Errorf("error while querying: %s", err)
199+
}
176200

177201
if res.IsError() {
178202
if res.StatusCode == 401 {
179-
// Unauthorized error, reauthenticate and retry
180-
err = e.Reauthenticate(ctx)
181-
if err != nil {
203+
// Unauthorized error, reauthenticate and retry.
204+
if err = e.Reauthenticate(ctx); err != nil {
182205
return nil, fmt.Errorf("error: %s", err)
183206
}
207+
// Rebuild the body so the retried request carries the same query
208+
// instead of the already-drained (empty) reader. The retried reader
209+
// holds exactly these bytes, so logging string(body) reflects what is
210+
// actually sent over the wire.
211+
req.Body = bytes.NewReader(body)
212+
logger.DebugContext(ctx, "Retry Query", "index", index, "body", string(body))
184213
res, err = req.Do(ctx, e.client)
185214
if err != nil {
186215
return nil, fmt.Errorf("error: %s", err)
@@ -192,6 +221,15 @@ func (e *Client) search(ctx context.Context, o ...func(*esapi.SearchRequest)) (*
192221
return res, err
193222
}
194223

224+
// drainBody reads an io.Reader fully and returns its bytes, handling a nil
225+
// reader (no body) gracefully.
226+
func drainBody(r io.Reader) ([]byte, error) {
227+
if r == nil {
228+
return nil, nil
229+
}
230+
return io.ReadAll(r)
231+
}
232+
195233
// Explain performs a search with explain operation in elastic search.
196234
//
197235
// Since the Explain API requires document ID, we can't use it.

elasticsearch/client_retry_test.go

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package elasticsearch
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"io"
8+
"log/slog"
9+
"net/http"
10+
"net/http/httptest"
11+
"strings"
12+
"sync"
13+
"testing"
14+
)
15+
16+
// newFakeES returns an httptest server that emulates an Elasticsearch cluster
17+
// which fails the first _search with 401 (token expired mid-traffic) and then
18+
// succeeds. It records the body received on every _search attempt into bodies.
19+
func newFakeES(t *testing.T, bodies *[]string, mu *sync.Mutex) *httptest.Server {
20+
t.Helper()
21+
var attempts int
22+
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
23+
w.Header().Set("X-Elastic-Product", "Elasticsearch")
24+
switch {
25+
case r.Method == http.MethodHead && r.URL.Path == "/":
26+
w.WriteHeader(http.StatusOK)
27+
case r.URL.Path == "/" && r.Method == http.MethodGet:
28+
w.WriteHeader(http.StatusOK)
29+
_, _ = w.Write([]byte(`{"version":{"number":"8.0.0"}}`))
30+
default:
31+
bodyBytes, _ := io.ReadAll(r.Body)
32+
mu.Lock()
33+
attempts++
34+
attempt := attempts
35+
*bodies = append(*bodies, string(bodyBytes))
36+
mu.Unlock()
37+
if attempt == 1 {
38+
w.WriteHeader(http.StatusUnauthorized)
39+
_, _ = w.Write([]byte(`{"error":"unauthorized"}`))
40+
return
41+
}
42+
w.WriteHeader(http.StatusOK)
43+
_, _ = w.Write([]byte(`{"took":1,"hits":{"total":{"value":0},"hits":[]}}`))
44+
}
45+
}))
46+
}
47+
48+
// TestSearchRetryBodyOn401 is a hermetic functional test for the "dropped
49+
// payload on intermittent retry" bug (JPMorgan Chase support ticket #15000).
50+
//
51+
// It stands up a fake Elasticsearch server that returns 401 on the first
52+
// _search call (simulating a token expiring mid-traffic) and 200 afterwards.
53+
// It captures the request body received on EVERY _search attempt and asserts
54+
// that the retry, sent after re-authentication, carries the SAME, non-empty
55+
// query body as the first attempt.
56+
//
57+
// go test -v -run TestSearchRetryBodyOn401 ./elasticsearch/
58+
//
59+
// To manually reproduce the OLD buggy behaviour (retry sends an empty body),
60+
// comment out the body-rebuild line in search() in client.go:
61+
//
62+
// // req.Body = bytes.NewReader(body) // <- comment this out before the retry req.Do
63+
//
64+
// and re-run this test: it will fail with "BUG REPRODUCED", because the retry
65+
// then reuses the already-drained reader and sends an empty body.
66+
func TestSearchRetryBodyOn401(t *testing.T) {
67+
var (
68+
mu sync.Mutex
69+
searchBodies []string
70+
)
71+
72+
server := newFakeES(t, &searchBodies, &mu)
73+
defer server.Close()
74+
75+
// Point the connector at the fake server. These are required by
76+
// getConfigFromEnv so that Reauthenticate succeeds against the fake server.
77+
t.Setenv("ELASTICSEARCH_URL", server.URL)
78+
t.Setenv("ELASTICSEARCH_USERNAME", "elastic")
79+
t.Setenv("ELASTICSEARCH_PASSWORD", "changeme")
80+
81+
ctx := context.Background()
82+
client, err := NewClient(ctx)
83+
if err != nil {
84+
t.Fatalf("failed to create client: %v", err)
85+
}
86+
87+
query := map[string]interface{}{
88+
"query": map[string]interface{}{
89+
"term": map[string]interface{}{"customer_id": "JPMC-42"},
90+
},
91+
"size": 10,
92+
}
93+
expectedBody, err := json.Marshal(query)
94+
if err != nil {
95+
t.Fatalf("failed to marshal query: %v", err)
96+
}
97+
98+
if _, err := client.Search(ctx, "transactions", query); err != nil {
99+
t.Fatalf("Search returned error: %v", err)
100+
}
101+
102+
mu.Lock()
103+
defer mu.Unlock()
104+
105+
if len(searchBodies) != 2 {
106+
t.Fatalf("expected exactly 2 _search attempts (initial + retry), got %d", len(searchBodies))
107+
}
108+
109+
// The first attempt must always carry the full query body.
110+
if !jsonEqual(t, searchBodies[0], string(expectedBody)) {
111+
t.Fatalf("first attempt body mismatch:\n got: %q\n want: %q", searchBodies[0], expectedBody)
112+
}
113+
114+
// The crux of the bug: the retry must carry the SAME, non-empty body.
115+
retryBody := searchBodies[1]
116+
if len(trimWS(retryBody)) == 0 {
117+
t.Fatalf("BUG REPRODUCED: retry sent an EMPTY body (Elasticsearch would treat this as match_all and return unfiltered results). retry body=%q", retryBody)
118+
}
119+
if !jsonEqual(t, retryBody, string(expectedBody)) {
120+
t.Fatalf("retry body mismatch:\n got: %q\n want: %q", retryBody, expectedBody)
121+
}
122+
123+
t.Logf("OK: both attempts sent identical, non-empty query body: %s", expectedBody)
124+
}
125+
126+
// TestSearchPerAttemptLogging verifies the per-attempt request logging: the
127+
// connector logs the actual _search body and target index it sends on EVERY
128+
// attempt, and the retry log line carries the literal "Retry Query" marker so
129+
// retries are easy to grep. The logged body reflects what is ACTUALLY sent, so
130+
// the "Retry Query" line carries the same non-empty body as the first attempt.
131+
//
132+
// go test -v -run TestSearchPerAttemptLogging ./elasticsearch/
133+
func TestSearchPerAttemptLogging(t *testing.T) {
134+
var (
135+
mu sync.Mutex
136+
searchBodies []string
137+
)
138+
server := newFakeES(t, &searchBodies, &mu)
139+
defer server.Close()
140+
141+
t.Setenv("ELASTICSEARCH_URL", server.URL)
142+
t.Setenv("ELASTICSEARCH_USERNAME", "elastic")
143+
t.Setenv("ELASTICSEARCH_PASSWORD", "changeme")
144+
145+
// Capture logs: GetLogger falls back to slog.Default() when no logger is set
146+
// on the context, so redirect the default logger to a debug-level buffer.
147+
var logBuf bytes.Buffer
148+
prev := slog.Default()
149+
slog.SetDefault(slog.New(slog.NewJSONHandler(&logBuf, &slog.HandlerOptions{Level: slog.LevelDebug})))
150+
defer slog.SetDefault(prev)
151+
152+
ctx := context.Background()
153+
client, err := NewClient(ctx)
154+
if err != nil {
155+
t.Fatalf("failed to create client: %v", err)
156+
}
157+
158+
query := map[string]interface{}{
159+
"query": map[string]interface{}{"term": map[string]interface{}{"customer_id": "JPMC-42"}},
160+
}
161+
if _, err := client.Search(ctx, "transactions", query); err != nil {
162+
t.Fatalf("Search returned error: %v", err)
163+
}
164+
165+
logs := logBuf.String()
166+
t.Logf("captured logs:\n%s", logs)
167+
168+
// Per-attempt logging: first attempt logged as "Query".
169+
if !strings.Contains(logs, `"msg":"Query"`) {
170+
t.Errorf(`expected a per-attempt "Query" log line, not found in:\n%s`, logs)
171+
}
172+
// Retry attempt logged with the literal "Retry Query" marker.
173+
if !strings.Contains(logs, `"msg":"Retry Query"`) {
174+
t.Errorf(`expected a "Retry Query" marked log line, not found in:\n%s`, logs)
175+
}
176+
// The log must carry the target index.
177+
if !strings.Contains(logs, `"index":"transactions"`) {
178+
t.Errorf(`expected the target index in the log line, not found in:\n%s`, logs)
179+
}
180+
181+
// The logged retry body must reflect what is ACTUALLY sent on the wire: a
182+
// non-empty body identical to the first attempt.
183+
retryBody := logFieldForMsg(t, logs, "Retry Query", "body")
184+
if trimWS(retryBody) == "" {
185+
t.Fatalf("expected a non-empty logged retry body, got %q", retryBody)
186+
}
187+
if !jsonEqual(t, retryBody, logFieldForMsg(t, logs, "Query", "body")) {
188+
t.Errorf("logged retry body %q does not match first attempt body", retryBody)
189+
}
190+
t.Logf("retry logged the full body: %s", retryBody)
191+
}
192+
193+
// logFieldForMsg returns the value of field for the first JSON log record whose
194+
// "msg" equals msg.
195+
func logFieldForMsg(t *testing.T, logs, msg, field string) string {
196+
t.Helper()
197+
for _, line := range strings.Split(strings.TrimSpace(logs), "\n") {
198+
if line == "" {
199+
continue
200+
}
201+
var rec map[string]interface{}
202+
if err := json.Unmarshal([]byte(line), &rec); err != nil {
203+
continue
204+
}
205+
if rec["msg"] == msg {
206+
if v, ok := rec[field].(string); ok {
207+
return v
208+
}
209+
}
210+
}
211+
return ""
212+
}
213+
214+
// jsonEqual compares two JSON documents for semantic equality (ignoring key
215+
// order and insignificant whitespace).
216+
func jsonEqual(t *testing.T, a, b string) bool {
217+
t.Helper()
218+
var av, bv interface{}
219+
if err := json.Unmarshal([]byte(a), &av); err != nil {
220+
return false
221+
}
222+
if err := json.Unmarshal([]byte(b), &bv); err != nil {
223+
return false
224+
}
225+
ab, _ := json.Marshal(av)
226+
bb, _ := json.Marshal(bv)
227+
return string(ab) == string(bb)
228+
}
229+
230+
func trimWS(s string) string {
231+
out := make([]rune, 0, len(s))
232+
for _, r := range s {
233+
if r != ' ' && r != '\n' && r != '\t' && r != '\r' {
234+
out = append(out, r)
235+
}
236+
}
237+
return string(out)
238+
}

0 commit comments

Comments
 (0)