Skip to content

Commit b4ed2ae

Browse files
committed
chore: review comments
1 parent 4ec4da5 commit b4ed2ae

File tree

4 files changed

+294
-190
lines changed

4 files changed

+294
-190
lines changed

router/batchrouter/batch_worker.go

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package batchrouter
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"time"
8+
9+
"github.com/rudderlabs/rudder-server/utils/workerpool"
10+
)
11+
12+
// BatchWorker implements the workerpool.Worker interface for processing batches of jobs
13+
type BatchWorker struct {
14+
partition string
15+
jobs []*ConnectionJob
16+
jobBuffer *JobBuffer
17+
}
18+
19+
// Work processes the current batch of jobs
20+
func (bw *BatchWorker) Work() bool {
21+
if len(bw.jobs) == 0 {
22+
return false
23+
}
24+
25+
// Group jobs by connection (source-destination pair)
26+
connectionJobs := make(map[string][]*ConnectionJob)
27+
for _, job := range bw.jobs {
28+
key := fmt.Sprintf("%s:%s", job.sourceID, job.destID)
29+
connectionJobs[key] = append(connectionJobs[key], job)
30+
}
31+
32+
// Reset jobs array for next batch
33+
bw.jobs = make([]*ConnectionJob, 0, bw.jobBuffer.brt.maxEventsInABatch)
34+
35+
// Create a connection worker factory that includes the jobs
36+
workerJobs := connectionJobs
37+
pool := workerpool.New(context.Background(), func(partition string) workerpool.Worker {
38+
parts := strings.Split(partition, ":")
39+
sourceID := parts[0]
40+
destID := parts[1]
41+
return &ConnectionWorker{
42+
sourceID: sourceID,
43+
destID: destID,
44+
jobs: workerJobs[fmt.Sprintf("%s:%s", sourceID, destID)],
45+
jobBuffer: bw.jobBuffer,
46+
}
47+
}, bw.jobBuffer.brt.logger)
48+
defer pool.Shutdown()
49+
50+
// Start workers for each connection
51+
for key := range connectionJobs {
52+
pool.PingWorker(key)
53+
}
54+
55+
return true
56+
}
57+
58+
// SleepDurations returns the min and max sleep durations for the worker when idle
59+
func (bw *BatchWorker) SleepDurations() (min, max time.Duration) {
60+
return time.Millisecond * 100, time.Second * 5
61+
}
62+
63+
// Stop is a no-op since this worker doesn't have internal goroutines
64+
func (bw *BatchWorker) Stop() {
65+
// No-op
66+
}
+161
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package batchrouter
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/rudderlabs/rudder-go-kit/stats"
8+
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
9+
"github.com/rudderlabs/rudder-server/jobsdb"
10+
asynccommon "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
11+
"github.com/rudderlabs/rudder-server/utils/misc"
12+
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
13+
)
14+
15+
// ConnectionWorker implements the workerpool.Worker interface for processing connection-level batches
16+
type ConnectionWorker struct {
17+
sourceID string
18+
destID string
19+
jobs []*ConnectionJob
20+
jobBuffer *JobBuffer
21+
}
22+
23+
// Work processes jobs for a specific connection
24+
func (cw *ConnectionWorker) Work() bool {
25+
if len(cw.jobs) == 0 {
26+
return false
27+
}
28+
29+
// Use the existing upload logic
30+
uploadSuccess := true
31+
var uploadError error
32+
33+
// Define a function to update the failing destinations map based on the result
34+
updateFailingStatus := func(failed bool, err error) {
35+
cw.jobBuffer.brt.failingDestinationsMu.Lock()
36+
cw.jobBuffer.brt.failingDestinations[cw.destID] = failed
37+
cw.jobBuffer.brt.failingDestinationsMu.Unlock()
38+
39+
if failed {
40+
cw.jobBuffer.brt.logger.Errorf("Upload failed for destination %s: %v", cw.destID, err)
41+
}
42+
}
43+
44+
// Execute the upload inside a defer context to always update status
45+
defer func() {
46+
updateFailingStatus(!uploadSuccess, uploadError)
47+
}()
48+
49+
defer cw.jobBuffer.brt.limiter.upload.Begin(cw.destID)()
50+
51+
// Convert jobs to BatchedJobs
52+
jobsToUpload := make([]*jobsdb.JobT, 0, len(cw.jobs))
53+
for _, job := range cw.jobs {
54+
jobsToUpload = append(jobsToUpload, job.job)
55+
}
56+
57+
// Get destination and source from destinationsMap
58+
cw.jobBuffer.brt.configSubscriberMu.RLock()
59+
destWithSources, ok := cw.jobBuffer.brt.destinationsMap[cw.destID]
60+
cw.jobBuffer.brt.configSubscriberMu.RUnlock()
61+
if !ok {
62+
cw.jobBuffer.brt.logger.Errorf("Destination %s not found in destinationsMap", cw.destID)
63+
return false
64+
}
65+
66+
// Find matching source
67+
var source *backendconfig.SourceT
68+
for _, s := range destWithSources.Sources {
69+
if s.ID == cw.sourceID {
70+
source = &s
71+
break
72+
}
73+
}
74+
if source == nil {
75+
cw.jobBuffer.brt.logger.Errorf("Source %s not found in destination %s", cw.sourceID, cw.destID)
76+
return false
77+
}
78+
79+
batchedJobs := BatchedJobs{
80+
Jobs: jobsToUpload,
81+
Connection: &Connection{
82+
Destination: destWithSources.Destination,
83+
Source: *source,
84+
},
85+
}
86+
87+
// Process based on destination type
88+
switch {
89+
case IsObjectStorageDestination(cw.jobBuffer.brt.destType):
90+
destUploadStat := cw.jobBuffer.createStat("batch_router_dest_upload_time", stats.TimerType, cw.sourceID, cw.destID)
91+
destUploadStart := time.Now()
92+
output := cw.jobBuffer.brt.upload(cw.jobBuffer.brt.destType, &batchedJobs, false)
93+
cw.jobBuffer.brt.recordDeliveryStatus(cw.destID, cw.sourceID, output, false)
94+
cw.jobBuffer.brt.updateJobStatus(&batchedJobs, false, output.Error, false)
95+
misc.RemoveFilePaths(output.LocalFilePaths...)
96+
if output.JournalOpID > 0 {
97+
cw.jobBuffer.brt.jobsDB.JournalDeleteEntry(output.JournalOpID)
98+
}
99+
if output.Error == nil {
100+
cw.jobBuffer.brt.recordUploadStats(*batchedJobs.Connection, output)
101+
} else {
102+
uploadSuccess = false
103+
uploadError = output.Error
104+
}
105+
destUploadStat.Since(destUploadStart)
106+
107+
case IsWarehouseDestination(cw.jobBuffer.brt.destType):
108+
useRudderStorage := misc.IsConfiguredToUseRudderObjectStorage(batchedJobs.Connection.Destination.Config)
109+
objectStorageType := warehouseutils.ObjectStorageType(cw.jobBuffer.brt.destType, batchedJobs.Connection.Destination.Config, useRudderStorage)
110+
destUploadStat := cw.jobBuffer.createStat("batch_router_dest_upload_time", stats.TimerType, cw.sourceID, cw.destID)
111+
destUploadStart := time.Now()
112+
splitBatchJobs := cw.jobBuffer.brt.splitBatchJobsOnTimeWindow(batchedJobs)
113+
allSuccess := true
114+
for _, splitJob := range splitBatchJobs {
115+
output := cw.jobBuffer.brt.upload(objectStorageType, splitJob, true)
116+
notifyWarehouseErr := false
117+
if output.Error == nil && output.Key != "" {
118+
output.Error = cw.jobBuffer.brt.pingWarehouse(splitJob, output)
119+
if output.Error != nil {
120+
notifyWarehouseErr = true
121+
allSuccess = false
122+
uploadError = output.Error
123+
}
124+
warehouseutils.DestStat(stats.CountType, "generate_staging_files", cw.destID).Count(1)
125+
warehouseutils.DestStat(stats.CountType, "staging_file_batch_size", cw.destID).Count(len(splitJob.Jobs))
126+
} else if output.Error != nil {
127+
allSuccess = false
128+
uploadError = output.Error
129+
}
130+
cw.jobBuffer.brt.recordDeliveryStatus(cw.destID, cw.sourceID, output, true)
131+
cw.jobBuffer.brt.updateJobStatus(splitJob, true, output.Error, notifyWarehouseErr)
132+
misc.RemoveFilePaths(output.LocalFilePaths...)
133+
}
134+
uploadSuccess = allSuccess
135+
destUploadStat.Since(destUploadStart)
136+
137+
case asynccommon.IsAsyncDestination(cw.jobBuffer.brt.destType):
138+
destUploadStat := cw.jobBuffer.createStat("batch_router_dest_upload_time", stats.TimerType, cw.sourceID, cw.destID)
139+
destUploadStart := time.Now()
140+
cw.jobBuffer.brt.sendJobsToStorage(batchedJobs)
141+
destUploadStat.Since(destUploadStart)
142+
143+
default:
144+
errMsg := fmt.Errorf("unsupported destination type %s for job buffer", cw.jobBuffer.brt.destType)
145+
cw.jobBuffer.brt.logger.Errorf("Unsupported destination type %s for job buffer: %v", cw.jobBuffer.brt.destType, errMsg)
146+
uploadSuccess = false
147+
uploadError = errMsg
148+
}
149+
150+
return true
151+
}
152+
153+
// SleepDurations returns the min and max sleep durations for the worker when idle
154+
func (cw *ConnectionWorker) SleepDurations() (min, max time.Duration) {
155+
return time.Millisecond * 100, time.Second * 5
156+
}
157+
158+
// Stop is a no-op since this worker doesn't have internal goroutines
159+
func (cw *ConnectionWorker) Stop() {
160+
// No-op
161+
}

