Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1c01dd2
chore: improve batch router
cisse21 Apr 1, 2025
9000924
Merge branch 'master' into chore.improveBRT
cisse21 Apr 1, 2025
4e80e7c
Merge branch 'master' into chore.improveBRT
cisse21 Apr 2, 2025
3758ae6
chore: fix panic
cisse21 Apr 2, 2025
57ec688
Merge branch 'chore.improveBRT' of github.com:rudderlabs/rudder-serve…
cisse21 Apr 2, 2025
8cf6ba8
Merge branch 'master' into chore.improveBRT
achettyiitr Apr 2, 2025
a3235c1
chore:panic if unknown dest
cisse21 Apr 2, 2025
47dcdb2
Merge branch 'chore.improveBRT' of github.com:rudderlabs/rudder-serve…
cisse21 Apr 2, 2025
dc2319f
chore: debug logs
cisse21 Apr 2, 2025
c8c09ad
chore: debug logs
cisse21 Apr 3, 2025
3da0a05
Merge branch 'master' into chore.improveBRT
cisse21 Apr 3, 2025
febaa01
Merge branch 'master' into chore.improveBRT
cisse21 Apr 4, 2025
ab5def2
chore: introduce multiple goroutines to upload
cisse21 Apr 4, 2025
7dccdca
Merge branch 'master' into chore.improveBRT
cisse21 Apr 4, 2025
ef7b680
Merge branch 'master' into chore.improveBRT
cisse21 Apr 7, 2025
d890c52
chore: debug
cisse21 Apr 7, 2025
3aed37a
chore: move drain early in the pipeline
cisse21 Apr 7, 2025
5a59bfc
Merge branch 'master' into chore.improveBRT
cisse21 Apr 8, 2025
0acbc7d
chore: fix tests
cisse21 Apr 8, 2025
79f32b9
chore: use upload freq from config
cisse21 Apr 9, 2025
f12c6eb
Merge branch 'master' into chore.improveBRT
cisse21 Apr 10, 2025
5e71b04
Merge branch 'master' into chore.improveBRT
cisse21 Apr 11, 2025
0364154
Merge branch 'chore.improveBRT' of github.com:rudderlabs/rudder-serve…
cisse21 Apr 11, 2025
98d85dc
chore: improve buffer functionality
cisse21 Apr 11, 2025
4ec4da5
Merge branch 'master' into chore.improveBRT
cisse21 Apr 14, 2025
b4ed2ae
chore: review comments
cisse21 Apr 14, 2025
d3af8db
chore: clean up
cisse21 Apr 14, 2025
ba9d27a
Merge branch 'master' into chore.improveBRT
cisse21 Apr 14, 2025
f2776fc
chore: review comments
cisse21 Apr 14, 2025
cfb8844
chore: combine batch worker and connection worker
cisse21 Apr 14, 2025
d554cb6
chore: introduce consumer worker
cisse21 Apr 14, 2025
8fbc86d
chore: fix lint issues
cisse21 Apr 14, 2025
f76f494
Merge branch 'master' into chore.improveBRT
cisse21 Apr 14, 2025
6d1ceb7
chore: use limiter
cisse21 Apr 14, 2025
6004f1a
chore: fix bug
cisse21 Apr 14, 2025
a6c1a28
chore: fix bug
cisse21 Apr 14, 2025
0f1f12b
chore: decouple consumer worker
cisse21 Apr 14, 2025
f1f2434
chore: refactor 2
cisse21 Apr 15, 2025
ba83d73
chore: refactor 3
cisse21 Apr 15, 2025
e76e8a4
chore: lint fixed
cisse21 Apr 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,5 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v7
with:
version: v2.0.2
version: v2.1.1
args: -v
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ TESTFILE=_testok
MOUNT_PATH=/local

