Skip to content

Commit b16b7c1

Browse files
authored
Merge branch 'master' into chore.cleanup_router_oauthV2
2 parents 5177a8b + dbc830f commit b16b7c1

15 files changed

+809
-267
lines changed

router/batchrouter/batchrouter_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ func TestPostToWarehouse(t *testing.T) {
472472
responseBody: "OK",
473473
responseCode: http.StatusOK,
474474

475-
expectedPayload: `{"WorkspaceID":"test-workspace","Schema":{"tracks":{"id":"string"}},"BatchDestination":{"Source":{"ID":""},"Destination":{"ID":""}},"Location":"","FirstEventAt":"","LastEventAt":"","TotalEvents":1,"TotalBytes":200,"UseRudderStorage":false,"DestinationRevisionID":"","SourceTaskRunID":"","SourceJobID":"","SourceJobRunID":"","TimeWindow":"0001-01-01T00:00:00Z"}`,
475+
expectedPayload: `{"WorkspaceID":"test-workspace","Schema":{"tracks":{"id":"string"}},"BatchDestination":{"Source":{"ID":""},"Destination":{"ID":""}},"Location":"","FirstEventAt":"","LastEventAt":"","TotalEvents":1,"TotalBytes":200,"UseRudderStorage":false,"DestinationRevisionID":"","SourceTaskRunID":"","SourceJobID":"","SourceJobRunID":"","TimeWindow":"0001-01-01T00:00:00Z","BytesPerTable": {"tracks": 200}}`,
476476
},
477477
{
478478
name: "should fail to post to warehouse",
@@ -526,6 +526,9 @@ func TestPostToWarehouse(t *testing.T) {
526526
err := job.pingWarehouse(&batchJobs, UploadResult{
527527
TotalEvents: 1,
528528
TotalBytes: 200,
529+
BytesPerTable: map[string]int64{
530+
"tracks": 200,
531+
},
529532
})
530533
if input.expectedError != nil {
531534
require.Equal(t, fmt.Sprintf(input.expectedError.Error(), ts.URL), err.Error())

router/batchrouter/handle.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,8 @@ func (brt *Handle) upload(provider string, batchJobs *BatchedJobs, isWarehouse b
307307
warehouseConnIdentifier := brt.connectionWHNamespaceMap[connIdentifier]
308308
brt.configSubscriberMu.RUnlock()
309309
var totalBytes int
310+
bytesPerTable := make(map[string]int64)
311+
310312
for _, job := range batchJobs.Jobs {
311313
// do not add to staging file if the event is a rudder_identity_merge_rules record
312314
// and has been previously added to it
@@ -335,12 +337,20 @@ func (brt *Handle) upload(provider string, batchJobs *BatchedJobs, isWarehouse b
335337
eventsFound = true
336338
line := string(job.EventPayload) + "\n"
337339
totalBytes += len(line)
340+
if isWarehouse {
341+
tableName := gjson.GetBytes(job.EventPayload, "metadata.table").String()
342+
bytesPerTable[tableName] += int64(len(line))
343+
}
338344
_ = gzWriter.WriteGZ(line)
339345
}
340346
} else {
341347
eventsFound = true
342348
line := string(job.EventPayload) + "\n"
343349
totalBytes += len(line)
350+
if isWarehouse {
351+
tableName := gjson.GetBytes(job.EventPayload, "metadata.table").String()
352+
bytesPerTable[tableName] += int64(len(line))
353+
}
344354
_ = gzWriter.WriteGZ(line)
345355
}
346356
}
@@ -490,6 +500,7 @@ func (brt *Handle) upload(provider string, batchJobs *BatchedJobs, isWarehouse b
490500
LastEventAt: lastEventAt,
491501
TotalEvents: len(batchJobs.Jobs) - dedupedIDMergeRuleJobs,
492502
TotalBytes: totalBytes,
503+
BytesPerTable: bytesPerTable,
493504
UseRudderStorage: useRudderStorage,
494505
}
495506
}
@@ -513,6 +524,7 @@ func (brt *Handle) pingWarehouse(batchJobs *BatchedJobs, output UploadResult) (e
513524
LastEventAt: output.LastEventAt,
514525
TotalEvents: output.TotalEvents,
515526
TotalBytes: output.TotalBytes,
527+
BytesPerTable: output.BytesPerTable,
516528
UseRudderStorage: output.UseRudderStorage,
517529
SourceTaskRunID: sampleParameters.SourceTaskRunID,
518530
SourceJobID: sampleParameters.SourceJobID,

router/batchrouter/handle_test.go

Lines changed: 154 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,17 @@ import (
55
"testing"
66

77
"github.com/stretchr/testify/require"
8+
"go.uber.org/mock/gomock"
89

9-
"github.com/rudderlabs/rudder-go-kit/config"
10+
"github.com/rudderlabs/rudder-go-kit/filemanager"
11+
"github.com/rudderlabs/rudder-go-kit/filemanager/mock_filemanager"
12+
"github.com/rudderlabs/rudder-go-kit/logger"
13+
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
1014
"github.com/rudderlabs/rudder-server/jobsdb"
15+
mocksJobsDB "github.com/rudderlabs/rudder-server/mocks/jobsdb"
16+
"github.com/rudderlabs/rudder-server/utils/timeutil"
17+
18+
"github.com/rudderlabs/rudder-go-kit/config"
1119
"github.com/rudderlabs/rudder-server/jsonrs"
1220
)
1321

@@ -430,3 +438,148 @@ func TestGenerateSchemaMap(t *testing.T) {
430438
})
431439
}
432440
}
441+
442+
type testCase struct {
443+
name string
444+
jobs []*jobsdb.JobT
445+
expectedTableBytes map[string]int64
446+
expectedTotalBytes int
447+
isWarehouse bool
448+
}
449+
450+
func TestBytesPerTable(t *testing.T) {
451+
newHandle := func(isWarehouse bool) *Handle {
452+
mockCtrl := gomock.NewController(t)
453+
mockFileManager := mock_filemanager.NewMockFileManager(mockCtrl)
454+
mockFileManagerFactory := func(settings *filemanager.Settings) (filemanager.FileManager, error) { return mockFileManager, nil }
455+
mockFileManager.EXPECT().Prefix().Return("mockPrefix")
456+
mockFileObjects := []*filemanager.FileInfo{}
457+
mockFileManager.EXPECT().ListFilesWithPrefix(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(filemanager.MockListSession(mockFileObjects, nil))
458+
mockFileManager.EXPECT().Upload(gomock.Any(), gomock.Any(), gomock.Any()).Return(filemanager.UploadedFile{Location: "local", ObjectName: "file"}, nil)
459+
jobsDB := mocksJobsDB.NewMockJobsDB(mockCtrl)
460+
if !isWarehouse {
461+
jobsDB.EXPECT().JournalMarkStart(gomock.Any(), gomock.Any()).Times(1).Return(int64(1), nil)
462+
}
463+
return &Handle{
464+
logger: logger.NewLogger().Child("batchrouter"),
465+
fileManagerFactory: mockFileManagerFactory,
466+
datePrefixOverride: config.GetReloadableStringVar("", "BatchRouter.datePrefixOverride"),
467+
customDatePrefix: config.GetReloadableStringVar("", "BatchRouter.customDatePrefix"),
468+
dateFormatProvider: &storageDateFormatProvider{dateFormatsCache: make(map[string]string)},
469+
conf: config.New(),
470+
now: timeutil.Now,
471+
jobsDB: jobsDB,
472+
}
473+
}
474+
475+
tests := []testCase{
476+
{
477+
name: "single table warehouse upload",
478+
jobs: []*jobsdb.JobT{
479+
{
480+
EventPayload: []byte(`{
481+
"metadata": {
482+
"table": "users",
483+
},
484+
}`),
485+
},
486+
{
487+
EventPayload: []byte(`{
488+
"metadata": {
489+
"table": "users",
490+
},
491+
}`),
492+
},
493+
},
494+
expectedTableBytes: map[string]int64{
495+
"users": 126,
496+
},
497+
expectedTotalBytes: 126,
498+
isWarehouse: true,
499+
},
500+
{
501+
name: "multiple tables warehouse upload",
502+
jobs: []*jobsdb.JobT{
503+
{
504+
EventPayload: []byte(`{
505+
"metadata": {
506+
"table": "users1",
507+
},
508+
}`),
509+
},
510+
{
511+
EventPayload: []byte(`{
512+
"metadata": {
513+
"table": "users2",
514+
},
515+
}`),
516+
},
517+
{
518+
EventPayload: []byte(`{
519+
"metadata": {
520+
"table": "users1",
521+
},
522+
}`),
523+
},
524+
{
525+
EventPayload: []byte(`{
526+
"metadata": {
527+
"table": "users3",
528+
},
529+
}`),
530+
},
531+
{
532+
EventPayload: []byte(`{}`),
533+
},
534+
},
535+
expectedTableBytes: map[string]int64{
536+
"users1": 128,
537+
"users2": 64,
538+
"users3": 64,
539+
"": 3,
540+
},
541+
expectedTotalBytes: 259,
542+
isWarehouse: true,
543+
},
544+
{
545+
name: "non-warehouse upload",
546+
jobs: []*jobsdb.JobT{
547+
{
548+
EventPayload: []byte(`{
549+
"metadata": {
550+
"table": "users",
551+
},
552+
}`),
553+
},
554+
},
555+
expectedTotalBytes: 63,
556+
isWarehouse: false,
557+
},
558+
}
559+
560+
for _, tc := range tests {
561+
t.Run(tc.name, func(t *testing.T) {
562+
batchedJobs := &BatchedJobs{
563+
Jobs: tc.jobs,
564+
Connection: &Connection{
565+
Source: backendconfig.SourceT{
566+
ID: "test-source",
567+
},
568+
Destination: backendconfig.DestinationT{
569+
ID: "test-destination",
570+
},
571+
},
572+
}
573+
handle := newHandle(tc.isWarehouse)
574+
result := handle.upload("S3", batchedJobs, tc.isWarehouse)
575+
576+
// Verify results
577+
require.Equal(t, tc.expectedTotalBytes, result.TotalBytes, "total bytes mismatch")
578+
if tc.isWarehouse {
579+
require.Equal(t, tc.expectedTableBytes, result.BytesPerTable, "bytes per table mismatch")
580+
} else {
581+
require.Len(t, result.BytesPerTable, 0)
582+
}
583+
})
584+
}
585+
}

router/batchrouter/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type UploadResult struct {
4444
LastEventAt string
4545
TotalEvents int
4646
TotalBytes int
47+
BytesPerTable map[string]int64
4748
UseRudderStorage bool
4849
}
4950

services/notifier/notifier.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ const (
3535
type JobType string
3636

3737
const (
38-
JobTypeUpload JobType = "upload"
39-
JobTypeAsync JobType = "async_job"
38+
JobTypeUpload JobType = "upload"
39+
JobTypeUploadV2 JobType = "upload_v2"
40+
JobTypeAsync JobType = "async_job"
4041
)
4142

4243
type JobStatus string

warehouse/client/warehouse.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type StagingFile struct {
3030
LastEventAt string
3131
TotalEvents int
3232
TotalBytes int
33+
BytesPerTable map[string]int64
3334
UseRudderStorage bool
3435
DestinationRevisionID string
3536
// cloud sources specific info
@@ -50,6 +51,7 @@ type legacyPayload struct {
5051
LastEventAt string
5152
TotalEvents int
5253
TotalBytes int
54+
BytesPerTable map[string]int64
5355
UseRudderStorage bool
5456
DestinationRevisionID string
5557
// cloud sources specific info
@@ -105,6 +107,7 @@ func (w *Warehouse) Process(ctx context.Context, stagingFile StagingFile) error
105107
LastEventAt: stagingFile.LastEventAt,
106108
TotalEvents: stagingFile.TotalEvents,
107109
TotalBytes: stagingFile.TotalBytes,
110+
BytesPerTable: stagingFile.BytesPerTable,
108111
UseRudderStorage: stagingFile.UseRudderStorage,
109112
DestinationRevisionID: stagingFile.DestinationRevisionID,
110113
SourceTaskRunID: stagingFile.SourceTaskRunID,

warehouse/client/warehouse_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ func TestWarehouse(t *testing.T) {
6767
SourceJobID: "<source-job-id>",
6868
SourceJobRunID: "<source-job-run-id>",
6969
TimeWindow: time.Date(1, 1, 1, 0, 40, 0, 0, time.UTC),
70+
BytesPerTable: map[string]int64{
71+
"product_track": 1000,
72+
"tracks": 1000,
73+
},
7074
Schema: map[string]map[string]string{
7175
"product_track": {
7276
"context_destination_id": "string",

warehouse/internal/api/http_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func filterPayload(text, match string) string {
6161
}
6262

6363
func TestAPI_Process(t *testing.T) {
64-
body := loadFile(t, "./testdata/process_request_v2.json")
64+
body := loadFile(t, "./testdata/process_request.json")
6565
expectedStagingFile := model.StagingFileWithSchema{
6666
StagingFile: model.StagingFile{
6767
ID: 0,

warehouse/internal/api/testdata/process_request.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@
5858
"LastEventAt": "2022-11-08T13:23:07Z",
5959
"TotalEvents": 2,
6060
"TotalBytes": 2000,
61+
"BytesPerTable": {
62+
"product_track": 1000,
63+
"tracks": 1000
64+
},
6165
"UseRudderStorage": false,
6266
"DestinationRevisionID": "2H1cLBvL3v0prRBNzpe8D34XTzU",
6367
"SourceTaskRunID": "<source-task-run-id>",

warehouse/internal/api/testdata/process_request_v2.json

Lines changed: 0 additions & 71 deletions
This file was deleted.

0 commit comments

Comments
 (0)