Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions materialize-snowflake/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,23 @@ import (
"google.golang.org/api/option"
)

const (
logContextKey = "logContextKey"
)

// WithLogger adds the logger to the Context.
func WithLogger(ctx context.Context, logger log.FieldLogger) context.Context {
return context.WithValue(ctx, logContextKey, logger)
}

// Logger retrieves any stored logger or if none found returns a new one.
func Logger(ctx context.Context) log.FieldLogger {
if logger, ok := ctx.Value(logContextKey).(log.FieldLogger); ok {
return logger
}
return log.StandardLogger()
}

// channelName produces a reasonably readable channel name that is globally
// unique per materialization and shard. Channels are specific to a table, so
// there is no need to include the database or schema in the name, and they can
Expand Down Expand Up @@ -230,16 +247,20 @@ func (sm *streamManager) write(ctx context.Context, blobs []*blobMetadata, decry
return fmt.Errorf("unknown channel %s", channelName)
}

logger := log.WithFields(log.Fields{
"schema": schema,
"table": table,
})
ctx = WithLogger(ctx, logger)

for _, blob := range blobs {
blobToken := blob.Chunks[0].Channels[0].OffsetToken
currentChannelToken := thisChannel.OffsetToken
log.WithFields(log.Fields{
"schema": schema,
"table": table,
logger.WithFields(log.Fields{
"currentChannelToken": thisChannel.OffsetToken,
"blobToken": blobToken,
}).Info("evaluating blob registration")
if shouldWrite, err := shouldWriteNextToken(blobToken, currentChannelToken); err != nil {
if shouldWrite, err := shouldWriteNextToken(ctx, blobToken, currentChannelToken); err != nil {
return fmt.Errorf("shouldWriteNextToken: %w", err)
} else if !shouldWrite {
continue
Expand Down Expand Up @@ -311,7 +332,8 @@ func (sm *streamManager) writeRenamed(ctx context.Context, decryptKey string, ch
}
nextName := sm.getNextFileName(time.Now(), fmt.Sprintf("%s_%d", sm.prefix, sm.deploymentId))

ll := log.WithFields(log.Fields{
logger := Logger(ctx)
ll := logger.WithFields(log.Fields{
"oldName": blob.Path,
"newName": nextName,
"token": blob.Chunks[0].Channels[0].OffsetToken,
Expand Down Expand Up @@ -474,12 +496,13 @@ func blobToken(baseToken string, n int) string {
// written or not, based on the `current` persisted token. Generally this means
// if the `n` value for `next` is exactly one larger than `current` (or there is
// no `current`), the blob should be written.
func shouldWriteNextToken(next string, current *string) (bool, error) {
func shouldWriteNextToken(ctx context.Context, next string, current *string) (bool, error) {
logger := Logger(ctx)
if current == nil {
// Maybe this should be more strict and error out unless `next` is the
// first one in the sequence, but that would block cases where a user
// has manually dropped a table for some reason.
log.WithField("nextToken", next).Info("no current token persisted; blob will be registered")
logger.WithField("nextToken", next).Info("no current token persisted; blob will be registered")
return true, nil
}

Expand All @@ -493,7 +516,7 @@ func shouldWriteNextToken(next string, current *string) (bool, error) {
return false, err
}

ll := log.WithFields(log.Fields{
ll := logger.WithFields(log.Fields{
"nextToken": next,
"currentToken": currentToken,
"currentBase": currentBase,
Expand Down
16 changes: 7 additions & 9 deletions materialize-snowflake/stream_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func (s *streamClient) openChannel(ctx context.Context, schema, table, name stri
}

func (s *streamClient) write(ctx context.Context, blob *blobMetadata) error {
logger := Logger(ctx)
type req struct {
Role *string `json:"role,omitempty"`
Blobs []*blobMetadata `json:"blobs"`
Expand Down Expand Up @@ -279,7 +280,7 @@ func (s *streamClient) write(ctx context.Context, blob *blobMetadata) error {
}
}

log.WithFields(log.Fields{
logger.WithFields(log.Fields{
"path": blob.Path,
"md5": blob.MD5,
"rows": blob.Chunks[0].EPS.Rows,
Expand Down Expand Up @@ -335,6 +336,7 @@ func (s *streamClient) channelStatus(ctx context.Context, clientSeq int, schema,
}

func (s *streamClient) waitForTokenPersisted(ctx context.Context, token string, clientSeq int, schema, table, name string) error {
logger := Logger(ctx)
maxBackoff := 1 * time.Second
backoff := 100 * time.Millisecond
ts := time.Now()
Expand All @@ -350,10 +352,8 @@ func (s *streamClient) waitForTokenPersisted(ctx context.Context, token string,
}

if n > 10 {
log.WithFields(log.Fields{
logger.WithFields(log.Fields{
"attempt": n,
"schema": schema,
"table": table,
"token": token,
}).Info("channel offset token not yet persisted")
}
Expand All @@ -362,11 +362,9 @@ func (s *streamClient) waitForTokenPersisted(ctx context.Context, token string,
backoff = min(backoff*2, maxBackoff)
}

log.WithFields(log.Fields{
"schema": schema,
"table": table,
"token": token,
"took": time.Since(ts).String(),
logger.WithFields(log.Fields{
"token": token,
"took": time.Since(ts).String(),
}).Info("channel offset token persisted")

return nil
Expand Down
2 changes: 1 addition & 1 deletion materialize-snowflake/stream_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestShouldWriteNextToken(t *testing.T) {
},
} {
t.Run(tt.name, func(t *testing.T) {
got, err := shouldWriteNextToken(tt.blobToken, tt.currentToken)
got, err := shouldWriteNextToken(t.Context(), tt.blobToken, tt.currentToken)
if tt.wantErr {
require.Error(t, err)
} else {
Expand Down
Loading