# go tools versions
GOLANGCI=github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.0.2
GOLANGCI=github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.1.1
gofumpt=mvdan.cc/gofumpt@latest
govulncheck=golang.org/x/vuln/cmd/govulncheck@latest
goimports=golang.org/x/tools/cmd/goimports@latest
Expand Down
12 changes: 6 additions & 6 deletions integration_test/docker_test/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ func testCases(t *testing.T) {
eventSql := "select anonymous_id, user_id from dev_integration_test_1.identifies limit 1;"
_ = db.QueryRow(eventSql).Scan(&myEvent.anonymousID, &myEvent.userID)
return myEvent.anonymousID == "anonymousId_1"
}, time.Minute, 10*time.Millisecond)
}, 2*time.Minute, 10*time.Millisecond)
require.Eventually(t, func() bool {
eventSql := "select count(*) from dev_integration_test_1.identifies;"
_ = db.QueryRow(eventSql).Scan(&myEvent.count)
return myEvent.count == "2"
}, time.Minute, 10*time.Millisecond)
}, 2*time.Minute, 10*time.Millisecond)

// Verify User Transformation
eventSql := "select context_myuniqueid,context_id,context_ip from dev_integration_test_1.identifies;"
Expand All @@ -194,12 +194,12 @@ func testCases(t *testing.T) {
eventSql := "select anonymous_id, user_id from dev_integration_test_1.users limit 1;"
_ = db.QueryRow(eventSql).Scan(&myEvent.anonymousID, &myEvent.userID)
return myEvent.anonymousID == "anonymousId_1"
}, time.Minute, 10*time.Millisecond)
}, 2*time.Minute, 10*time.Millisecond)
require.Eventually(t, func() bool {
eventSql := "select count(*) from dev_integration_test_1.users;"
_ = db.QueryRow(eventSql).Scan(&myEvent.count)
return myEvent.count == "1"
}, time.Minute, 10*time.Millisecond)
}, 2*time.Minute, 10*time.Millisecond)

// Verify User Transformation
eventSql = "select context_myuniqueid,context_id,context_ip from dev_integration_test_1.users;"
Expand All @@ -213,12 +213,12 @@ func testCases(t *testing.T) {
eventSql := "select anonymous_id, user_id from dev_integration_test_1.screens limit 1;"
_ = db.QueryRow(eventSql).Scan(&myEvent.anonymousID, &myEvent.userID)
return myEvent.anonymousID == "anonymousId_1"
}, time.Minute, 10*time.Millisecond)
}, 2*time.Minute, 10*time.Millisecond)
require.Eventually(t, func() bool {
eventSql := "select count(*) from dev_integration_test_1.screens;"
_ = db.QueryRow(eventSql).Scan(&myEvent.count)
return myEvent.count == "1"
}, time.Minute, 10*time.Millisecond)
}, 2*time.Minute, 10*time.Millisecond)

