Skip to content

Commit 2046dd0

Browse files
committed
materialize-snowflake: use an atomic for bdec filename counter
It seems possible that two files could get the same file count since this is the a shared counter. While this hasn't been seen in practice it makes sense to prevent.
1 parent c553472 commit 2046dd0

File tree

2 files changed

+7
-8
lines changed

2 files changed

+7
-8
lines changed

materialize-snowflake/stream.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"path"
77
"strconv"
88
"strings"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/aws/aws-sdk-go-v2/credentials"
@@ -49,7 +50,7 @@ type streamManager struct {
4950
bucket blob.Bucket
5051
bucketExpiresAt time.Time
5152
bdecWriter *bdecWriter
52-
counter int
53+
counter int64
5354
lastBinding int // bookkeeping for when to flush the writer when a new binding starts writing rows
5455

5556
blobStats map[int][]*blobStatsTracker
@@ -68,7 +69,7 @@ func newStreamManager(cfg *config, materialization string, account string, keyBe
6869
channelName: channelName(materialization, keyBegin),
6970
lastBinding: -1,
7071
blobStats: make(map[int][]*blobStatsTracker),
71-
counter: 0,
72+
counter: -1,
7273
}, nil
7374
}
7475

@@ -448,17 +449,15 @@ func (sm *streamManager) getNextFileName(calendar time.Time, clientPrefix string
448449
"_" +
449450
strconv.Itoa(int(sm.keyBegin)) +
450451
"_" +
451-
strconv.Itoa(sm.getAndIncrementCounter()) +
452+
strconv.FormatInt(sm.getAndIncrementCounter(), 10) +
452453
"." +
453454
BLOB_EXTENSION_TYPE
454455

455456
return blobFileName(year + "/" + month + "/" + day + "/" + hour + "/" + minute + "/" + blobShortName)
456457
}
457458

458-
func (sm *streamManager) getAndIncrementCounter() int {
459-
out := sm.counter
460-
sm.counter++
461-
return out
459+
func (sm *streamManager) getAndIncrementCounter() int64 {
460+
return atomic.AddInt64(&sm.counter, 1)
462461
}
463462

464463
// blobToken encodes `baseToken`, which is per-transaction, and the counter `n`

materialize-snowflake/stream_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ func TestGetNextFileName(t *testing.T) {
631631

632632
clientPrefix := "asdfasdf_1234"
633633

634-
sm := &streamManager{keyBegin: 5678}
634+
sm := &streamManager{keyBegin: 5678, counter: -1}
635635

636636
for idx := range 5 {
637637
require.Equal(t,

0 commit comments

Comments
 (0)