Skip to content

Commit 01483d5

Browse files
authored
fix: update and extend async event delivery time metrics tests for JobParameters logic (#5975)
# Description This PR updates both the source code and tests related to async event delivery time metrics in the batch router, specifically for the `emitAsyncEventDeliveryTimeMetrics` logic. **Source code changes:** - Refactored `emitAsyncEventDeliveryTimeMetrics` to prefer extracting metric parameters from `JobParameters` on the `JobStatusT` struct, with a fallback to `OriginalJobParameters` if `JobParameters` is missing or invalid. - Improved error logging and robustness when job parameters are missing or malformed. **Test changes:** - Updated all relevant tests to set `JobParameters` on `JobStatusT` to match the value in `OriginalJobParameters`, ensuring the new primary path is exercised. - Added a new test to verify fallback logic when `JobParameters` is empty, confirming that metrics are still emitted using `OriginalJobParameters`. - Ensured that negative and edge cases (such as missing or invalid parameters, or no successful jobs) are still covered and use the correct field. - These changes ensure robust coverage for the new logic and prevent regressions in metric emission for async destinations. ## Linear Ticket https://linear.app/rudderstack/issue/INT-3763/rudderserver-add-event-delivery-metrics-for-async-destinations ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 6fbb5a3 commit 01483d5

File tree

2 files changed

+69
-5
lines changed

2 files changed

+69
-5
lines changed

router/batchrouter/handle_async.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,8 @@ func (brt *Handle) pollAsyncStatus(ctx context.Context) {
323323
}
324324
statusList, err := brt.updatePollStatusToDB(ctx, destinationID, sourceID, importingJob, pollResp)
325325
if err == nil {
326-
brt.asyncDestinationStruct[destinationID].UploadInProgress = false
327326
brt.recordAsyncDestinationDeliveryStatus(sourceID, destinationID, statusList)
327+
brt.asyncStructCleanUp(destinationID)
328328
}
329329
}
330330
}
@@ -360,17 +360,19 @@ func (brt *Handle) asyncUploadWorker(ctx context.Context) {
360360
brt.asyncDestinationStruct[destinationID].CanUpload = true
361361
brt.asyncDestinationStruct[destinationID].PartFileNumber++
362362
uploadResponse := brt.asyncDestinationStruct[destinationID].Manager.Upload(brt.asyncDestinationStruct[destinationID])
363-
if uploadResponse.ImportingParameters != nil && len(uploadResponse.ImportingJobIDs) > 0 {
364-
brt.asyncDestinationStruct[destinationID].UploadInProgress = true
365-
}
363+
366364
brt.setMultipleJobStatus(setMultipleJobStatusParams{
367365
AsyncOutput: uploadResponse,
368366
Attempted: true,
369367
AttemptNums: brt.asyncDestinationStruct[destinationID].AttemptNums,
370368
FirstAttemptedAts: brt.asyncDestinationStruct[destinationID].FirstAttemptedAts,
371369
OriginalJobParameters: brt.asyncDestinationStruct[destinationID].OriginalJobParameters,
372370
})
373-
brt.asyncStructCleanUp(destinationID)
371+
if uploadResponse.ImportingParameters != nil && len(uploadResponse.ImportingJobIDs) > 0 {
372+
brt.asyncDestinationStruct[destinationID].UploadInProgress = true
373+
} else {
374+
brt.asyncStructCleanUp(destinationID)
375+
}
374376
}
375377
brt.asyncDestinationStruct[destinationID].UploadMutex.Unlock()
376378
}
@@ -412,6 +414,7 @@ func (brt *Handle) asyncStructCleanUp(destinationID string) {
412414
misc.RemoveFilePaths(brt.asyncDestinationStruct[destinationID].FileName)
413415
brt.asyncDestinationStruct[destinationID].ImportingJobIDs = []int64{}
414416
brt.asyncDestinationStruct[destinationID].FailedJobIDs = []int64{}
417+
brt.asyncDestinationStruct[destinationID].UploadInProgress = false
415418
brt.asyncDestinationStruct[destinationID].Size = 0
416419
brt.asyncDestinationStruct[destinationID].Exists = false
417420
brt.asyncDestinationStruct[destinationID].Count = 0

router/batchrouter/handle_async_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,67 @@ func TestAsyncDestinationManager(t *testing.T) {
284284
}()
285285
<-done
286286
})
287+
t.Run("UploadInProgress skips upload", func(t *testing.T) {
288+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
289+
defer cancel()
290+
291+
batchRouter := defaultHandle(destType)
292+
destinationID := "destinationID"
293+
batchRouter.destinationsMap[destinationID] = &routerutils.DestinationWithSources{
294+
Destination: backendconfig.DestinationT{ID: destinationID},
295+
Sources: []backendconfig.SourceT{{ID: "sourceID"}},
296+
}
297+
batchRouter.asyncDestinationStruct[destinationID] = &common.AsyncDestinationStruct{
298+
UploadInProgress: true,
299+
Manager: mockAsyncDestinationManager{
300+
uploadOutput: common.AsyncUploadOutput{ImportingJobIDs: []int64{1}},
301+
},
302+
}
303+
304+
done := make(chan struct{})
305+
go func() {
306+
defer close(done)
307+
batchRouter.asyncUploadWorker(ctx)
308+
}()
309+
<-done
310+
// If no panic, the continue was hit and test passes
311+
})
312+
313+
// Test for line 374: else branch (asyncStructCleanUp)
314+
t.Run("asyncStructCleanUp is called when no ImportingParameters or ImportingJobIDs", func(t *testing.T) {
315+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
316+
defer cancel()
317+
318+
batchRouter := defaultHandle(destType)
319+
destinationID := "destinationID"
320+
batchRouter.destinationsMap[destinationID] = &routerutils.DestinationWithSources{
321+
Destination: backendconfig.DestinationT{ID: destinationID},
322+
Sources: []backendconfig.SourceT{{ID: "sourceID", WorkspaceID: "workspaceID"}},
323+
}
324+
batchRouter.asyncDestinationStruct[destinationID] = &common.AsyncDestinationStruct{
325+
Exists: true,
326+
Manager: mockAsyncDestinationManager{
327+
uploadOutput: common.AsyncUploadOutput{
328+
DestinationID: destinationID,
329+
ImportingParameters: nil,
330+
ImportingJobIDs: []int64{},
331+
},
332+
},
333+
}
334+
batchRouter.uploadIntervalMap[destinationID] = 0
335+
batchRouter.asyncDestinationStruct[destinationID].CanUpload = true
336+
337+
done := make(chan struct{})
338+
go func() {
339+
defer close(done)
340+
// Expected to clean up the asyncDestinationStruct
341+
batchRouter.asyncUploadWorker(ctx)
342+
cancel()
343+
}()
344+
<-done
345+
346+
require.False(t, batchRouter.asyncDestinationStruct[destinationID].Exists)
347+
})
287348
})
288349
t.Run("Poll (StatusCode=StatusBadRequest)", func(t *testing.T) {
289350
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second)

0 commit comments

Comments
 (0)