// Verify User Transformation
eventSql = "select prop_key,myuniqueid,ip from dev_integration_test_1.screens;"
Expand Down
24 changes: 9 additions & 15 deletions router/batchrouter/batchrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ var _ = Describe("BatchRouter", func() {

<-batchrouter.backendConfigInitialized
batchrouter.minIdleSleep = config.SingleValueLoader(time.Microsecond)
batchrouter.uploadFreq = config.SingleValueLoader(time.Microsecond)
batchrouter.uploadFreq = config.SingleValueLoader(5 * time.Millisecond)
batchrouter.mainLoopFreq = config.SingleValueLoader(time.Microsecond)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
Expand Down Expand Up @@ -409,19 +409,13 @@ var _ = Describe("BatchRouter", func() {
},
).AnyTimes()

c.mockBatchRouterJobsDB.EXPECT().UpdateJobStatus(gomock.Any(), gomock.Any(), []string{CustomVal["S3"]}, gomock.Any()).Times(1).
Do(func(ctx context.Context, statuses []*jobsdb.JobStatusT, _, _ interface{}) {
assertJobStatus(toRetryJobsList[0], statuses[0], jobsdb.Executing.State, `{}`, 130)
assertJobStatus(toRetryJobsList[1], statuses[1], jobsdb.Executing.State, `{}`, 4)
}).Return(nil)

c.mockBatchRouterJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) {
_ = f(jobsdb.EmptyUpdateSafeTx())
}).Return(nil)
c.mockBatchRouterJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Any(), []string{CustomVal["S3"]}, gomock.Any()).Times(1).
Do(func(ctx context.Context, _ interface{}, statuses []*jobsdb.JobStatusT, _, _ interface{}) {
assertJobStatus(toRetryJobsList[0], statuses[0], jobsdb.Aborted.State, fmt.Sprintf(`{"firstAttemptedAt": "%s", "Error": "BRT: Batch destination source not found in config for sourceID: %s"}`, attempt1.Format(misc.RFC3339Milli), SourceIDEnabled+"random"), 130)
assertJobStatus(toRetryJobsList[1], statuses[1], jobsdb.Aborted.State, fmt.Sprintf(`{"firstAttemptedAt": "%s", "Error": "BRT: Batch destination source not found in config for sourceID: %s"}`, attempt2.Format(misc.RFC3339Milli), SourceIDEnabled+"random"), 4)
assertJobStatus(toRetryJobsList[0], statuses[0], jobsdb.Aborted.State, "{\"reason\":\"source_not_found\"}", 130)
assertJobStatus(toRetryJobsList[1], statuses[1], jobsdb.Aborted.State, "{\"reason\":\"source_not_found\"}", 4)
}).Return(nil)
c.mockProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1).DoAndReturn(
func(ctx context.Context, _ []*jobsdb.JobT) error {
Expand All @@ -432,7 +426,7 @@ var _ = Describe("BatchRouter", func() {

<-batchrouter.backendConfigInitialized
batchrouter.minIdleSleep = config.SingleValueLoader(time.Microsecond)
batchrouter.uploadFreq = config.SingleValueLoader(time.Microsecond)
batchrouter.uploadFreq = config.SingleValueLoader(5 * time.Millisecond)
batchrouter.mainLoopFreq = config.SingleValueLoader(time.Microsecond)
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -620,11 +614,11 @@ func TestBatchRouter(t *testing.T) {
"table": "tracks"
}
}`),
Parameters: jsonb.RawMessage([]byte(fmt.Sprintf(`{
Parameters: []byte(fmt.Sprintf(`{
"source_id": %[1]q,
"destination_id": %[2]q,
"receivedAt": %[3]q
}`, bc.Sources[0].ID, s3Dest.ID, time.Now().Format(time.RFC3339)))),
}`, bc.Sources[0].ID, s3Dest.ID, time.Now().Format(time.RFC3339))),
CustomVal: s3Dest.DestinationDefinition.Name,
CreatedAt: time.Now(),
})
Expand All @@ -649,8 +643,8 @@ func TestBatchRouter(t *testing.T) {
)

batchrouter.minIdleSleep = config.SingleValueLoader(time.Microsecond)
batchrouter.uploadFreq = config.SingleValueLoader(time.Microsecond)
batchrouter.mainLoopFreq = config.SingleValueLoader(time.Microsecond)
batchrouter.uploadFreq = config.SingleValueLoader(5 * time.Millisecond)
batchrouter.mainLoopFreq = config.SingleValueLoader(time.Second)

err = routerDB.Store(context.Background(), jobs)
require.NoError(t, err)
Expand All @@ -665,7 +659,7 @@ func TestBatchRouter(t *testing.T) {
return false
}
return len(minioContents) == len(bcs)
}, 5*time.Second, 200*time.Millisecond)
}, 2*time.Minute, 200*time.Millisecond)

minioContents, err := minioResource.Contents(context.Background(), "")
require.NoError(t, err)
Expand Down
125 changes: 125 additions & 0 deletions router/batchrouter/consumer_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package batchrouter

import (
"context"
"time"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/jobsdb"
)

// ConsumerWorker handles job consumption for a specific source-destination pair.
// It acts as a buffer between job ingestion and batch processing, storing jobs
// in partition queues for the batch worker to process.
type ConsumerWorker struct {
sourceID string
destID string
jobChan chan *jobsdb.JobT
ctx context.Context
logger logger.Logger
callbacks *ConsumerCallbacks
}

// ConsumerCallbacks contains callback functions needed by the consumer worker
type ConsumerCallbacks struct {
GetUploadFrequency func() time.Duration
GetMaxBatchSize func() int
ProcessJobs func(ctx context.Context, jobs []*jobsdb.JobT) error
ReleaseWorker func(sourceID, destID string)
PartitionJobs func(jobs []*jobsdb.JobT) [][]*jobsdb.JobT
}

// NewConsumerWorker creates a new consumer worker for a source-destination pair
func NewConsumerWorker(
sourceID string,
destID string,
jobChan chan *jobsdb.JobT,
ctx context.Context,
logger logger.Logger,
callbacks *ConsumerCallbacks,
) *ConsumerWorker {
return &ConsumerWorker{
sourceID: sourceID,
destID: destID,
jobChan: jobChan,
ctx: ctx,
logger: logger.Child("consumer-worker").With("sourceID", sourceID, "destID", destID),
callbacks: callbacks,
}
}

// Work implements the workerpool.Worker interface.
// It consumes jobs from the channel and processes them in batches.
// When either the batch size threshold or the upload frequency timer is reached,
// it sends the collected jobs for processing using the callbacks.ProcessJobs method.
// This directly processes the jobs without sending them back to the buffer.
// Returns true if any jobs were processed.
func (c *ConsumerWorker) Work() bool {
defer c.callbacks.ReleaseWorker(c.sourceID, c.destID)

uploadFrequency := c.callbacks.GetUploadFrequency()
maxBatchSize := c.callbacks.GetMaxBatchSize()

var jobs []*jobsdb.JobT
timer := time.NewTimer(uploadFrequency)
defer timer.Stop()

jobsProcessed := false

for {
select {
case <-c.ctx.Done():
if len(jobs) > 0 {
partitionedJobs := c.callbacks.PartitionJobs(jobs)
for _, jobBatch := range partitionedJobs {
if err := c.callbacks.ProcessJobs(c.ctx, jobBatch); err != nil {
c.logger.Errorf("Error processing jobs: %v", err)
}
}
jobsProcessed = true
}
return jobsProcessed

case job := <-c.jobChan:
jobs = append(jobs, job)
if len(jobs) >= maxBatchSize {
partitionedJobs := c.callbacks.PartitionJobs(jobs)
for _, jobBatch := range partitionedJobs {
if err := c.callbacks.ProcessJobs(c.ctx, jobBatch); err != nil {
c.logger.Errorf("Error processing jobs: %v", err)
}
}
jobs = nil
timer.Reset(uploadFrequency)
jobsProcessed = true
}

case <-timer.C:
if len(jobs) > 0 {
partitionedJobs := c.callbacks.PartitionJobs(jobs)
for _, jobBatch := range partitionedJobs {
if err := c.callbacks.ProcessJobs(c.ctx, jobBatch); err != nil {
c.logger.Errorf("Error processing jobs: %v", err)
}
}
jobsProcessed = true
}
timer.Reset(uploadFrequency)
return jobsProcessed
}
}
}

// SleepDurations returns the min and max sleep durations for the worker when idle
func (c *ConsumerWorker) SleepDurations() (min, max time.Duration) {
return time.Millisecond * 100, time.Second * 5
}

// Stop implements the workerpool.Worker interface
func (c *ConsumerWorker) Stop() {
// No cleanup needed
}

func (c *ConsumerWorker) IsIdle() bool {
return len(c.jobChan) == 0
}
11 changes: 6 additions & 5 deletions router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type Handle struct {
backgroundCancel context.CancelFunc
backgroundWait func() error

jobBuffer *JobBuffer // Added for channel-based job buffering

backendConfigInitializedOnce sync.Once
backendConfigInitialized chan bool

Expand All @@ -121,9 +123,8 @@ type Handle struct {
encounteredMergeRuleMap map[string]map[string]bool

limiter struct {
read kitsync.Limiter
process kitsync.Limiter
upload kitsync.Limiter
read kitsync.Limiter
upload kitsync.Limiter
}

lastExecTimesMu sync.RWMutex
Expand Down Expand Up @@ -190,8 +191,7 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
if brt.skipFetchingJobs(partition) {
return
}

defer brt.limiter.read.Begin("")()
defer brt.limiter.read.Begin(partition)()

brt.configSubscriberMu.RLock()
destinationsMap := brt.destinationsMap
Expand Down Expand Up @@ -621,6 +621,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err
brt.failingDestinationsMu.Lock()
brt.failingDestinations[batchJobs.Connection.Destination.ID] = batchReqMetric.batchRequestFailed > 0
brt.failingDestinationsMu.Unlock()

var statusList []*jobsdb.JobStatusT

if isWarehouse && notifyWarehouseErr {
Expand Down
46 changes: 27 additions & 19 deletions router/batchrouter/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,6 @@ func (brt *Handle) Setup(
return time.After(limiterStatsPeriod)
}),
)
brt.limiter.process = kitsync.NewLimiter(ctx, &limiterGroup, "brt_process",
getBatchRouterConfigInt("Limiter.process.limit", brt.destType, 20),
stats.Default,
kitsync.WithLimiterDynamicPeriod(config.GetDuration("BatchRouter.Limiter.process.dynamicPeriod", 1, time.Second)),
kitsync.WithLimiterTags(map[string]string{"destType": brt.destType}),
kitsync.WithLimiterStatsTriggerFunc(func() <-chan time.Time {
return time.After(limiterStatsPeriod)
}),
)
brt.limiter.upload = kitsync.NewLimiter(ctx, &limiterGroup, "brt_upload",
getBatchRouterConfigInt("Limiter.upload.limit", brt.destType, 50),
stats.Default,
Expand All @@ -161,9 +152,17 @@ func (brt *Handle) Setup(
)

brt.logger.Infof("BRT: Batch Router started: %s", destType)

brt.crashRecover()

if asynccommon.IsAsyncDestination(brt.destType) {
brt.startAsyncDestinationManager()
}

brt.backgroundGroup.Go(crash.Wrapper(func() error {
brt.backendConfigSubscriber()
return nil
}))

// periodically publish a zero counter for ensuring that stuck processing pipeline alert
// can always detect a stuck batch router
brt.backgroundGroup.Go(func() error {
Expand Down Expand Up @@ -192,14 +191,8 @@ func (brt *Handle) Setup(
return nil
}))

if asynccommon.IsAsyncDestination(destType) {
brt.startAsyncDestinationManager()
}

brt.backgroundGroup.Go(crash.Wrapper(func() error {
brt.backendConfigSubscriber()
return nil
}))
// Initialize the job buffer
brt.jobBuffer = NewJobBuffer(brt)
}

func (brt *Handle) setupReloadableVars() {
Expand Down Expand Up @@ -262,8 +255,21 @@ func (brt *Handle) Start() {

// Shutdown stops the batch router
func (brt *Handle) Shutdown() {
// Signal all goroutines to stop via context cancellation
brt.logger.Info("Initiating batch router shutdown")
brt.backgroundCancel()

// Stop all job buffer timers and workerPool
if brt.jobBuffer != nil {
brt.logger.Debug("Stopping JobBuffer and its resources")

// Stop the job buffer (which will also stop the worker pool)
brt.jobBuffer.Shutdown()
}

// Wait for all background goroutines to complete
_ = brt.backgroundWait()
brt.logger.Info("Batch router shutdown complete")
}

func (brt *Handle) initAsyncDestinationStruct(destination *backendconfig.DestinationT) {
Expand Down Expand Up @@ -399,7 +405,9 @@ func (brt *Handle) backendConfigSubscriber() {
if destination.DestinationDefinition.Name == brt.destType && destination.Enabled {
if _, ok := destinationsMap[destination.ID]; !ok {
destinationsMap[destination.ID] = &routerutils.DestinationWithSources{Destination: destination, Sources: []backendconfig.SourceT{}}
uploadIntervalMap[destination.ID] = brt.uploadInterval(destination.Config)
if asynccommon.IsAsyncDestination(brt.destType) {
uploadIntervalMap[destination.ID] = brt.uploadInterval(destination.Config)
}
}
destinationsMap[destination.ID].Sources = append(destinationsMap[destination.ID].Sources, source)
brt.refreshDestination(destination)
Expand Down
Loading
Loading