router/batchrouter/handle_lifecycle.go

+3-16
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ import (
4040
"github.com/rudderlabs/rudder-server/utils/crash"
4141
"github.com/rudderlabs/rudder-server/utils/misc"
4242
"github.com/rudderlabs/rudder-server/utils/types"
43-
"github.com/rudderlabs/rudder-server/utils/workerpool"
4443
"github.com/rudderlabs/rudder-server/warehouse/client"
4544
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
4645
)
@@ -153,21 +152,6 @@ func (brt *Handle) Setup(
153152
)
154153

155154
brt.logger.Infof("BRT: Batch Router started: %s", destType)
156-
157-
// Initialize job buffer
158-
brt.jobBuffer = &JobBuffer{
159-
sourceDestMap: make(map[string]chan *jobsdb.JobT),
160-
uploadTimers: make(map[string]*time.Timer),
161-
batchWorkers: make(map[string]*batchWorker),
162-
brt: brt,
163-
workerPool: workerpool.New(context.Background(), func(key string) workerpool.Worker {
164-
return &batchWorker{
165-
partition: key,
166-
jobBuffer: brt.jobBuffer,
167-
}
168-
}, brt.logger),
169-
}
170-
171155
brt.crashRecover()
172156

173157
if asynccommon.IsAsyncDestination(brt.destType) {
@@ -206,6 +190,9 @@ func (brt *Handle) Setup(
206190
brt.collectMetrics(brt.backgroundCtx)
207191
return nil
208192
}))
193+
194+
// Initialize the job buffer
195+
brt.jobBuffer = NewJobBuffer(brt)
209196
}
210197

211198
func (brt *Handle) setupReloadableVars() {

0 commit comments

Comments
 (0)