Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,19 @@ jobs:
package:
- cluster
- jobsdb
- integration_test/backendconfigunavailability
- integration_test/docker_test
- integration_test/multi_tenant_test
- integration_test/partitionmigration
- integration_test/reporting_dropped_events
- integration_test/reporting_error_index
- integration_test/warehouse
- integration_test/tracing
- integration_test/backendconfigunavailability
- integration_test/trackedusersreporting
- integration_test/retl_test
- integration_test/snowpipestreaming
- integration_test/srchydration
- integration_test/tracing
- integration_test/trackedusersreporting
- integration_test/transformer_contract
- processor
- regulation-worker
- router
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ workspaceConfig.json
imports/enterprise.go
*.coverprofile
junit*.xml
**/profile.out
**/*profile.out
**/*.test
.idea/*
build/regulation-worker
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ test-teardown:
echo "Tests passed, tearing down..." ;\
rm -f $(TESTFILE) ;\
echo "mode: atomic" > coverage.txt ;\
find . -name "profile.out" | while read file; do grep -v 'mode: atomic' $${file} >> coverage.txt; rm -f $${file}; done ;\
find . -name "*profile.out" | while read file; do grep -Ev 'mode: atomic|mode: set' $${file} >> coverage.txt; rm -f $${file}; done ;\
else \
rm -f coverage.txt coverage.html ; find . -name "profile.out" | xargs rm -f ;\
rm -f coverage.txt coverage.html ; find . -name "*profile.out" | xargs rm -f ;\
echo "Tests failed :-(" ;\
exit 1 ;\
fi
Expand Down
4 changes: 2 additions & 2 deletions app/apphandlers/apphandlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestAppHandlerStartSequence(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("it shouldn't be able to start without setup being called first", func(t *testing.T) {
require.Error(t, appHandler.StartRudderCore(ctx, options))
require.Error(t, appHandler.StartRudderCore(ctx, cancel, options))
})

t.Run("it shouldn't be able to setup if database is down", func(t *testing.T) {
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestAppHandlerStartSequence(t *testing.T) {

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := appHandler.StartRudderCore(ctx, options)
err := appHandler.StartRudderCore(ctx, cancel, options)
if err != nil {
t.Fatalf("rudder-server exited with error: %v", err)
}
Expand Down
97 changes: 58 additions & 39 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (a *embeddedApp) Setup() error {
return nil
}

func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) error {
func (a *embeddedApp) StartRudderCore(ctx context.Context, shutdownFn func(), options *app.Options) error {
config := config.Default
statsFactory := stats.Default

Expand Down Expand Up @@ -158,31 +158,33 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
// This separate gateway db is created just to be used with gateway because in case of degraded mode,
// the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it
// will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode.
gatewayDB := jobsdb.NewForWrite(
gwWOHandle := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
jobsdb.WithNumPartitions(partitionCount),
)
defer gatewayDB.Close()
if err = gatewayDB.Start(); err != nil {
defer gwWOHandle.Close()
if err = gwWOHandle.Start(); err != nil {
return fmt.Errorf("could not start gateway: %w", err)
}
defer gatewayDB.Stop()
defer gwWOHandle.Stop()
var gwWODB jobsdb.JobsDB = gwWOHandle

// This gwDBForProcessor should only be used by processor as this is supposed to be stopped and started with the
// Processor.
gwDBForProcessor := jobsdb.NewForRead(
gwROHandle := jobsdb.NewForRead(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.gwDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
jobsdb.WithNumPartitions(partitionCount),
)
defer gwDBForProcessor.Close()
routerDBHandle := jobsdb.NewForReadWrite(
defer gwROHandle.Close()
var gwRODB jobsdb.JobsDB = gwROHandle

rtRWHandle := jobsdb.NewForReadWrite(
"rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.rtDSLimit),
Expand All @@ -191,10 +193,10 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithDBHandle(dbPool),
jobsdb.WithNumPartitions(partitionCount),
)
defer routerDBHandle.Close()
routerDB := jobsdb.NewPendingEventsJobsDB(routerDBHandle, pendingEventsRegistry)
defer rtRWHandle.Close()
rtRWDB := jobsdb.NewPendingEventsJobsDB(rtRWHandle, pendingEventsRegistry)

batchRouterDBHandle := jobsdb.NewForReadWrite(
brtRWHandle := jobsdb.NewForReadWrite(
"batch_rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.batchrtDSLimit),
Expand All @@ -203,20 +205,20 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithDBHandle(dbPool),
jobsdb.WithNumPartitions(partitionCount),
)
defer batchRouterDBHandle.Close()
batchRouterDB := jobsdb.NewPendingEventsJobsDB(batchRouterDBHandle, pendingEventsRegistry)
defer brtRWHandle.Close()
brtRWDB := jobsdb.NewPendingEventsJobsDB(brtRWHandle, pendingEventsRegistry)

schemaDB := jobsdb.NewForReadWrite(
eschRWDB := jobsdb.NewForReadWrite(
"esch",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.eschDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer schemaDB.Close()
defer eschRWDB.Close()

archivalDB := jobsdb.NewForReadWrite(
arcRWDB := jobsdb.NewForReadWrite(
"arc",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.arcDSLimit),
Expand All @@ -225,7 +227,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithJobMaxAge(config.GetReloadableDurationVar(24, time.Hour, "archival.jobRetention")),
jobsdb.WithDBHandle(dbPool),
)
defer archivalDB.Close()
defer arcRWDB.Close()

var schemaForwarder schema_forwarder.Forwarder
if config.GetBool("EventSchemas2.enabled", false) {
Expand All @@ -234,16 +236,32 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
return err
}
defer client.Close()
schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, schemaDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config, statsFactory)
schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, eschRWDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config, statsFactory)
} else {
schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, schemaDB, logger.NewLogger().Child("jobs_forwarder"), config, statsFactory)
schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, eschRWDB, logger.NewLogger().Child("jobs_forwarder"), config, statsFactory)
}

modeProvider, err := resolveModeProvider(a.log, deploymentType)
if err != nil {
return err
}

// setup partition migrator
ppmSetup, err := setupProcessorPartitionMigrator(ctx, shutdownFn, dbPool,
config, statsFactory,
gwRODB, gwWODB,
rtRWDB, brtRWDB,
modeProvider.EtcdClient,
)
defer ppmSetup.Finally() // always run finally to clean up resources regardless of error
if err != nil {
return fmt.Errorf("setting up partition migrator: %w", err)
}
partitionMigrator := ppmSetup.PartitionMigrator
gwWODB = ppmSetup.GwDB
rtRWDB = ppmSetup.RtDB
brtRWDB = ppmSetup.BrtDB

adaptiveLimit := payload.SetupAdaptiveLimiter(ctx, g)

enrichers, err := setupPipelineEnrichers(config, a.log, statsFactory)
Expand All @@ -260,11 +278,11 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
proc := processor.New(
ctx,
&options.ClearDB,
gwDBForProcessor,
routerDB,
batchRouterDB,
schemaDB,
archivalDB,
gwRODB,
rtRWDB,
brtRWDB,
eschRWDB,
arcRWDB,
reporting,
transientSources,
fileUploaderProvider,
Expand All @@ -288,7 +306,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple routers can share the same cache and not hit the DB every time
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
routerDB,
rtRWDB,
),
TransientSources: transientSources,
RsourcesService: rsourcesService,
Expand All @@ -302,7 +320,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple batch routers can share the same cache and not hit the DB every time
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
batchRouterDB,
brtRWDB,
),
TransientSources: transientSources,
RsourcesService: rsourcesService,
Expand All @@ -312,17 +330,18 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
rt := routerManager.New(rtFactory, brtFactory, backendconfig.DefaultBackendConfig, logger.NewLogger())

dm := cluster.Dynamic{
Provider: modeProvider,
GatewayDB: gwDBForProcessor,
RouterDB: routerDB,
BatchRouterDB: batchRouterDB,
EventSchemaDB: schemaDB,
ArchivalDB: archivalDB,
Processor: proc,
Router: rt,
SchemaForwarder: schemaForwarder,
Provider: modeProvider,
GatewayDB: gwRODB,
RouterDB: rtRWDB,
BatchRouterDB: brtRWDB,
EventSchemaDB: eschRWDB,
ArchivalDB: arcRWDB,
PartitionMigrator: partitionMigrator,
Processor: proc,
Router: rt,
SchemaForwarder: schemaForwarder,
Archiver: archiver.New(
archivalDB,
arcRWDB,
fileUploaderProvider,
config,
statsFactory,
Expand All @@ -348,7 +367,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
streamMsgValidator := stream.NewMessageValidator()
gw := gateway.Handle{}
err = gw.Setup(ctx, config, logger.NewLogger().Child("gateway"), statsFactory, a.app, backendconfig.DefaultBackendConfig,
gatewayDB, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
gwWODB, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
streamMsgValidator, gateway.WithInternalHttpHandlers(
map[string]http.Handler{
"/drain": drainConfigManager.DrainConfigHttpHandler(),
Expand Down
28 changes: 17 additions & 11 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (a *gatewayApp) Setup() error {
return nil
}

func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) error {
func (a *gatewayApp) StartRudderCore(ctx context.Context, _ func(), options *app.Options) error {
config := config.Default
statsFactory := stats.Default
if !a.setupDone {
Expand Down Expand Up @@ -80,32 +80,38 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
}
partitionCount := config.GetIntVar(0, 1, "JobsDB.partitionCount")

gatewayDB := jobsdb.NewForWrite(
gwWOHandle := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
jobsdb.WithNumPartitions(partitionCount),
)
defer gatewayDB.Close()
defer gwWOHandle.Close()
var gwWODB jobsdb.JobsDB = gwWOHandle

if err := gatewayDB.Start(); err != nil {
if err := gwWODB.Start(); err != nil {
return fmt.Errorf("could not start gatewayDB: %w", err)
}
defer gatewayDB.Stop()
defer gwWODB.Stop()

g, ctx := errgroup.WithContext(ctx)

modeProvider, err := resolveModeProvider(a.log, deploymentType)
if err != nil {
return err
return fmt.Errorf("resolving mode provider: %w", err)
}

dm := cluster.Dynamic{
Provider: modeProvider,
GatewayComponent: true,
partitionMigrator, gwWODB, err := setupGatewayPartitionMigrator(ctx, config, gwWODB)
if err != nil {
return fmt.Errorf("setting up partition migrator: %w", err)
}
if err := partitionMigrator.Start(); err != nil {
return fmt.Errorf("starting partition migrator: %w", err)
}
defer partitionMigrator.Stop()

dm := cluster.Dynamic{Provider: modeProvider, GatewayComponent: true}
g.Go(func() error {
return dm.Run(ctx)
})
Expand Down Expand Up @@ -136,7 +142,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
}
streamMsgValidator := stream.NewMessageValidator()
err = gw.Setup(ctx, config, logger.NewLogger().Child("gateway"), statsFactory, a.app, backendconfig.DefaultBackendConfig,
gatewayDB, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
gwWODB, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
streamMsgValidator, gateway.WithInternalHttpHandlers(
map[string]http.Handler{
"/drain": drainConfigHttpHandler,
Expand Down
Loading
Loading