Skip to content

Commit ab51e7c

Browse files
committed
chore: improve batch router
1 parent 379fcbd commit ab51e7c

File tree

5 files changed

+418
-232
lines changed

5 files changed

+418
-232
lines changed

router/batchrouter/handle.go

+2
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ type Handle struct {
109109
backgroundCancel context.CancelFunc
110110
backgroundWait func() error
111111

112+
jobBuffer *JobBuffer // Added for channel-based job buffering
113+
112114
backendConfigInitializedOnce sync.Once
113115
backendConfigInitialized chan bool
114116

router/batchrouter/handle_lifecycle.go

+39-10
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,29 @@ func (brt *Handle) Setup(
162162

163163
brt.logger.Infof("BRT: Batch Router started: %s", destType)
164164

165+
// Initialize job buffer
166+
brt.jobBuffer = &JobBuffer{
167+
sourceDestMap: make(map[string]chan *jobsdb.JobT),
168+
uploadTimers: make(map[string]*time.Timer),
169+
brt: brt,
170+
}
171+
165172
brt.crashRecover()
166173

174+
if asynccommon.IsAsyncDestination(brt.destType) {
175+
brt.startAsyncDestinationManager()
176+
}
177+
178+
brt.backgroundGroup.Go(crash.Wrapper(func() error {
179+
brt.backendConfigSubscriber()
180+
return nil
181+
}))
182+
183+
brt.backgroundGroup.Go(crash.Wrapper(func() error {
184+
brt.mainLoop(ctx)
185+
return nil
186+
}))
187+
167188
// periodically publish a zero counter for ensuring that stuck processing pipeline alert
168189
// can always detect a stuck batch router
169190
brt.backgroundGroup.Go(func() error {
@@ -191,15 +212,6 @@ func (brt *Handle) Setup(
191212
brt.collectMetrics(brt.backgroundCtx)
192213
return nil
193214
}))
194-
195-
if asynccommon.IsAsyncDestination(destType) {
196-
brt.startAsyncDestinationManager()
197-
}
198-
199-
brt.backgroundGroup.Go(crash.Wrapper(func() error {
200-
brt.backendConfigSubscriber()
201-
return nil
202-
}))
203215
}
204216

205217
func (brt *Handle) setupReloadableVars() {
@@ -262,8 +274,23 @@ func (brt *Handle) Start() {
262274

263275
// Shutdown stops the batch router
264276
func (brt *Handle) Shutdown() {
277+
// Signal all goroutines to stop via context cancellation
278+
brt.logger.Info("Initiating batch router shutdown")
265279
brt.backgroundCancel()
280+
281+
// Stop all job buffer timers
282+
if brt.jobBuffer != nil {
283+
brt.jobBuffer.mu.Lock()
284+
for key, timer := range brt.jobBuffer.uploadTimers {
285+
timer.Stop()
286+
brt.logger.Debugf("Stopped timer for source-destination: %s", key)
287+
}
288+
brt.jobBuffer.mu.Unlock()
289+
}
290+
291+
// Wait for all background goroutines to complete
266292
_ = brt.backgroundWait()
293+
brt.logger.Info("Batch router shutdown complete")
267294
}
268295

269296
func (brt *Handle) initAsyncDestinationStruct(destination *backendconfig.DestinationT) {
@@ -399,7 +426,9 @@ func (brt *Handle) backendConfigSubscriber() {
399426
if destination.DestinationDefinition.Name == brt.destType && destination.Enabled {
400427
if _, ok := destinationsMap[destination.ID]; !ok {
401428
destinationsMap[destination.ID] = &routerutils.DestinationWithSources{Destination: destination, Sources: []backendconfig.SourceT{}}
402-
uploadIntervalMap[destination.ID] = brt.uploadInterval(destination.Config)
429+
if asynccommon.IsAsyncDestination(brt.destType) {
430+
uploadIntervalMap[destination.ID] = brt.uploadInterval(destination.Config)
431+
}
403432
}
404433
destinationsMap[destination.ID].Sources = append(destinationsMap[destination.ID].Sources, source)
405434
brt.refreshDestination(destination)

router/batchrouter/handle_observability.go

+22-22
Original file line numberDiff line numberDiff line change
@@ -25,32 +25,32 @@ func (brt *Handle) collectMetrics(ctx context.Context) {
2525
}
2626

2727
for {
28-
brt.batchRequestsMetricMu.RLock()
29-
var diagnosisProperties map[string]interface{}
30-
success := 0
31-
failed := 0
32-
for _, batchReqMetric := range brt.batchRequestsMetric {
33-
success = success + batchReqMetric.batchRequestSuccess
34-
failed = failed + batchReqMetric.batchRequestFailed
35-
}
36-
if len(brt.batchRequestsMetric) > 0 {
37-
diagnosisProperties = map[string]interface{}{
38-
brt.destType: map[string]interface{}{
39-
diagnostics.BatchRouterSuccess: success,
40-
diagnostics.BatchRouterFailed: failed,
41-
},
42-
}
43-
44-
brt.Diagnostics.Track(diagnostics.BatchRouterEvents, diagnosisProperties)
45-
}
46-
47-
brt.batchRequestsMetric = nil
48-
brt.batchRequestsMetricMu.RUnlock()
49-
5028
select {
5129
case <-ctx.Done():
5230
return
31+
5332
case <-brt.diagnosisTicker.C:
33+
brt.batchRequestsMetricMu.RLock()
34+
var diagnosisProperties map[string]interface{}
35+
success := 0
36+
failed := 0
37+
for _, batchReqMetric := range brt.batchRequestsMetric {
38+
success = success + batchReqMetric.batchRequestSuccess
39+
failed = failed + batchReqMetric.batchRequestFailed
40+
}
41+
if len(brt.batchRequestsMetric) > 0 {
42+
diagnosisProperties = map[string]interface{}{
43+
brt.destType: map[string]interface{}{
44+
diagnostics.BatchRouterSuccess: success,
45+
diagnostics.BatchRouterFailed: failed,
46+
},
47+
}
48+
49+
brt.Diagnostics.Track(diagnostics.BatchRouterEvents, diagnosisProperties)
50+
}
51+
52+
brt.batchRequestsMetric = nil
53+
brt.batchRequestsMetricMu.RUnlock()
5454
}
5555
}
5656
}

router/batchrouter/job_buffer.go

+246
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
package batchrouter
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
8+
"github.com/rudderlabs/rudder-go-kit/stats"
9+
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
10+
"github.com/rudderlabs/rudder-server/jobsdb"
11+
asynccommon "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
12+
"github.com/rudderlabs/rudder-server/utils/misc"
13+
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
14+
)
15+
16+
// JobBuffer provides buffering capabilities for jobs based on source-destination pairs
17+
type JobBuffer struct {
18+
sourceDestMap map[string]chan *jobsdb.JobT
19+
uploadTimers map[string]*time.Timer
20+
mu sync.RWMutex
21+
brt *Handle
22+
}
23+
24+
// createStat creates a new stat with consistent destination, source, and destination type tags
25+
func (jb *JobBuffer) createStat(name, statType, sourceID, destID string) stats.Measurement {
26+
return stats.Default.NewTaggedStat(name, statType, stats.Tags{
27+
"destType": jb.brt.destType,
28+
"sourceId": sourceID,
29+
"destId": destID,
30+
})
31+
}
32+
33+
// Key format: "sourceID:destinationID"
34+
func getSourceDestKey(sourceID, destID string) string {
35+
return fmt.Sprintf("%s:%s", sourceID, destID)
36+
}
37+
38+
// Initialize or get a job channel for a source-destination pair
39+
func (jb *JobBuffer) getJobChannel(sourceID, destID string) chan *jobsdb.JobT {
40+
key := getSourceDestKey(sourceID, destID)
41+
42+
jb.mu.RLock()
43+
ch, exists := jb.sourceDestMap[key]
44+
jb.mu.RUnlock()
45+
46+
if !exists {
47+
jb.mu.Lock()
48+
// Double-check to avoid race conditions
49+
if ch, exists = jb.sourceDestMap[key]; !exists {
50+
bufferSize := jb.brt.conf.GetIntVar(100000, 1, "BatchRouter."+jb.brt.destType+".channelBufferSize", "BatchRouter.channelBufferSize")
51+
ch = make(chan *jobsdb.JobT, bufferSize)
52+
jb.sourceDestMap[key] = ch
53+
54+
// Start consumer for this channel
55+
go jb.startJobConsumer(sourceID, destID, ch)
56+
}
57+
jb.mu.Unlock()
58+
}
59+
60+
return ch
61+
}
62+
63+
func (jb *JobBuffer) startJobConsumer(sourceID, destID string, jobCh chan *jobsdb.JobT) {
64+
key := getSourceDestKey(sourceID, destID)
65+
jobBatch := make([]*jobsdb.JobT, 0)
66+
67+
// Configure upload frequency from settings
68+
jb.brt.configSubscriberMu.RLock()
69+
uploadFreq := jb.brt.uploadIntervalMap[destID]
70+
jb.brt.configSubscriberMu.RUnlock()
71+
72+
if uploadFreq == 0 {
73+
uploadFreq = jb.brt.uploadFreq.Load()
74+
}
75+
76+
jb.brt.logger.Debugf("Starting job consumer for %s:%s with upload frequency %s", sourceID, destID, uploadFreq)
77+
78+
// Start timer for this source-destination pair
79+
timer := time.NewTimer(uploadFreq)
80+
jb.mu.Lock()
81+
jb.uploadTimers[key] = timer
82+
jb.mu.Unlock()
83+
84+
// Helper function to safely stop and reset the timer
85+
resetTimer := func() {
86+
if !timer.Stop() {
87+
select {
88+
case <-timer.C:
89+
default:
90+
}
91+
}
92+
timer.Reset(uploadFreq)
93+
}
94+
95+
// Helper function to upload batch and clear it
96+
processBatch := func() {
97+
if len(jobBatch) > 0 {
98+
batchSize := len(jobBatch)
99+
jb.brt.logger.Debugf("Processing batch of %d jobs for %s:%s", batchSize, sourceID, destID)
100+
jb.processAndUploadBatch(sourceID, destID, jobBatch)
101+
jobBatch = make([]*jobsdb.JobT, 0)
102+
}
103+
}
104+
105+
metricTicker := time.NewTicker(30 * time.Second)
106+
defer metricTicker.Stop()
107+
108+
for {
109+
select {
110+
case job, ok := <-jobCh:
111+
// Channel closed or context cancelled
112+
if !ok {
113+
jb.brt.logger.Debugf("Channel closed for %s:%s, processing remaining jobs and exiting", sourceID, destID)
114+
processBatch()
115+
return
116+
}
117+
jobBatch = append(jobBatch, job)
118+
119+
// Check if batch size threshold is reached
120+
if len(jobBatch) >= jb.brt.maxEventsInABatch {
121+
jb.brt.logger.Debugf("Batch size threshold reached for %s:%s: %d jobs", sourceID, destID, len(jobBatch))
122+
processBatch()
123+
resetTimer()
124+
}
125+
126+
case <-timer.C:
127+
// Upload on timer expiry if we have jobs
128+
jb.brt.logger.Debugf("Timer expired for %s:%s with %d jobs in batch", sourceID, destID, len(jobBatch))
129+
processBatch()
130+
timer.Reset(uploadFreq)
131+
132+
case <-jb.brt.backgroundCtx.Done():
133+
// Clean shutdown
134+
jb.brt.logger.Debugf("Shutting down job consumer for %s:%s with %d jobs in batch", sourceID, destID, len(jobBatch))
135+
processBatch()
136+
return
137+
}
138+
}
139+
}
140+
141+
func (jb *JobBuffer) processAndUploadBatch(sourceID, destID string, jobs []*jobsdb.JobT) {
142+
if len(jobs) == 0 {
143+
return
144+
}
145+
146+
// Track metrics for batch processing
147+
processedBatchSizeStat := jb.createStat("batch_router_processed_batch_size", stats.GaugeType, sourceID, destID)
148+
processedBatchSizeStat.Gauge(len(jobs))
149+
150+
processingStartTime := time.Now()
151+
defer func() {
152+
jb.createStat("batch_router_buffered_batch_processing_time", stats.TimerType, sourceID, destID).Since(processingStartTime)
153+
}()
154+
155+
// Get destination and source details
156+
jb.brt.configSubscriberMu.RLock()
157+
destWithSources, ok := jb.brt.destinationsMap[destID]
158+
jb.brt.configSubscriberMu.RUnlock()
159+
160+
if !ok {
161+
// Handle destination not found
162+
jb.brt.logger.Errorf("Destination not found for ID: %s", destID)
163+
return
164+
}
165+
166+
// Find the source
167+
var source backendconfig.SourceT
168+
sourceFound := false
169+
for _, s := range destWithSources.Sources {
170+
if s.ID == sourceID {
171+
source = s
172+
sourceFound = true
173+
break
174+
}
175+
}
176+
177+
if !sourceFound {
178+
// Handle source not found
179+
jb.brt.logger.Errorf("Source not found for ID: %s", sourceID)
180+
return
181+
}
182+
183+
batchedJobs := BatchedJobs{
184+
Jobs: jobs,
185+
Connection: &Connection{
186+
Destination: destWithSources.Destination,
187+
Source: source,
188+
},
189+
}
190+
191+
// Use the existing upload logic
192+
defer jb.brt.limiter.upload.Begin("")()
193+
194+
// Helper function for standard object storage upload process
195+
processObjectStorageUpload := func(destType string, isWarehouse bool) {
196+
destUploadStat := jb.createStat("batch_router_dest_upload_time", stats.TimerType, sourceID, destID)
197+
destUploadStart := time.Now()
198+
output := jb.brt.upload(destType, &batchedJobs, isWarehouse)
199+
jb.brt.recordDeliveryStatus(*batchedJobs.Connection, output, isWarehouse)
200+
jb.brt.updateJobStatus(&batchedJobs, isWarehouse, output.Error, false)
201+
misc.RemoveFilePaths(output.LocalFilePaths...)
202+
if output.JournalOpID > 0 {
203+
jb.brt.jobsDB.JournalDeleteEntry(output.JournalOpID)
204+
}
205+
if output.Error == nil {
206+
jb.brt.recordUploadStats(*batchedJobs.Connection, output)
207+
}
208+
destUploadStat.Since(destUploadStart)
209+
}
210+
211+
switch {
212+
case IsObjectStorageDestination(jb.brt.destType):
213+
processObjectStorageUpload(jb.brt.destType, false)
214+
case IsWarehouseDestination(jb.brt.destType):
215+
useRudderStorage := misc.IsConfiguredToUseRudderObjectStorage(batchedJobs.Connection.Destination.Config)
216+
objectStorageType := warehouseutils.ObjectStorageType(jb.brt.destType, batchedJobs.Connection.Destination.Config, useRudderStorage)
217+
destUploadStat := jb.createStat("batch_router_dest_upload_time", stats.TimerType, sourceID, destID)
218+
destUploadStart := time.Now()
219+
splitBatchJobs := jb.brt.splitBatchJobsOnTimeWindow(batchedJobs)
220+
for _, batchJob := range splitBatchJobs {
221+
output := jb.brt.upload(objectStorageType, batchJob, true)
222+
notifyWarehouseErr := false
223+
if output.Error == nil && output.Key != "" {
224+
output.Error = jb.brt.pingWarehouse(batchJob, output)
225+
if output.Error != nil {
226+
notifyWarehouseErr = true
227+
}
228+
warehouseutils.DestStat(stats.CountType, "generate_staging_files", batchJob.Connection.Destination.ID).Count(1)
229+
warehouseutils.DestStat(stats.CountType, "staging_file_batch_size", batchJob.Connection.Destination.ID).Count(len(batchJob.Jobs))
230+
}
231+
jb.brt.recordDeliveryStatus(*batchJob.Connection, output, true)
232+
jb.brt.updateJobStatus(batchJob, true, output.Error, notifyWarehouseErr)
233+
misc.RemoveFilePaths(output.LocalFilePaths...)
234+
}
235+
destUploadStat.Since(destUploadStart)
236+
case asynccommon.IsAsyncDestination(jb.brt.destType):
237+
destUploadStat := jb.createStat("batch_router_dest_upload_time", stats.TimerType, sourceID, destID)
238+
destUploadStart := time.Now()
239+
jb.brt.sendJobsToStorage(batchedJobs)
240+
destUploadStat.Since(destUploadStart)
241+
default:
242+
// Handle any other destination types
243+
jb.brt.logger.Warnf("Unsupported destination type %s for job buffer. Attempting generic processing.", jb.brt.destType)
244+
processObjectStorageUpload(jb.brt.destType, false)
245+
}
246+
}

0 commit comments

Comments
 (0)