Skip to content

Commit 3119c3e

Browse files
committed
materialize-snowflake: additional case of snowpipe streaming incompatibility
An open channel API response with code 400 and Snowflake message code 6 has been observed in certain setups with transient tables, where Snowpipe streaming is apparently not supported. This is somewhat strange as I am able to use Snowpipe streaming with a transient table on our own test account, but that is obviously not always the case, so we need to handle it here to fall back to regular Snowpipe.
1 parent a49c5a1 commit 3119c3e

File tree

2 files changed

+11
-9
lines changed

2 files changed

+11
-9
lines changed

materialize-snowflake/snowflake.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,9 @@ func (d *transactor) addBinding(ctx context.Context, target sql.Table, streaming
327327
if err := d.streamManager.addBinding(ctx, loc.TableSchema, d.ep.Identifier(loc.TableName), target); err != nil {
328328
var apiError *streamingApiError
329329
var colError *unhandledColError
330-
if errors.As(err, &apiError) && apiError.Code == 55 {
331-
// Streaming API errors with code 55 come from tables that don't
332-
// support streaming at all, so we will fall back to a
330+
if errors.As(err, &apiError) && (apiError.Code == 6 || apiError.Code == 55) {
331+
// Streaming API errors with code 6 or 55 come from tables that
332+
// don't support streaming at all, so we will fall back to a
333333
// non-streaming strategy for them if they are encountered.
334334
} else if errors.As(err, &colError) {
335335
// This column type is something that we haven't yet implemented

materialize-snowflake/stream_http.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func newStreamClient(cfg *config, account string) (*streamClient, error) {
189189
}
190190

191191
return &streamClient{
192-
r: resty.New().SetBaseURL(fmt.Sprintf("https://") + path.Join(cfg.Host, "v1/streaming")),
192+
r: resty.New().SetBaseURL("https://" + path.Join(cfg.Host, "v1/streaming")),
193193
key: key,
194194
user: cfg.Credentials.User,
195195
database: cfg.Database,
@@ -200,7 +200,7 @@ func newStreamClient(cfg *config, account string) (*streamClient, error) {
200200

201201
func (s *streamClient) configure(ctx context.Context) (*streamConfig, error) {
202202
type req struct {
203-
Role *string `json:"role;omitempty"`
203+
Role *string `json:"role,omitempty"`
204204
}
205205

206206
res, err := post[streamConfig](ctx, s, "/client/configure", req{
@@ -235,10 +235,12 @@ func (s *streamClient) openChannel(ctx context.Context, schema, table, name stri
235235
Channel: name,
236236
WriteMode: "CLOUD_STORAGE",
237237
})
238-
if err != nil {
238+
if err != nil && res == nil {
239239
return nil, err
240-
} else if err := getErrorByCode(res.StatusCode); err != nil {
241-
return nil, fmt.Errorf("request was not successful: %w", err)
240+
} else if codeErr := getErrorByCode(res.StatusCode); codeErr != nil {
241+
return nil, fmt.Errorf("request returned error code: %w", codeErr)
242+
} else if err != nil {
243+
return nil, fmt.Errorf("error opening channel with a non-error status code: %w", err)
242244
} else if res.Message != "Success" {
243245
return nil, fmt.Errorf("unexpected response message: %s", res.Message)
244246
}
@@ -392,7 +394,7 @@ func post[T any](ctx context.Context, c *streamClient, path string, body any) (*
392394
return nil, fmt.Errorf("failed to POST %s: %w", path, err)
393395
}
394396
if !got.IsSuccess() {
395-
return nil, fmt.Errorf("failed to POST %s: %s: %w", got.Request.URL, got.Status(), got.Error().(error))
397+
return &res, fmt.Errorf("failed to POST %s: %s: %w", got.Request.URL, got.Status(), got.Error().(error))
396398
}
397399

398400
return &res, nil

0 commit comments

Comments
 (0)