-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathclient.go
More file actions
268 lines (218 loc) · 8.28 KB
/
client.go
File metadata and controls
268 lines (218 loc) · 8.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package client
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"github.com/google/uuid"
"github.com/prefecthq/terraform-provider-prefect/internal/api"
retryablehttp "github.com/hashicorp/go-retryablehttp"
)
var _ = api.PrefectClient(&Client{})
// New creates and returns new client instance.
func New(opts ...Option) (*Client, error) {
// Uses the retryablehttp package for built-in retries.
//
// Some notable defaults from that package include:
// - max retries: 4
// - retry wait minimum seconds: 1
// - retry wait maximum seconds: 30
//
// We use RateLimitLinearJitterBackoff as the backoff strategy, which:
// - Respects Retry-After headers on 429/503 responses
// - Falls back to linear jitter backoff otherwise
// - Helps prevent thundering herd problems
//
// All defaults are defined in
// https://github.com/hashicorp/go-retryablehttp/blob/main/client.go#L48-L51.
retryableClient := retryablehttp.NewClient()
retryableClient.Backoff = retryablehttp.RateLimitLinearJitterBackoff
// By default, retryablehttp will only retry requests if there was some kind
// of transient server or networking error. We can be more specific with this
// by providing a custom function for determining whether or not to retry.
retryableClient.CheckRetry = checkRetryPolicy
// Finally, convert the retryablehttp client to a standard http client.
// This allows us to retain the `http.Client` interface, and avoid specifying
// the `retryablehttp.Client` interface in our client methods.
httpClient := retryableClient.StandardClient()
client := &Client{hc: httpClient}
var errs []error
for _, opt := range opts {
err := opt(client)
// accumulate errors and return them all at once
if err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return nil, errors.Join(errs...)
}
return client, nil
}
// obtainCsrfToken fetches the CSRF token from the Prefect server.
// It should be called after the client's endpoint and auth are configured.
func (c *Client) obtainCsrfToken() error {
tokenURL := fmt.Sprintf("%s/csrf-token?client=%s", c.endpoint, c.csrfClientToken)
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, tokenURL, http.NoBody)
if err != nil {
return fmt.Errorf("error creating CSRF token request: %w", err)
}
// Set necessary headers. Note: Prefect-Csrf-Token is NOT sent for this request.
setAuthorizationHeader(req, c.apiKey, c.basicAuthKey)
req.Header.Set("Accept", "application/json")
req.Header.Set("Prefect-Csrf-Client", c.csrfClientToken)
// Apply custom headers to CSRF token request
for key, value := range c.customHeaders {
req.Header.Set(key, value)
}
resp, err := c.hc.Do(req) //nolint:gosec // URL is constructed from provider configuration, not user input
if err != nil {
return fmt.Errorf("http error on CSRF token request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to fetch CSRF token, status: %s, body: %s", resp.Status, string(bodyBytes))
}
var tokenResponse api.CSRFTokenResponse
if err := json.NewDecoder(resp.Body).Decode(&tokenResponse); err != nil {
return fmt.Errorf("failed to decode CSRF token response: %w", err)
}
if tokenResponse.Token == "" {
return fmt.Errorf("CSRF token not found in response")
}
c.csrfToken = tokenResponse.Token
return nil
}
// WithEndpoint configures the client to communicate with a self-hosted
// Prefect server or Prefect Cloud.
func WithEndpoint(endpoint string, host string) Option {
return func(client *Client) error {
_, err := url.Parse(endpoint)
if err != nil {
return fmt.Errorf("endpoint is not a valid url: %w", err)
}
if strings.HasSuffix(endpoint, "/") {
return fmt.Errorf("endpoint %q must not include trailing slash", endpoint)
}
client.endpoint = endpoint
client.endpointHost = host
return nil
}
}
// WithAPIKey configures the API Key to use to authenticate to Prefect.
func WithAPIKey(apiKey string) Option {
return func(client *Client) error {
client.apiKey = apiKey
return nil
}
}
// WithBasicAuthKey configures the basic auth key to use to authenticate to Prefect.
func WithBasicAuthKey(basicAuthKey string) Option {
return func(client *Client) error {
client.basicAuthKey = basicAuthKey
return nil
}
}
// WithCsrfEnabled configures the client to enable CSRF protection.
func WithCsrfEnabled(csrfEnabled bool) Option {
return func(client *Client) error {
if csrfEnabled {
client.csrfClientToken = uuid.NewString()
if err := client.obtainCsrfToken(); err != nil {
return fmt.Errorf("failed to obtain CSRF token: %w", err)
}
}
return nil
}
}
// WithDefaults configures the default account and workspace ID.
func WithDefaults(accountID uuid.UUID, workspaceID uuid.UUID) Option {
return func(client *Client) error {
if accountID == uuid.Nil && workspaceID != uuid.Nil {
return fmt.Errorf("an accountID must be set if a workspaceID is set: accountID is %q and workspaceID is %q", accountID, workspaceID)
}
client.defaultAccountID = accountID
client.defaultWorkspaceID = workspaceID
return nil
}
}
// WithCustomHeaders configures custom HTTP headers to include in all API requests.
// Protected headers (User-Agent, Prefect-Csrf-Token, Prefect-Csrf-Client) are filtered out
// and a warning is logged if any are attempted to be overridden.
//
// References:
// - https://docs.prefect.io/v3/advanced/api-client#configure-custom-headers
// - https://docs.prefect.io/v3/advanced/security-settings#custom-client-headers
func WithCustomHeaders(headers map[string]string) Option {
return func(client *Client) error {
if headers == nil {
return nil
}
// Define protected headers that cannot be overridden for security reasons
protectedHeaders := map[string]bool{
"User-Agent": true,
"Prefect-Csrf-Token": true,
"Prefect-Csrf-Client": true,
}
filtered := make(map[string]string)
for key, value := range headers {
if protectedHeaders[key] {
// Log warning when protected header is attempted to be overridden
// This matches Prefect Python SDK behavior
// Note: Using fmt.Fprintf to stderr instead of Printf for warning output
_, _ = fmt.Fprintf(os.Stderr, "WARNING: Cannot override protected header %q, ignoring custom value to maintain security\n", key)
continue
}
filtered[key] = value
}
client.customHeaders = filtered
return nil
}
}
func checkRetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
// If the response is empty, there was a problem with the request,
// so try again.
if resp == nil {
return true, err
}
// If the response is a 409 (StatusConflict), that means the request
// eventually succeeded and we don't need to make the request again.
if resp.StatusCode == http.StatusConflict {
return false, nil
}
// If the request is forbidden, no need to retry the request. Print
// out the error and stop retrying.
if resp.StatusCode == http.StatusForbidden {
body, _ := io.ReadAll(resp.Body)
return false, fmt.Errorf("status_code=%d, error=%w, body=%s", resp.StatusCode, err, body)
}
// Context-aware 404 handling: Skip retries for DELETE operations.
// This prevents timing issues in acceptance tests during post-destroy plans.
//
// For non-DELETE operations (GET, POST, PUT, PATCH), retry 404s.
// This is particularly relevant for block-related objects that are created asynchronously.
if resp.StatusCode == http.StatusNotFound {
// NOTE: we encode the status code in the error object as a workaround
// in cases where we want access to the status code on a failed client.Do() call
// due to exhausted retries.
//
// go-retryablehttp does not return the response object on exhausted retries.
//
// https://github.com/hashicorp/go-retryablehttp/blob/main/client.go#L811-L825
body, _ := io.ReadAll(resp.Body)
errResult := fmt.Errorf("status_code=%d, error=%w, body=%s", resp.StatusCode, err, body)
if httpMethod, ok := ctx.Value(httpMethodContextKey).(string); ok && httpMethod == http.MethodDelete {
return false, errResult
}
return true, errResult
}
// Fall back to the default retry policy for any other status codes.
//nolint:wrapcheck // we've extended this method, no need to wrap error
return retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, err)
}