Skip to content

Commit b5ee878

Browse files
authored
feat: integrate processor partition migrator in processor and embedded apps (#6622)
🔒 Scanned for secrets using gitleaks 8.30.0 # Description Integrating processor partition migrator in embedded and processor apps & introducing an integration test for the embedded scenario: - Propagating the `cancel` function from `main` to the `runner` so that source migrator is able to shutdown the server in case of a timeout while waiting for inflight jobs to settle. - Introducing `clustertest.PartitionRoutingProxy`, a simple http reverse proxy which routes requests to the proper server backend based on user id. - Introducing `clustertest.PartitionMigrationExecutor`, a component which is able to orchestrate one single partition migration through etcd. - Adding optional support for router's network handle to include a `X-Rudder-Instance-Id` header in its requests. - Panicking in jobsdb in case `readerCapacity` & `writerCapacity` are uninitialized. - Creating the `partition_id` index in jobsdb regardless of the `numPartitions` configuration option, since buffer JobsDBs are going to have `numPartitions: 0`, but still, we need to be able to get jobs out of them for specific partitions. - Including integration tests in ci that were previously not running. - Gathering coverage data from rudder-server binary when running as separate process. ## Linear Ticket resolves PIPE-2700 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 1cffea7 commit b5ee878

File tree

76 files changed

+1610
-356
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+1610
-356
lines changed

.github/workflows/tests.yaml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,16 +175,19 @@ jobs:
175175
package:
176176
- cluster
177177
- jobsdb
178+
- integration_test/backendconfigunavailability
178179
- integration_test/docker_test
179180
- integration_test/multi_tenant_test
181+
- integration_test/partitionmigration
180182
- integration_test/reporting_dropped_events
181183
- integration_test/reporting_error_index
182184
- integration_test/warehouse
183-
- integration_test/tracing
184-
- integration_test/backendconfigunavailability
185-
- integration_test/trackedusersreporting
185+
- integration_test/retl_test
186186
- integration_test/snowpipestreaming
187187
- integration_test/srchydration
188+
- integration_test/tracing
189+
- integration_test/trackedusersreporting
190+
- integration_test/transformer_contract
188191
- processor
189192
- regulation-worker
190193
- router

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ workspaceConfig.json
1010
imports/enterprise.go
1111
*.coverprofile
1212
junit*.xml
13-
**/profile.out
13+
**/*profile.out
1414
**/*.test
1515
.idea/*
1616
build/regulation-worker

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ test-teardown:
6565
echo "Tests passed, tearing down..." ;\
6666
rm -f $(TESTFILE) ;\
6767
echo "mode: atomic" > coverage.txt ;\
68-
find . -name "profile.out" | while read file; do grep -v 'mode: atomic' $${file} >> coverage.txt; rm -f $${file}; done ;\
68+
find . -name "*profile.out" | while read file; do grep -Ev 'mode: atomic|mode: set' $${file} >> coverage.txt; rm -f $${file}; done ;\
6969
else \
70-
rm -f coverage.txt coverage.html ; find . -name "profile.out" | xargs rm -f ;\
70+
rm -f coverage.txt coverage.html ; find . -name "*profile.out" | xargs rm -f ;\
7171
echo "Tests failed :-(" ;\
7272
exit 1 ;\
7373
fi

app/apphandlers/apphandlers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestAppHandlerStartSequence(t *testing.T) {
4040
ctx, cancel := context.WithCancel(context.Background())
4141
defer cancel()
4242
t.Run("it shouldn't be able to start without setup being called first", func(t *testing.T) {
43-
require.Error(t, appHandler.StartRudderCore(ctx, options))
43+
require.Error(t, appHandler.StartRudderCore(ctx, cancel, options))
4444
})
4545

4646
t.Run("it shouldn't be able to setup if database is down", func(t *testing.T) {
@@ -167,7 +167,7 @@ func TestAppHandlerStartSequence(t *testing.T) {
167167

168168
wg, ctx := errgroup.WithContext(ctx)
169169
wg.Go(func() error {
170-
err := appHandler.StartRudderCore(ctx, options)
170+
err := appHandler.StartRudderCore(ctx, cancel, options)
171171
if err != nil {
172172
t.Fatalf("rudder-server exited with error: %v", err)
173173
}

app/apphandlers/embeddedAppHandler.go

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (a *embeddedApp) Setup() error {
7777
return nil
7878
}
7979

80-
func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) error {
80+
func (a *embeddedApp) StartRudderCore(ctx context.Context, shutdownFn func(), options *app.Options) error {
8181
config := config.Default
8282
statsFactory := stats.Default
8383

@@ -158,31 +158,33 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
158158
// This separate gateway db is created just to be used with gateway because in case of degraded mode,
159159
// the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it
160160
// will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode.
161-
gatewayDB := jobsdb.NewForWrite(
161+
gwWOHandle := jobsdb.NewForWrite(
162162
"gw",
163163
jobsdb.WithClearDB(options.ClearDB),
164164
jobsdb.WithStats(statsFactory),
165165
jobsdb.WithDBHandle(dbPool),
166166
jobsdb.WithNumPartitions(partitionCount),
167167
)
168-
defer gatewayDB.Close()
169-
if err = gatewayDB.Start(); err != nil {
168+
defer gwWOHandle.Close()
169+
if err = gwWOHandle.Start(); err != nil {
170170
return fmt.Errorf("could not start gateway: %w", err)
171171
}
172-
defer gatewayDB.Stop()
172+
defer gwWOHandle.Stop()
173+
var gwWODB jobsdb.JobsDB = gwWOHandle
173174

174-
// This gwDBForProcessor should only be used by processor as this is supposed to be stopped and started with the
175-
// Processor.
176-
gwDBForProcessor := jobsdb.NewForRead(
175+
gwROHandle := jobsdb.NewForRead(
177176
"gw",
178177
jobsdb.WithClearDB(options.ClearDB),
179178
jobsdb.WithDSLimit(a.config.gwDSLimit),
180179
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
181180
jobsdb.WithStats(statsFactory),
182181
jobsdb.WithDBHandle(dbPool),
182+
jobsdb.WithNumPartitions(partitionCount),
183183
)
184-
defer gwDBForProcessor.Close()
185-
routerDBHandle := jobsdb.NewForReadWrite(
184+
defer gwROHandle.Close()
185+
var gwRODB jobsdb.JobsDB = gwROHandle
186+
187+
rtRWHandle := jobsdb.NewForReadWrite(
186188
"rt",
187189
jobsdb.WithClearDB(options.ClearDB),
188190
jobsdb.WithDSLimit(a.config.rtDSLimit),
@@ -191,10 +193,10 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
191193
jobsdb.WithDBHandle(dbPool),
192194
jobsdb.WithNumPartitions(partitionCount),
193195
)
194-
defer routerDBHandle.Close()
195-
routerDB := jobsdb.NewPendingEventsJobsDB(routerDBHandle, pendingEventsRegistry)
196+
defer rtRWHandle.Close()
197+
rtRWDB := jobsdb.NewPendingEventsJobsDB(rtRWHandle, pendingEventsRegistry)
196198

197-
batchRouterDBHandle := jobsdb.NewForReadWrite(
199+
brtRWHandle := jobsdb.NewForReadWrite(
198200
"batch_rt",
199201
jobsdb.WithClearDB(options.ClearDB),
200202
jobsdb.WithDSLimit(a.config.batchrtDSLimit),
@@ -203,20 +205,20 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
203205
jobsdb.WithDBHandle(dbPool),
204206
jobsdb.WithNumPartitions(partitionCount),
205207
)
206-
defer batchRouterDBHandle.Close()
207-
batchRouterDB := jobsdb.NewPendingEventsJobsDB(batchRouterDBHandle, pendingEventsRegistry)
208+
defer brtRWHandle.Close()
209+
brtRWDB := jobsdb.NewPendingEventsJobsDB(brtRWHandle, pendingEventsRegistry)
208210

209-
schemaDB := jobsdb.NewForReadWrite(
211+
eschRWDB := jobsdb.NewForReadWrite(
210212
"esch",
211213
jobsdb.WithClearDB(options.ClearDB),
212214
jobsdb.WithDSLimit(a.config.eschDSLimit),
213215
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
214216
jobsdb.WithStats(statsFactory),
215217
jobsdb.WithDBHandle(dbPool),
216218
)
217-
defer schemaDB.Close()
219+
defer eschRWDB.Close()
218220

219-
archivalDB := jobsdb.NewForReadWrite(
221+
arcRWDB := jobsdb.NewForReadWrite(
220222
"arc",
221223
jobsdb.WithClearDB(options.ClearDB),
222224
jobsdb.WithDSLimit(a.config.arcDSLimit),
@@ -225,7 +227,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
225227
jobsdb.WithJobMaxAge(config.GetReloadableDurationVar(24, time.Hour, "archival.jobRetention")),
226228
jobsdb.WithDBHandle(dbPool),
227229
)
228-
defer archivalDB.Close()
230+
defer arcRWDB.Close()
229231

230232
var schemaForwarder schema_forwarder.Forwarder
231233
if config.GetBool("EventSchemas2.enabled", false) {
@@ -234,16 +236,32 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
234236
return err
235237
}
236238
defer client.Close()
237-
schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, schemaDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config, statsFactory)
239+
schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, eschRWDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config, statsFactory)
238240
} else {
239-
schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, schemaDB, logger.NewLogger().Child("jobs_forwarder"), config, statsFactory)
241+
schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, eschRWDB, logger.NewLogger().Child("jobs_forwarder"), config, statsFactory)
240242
}
241243

242244
modeProvider, err := resolveModeProvider(a.log, deploymentType)
243245
if err != nil {
244246
return err
245247
}
246248

249+
// setup partition migrator
250+
ppmSetup, err := setupProcessorPartitionMigrator(ctx, shutdownFn, dbPool,
251+
config, statsFactory,
252+
gwRODB, gwWODB,
253+
rtRWDB, brtRWDB,
254+
modeProvider.EtcdClient,
255+
)
256+
defer ppmSetup.Finally() // always run finally to clean up resources regardless of error
257+
if err != nil {
258+
return fmt.Errorf("setting up partition migrator: %w", err)
259+
}
260+
partitionMigrator := ppmSetup.PartitionMigrator
261+
gwWODB = ppmSetup.GwDB
262+
rtRWDB = ppmSetup.RtDB
263+
brtRWDB = ppmSetup.BrtDB
264+
247265
adaptiveLimit := payload.SetupAdaptiveLimiter(ctx, g)
248266

249267
enrichers, err := setupPipelineEnrichers(config, a.log, statsFactory)
@@ -260,11 +278,11 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
260278
proc := processor.New(
261279
ctx,
262280
&options.ClearDB,
263-
gwDBForProcessor,
264-
routerDB,
265-
batchRouterDB,
266-
schemaDB,
267-
archivalDB,
281+
gwRODB,
282+
rtRWDB,
283+
brtRWDB,
284+
eschRWDB,
285+
arcRWDB,
268286
reporting,
269287
transientSources,
270288
fileUploaderProvider,
@@ -288,7 +306,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
288306
BackendConfig: backendconfig.DefaultBackendConfig,
289307
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple routers can share the same cache and not hit the DB every time
290308
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
291-
routerDB,
309+
rtRWDB,
292310
),
293311
TransientSources: transientSources,
294312
RsourcesService: rsourcesService,
@@ -302,7 +320,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
302320
BackendConfig: backendconfig.DefaultBackendConfig,
303321
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple batch routers can share the same cache and not hit the DB every time
304322
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
305-
batchRouterDB,
323+
brtRWDB,
306324
),
307325
TransientSources: transientSources,
308326
RsourcesService: rsourcesService,
@@ -312,17 +330,18 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
312330
rt := routerManager.New(rtFactory, brtFactory, backendconfig.DefaultBackendConfig, logger.NewLogger())
313331

314332
dm := cluster.Dynamic{
315-
Provider: modeProvider,
316-
GatewayDB: gwDBForProcessor,
317-
RouterDB: routerDB,
318-
BatchRouterDB: batchRouterDB,
319-
EventSchemaDB: schemaDB,
320-
ArchivalDB: archivalDB,
321-
Processor: proc,
322-
Router: rt,
323-
SchemaForwarder: schemaForwarder,
333+
Provider: modeProvider,
334+
GatewayDB: gwRODB,
335+
RouterDB: rtRWDB,
336+
BatchRouterDB: brtRWDB,
337+
EventSchemaDB: eschRWDB,
338+
ArchivalDB: arcRWDB,
339+
PartitionMigrator: partitionMigrator,
340+
Processor: proc,
341+
Router: rt,
342+
SchemaForwarder: schemaForwarder,
324343
Archiver: archiver.New(
325-
archivalDB,
344+
arcRWDB,
326345
fileUploaderProvider,
327346
config,
328347
statsFactory,
@@ -348,7 +367,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
348367
streamMsgValidator := stream.NewMessageValidator()
349368
gw := gateway.Handle{}
350369
err = gw.Setup(ctx, config, logger.NewLogger().Child("gateway"), statsFactory, a.app, backendconfig.DefaultBackendConfig,
351-
gatewayDB, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
370+
gwWODB, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
352371
streamMsgValidator, gateway.WithInternalHttpHandlers(
353372
map[string]http.Handler{
354373
"/drain": drainConfigManager.DrainConfigHttpHandler(),

app/apphandlers/gatewayAppHandler.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (a *gatewayApp) Setup() error {
4848
return nil
4949
}
5050

51-
func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) error {
51+
func (a *gatewayApp) StartRudderCore(ctx context.Context, _ func(), options *app.Options) error {
5252
config := config.Default
5353
statsFactory := stats.Default
5454
if !a.setupDone {
@@ -80,32 +80,38 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
8080
}
8181
partitionCount := config.GetIntVar(0, 1, "JobsDB.partitionCount")
8282

83-
gatewayDB := jobsdb.NewForWrite(
83+
gwWOHandle := jobsdb.NewForWrite(
8484
"gw",
8585
jobsdb.WithClearDB(options.ClearDB),
8686
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
8787
jobsdb.WithStats(statsFactory),
8888
jobsdb.WithDBHandle(dbPool),
8989
jobsdb.WithNumPartitions(partitionCount),
9090
)
91-
defer gatewayDB.Close()
91+
defer gwWOHandle.Close()
92+
var gwWODB jobsdb.JobsDB = gwWOHandle
9293

93-
if err := gatewayDB.Start(); err != nil {
94+
if err := gwWODB.Start(); err != nil {
9495
return fmt.Errorf("could not start gatewayDB: %w", err)
9596
}
96-
defer gatewayDB.Stop()
97+
defer gwWODB.Stop()
9798

9899
g, ctx := errgroup.WithContext(ctx)
99100

100101
modeProvider, err := resolveModeProvider(a.log, deploymentType)
101102
if err != nil {
102-
return err
103+
return fmt.Errorf("resolving mode provider: %w", err)
103104
}
104-
105-
dm := cluster.Dynamic{
106-
Provider: modeProvider,
107-
GatewayComponent: true,
105+
partitionMigrator, gwWODB, err := setupGatewayPartitionMigrator(ctx, config, gwWODB)
106+
if err != nil {
107+
return fmt.Errorf("setting up partition migrator: %w", err)
108108
}
109+
if err := partitionMigrator.Start(); err != nil {
110+
return fmt.Errorf("starting partition migrator: %w", err)
111+
}
112+
defer partitionMigrator.Stop()
113+
114+
dm := cluster.Dynamic{Provider: modeProvider, GatewayComponent: true}
109115
g.Go(func() error {
110116
return dm.Run(ctx)
111117
})
@@ -136,7 +142,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
136142
}
137143
streamMsgValidator := stream.NewMessageValidator()
138144
err = gw.Setup(ctx, config, logger.NewLogger().Child("gateway"), statsFactory, a.app, backendconfig.DefaultBackendConfig,
139-
gatewayDB, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
145+
gwWODB, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
140146
streamMsgValidator, gateway.WithInternalHttpHandlers(
141147
map[string]http.Handler{
142148
"/drain": drainConfigHttpHandler,

0 commit comments

Comments
 (0)