Skip to content

Commit 9f9adeb

Browse files
committed
wip
🔒 Scanned for secrets using gitleaks 8.30.0
1 parent 2236f81 commit 9f9adeb

File tree

14 files changed

+151
-150
lines changed

14 files changed

+151
-150
lines changed

app/apphandlers/embeddedAppHandler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,13 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, shutdownFn func(), op
247247
}
248248

249249
// setup partition migrator
250-
partitionMigrator, gwWODB, rtRWDB, brtRWDB, err := setupProcessorPartitionMigrator(ctx, shutdownFn, dbPool,
250+
partitionMigrator, gwWODB, rtRWDB, brtRWDB, finally, err := setupProcessorPartitionMigrator(ctx, shutdownFn, dbPool,
251251
config, statsFactory,
252252
gwRODB, gwWODB,
253253
rtRWDB, brtRWDB,
254254
modeProvider.EtcdClient,
255255
)
256+
defer finally() // always defer finally
256257
if err != nil {
257258
return fmt.Errorf("setting up partition migrator: %w", err)
258259
}

app/apphandlers/processorAppHandler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,13 @@ func (a *processorApp) StartRudderCore(ctx context.Context, shutdownFn func(), o
232232
}
233233

234234
// setup partition migrator
235-
partitionMigrator, gwRODB, rtRWDB, brtRWDB, err := setupProcessorPartitionMigrator(ctx, shutdownFn, dbPool,
235+
partitionMigrator, gwRODB, rtRWDB, brtRWDB, finally, err := setupProcessorPartitionMigrator(ctx, shutdownFn, dbPool,
236236
config, statsFactory,
237237
gwRODB, nil,
238238
rtRWDB, brtRWDB,
239239
modeProvider.EtcdClient,
240240
)
241+
defer finally() // always defer finally
241242
if err != nil {
242243
return fmt.Errorf("setting up partition migrator: %w", err)
243244
}

app/apphandlers/setup_partitionmigration.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,22 @@ func setupProcessorPartitionMigrator(ctx context.Context,
3333
gwRODB, gwWODB,
3434
rtRWDB, brtRWDB jobsdb.JobsDB,
3535
etcdClientProvider func() (etcdclient.Client, error),
36-
) (partitionMigrator PartitionMigrator, gwDB, rtDB, brtDB jobsdb.JobsDB, err error) {
36+
) (partitionMigrator PartitionMigrator, gwDB, rtDB, brtDB jobsdb.JobsDB, finally func(), err error) {
3737
if !config.GetBool("PartitionMigration.enabled", false) {
38+
finally = func() {}
3839
if gwWODB == nil {
3940
// caller expects to get reader gw db back if writer is nil
40-
return &noOpPartitionMigrator{}, gwRODB, rtRWDB, brtRWDB, nil
41+
return &noOpPartitionMigrator{}, gwRODB, rtRWDB, brtRWDB, finally, nil
4142
} else {
4243
// caller expects to get writer gw db back if writer is not nil
43-
return &noOpPartitionMigrator{}, gwWODB, rtRWDB, brtRWDB, nil
44+
return &noOpPartitionMigrator{}, gwWODB, rtRWDB, brtRWDB, finally, nil
4445
}
4546
}
4647

4748
log := logger.NewLogger().Child("partitionmigrator")
4849
partitionCount := config.GetIntVar(0, 1, "JobsDB.partitionCount")
4950
if partitionCount == 0 {
50-
return nil, nil, nil, nil, fmt.Errorf("partition migrator needs partition count > 0")
51+
return nil, nil, nil, nil, nil, fmt.Errorf("partition migrator needs partition count > 0")
5152
}
5253
bufferFlushBatchSize := config.GetReloadableIntVar(20000, 1, "PartitionMigration.bufferFlushBatchSize")
5354
bufferFlushPayloadSize := config.GetReloadableInt64Var(500, bytesize.MB, "PartitionMigration.bufferFlushPayloadSize")
@@ -66,6 +67,12 @@ func setupProcessorPartitionMigrator(ctx context.Context,
6667
jobsdb.WithStats(stats),
6768
jobsdb.WithDBHandle(dbPool),
6869
)
70+
if err := gwBuffRWHandle.Start(); err != nil {
71+
return nil, nil, nil, nil, finally, fmt.Errorf("starting gw buffer jobsdb handle: %w", err)
72+
}
73+
finally = func() {
74+
gwBuffRWHandle.Stop()
75+
}
6976
gwSetupOpt = partitionbuffer.WithSeparateReaderAndWriterPrimaryJobsDBs(gwRODB, gwWODB, gwBuffRWHandle)
7077
} else {
7178
// we have only a reader gw DB, so we create a buffer with reader and
@@ -98,7 +105,7 @@ func setupProcessorPartitionMigrator(ctx context.Context,
98105
partitionbuffer.WithFlushMoveTimeout(bufferFlushMoveTimeout),
99106
)
100107
if err != nil {
101-
return nil, nil, nil, nil, fmt.Errorf("creating gw partition buffer: %w", err)
108+
return nil, nil, nil, nil, finally, fmt.Errorf("creating gw partition buffer: %w", err)
102109
}
103110

104111
// setup partition buffer for router jobsDB
@@ -120,7 +127,7 @@ func setupProcessorPartitionMigrator(ctx context.Context,
120127
partitionbuffer.WithFlushMoveTimeout(bufferFlushMoveTimeout),
121128
)
122129
if err != nil {
123-
return nil, nil, nil, nil, fmt.Errorf("creating rt partition buffer: %w", err)
130+
return nil, nil, nil, nil, finally, fmt.Errorf("creating rt partition buffer: %w", err)
124131
}
125132

126133
// setup partition buffer for batchrouter jobsDB
@@ -142,21 +149,21 @@ func setupProcessorPartitionMigrator(ctx context.Context,
142149
partitionbuffer.WithFlushMoveTimeout(bufferFlushMoveTimeout),
143150
)
144151
if err != nil {
145-
return nil, nil, nil, nil, fmt.Errorf("creating batch rt partition buffer: %w", err)
152+
return nil, nil, nil, nil, finally, fmt.Errorf("creating batch rt partition buffer: %w", err)
146153
}
147154

148155
// setup partition migrator
149156
etcdClient, err := etcdClientProvider()
150157
if err != nil {
151-
return nil, nil, nil, nil, fmt.Errorf("getting etcd client: %w", err)
158+
return nil, nil, nil, nil, finally, fmt.Errorf("getting etcd client: %w", err)
152159
}
153160
nodeIndex := config.GetIntVar(-1, 1, "PROCESSOR_INDEX")
154161
if nodeIndex < 0 {
155-
return nil, nil, nil, nil, fmt.Errorf("got invalid node index from config: %d", nodeIndex)
162+
return nil, nil, nil, nil, finally, fmt.Errorf("got invalid node index from config: %d", nodeIndex)
156163
}
157164
nodeName := config.GetStringVar("", "HOSTNAME")
158165
if nodeName == "" {
159-
return nil, nil, nil, nil, fmt.Errorf("got empty node name from config")
166+
return nil, nil, nil, nil, finally, fmt.Errorf("got empty node name from config")
160167
}
161168

162169
targetURLProvider, err := func() (func(targetNodeIndex int) (string, error), error) {
@@ -177,7 +184,7 @@ func setupProcessorPartitionMigrator(ctx context.Context,
177184
}, nil
178185
}()
179186
if err != nil {
180-
return nil, nil, nil, nil, fmt.Errorf("creating target URL provider: %w", err)
187+
return nil, nil, nil, nil, finally, fmt.Errorf("creating target URL provider: %w", err)
181188
}
182189

183190
sourceMigrator, err := sourcenode.NewMigratorBuilder(nodeIndex, nodeName).
@@ -190,7 +197,7 @@ func setupProcessorPartitionMigrator(ctx context.Context,
190197
WithReaderJobsDBs([]jobsdb.JobsDB{gwRODB, rtRWDB, brtRWDB}).
191198
Build()
192199
if err != nil {
193-
return nil, nil, nil, nil, fmt.Errorf("creating source node migrator: %w", err)
200+
return nil, nil, nil, nil, finally, fmt.Errorf("creating source node migrator: %w", err)
194201
}
195202

196203
targetMigrator, err := targetnode.NewMigratorBuilder(nodeIndex, nodeName).
@@ -205,7 +212,7 @@ func setupProcessorPartitionMigrator(ctx context.Context,
205212
WithUnbufferedJobsDBs([]jobsdb.JobsDB{rtRWDB, brtRWDB, gwWODB}).
206213
Build()
207214
if err != nil {
208-
return nil, nil, nil, nil, fmt.Errorf("creating target node migrator: %w", err)
215+
return nil, nil, nil, nil, finally, fmt.Errorf("creating target node migrator: %w", err)
209216
}
210217
ppm, err := processor.NewProcessorPartitionMigratorBuilder(nodeIndex, nodeName).
211218
WithConfig(config).
@@ -216,9 +223,9 @@ func setupProcessorPartitionMigrator(ctx context.Context,
216223
WithTargetMigrator(targetMigrator).
217224
Build()
218225
if err != nil {
219-
return nil, nil, nil, nil, fmt.Errorf("creating processor partition migrator: %w", err)
226+
return nil, nil, nil, nil, finally, fmt.Errorf("creating processor partition migrator: %w", err)
220227
}
221-
return ppm, gwPartitionBuffer, rtPartitionBuffer, brtPartitionBuffer, nil
228+
return ppm, gwPartitionBuffer, rtPartitionBuffer, brtPartitionBuffer, finally, nil
222229
}
223230

224231
func setupGatewayPartitionMigrator(_ context.Context, _ *config.Config, gwWODB jobsdb.JobsDB,

cluster/migrator/partitionmigration/client/migration_job_executor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ func (mpe *migrationJobExecutor) Run(ctx context.Context) error {
323323
case <-done:
324324
return nil
325325
case <-time.After(mpe.progressPeriod.Load()):
326-
mpe.logger.Infon("Partition migration in progress",
326+
mpe.logger.Infon("Partition migration job in progress",
327327
logger.NewIntField("sent", totalSent.Load()),
328328
logger.NewIntField("acked", totalAcked.Load()),
329329
)
@@ -335,7 +335,7 @@ func (mpe *migrationJobExecutor) Run(ctx context.Context) error {
335335
defer close(done)
336336
if err := streamGroup.Wait(); err != nil {
337337
if ctx.Err() == nil {
338-
mpe.logger.Errorn("Partition migration failed", obskit.Error(err))
338+
mpe.logger.Errorn("Partition migration job failed", obskit.Error(err))
339339
for index, jobs := range unackedBatches { // no need to lock unackedBatches, we are done with sender and receiver goroutines
340340
if err := mpe.updateJobStatus(ctx, jobs, jobsdb.Failed.State, fmt.Errorf("job migration interrupted: %w", err)); err != nil {
341341
mpe.logger.Warnn("Could not mark non-migrated jobs as failed",
@@ -350,13 +350,13 @@ func (mpe *migrationJobExecutor) Run(ctx context.Context) error {
350350
}
351351
return fmt.Errorf("migrating partitions: %w", err)
352352
}
353-
mpe.logger.Infon("Partition migration completed successfully",
353+
mpe.logger.Infon("Partition migration job completed successfully",
354354
logger.NewIntField("total", totalAcked.Load()),
355355
)
356356
return nil
357357
})
358358
err = g.Wait()
359-
mpe.logger.Infon("Partition migration progress final status",
359+
mpe.logger.Infon("Partition migration job progress final status",
360360
logger.NewIntField("sent", totalSent.Load()),
361361
logger.NewIntField("acked", totalAcked.Load()),
362362
obskit.Error(err),

cluster/migrator/processor/sourcenode/sourcenode_migrator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func (m *migrator) waitForNoInProgressJobs(ctx context.Context, sourcePartitions
166166
logger.NewDurationField("elapsed", time.Since(start)),
167167
)
168168
// sleep for a short duration before checking again
169-
if err := misc.SleepCtx(pollCtx, 5*time.Second); err != nil {
169+
if err := misc.SleepCtx(pollCtx, m.c.inProgressPollSleep.Load()); err != nil {
170170
return fmt.Errorf("sleeping while waiting for no in-progress jobs in %q jobsdb: %w", readerJobsDB.Identifier(), err)
171171
}
172172
}
@@ -322,7 +322,7 @@ func (m *migrator) onNewJob(ctx context.Context, key string, job *etcdtypes.Part
322322
delete(m.pendingMigrationJobs, job.JobID)
323323
m.pendingMigrationJobsMu.Unlock()
324324

325-
log.Infon("Partition migration job status marked as moved successfully",
325+
log.Infon("Partition migration job status marked as moved in etcd successfully",
326326
logger.NewIntField("revision", res.Header.Revision),
327327
)
328328
m.stats.NewTaggedStat("partition_mig_src_job", stats.TimerType, m.statsTags()).SendTiming(time.Since(start))

cluster/migrator/processor/sourcenode/sourcenode_migrator_builder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ func (b *MigratorBuilder) Build() (Migrator, error) {
116116
targetURLProvider: b.targetURLProvider,
117117
pendingMigrationJobs: map[string]struct{}{},
118118
}
119-
m.c.readExcludeSleep = b.config.GetReloadableDurationVar(20, time.Second, "PartitionMigration.Processor.SourceNode.readExcludeSleep")
119+
m.c.readExcludeSleep = b.config.GetReloadableDurationVar(30, time.Second, "PartitionMigration.Processor.SourceNode.readExcludeSleep")
120120
m.c.waitForInProgressTimeout = b.config.GetReloadableDurationVar(5, time.Minute, "PartitionMigration.Processor.SourceNode.waitForInProgressTimeout")
121-
m.c.inProgressPollSleep = b.config.GetReloadableDurationVar(1, time.Second, "PartitionMigration.SourceNode.inProgressPollSleep")
121+
m.c.inProgressPollSleep = b.config.GetReloadableDurationVar(5, time.Second, "PartitionMigration.SourceNode.inProgressPollSleep")
122122

123123
return m, nil
124124
}

cluster/migrator/processor/targetnode/targetnode_migrator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func (m *migrator) onNewJob(ctx context.Context, key string, job *etcdtypes.Part
231231
delete(m.pendingMigrationJobs, job.JobID)
232232
m.pendingMigrationJobsMu.Unlock()
233233

234-
log.Infon("Partition migration job status marked as completed successfully",
234+
log.Infon("Partition migration job status marked as completed in etcd successfully",
235235
logger.NewIntField("revision", res.Header.Revision),
236236
)
237237
m.stats.NewTaggedStat("partition_mig_target_job", stats.TimerType, m.statsTags()).SendTiming(time.Since(start))

cluster/partitionbuffer/jobsdb_partition_buffer_factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func WithSeparateReaderAndWriterPrimaryJobsDBs(primaryReader, primaryWriter, buf
6969
b.canStore = true
7070
b.canFlush = true
7171
b.differentBufferDBs = false
72-
b.lifecycleJobsDBs = []jobsdb.JobsDB{primaryReader, buffer} // primaryWriter's lifecycle is externally managed
72+
b.lifecycleJobsDBs = []jobsdb.JobsDB{primaryReader} // primaryWriter's and buffer's lifecycle are externally managed
7373
}
7474
}
7575

cluster/partitionbuffer/jobsdb_partition_buffer_flush.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (b *jobsDBPartitionBuffer) FlushBufferedPartitions(ctx context.Context, par
6363
var totalCount int
6464

6565
// move in batches until we stop reaching limits
66-
b.logger.Infon("flushing jobs from buffer to primary jobsdb (move phase)",
66+
b.logger.Infon("Flushing jobs from buffer to primary jobsdb (move phase)",
6767
logger.NewStringField("partitions", strings.Join(partitions, ",")),
6868
logger.NewStringField("prefix", b.Identifier()),
6969
)
@@ -72,7 +72,7 @@ func (b *jobsDBPartitionBuffer) FlushBufferedPartitions(ctx context.Context, par
7272
select {
7373
case <-moveTimeout:
7474
// timeout reached, break out to switchover
75-
b.logger.Warnn("flush move timeout reached, proceeding to switchover",
75+
b.logger.Warnn("Flush move timeout reached, proceeding to switchover",
7676
logger.NewStringField("partitions", fmt.Sprintf("%v", partitions)),
7777
logger.NewDurationField("duration", time.Since(start)),
7878
)
@@ -87,7 +87,7 @@ func (b *jobsDBPartitionBuffer) FlushBufferedPartitions(ctx context.Context, par
8787
}
8888
}
8989
// switchover
90-
b.logger.Infon("flushing jobs from buffer to primary jobsdb (switchover phase)",
90+
b.logger.Infon("Flushing jobs from buffer to primary jobsdb (switchover phase)",
9191
logger.NewStringField("partitions", strings.Join(partitions, ",")),
9292
logger.NewStringField("prefix", b.Identifier()),
9393
)
@@ -96,7 +96,7 @@ func (b *jobsDBPartitionBuffer) FlushBufferedPartitions(ctx context.Context, par
9696
return fmt.Errorf("switchover of buffered partitions: %w", err)
9797
}
9898
totalCount += switchoverCount
99-
b.logger.Infon("completed flush of buffered partitions",
99+
b.logger.Infon("Flushing of buffered partitions completed successfully",
100100
logger.NewStringField("partitions", strings.Join(partitions, ",")),
101101
logger.NewStringField("prefix", b.Identifier()),
102102
logger.NewDurationField("duration", time.Since(start)),
@@ -164,20 +164,16 @@ func (b *jobsDBPartitionBuffer) switchoverBufferedPartitions(ctx context.Context
164164
"prefix": b.Identifier(),
165165
}).RecordDuration()()
166166

167-
lockCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
168-
if !b.bufferedPartitionsMu.TryLockWithContext(lockCtx) {
169-
cancel()
170-
// TODO: get a goroutine dump to /Users/aristzoumas/Desktop/lock.dump
167+
if !b.bufferedPartitionsMu.TryLockWithContext(ctx) {
171168
return 0, fmt.Errorf("acquiring a buffered partitions write lock during switchover: %w", ctx.Err())
172169
}
173-
cancel()
174-
b.logger.Infon("buffered partitions write lock acquired (switchover phase)",
170+
b.logger.Infon("Buffered partitions write lock acquired (switchover phase)",
175171
logger.NewStringField("partitions", strings.Join(partitionIDs, ",")),
176172
logger.NewStringField("prefix", b.Identifier()),
177173
)
178174
defer func() {
179175
b.bufferedPartitionsMu.Unlock()
180-
b.logger.Infon("buffered partitions write lock released (switchover phase)",
176+
b.logger.Infon("Buffered partitions write lock released (switchover phase)",
181177
logger.NewStringField("partitions", strings.Join(partitionIDs, ",")),
182178
logger.NewStringField("prefix", b.Identifier()),
183179
)

integration_test/partitionmigration/client_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ import (
1414
"time"
1515

1616
"github.com/google/uuid"
17+
"golang.org/x/sync/errgroup"
18+
1719
"github.com/rudderlabs/rudder-go-kit/partmap"
1820
"github.com/rudderlabs/rudder-server/utils/httputil"
19-
"golang.org/x/sync/errgroup"
2021
)
2122

2223
type gatewayClientConfig struct {
@@ -96,7 +97,7 @@ func (gc *gatewayClient) GetTotalSent() int64 {
9697
return gc.totalSent.Load()
9798
}
9899

99-
func (gc *gatewayClient) generateJobs(userID string, startIndex int, jobCount int) (jobs []string) {
100+
func (gc *gatewayClient) generateJobs(userID string, startIndex, jobCount int) (jobs []string) {
100101
for j := startIndex; j < startIndex+jobCount; j++ {
101102
messageID := uuid.New().String()
102103
timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
@@ -114,23 +115,23 @@ func (gc *gatewayClient) generateJobs(userID string, startIndex int, jobCount in
114115
}
115116

116117
func (gc *gatewayClient) getUserIDForPartition(partitionIdx, numPartitions int) string {
117-
const separator = "<<>>"
118118
for i := range 1000000 {
119119
candidate := "user" + strconv.Itoa(i)
120-
idx, _ := partmap.Murmur3Partition32(gwUserID(candidate), uint32(numPartitions))
120+
idx, _ := partmap.Murmur3Partition32(legacyUserID(candidate), uint32(numPartitions))
121121
if idx == uint32(partitionIdx) {
122122
return candidate
123123
}
124124
}
125125
return ""
126126
}
127127

128-
func gwUserID(userID string) string {
128+
// gateway component doesn't hash just the userID, but a weird combination of separators and ids of some sort
129+
func legacyUserID(userID string) string {
129130
const separator = "<<>>"
130131
return separator + userID + separator + userID
131132
}
132133

133-
func (gc *gatewayClient) sendRequest(urlString, writeKey string, partitionKey, payload string) error {
134+
func (gc *gatewayClient) sendRequest(urlString, writeKey, userID, payload string) error {
134135
u, _ := url.Parse(urlString)
135136
u.Path = path.Join(u.Path, "v1", "identify")
136137
requestURL := u.String()
@@ -140,7 +141,7 @@ func (gc *gatewayClient) sendRequest(urlString, writeKey string, partitionKey, p
140141
}
141142
req.Header.Add("Content-Type", "application/json")
142143
req.Header.Add("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(writeKey+":")))
143-
req.Header.Add("X-Partition-Key", gwUserID(partitionKey))
144+
req.Header.Add("X-Partition-Key", legacyUserID(userID))
144145
res, err := http.DefaultClient.Do(req)
145146
defer func() { httputil.CloseResponse(res) }()
146147
if err != nil {

0 commit comments

Comments
 (0)