diff --git a/materialize-snowflake/stream.go b/materialize-snowflake/stream.go index f2ebbc5236..1251ed0e53 100644 --- a/materialize-snowflake/stream.go +++ b/materialize-snowflake/stream.go @@ -248,41 +248,35 @@ func (sm *streamManager) write(ctx context.Context, blobs []*blobMetadata, decry blob.Chunks[0].Channels[0].ClientSequencer = thisChannel.ClientSequencer blob.Chunks[0].Channels[0].RowSequencer = thisChannel.RowSequencer + 1 - if recovery { - // Handle the case where decryptKey was not in the checkpoint because - // the checkpoint was created before it was added. We will guess that - // its the same as the current key, if its not then the decrypted - // data will not have the parquet magic header. - // - // This check can be removed once all tasks have written a checkpoint. - if decryptKey == "" { - log.WithFields(log.Fields{ - "schema": schema, - "table": table, - }).Warn("unknown encryption key for rewrite; using current") - decryptKey = thisChannel.EncryptionKey - } - - // Always use the rename and re-upload strategy during recovery - // commits. This prevents issues where blobs could be re-registered - // if the underlying channel has changed somehow, perhaps by being - // dropped out-of-band, or its last persisted token changed in some - // other way. - // - // It is also possible the channel encryption key has changed and - // the blob will need to be rewritten with the new encryption key. - // - // This also address the "blob has wrong format or extension" error - // which occurs if the blob was written not-so-recently; apparently - // anything older than an hour or so is rejected by what seems to - // be a server-side check the examines the name of the file, which - // contains the timestamp it was written. In this case, which - // may arise from re-enabling a disabled binding / materialization, - // or extended outages, we have to download the file and re-upload - // it with an up-to-date name. - // - // This should be a relatively infrequent occurrence, so the - // performance impact + increased data transfer should be tolerable. + // Always use the rename and re-upload strategy during recovery + // commits. + // + // An exception is for checkpoints that do not yet have a decryptKey, + // created by earlier versions of the connector. If we don't know the + // old decryption key we can't rename them, so our only choice is to + // attempt to register them with the encryption key ID they were + // originally written with. + // + // This prevents issues where blobs could be re-registered + // if the underlying channel has changed somehow, perhaps by being + // dropped out-of-band, or its last persisted token changed in some + // other way. + // + // It is also possible the channel encryption key has changed and + // the blob will need to be rewritten with the new encryption key. + // + // This also address the "blob has wrong format or extension" error + // which occurs if the blob was written not-so-recently; apparently + // anything older than an hour or so is rejected by what seems to + // be a server-side check the examines the name of the file, which + // contains the timestamp it was written. In this case, which + // may arise from re-enabling a disabled binding / materialization, + // or extended outages, we have to download the file and re-upload + // it with an up-to-date name. + // + // This should be a relatively infrequent occurrence, so the + // performance impact + increased data transfer should be tolerable. + if recovery && decryptKey != "" { if err := sm.writeRenamed(ctx, decryptKey, thisChannel, blob); err != nil { return fmt.Errorf("writeRenamed: %w", err) }