@@ -12,55 +12,55 @@ import (
12
12
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
13
13
)
14
14
15
- // ConnectionWorker implements the workerpool.Worker interface for processing connection-level batches
15
+ // ConnectionWorker processes batches of jobs for a specific source-destination connection.
16
+ // It handles the actual upload of data to various destination types (object storage,
17
+ // warehouse, async destinations) and manages the job status updates.
16
18
type ConnectionWorker struct {
17
- sourceID string
18
- destID string
19
- jobs []* ConnectionJob
20
- jobBuffer * JobBuffer
19
+ sourceID string // Source identifier
20
+ destID string // Destination identifier
21
+ jobs []* ConnectionJob // Jobs to process for this connection
22
+ jobBuffer * JobBuffer // Reference to the parent job buffer
21
23
}
22
24
23
- // Work processes jobs for a specific connection
25
+ // Work processes the current batch of jobs for this connection.
26
+ // It handles different destination types and manages the upload process,
27
+ // including error handling and status updates.
24
28
func (cw * ConnectionWorker ) Work () bool {
25
29
if len (cw .jobs ) == 0 {
26
30
return false
27
31
}
28
32
29
- // Use the existing upload logic
30
- uploadSuccess := true
31
- var uploadError error
32
-
33
- // Define a function to update the failing destinations map based on the result
34
- updateFailingStatus := func (failed bool , err error ) {
35
- cw .jobBuffer .brt .failingDestinationsMu .Lock ()
36
- cw .jobBuffer .brt .failingDestinations [cw .destID ] = failed
37
- cw .jobBuffer .brt .failingDestinationsMu .Unlock ()
38
-
39
- if failed {
40
- cw .jobBuffer .brt .logger .Errorf ("Upload failed for destination %s: %v" , cw .destID , err )
41
- }
33
+ // Prepare the batch for processing
34
+ batchedJobs , err := cw .prepareBatch ()
35
+ if err != nil {
36
+ cw .jobBuffer .brt .logger .Errorf ("Failed to prepare batch: %v" , err )
37
+ return false
42
38
}
43
39
44
- // Execute the upload inside a defer context to always update status
45
- defer func () {
46
- updateFailingStatus (! uploadSuccess , uploadError )
47
- }()
40
+ // Process the batch based on destination type
41
+ uploadSuccess , uploadError := cw .processBatch (batchedJobs )
48
42
49
- defer cw .jobBuffer .brt .limiter .upload .Begin (cw .destID )()
43
+ // Update failing destinations status
44
+ cw .updateFailingStatus (! uploadSuccess , uploadError )
50
45
51
- // Convert jobs to BatchedJobs
46
+ return true
47
+ }
48
+
49
+ // prepareBatch converts the worker's jobs into a BatchedJobs structure and validates
50
+ // the source and destination configuration.
51
+ func (cw * ConnectionWorker ) prepareBatch () (* BatchedJobs , error ) {
52
+ // Convert jobs to BatchedJobs format
52
53
jobsToUpload := make ([]* jobsdb.JobT , 0 , len (cw .jobs ))
53
54
for _ , job := range cw .jobs {
54
55
jobsToUpload = append (jobsToUpload , job .job )
55
56
}
56
57
57
- // Get destination and source from destinationsMap
58
+ // Get destination and source configuration
58
59
cw .jobBuffer .brt .configSubscriberMu .RLock ()
59
60
destWithSources , ok := cw .jobBuffer .brt .destinationsMap [cw .destID ]
60
61
cw .jobBuffer .brt .configSubscriberMu .RUnlock ()
61
62
if ! ok {
62
- cw .jobBuffer .brt .logger .Errorf ("Destination %s not found in destinationsMap" , cw .destID )
63
- return false
63
+ return nil , fmt .Errorf ("destination %s not found in destinationsMap" , cw .destID )
64
64
}
65
65
66
66
// Find matching source
@@ -72,90 +72,121 @@ func (cw *ConnectionWorker) Work() bool {
72
72
}
73
73
}
74
74
if source == nil {
75
- cw .jobBuffer .brt .logger .Errorf ("Source %s not found in destination %s" , cw .sourceID , cw .destID )
76
- return false
75
+ return nil , fmt .Errorf ("source %s not found in destination %s" , cw .sourceID , cw .destID )
77
76
}
78
77
79
- batchedJobs := BatchedJobs {
78
+ return & BatchedJobs {
80
79
Jobs : jobsToUpload ,
81
80
Connection : & Connection {
82
81
Destination : destWithSources .Destination ,
83
82
Source : * source ,
84
83
},
85
- }
84
+ }, nil
85
+ }
86
+
87
+ // processBatch handles the upload process based on the destination type.
88
+ // Returns upload success status and any error that occurred.
89
+ func (cw * ConnectionWorker ) processBatch (batchedJobs * BatchedJobs ) (bool , error ) {
90
+ defer cw .jobBuffer .brt .limiter .upload .Begin (cw .destID )()
91
+
92
+ destUploadStat := cw .jobBuffer .createStat ("batch_router_dest_upload_time" , stats .TimerType , cw .sourceID , cw .destID )
93
+ destUploadStart := time .Now ()
94
+ defer destUploadStat .Since (destUploadStart )
86
95
87
- // Process based on destination type
88
96
switch {
89
97
case IsObjectStorageDestination (cw .jobBuffer .brt .destType ):
90
- destUploadStat := cw .jobBuffer .createStat ("batch_router_dest_upload_time" , stats .TimerType , cw .sourceID , cw .destID )
91
- destUploadStart := time .Now ()
92
- output := cw .jobBuffer .brt .upload (cw .jobBuffer .brt .destType , & batchedJobs , false )
93
- cw .jobBuffer .brt .recordDeliveryStatus (cw .destID , cw .sourceID , output , false )
94
- cw .jobBuffer .brt .updateJobStatus (& batchedJobs , false , output .Error , false )
95
- misc .RemoveFilePaths (output .LocalFilePaths ... )
96
- if output .JournalOpID > 0 {
97
- cw .jobBuffer .brt .jobsDB .JournalDeleteEntry (output .JournalOpID )
98
- }
99
- if output .Error == nil {
100
- cw .jobBuffer .brt .recordUploadStats (* batchedJobs .Connection , output )
101
- } else {
102
- uploadSuccess = false
103
- uploadError = output .Error
104
- }
105
- destUploadStat .Since (destUploadStart )
106
-
98
+ return cw .handleObjectStorageUpload (batchedJobs )
107
99
case IsWarehouseDestination (cw .jobBuffer .brt .destType ):
108
- useRudderStorage := misc .IsConfiguredToUseRudderObjectStorage (batchedJobs .Connection .Destination .Config )
109
- objectStorageType := warehouseutils .ObjectStorageType (cw .jobBuffer .brt .destType , batchedJobs .Connection .Destination .Config , useRudderStorage )
110
- destUploadStat := cw .jobBuffer .createStat ("batch_router_dest_upload_time" , stats .TimerType , cw .sourceID , cw .destID )
111
- destUploadStart := time .Now ()
112
- splitBatchJobs := cw .jobBuffer .brt .splitBatchJobsOnTimeWindow (batchedJobs )
113
- allSuccess := true
114
- for _ , splitJob := range splitBatchJobs {
115
- output := cw .jobBuffer .brt .upload (objectStorageType , splitJob , true )
116
- notifyWarehouseErr := false
117
- if output .Error == nil && output .Key != "" {
118
- output .Error = cw .jobBuffer .brt .pingWarehouse (splitJob , output )
119
- if output .Error != nil {
120
- notifyWarehouseErr = true
121
- allSuccess = false
122
- uploadError = output .Error
123
- }
124
- warehouseutils .DestStat (stats .CountType , "generate_staging_files" , cw .destID ).Count (1 )
125
- warehouseutils .DestStat (stats .CountType , "staging_file_batch_size" , cw .destID ).Count (len (splitJob .Jobs ))
126
- } else if output .Error != nil {
100
+ return cw .handleWarehouseUpload (batchedJobs )
101
+ case asynccommon .IsAsyncDestination (cw .jobBuffer .brt .destType ):
102
+ return cw .handleAsyncDestinationUpload (batchedJobs ), nil
103
+ default :
104
+ err := fmt .Errorf ("unsupported destination type %s for job buffer" , cw .jobBuffer .brt .destType )
105
+ cw .jobBuffer .brt .logger .Error (err )
106
+ return false , err
107
+ }
108
+ }
109
+
110
+ // handleObjectStorageUpload processes uploads for object storage destinations
111
+ func (cw * ConnectionWorker ) handleObjectStorageUpload (batchedJobs * BatchedJobs ) (bool , error ) {
112
+ output := cw .jobBuffer .brt .upload (cw .jobBuffer .brt .destType , batchedJobs , false )
113
+ defer misc .RemoveFilePaths (output .LocalFilePaths ... )
114
+
115
+ cw .jobBuffer .brt .recordDeliveryStatus (cw .destID , cw .sourceID , output , false )
116
+ cw .jobBuffer .brt .updateJobStatus (batchedJobs , false , output .Error , false )
117
+
118
+ if output .JournalOpID > 0 {
119
+ cw .jobBuffer .brt .jobsDB .JournalDeleteEntry (output .JournalOpID )
120
+ }
121
+
122
+ if output .Error == nil {
123
+ cw .jobBuffer .brt .recordUploadStats (* batchedJobs .Connection , output )
124
+ return true , nil
125
+ }
126
+ return false , output .Error
127
+ }
128
+
129
+ // handleWarehouseUpload processes uploads for warehouse destinations
130
+ func (cw * ConnectionWorker ) handleWarehouseUpload (batchedJobs * BatchedJobs ) (bool , error ) {
131
+ useRudderStorage := misc .IsConfiguredToUseRudderObjectStorage (batchedJobs .Connection .Destination .Config )
132
+ objectStorageType := warehouseutils .ObjectStorageType (cw .jobBuffer .brt .destType , batchedJobs .Connection .Destination .Config , useRudderStorage )
133
+
134
+ splitBatchJobs := cw .jobBuffer .brt .splitBatchJobsOnTimeWindow (* batchedJobs )
135
+ var lastError error
136
+ allSuccess := true
137
+
138
+ for _ , splitJob := range splitBatchJobs {
139
+ output := cw .jobBuffer .brt .upload (objectStorageType , splitJob , true )
140
+ defer misc .RemoveFilePaths (output .LocalFilePaths ... )
141
+
142
+ notifyWarehouseErr := false
143
+ if output .Error == nil && output .Key != "" {
144
+ output .Error = cw .jobBuffer .brt .pingWarehouse (splitJob , output )
145
+ if output .Error != nil {
146
+ notifyWarehouseErr = true
127
147
allSuccess = false
128
- uploadError = output .Error
148
+ lastError = output .Error
129
149
}
130
- cw .jobBuffer .brt .recordDeliveryStatus (cw .destID , cw .sourceID , output , true )
131
- cw .jobBuffer .brt .updateJobStatus (splitJob , true , output .Error , notifyWarehouseErr )
132
- misc .RemoveFilePaths (output .LocalFilePaths ... )
150
+ warehouseutils .DestStat (stats .CountType , "generate_staging_files" , cw .destID ).Count (1 )
151
+ warehouseutils .DestStat (stats .CountType , "staging_file_batch_size" , cw .destID ).Count (len (splitJob .Jobs ))
152
+ } else if output .Error != nil {
153
+ allSuccess = false
154
+ lastError = output .Error
133
155
}
134
- uploadSuccess = allSuccess
135
- destUploadStat .Since (destUploadStart )
136
-
137
- case asynccommon .IsAsyncDestination (cw .jobBuffer .brt .destType ):
138
- destUploadStat := cw .jobBuffer .createStat ("batch_router_dest_upload_time" , stats .TimerType , cw .sourceID , cw .destID )
139
- destUploadStart := time .Now ()
140
- cw .jobBuffer .brt .sendJobsToStorage (batchedJobs )
141
- destUploadStat .Since (destUploadStart )
142
156
143
- default :
144
- errMsg := fmt .Errorf ("unsupported destination type %s for job buffer" , cw .jobBuffer .brt .destType )
145
- cw .jobBuffer .brt .logger .Errorf ("Unsupported destination type %s for job buffer: %v" , cw .jobBuffer .brt .destType , errMsg )
146
- uploadSuccess = false
147
- uploadError = errMsg
157
+ cw .jobBuffer .brt .recordDeliveryStatus (cw .destID , cw .sourceID , output , true )
158
+ cw .jobBuffer .brt .updateJobStatus (splitJob , true , output .Error , notifyWarehouseErr )
148
159
}
149
160
161
+ return allSuccess , lastError
162
+ }
163
+
164
+ // handleAsyncDestinationUpload processes uploads for async destinations
165
+ func (cw * ConnectionWorker ) handleAsyncDestinationUpload (batchedJobs * BatchedJobs ) bool {
166
+ cw .jobBuffer .brt .sendJobsToStorage (* batchedJobs )
150
167
return true
151
168
}
152
169
153
- // SleepDurations returns the min and max sleep durations for the worker when idle
170
+ // updateFailingStatus updates the failing destinations map with the current status
171
+ func (cw * ConnectionWorker ) updateFailingStatus (failed bool , err error ) {
172
+ cw .jobBuffer .brt .failingDestinationsMu .Lock ()
173
+ cw .jobBuffer .brt .failingDestinations [cw .destID ] = failed
174
+ cw .jobBuffer .brt .failingDestinationsMu .Unlock ()
175
+
176
+ if failed {
177
+ cw .jobBuffer .brt .logger .Errorf ("Upload failed for destination %s: %v" , cw .destID , err )
178
+ }
179
+ }
180
+
181
+ // SleepDurations returns the min and max sleep durations for the worker when idle.
182
+ // These values determine how long the worker should wait before checking for new work
183
+ // when there are no jobs to process.
154
184
func (cw * ConnectionWorker ) SleepDurations () (min , max time.Duration ) {
155
185
return time .Millisecond * 100 , time .Second * 5
156
186
}
157
187
158
- // Stop is a no-op since this worker doesn't have internal goroutines
188
+ // Stop cleans up any resources used by the worker.
189
+ // Currently a no-op as ConnectionWorker doesn't maintain any persistent resources.
159
190
func (cw * ConnectionWorker ) Stop () {
160
- // No-op
191
+ // No cleanup needed
161
192
}
0 commit comments