Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 29 additions & 35 deletions materialize-snowflake/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,41 +248,35 @@ func (sm *streamManager) write(ctx context.Context, blobs []*blobMetadata, decry
blob.Chunks[0].Channels[0].ClientSequencer = thisChannel.ClientSequencer
blob.Chunks[0].Channels[0].RowSequencer = thisChannel.RowSequencer + 1

if recovery {
// Handle the case where decryptKey was not in the checkpoint because
// the checkpoint was created before it was added. We will guess that
// its the same as the current key, if its not then the decrypted
// data will not have the parquet magic header.
//
// This check can be removed once all tasks have written a checkpoint.
if decryptKey == "" {
log.WithFields(log.Fields{
"schema": schema,
"table": table,
}).Warn("unknown encryption key for rewrite; using current")
decryptKey = thisChannel.EncryptionKey
}

// Always use the rename and re-upload strategy during recovery
// commits. This prevents issues where blobs could be re-registered
// if the underlying channel has changed somehow, perhaps by being
// dropped out-of-band, or its last persisted token changed in some
// other way.
//
// It is also possible the channel encryption key has changed and
// the blob will need to be rewritten with the new encryption key.
//
// This also address the "blob has wrong format or extension" error
// which occurs if the blob was written not-so-recently; apparently
// anything older than an hour or so is rejected by what seems to
// be a server-side check the examines the name of the file, which
// contains the timestamp it was written. In this case, which
// may arise from re-enabling a disabled binding / materialization,
// or extended outages, we have to download the file and re-upload
// it with an up-to-date name.
//
// This should be a relatively infrequent occurrence, so the
// performance impact + increased data transfer should be tolerable.
// Always use the rename and re-upload strategy during recovery
// commits.
//
// An exception is for checkpoints that do not yet have a decryptKey,
// created by earlier versions of the connector. If we don't know the
// old decryption key we can't rename them, so our only choice is to
// attempt to register them with the encryption key ID they were
// originally written with.
//
// This prevents issues where blobs could be re-registered
// if the underlying channel has changed somehow, perhaps by being
// dropped out-of-band, or its last persisted token changed in some
// other way.
//
// It is also possible the channel encryption key has changed and
// the blob will need to be rewritten with the new encryption key.
//
// This also address the "blob has wrong format or extension" error
// which occurs if the blob was written not-so-recently; apparently
// anything older than an hour or so is rejected by what seems to
// be a server-side check the examines the name of the file, which
// contains the timestamp it was written. In this case, which
// may arise from re-enabling a disabled binding / materialization,
// or extended outages, we have to download the file and re-upload
// it with an up-to-date name.
//
// This should be a relatively infrequent occurrence, so the
// performance impact + increased data transfer should be tolerable.
if recovery && decryptKey != "" {
if err := sm.writeRenamed(ctx, decryptKey, thisChannel, blob); err != nil {
return fmt.Errorf("writeRenamed: %w", err)
}
Expand Down
Loading