Skip to content

Commit 3431d53

Browse files
authored
re-authenticate when elasticsearch returns a 401 error (#72)
* refactor auth * re authenticate on a 401 error from elasticsearch * add trace span to authentication
1 parent 9bb3b77 commit 3431d53

1 file changed

Lines changed: 111 additions & 60 deletions

File tree

elasticsearch/client.go

Lines changed: 111 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ import (
1111
"github.com/elastic/go-elasticsearch/v8"
1212
"github.com/elastic/go-elasticsearch/v8/esapi"
1313
"github.com/hasura/ndc-sdk-go/connector"
14+
"go.opentelemetry.io/otel"
15+
"go.opentelemetry.io/otel/attribute"
16+
"go.opentelemetry.io/otel/codes"
17+
"go.opentelemetry.io/otel/trace"
1418
)
1519

1620
type Client struct {
@@ -19,88 +23,119 @@ type Client struct {
1923

2024
// NewClient creates a new client with configuration from cfg.
2125
func NewClient(ctx context.Context) (*Client, error) {
22-
var config *elasticsearch.Config
23-
var err error
24-
25-
if shouldUseCredentialsProvider() {
26-
config, err = getConfigFromCredentialsProvider(ctx, false)
27-
if err != nil {
28-
return nil, err
29-
}
30-
31-
client, err := initClient(config)
32-
if err != nil {
26+
client := &Client{}
27+
err := client.Authenticate(ctx)
28+
if err != nil {
29+
return nil, fmt.Errorf("error authenticating with elasticsearch: %s", err)
30+
}
31+
return client, nil
32+
}
3333

34-
// the error might be that the credentials have expired
35-
// try with forceRefresh = true to get the credentials again
36-
config, err = getConfigFromCredentialsProvider(ctx, true)
37-
if err != nil {
38-
return nil, err
39-
}
34+
func (e *Client) Authenticate(ctx context.Context) error {
35+
ctx, span := otel.Tracer("es_client").Start(ctx, "authenticate_elasticsearch", trace.WithAttributes(
36+
attribute.String("internal.visibility", "user"), // this attr makes the span visible in the hasura console
37+
))
38+
defer span.End()
4039

41-
client, err = initClient(config)
42-
if err != nil {
43-
return nil, err
44-
}
45-
}
46-
return client, nil
40+
// we'll set all auth related errors as internal errors
41+
// so that we don't expose any sensitive information in the API response.
42+
// actual errors are recorded in the span
43+
esConfig, err := e.accquireAuthConfig(ctx, false)
44+
if err != nil {
45+
span.SetStatus(codes.Error, err.Error())
46+
span.RecordError(err)
47+
return errors.New("internal error")
48+
}
49+
esClient, err := elasticsearch.NewClient(*esConfig)
50+
if err != nil {
51+
span.SetStatus(codes.Error, err.Error())
52+
span.RecordError(err)
53+
return errors.New("internal error")
54+
}
55+
e.client = esClient
56+
// Ping the client to check if the connection is successful
57+
err = e.Ping()
58+
if err == nil {
59+
// authenticated successfully
60+
return nil
4761
}
4862

49-
config, err = getConfigFromEnv()
63+
// if the ping fails, try to authenticate again with force refreshing the credentials
64+
esConfig, err = e.accquireAuthConfig(ctx, true)
5065
if err != nil {
51-
return nil, err
66+
span.SetStatus(codes.Error, err.Error())
67+
span.RecordError(err)
68+
return errors.New("internal error")
69+
}
70+
esClient, err = elasticsearch.NewClient(*esConfig)
71+
if err != nil {
72+
span.SetStatus(codes.Error, err.Error())
73+
span.RecordError(err)
74+
return errors.New("internal error")
75+
}
76+
e.client = esClient
77+
// Ping the client to check if the connection is successful
78+
err = e.Ping()
79+
if err != nil {
80+
span.SetStatus(codes.Error, err.Error())
81+
span.RecordError(err)
82+
return errors.New("internal error")
5283
}
5384

54-
return initClient(config)
85+
return nil
5586
}
5687

57-
func initClient(config *elasticsearch.Config) (*Client, error) {
58-
c, err := elasticsearch.NewClient(*config)
59-
if err != nil {
60-
return nil, err
61-
}
88+
func (e *Client) Reauthenticate(ctx context.Context) error {
89+
return e.Authenticate(ctx)
90+
}
6291

63-
client := &Client{client: c}
64-
err = client.Ping()
65-
if err != nil {
66-
return nil, fmt.Errorf("failed to validate elasticsearch credentials: %v", err)
92+
func (e *Client) accquireAuthConfig(ctx context.Context, forceRefresh bool) (*elasticsearch.Config, error) {
93+
if shouldUseCredentialsProvider() {
94+
esConfig, err := getConfigFromCredentialsProvider(ctx, forceRefresh)
95+
if err != nil {
96+
return nil, err
97+
}
98+
return esConfig, nil
99+
} else {
100+
esConfig, err := getConfigFromEnv()
101+
if err != nil {
102+
return nil, err
103+
}
104+
return esConfig, nil
67105
}
68-
return client, nil
69106
}
70107

71108
// Ping returns whether the Elasticsearch cluster is running.
72109
func (e *Client) Ping() error {
73110
res, err := e.client.Ping()
74111
if err != nil {
75-
return err
112+
return fmt.Errorf("failed to ping elasticsearch: %w", err)
76113
}
77-
78-
defer res.Body.Close()
79-
80-
// Check response status
81114
if res.IsError() {
82-
return errors.New(res.String())
115+
return fmt.Errorf("failed to ping elasticsearch: %s", res.String())
83116
}
84117

118+
defer res.Body.Close()
119+
85120
return nil
86121
}
87122

88123
// Search performs a search operation in elastic search.
89124
func (e *Client) Search(ctx context.Context, index string, body map[string]interface{}) (map[string]interface{}, error) {
90-
es := e.client
91-
92125
var buf bytes.Buffer
93126
if err := json.NewEncoder(&buf).Encode(body); err != nil {
94127
return nil, err
95128
}
96129

97-
// TODO: use search() helper function
98-
req := esapi.SearchRequest{
99-
Index: []string{index},
100-
Body: &buf,
101-
}
130+
search := esapi.Search(func(o ...func(*esapi.SearchRequest)) (*esapi.Response, error) {
131+
return e.search(ctx, o...)
132+
})
102133

103-
res, err := req.Do(ctx, es)
134+
res, err := search(
135+
search.WithContext(ctx),
136+
search.WithIndex(index),
137+
search.WithBody(&buf),
138+
)
104139
if err != nil {
105140
return nil, err
106141
}
@@ -114,31 +149,45 @@ func (e *Client) Search(ctx context.Context, index string, body map[string]inter
114149
}
115150

116151
// search is a helper function to perform a search operation in elastic search.
117-
func (e *Client) search(o ...func(*esapi.SearchRequest)) (*esapi.Response, error) {
152+
func (e *Client) search(ctx context.Context, o ...func(*esapi.SearchRequest)) (*esapi.Response, error) {
118153
req := &esapi.SearchRequest{}
119-
es := e.client
120154

121155
for _, opt := range o {
122156
opt(req)
123157
}
124158

125-
res, err := req.Do(context.TODO(), es)
159+
res, err := req.Do(ctx, e.client)
160+
161+
if res.IsError() {
162+
if res.StatusCode == 401 {
163+
// Unauthorized error, reauthenticate and retry
164+
err = e.Reauthenticate(ctx)
165+
if err != nil {
166+
return nil, fmt.Errorf("error: %s", err)
167+
}
168+
res, err = req.Do(ctx, e.client)
169+
if err != nil {
170+
return nil, fmt.Errorf("error: %s", err)
171+
}
172+
} else {
173+
return nil, fmt.Errorf("error while querying: %s", res.String())
174+
}
175+
}
126176
return res, err
127177
}
128178

129179
// Explain performs a search with explain operation in elastic search.
130180
//
131-
// Since the Explain API requires document ID, we can't use it.
181+
// Since the Explain API requires document ID, we can't use it.
132182
// Explain API: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-explain.html
133183
//
134184
// We instead use the Profile API to implement query explain functionality.
135185
// To use the Profile API, we need to add `profile=true` to the query.
136186
// Profile API: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-profile.html
137187
func (e *Client) ExplainSearch(ctx context.Context, index string, query map[string]interface{}) (map[string]interface{}, error) {
138-
139188
// Add `profile=true` to the query to get profiling query information
140189
query["profile"] = true
141-
190+
142191
var buf bytes.Buffer
143192
if err := json.NewEncoder(&buf).Encode(query); err != nil {
144193
return nil, err
@@ -148,12 +197,14 @@ func (e *Client) ExplainSearch(ctx context.Context, index string, query map[stri
148197
// We can safely remove it from the query map, so that the original var remains unchanged
149198
delete(query, "profile")
150199

151-
search := esapi.Search(e.search)
200+
search := esapi.Search(func(o ...func(*esapi.SearchRequest)) (*esapi.Response, error) {
201+
return e.search(ctx, o...)
202+
})
152203

153204
res, err := search(
154-
search.WithContext(ctx),
155-
search.WithIndex(index),
156-
search.WithBody(&buf),
205+
search.WithContext(ctx),
206+
search.WithIndex(index),
207+
search.WithBody(&buf),
157208
search.WithExplain(true), // set explain to true
158209
)
159210
if err != nil {

0 commit comments

Comments
 (0)