Skip to content

Commit 6f58ad5

Browse files
koladilipatzoum
andauthored
fix: group jobs by dest id to transformer from router (#5598)
# Description OAuth v2 assumes all jobs in batch are related to a single destination ID, and that only happens if isolation mode is destination, but in hosted we have isolation mode set to none so we are grouping jobs by destination before going to transformer. Resolves INT-3394 ## Linear Ticket https://linear.app/rudderstack/issue/INT-3394 ## Security - [ ] The code changed/added as part of this pull request won't create any security issues with how the software is being used. --------- Co-authored-by: Aris Tzoumas <atzoumas@rudderstack.com>
1 parent 456a62c commit 6f58ad5

File tree

6 files changed

+303
-13
lines changed

6 files changed

+303
-13
lines changed

router/handle.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ type Handle struct {
8989
netHandle NetHandle
9090
customDestinationManager customDestinationManager.DestinationManager
9191
transformer transformer.Transformer
92+
isOAuthDestination bool
9293
oauth oauth.Authorizer
9394
destinationsMapMu sync.RWMutex
9495
destinationsMap map[string]*routerutils.DestinationWithSources // destinationID -> destination

router/handle_lifecycle.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ func (rt *Handle) Setup(
6767
rt.jobsDB = jobsDB
6868
rt.errorDB = errorDB
6969
rt.destType = destType
70-
7170
rt.drainer = routerutils.NewDrainer(
7271
config,
7372
func(destinationID string) (*routerutils.DestinationWithSources, bool) {
@@ -125,7 +124,7 @@ func (rt *Handle) Setup(
125124
backendConfig, rt.reloadableConfig.oauthV2Enabled,
126125
rt.reloadableConfig.oauthV2ExpirationTimeDiff,
127126
)
128-
127+
rt.isOAuthDestination = oauth.IsOAuthDestination(destinationDefinition.Config)
129128
rt.oauth = oauth.NewOAuthErrorHandler(backendConfig)
130129

131130
rt.isBackendConfigInitialized = false

router/worker.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func (w *worker) workLoop() {
203203
logger.NewBoolField("oauthV2Enabled", oauthV2Enabled),
204204
obskit.DestinationType(destination.DestinationDefinition.Name),
205205
)
206-
if authType := oauth.GetAuthType(destination.DestinationDefinition.Config); authType == oauth.OAuth && !oauthV2Enabled {
206+
if w.rt.isOAuthDestination && !oauthV2Enabled {
207207
rudderAccountID := oauth.GetAccountId(destination.Config, oauth.DeliveryAccountIdKey)
208208

209209
if routerutils.IsNotEmptyString(rudderAccountID) {
@@ -272,6 +272,29 @@ func (w *worker) workLoop() {
272272
}
273273
}
274274

275+
func (w *worker) transformJobs(routerJobs []types.RouterJobT) []types.DestinationJobT {
276+
w.rt.routerTransformInputCountStat.Count(len(routerJobs))
277+
destinationJobs := w.rt.transformer.Transform(
278+
transformer.ROUTER_TRANSFORM,
279+
&types.TransformMessageT{Data: routerJobs, DestType: strings.ToLower(w.rt.destType)},
280+
)
281+
w.rt.routerTransformOutputCountStat.Count(len(destinationJobs))
282+
w.recordStatsForFailedTransforms("routerTransform", destinationJobs)
283+
return destinationJobs
284+
}
285+
286+
func (w *worker) transformJobsPerDestination(routerJobs []types.RouterJobT) []types.DestinationJobT {
287+
destinationJobs := make([]types.DestinationJobT, 0, len(routerJobs))
288+
destinationIDRouterJobsMap := lo.GroupBy(routerJobs, func(job types.RouterJobT) string {
289+
return job.Destination.ID
290+
})
291+
for _, destinationIDRouterJobs := range destinationIDRouterJobsMap {
292+
destinationJobs = append(destinationJobs, w.transformJobs(destinationIDRouterJobs)...)
293+
}
294+
295+
return destinationJobs
296+
}
297+
275298
func (w *worker) transform(routerJobs []types.RouterJobT) []types.DestinationJobT {
276299
// transform limiter with dynamic priority
277300
start := time.Now()
@@ -306,14 +329,11 @@ func (w *worker) transform(routerJobs []types.RouterJobT) []types.DestinationJob
306329
w.rt.logger.Debugn("traceParent is empty during router transform", logger.NewIntField("jobId", job.JobMetadata.JobID))
307330
}
308331
}
309-
w.rt.routerTransformInputCountStat.Count(len(routerJobs))
310-
destinationJobs := w.rt.transformer.Transform(
311-
transformer.ROUTER_TRANSFORM,
312-
&types.TransformMessageT{Data: routerJobs, DestType: strings.ToLower(w.rt.destType)},
313-
)
314-
w.rt.routerTransformOutputCountStat.Count(len(destinationJobs))
315-
w.recordStatsForFailedTransforms("routerTransform", destinationJobs)
316-
return destinationJobs
332+
333+
if w.rt.isOAuthDestination && w.rt.reloadableConfig.oauthV2Enabled.Load() {
334+
return w.transformJobsPerDestination(routerJobs)
335+
}
336+
return w.transformJobs(routerJobs)
317337
}
318338

319339
func (w *worker) batchTransform(routerJobs []types.RouterJobT) []types.DestinationJobT {
@@ -792,8 +812,7 @@ func (w *worker) proxyRequest(ctx context.Context, destinationJob types.Destinat
792812
proxyRequestResponse := w.rt.transformer.ProxyRequest(ctx, proxyReqparams)
793813
w.routerProxyStat.SendTiming(time.Since(rtlTime))
794814
w.logger.Debugf(`[TransformerProxy] (Dest-%[1]v) {Job - %[2]v} Request ended`, w.rt.destType, jobID)
795-
authType := oauth.GetAuthType(destinationJob.Destination.DestinationDefinition.Config)
796-
if authType != oauth.OAuth {
815+
if !oauth.IsOAuthDestination(destinationJob.Destination.DestinationDefinition.Config) {
797816
return proxyRequestResponse
798817
}
799818
if proxyRequestResponse.ProxyRequestStatusCode != http.StatusOK && !oauthV2Enabled {

router/worker_test.go

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"math"
8+
"sync"
79
"testing"
810

11+
"github.com/samber/lo"
912
"github.com/stretchr/testify/require"
1013
"go.uber.org/mock/gomock"
1114

1215
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
1316
"github.com/rudderlabs/rudder-server/enterprise/reporting"
1417
"github.com/rudderlabs/rudder-server/processor/integrations"
18+
"github.com/rudderlabs/rudder-server/router/internal/partition"
1519
"github.com/rudderlabs/rudder-server/router/throttler"
1620
"github.com/rudderlabs/rudder-server/router/transformer"
1721
"github.com/rudderlabs/rudder-server/router/types"
@@ -22,6 +26,7 @@ import (
2226
"github.com/rudderlabs/rudder-go-kit/config"
2327
"github.com/rudderlabs/rudder-go-kit/logger"
2428
"github.com/rudderlabs/rudder-go-kit/stats"
29+
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
2530
mocksRouter "github.com/rudderlabs/rudder-server/mocks/router"
2631
mocksTransformer "github.com/rudderlabs/rudder-server/mocks/router/transformer"
2732
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
@@ -456,3 +461,240 @@ var _ = Describe("Proxy Request", func() {
456461
})
457462
})
458463
})
464+
465+
func TestTransformForOAuthV2Destination(t *testing.T) {
466+
initRouter()
467+
mockCtrl := gomock.NewController(t)
468+
defer mockCtrl.Finish()
469+
470+
mockTransformer := mocksTransformer.NewMockTransformer(mockCtrl)
471+
472+
worker := &worker{
473+
rt: &Handle{
474+
transformer: mockTransformer,
475+
destType: "some_dest_type",
476+
logger: logger.NOP,
477+
routerTransformInputCountStat: stats.NOP.NewTaggedStat("router_transform_input_count", stats.CountType, stats.Tags{"destType": "some_dest_type"}),
478+
routerTransformOutputCountStat: stats.NOP.NewTaggedStat("router_transform_output_count", stats.CountType, stats.Tags{"destType": "some_dest_type"}),
479+
isOAuthDestination: true,
480+
reloadableConfig: &reloadableConfig{
481+
oauthV2Enabled: config.GetReloadableBoolVar(true),
482+
},
483+
},
484+
}
485+
var limiterWg sync.WaitGroup
486+
ctx, cancel := context.WithCancel(context.Background())
487+
defer limiterWg.Wait()
488+
defer cancel()
489+
worker.rt.limiter.transform = kitsync.NewLimiter(ctx, &limiterWg, "transform", math.MaxInt, stats.Default)
490+
worker.rt.limiter.stats.transform = partition.NewStats()
491+
492+
routerJobs := []types.RouterJobT{
493+
{
494+
Destination: backendconfig.DestinationT{
495+
ID: "d1",
496+
},
497+
Message: json.RawMessage(`{"event": "d1-test1"}`),
498+
JobMetadata: types.JobMetadataT{
499+
JobID: 1,
500+
},
501+
},
502+
{
503+
Destination: backendconfig.DestinationT{
504+
ID: "d2",
505+
},
506+
Message: json.RawMessage(`{"event": "d2-test2"}`),
507+
JobMetadata: types.JobMetadataT{
508+
JobID: 2,
509+
},
510+
},
511+
{
512+
Destination: backendconfig.DestinationT{
513+
ID: "d1",
514+
},
515+
Message: json.RawMessage(`{"event": "d1-test3"}`),
516+
JobMetadata: types.JobMetadataT{
517+
JobID: 3,
518+
},
519+
},
520+
{
521+
Destination: backendconfig.DestinationT{
522+
ID: "d3",
523+
},
524+
Message: json.RawMessage(`{"event": "d3-test4"}`),
525+
JobMetadata: types.JobMetadataT{
526+
JobID: 4,
527+
},
528+
},
529+
{
530+
Destination: backendconfig.DestinationT{
531+
ID: "d1",
532+
},
533+
Message: json.RawMessage(`{"event": "d1-test5"}`),
534+
JobMetadata: types.JobMetadataT{
535+
JobID: 5,
536+
},
537+
},
538+
{
539+
Destination: backendconfig.DestinationT{
540+
ID: "d2",
541+
},
542+
Message: json.RawMessage(`{"event": "d2-test6"}`),
543+
JobMetadata: types.JobMetadataT{
544+
JobID: 6,
545+
},
546+
},
547+
}
548+
mockTransformer.EXPECT().Transform(transformer.ROUTER_TRANSFORM, &types.TransformMessageT{
549+
Data: []types.RouterJobT{routerJobs[0], routerJobs[2], routerJobs[4]},
550+
DestType: worker.rt.destType,
551+
}).Return([]types.DestinationJobT{
552+
{
553+
Destination: backendconfig.DestinationT{
554+
ID: "d1",
555+
},
556+
Message: json.RawMessage(`{"event": ["d1-test1", "d1-test3"]}`),
557+
JobMetadataArray: []types.JobMetadataT{
558+
{
559+
JobID: 1,
560+
},
561+
{
562+
JobID: 3,
563+
},
564+
},
565+
},
566+
{
567+
Destination: backendconfig.DestinationT{
568+
ID: "d1",
569+
},
570+
Message: json.RawMessage(`{"event": [ "d1-test5"]}`),
571+
JobMetadataArray: []types.JobMetadataT{
572+
{
573+
JobID: 5,
574+
},
575+
},
576+
},
577+
})
578+
mockTransformer.EXPECT().Transform(transformer.ROUTER_TRANSFORM, &types.TransformMessageT{
579+
Data: []types.RouterJobT{routerJobs[1], routerJobs[5]},
580+
DestType: worker.rt.destType,
581+
}).Return([]types.DestinationJobT{
582+
{
583+
Destination: backendconfig.DestinationT{
584+
ID: "d2",
585+
},
586+
Message: json.RawMessage(`{"event": ["d2-test2", "d2-test6"]}`),
587+
JobMetadataArray: []types.JobMetadataT{
588+
{
589+
JobID: 2,
590+
},
591+
{
592+
JobID: 6,
593+
},
594+
},
595+
},
596+
})
597+
mockTransformer.EXPECT().Transform(transformer.ROUTER_TRANSFORM, &types.TransformMessageT{
598+
Data: []types.RouterJobT{routerJobs[3]},
599+
DestType: worker.rt.destType,
600+
}).Return([]types.DestinationJobT{
601+
{
602+
Destination: backendconfig.DestinationT{
603+
ID: "d3",
604+
},
605+
Message: json.RawMessage(`{"event": ["d3-test4"]}`),
606+
JobMetadataArray: []types.JobMetadataT{
607+
{
608+
JobID: 4,
609+
},
610+
},
611+
},
612+
})
613+
destinationJobs := worker.transform(routerJobs)
614+
require.Equal(t, 4, len(destinationJobs))
615+
destinationIDJobsMap := lo.GroupBy(destinationJobs, func(job types.DestinationJobT) string {
616+
return job.Destination.ID
617+
})
618+
require.Equal(t, 3, len(destinationIDJobsMap))
619+
require.Equal(t, 2, len(destinationIDJobsMap["d1"]))
620+
require.Equal(t, 1, len(destinationIDJobsMap["d2"]))
621+
require.Equal(t, 1, len(destinationIDJobsMap["d3"]))
622+
}
623+
624+
func TestTransformForNonOAuthDestination(t *testing.T) {
625+
initRouter()
626+
mockCtrl := gomock.NewController(t)
627+
defer mockCtrl.Finish()
628+
629+
mockTransformer := mocksTransformer.NewMockTransformer(mockCtrl)
630+
631+
worker := &worker{
632+
rt: &Handle{
633+
transformer: mockTransformer,
634+
destType: "some_dest_type",
635+
logger: logger.NOP,
636+
routerTransformInputCountStat: stats.NOP.NewTaggedStat("router_transform_input_count", stats.CountType, stats.Tags{"destType": "some_dest_type"}),
637+
routerTransformOutputCountStat: stats.NOP.NewTaggedStat("router_transform_output_count", stats.CountType, stats.Tags{"destType": "some_dest_type"}),
638+
isOAuthDestination: false,
639+
reloadableConfig: &reloadableConfig{
640+
oauthV2Enabled: config.GetReloadableBoolVar(true),
641+
},
642+
},
643+
}
644+
var limiterWg sync.WaitGroup
645+
ctx, cancel := context.WithCancel(context.Background())
646+
defer limiterWg.Wait()
647+
defer cancel()
648+
worker.rt.limiter.transform = kitsync.NewLimiter(ctx, &limiterWg, "transform", math.MaxInt, stats.Default)
649+
worker.rt.limiter.stats.transform = partition.NewStats()
650+
651+
routerJobs := []types.RouterJobT{
652+
{
653+
Destination: backendconfig.DestinationT{
654+
ID: "d1",
655+
},
656+
Message: json.RawMessage(`{"event": "d1-test1"}`),
657+
JobMetadata: types.JobMetadataT{
658+
JobID: 1,
659+
},
660+
},
661+
{
662+
Destination: backendconfig.DestinationT{
663+
ID: "d2",
664+
},
665+
Message: json.RawMessage(`{"event": "d2-test2"}`),
666+
JobMetadata: types.JobMetadataT{
667+
JobID: 2,
668+
},
669+
},
670+
}
671+
mockTransformer.EXPECT().Transform(transformer.ROUTER_TRANSFORM, &types.TransformMessageT{
672+
Data: routerJobs,
673+
DestType: worker.rt.destType,
674+
}).Return([]types.DestinationJobT{
675+
{
676+
Destination: backendconfig.DestinationT{
677+
ID: "d1",
678+
},
679+
Message: json.RawMessage(`{"event": ["d1-test1"]}`),
680+
JobMetadataArray: []types.JobMetadataT{
681+
{
682+
JobID: 1,
683+
},
684+
},
685+
},
686+
{
687+
Destination: backendconfig.DestinationT{
688+
ID: "d2",
689+
},
690+
Message: json.RawMessage(`{"event": [ "d2-test2"]}`),
691+
JobMetadataArray: []types.JobMetadataT{
692+
{
693+
JobID: 2,
694+
},
695+
},
696+
},
697+
})
698+
699+
worker.transform(routerJobs)
700+
}

services/oauth/oauth.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,11 @@ func GetAuthType(config map[string]interface{}) AuthType {
168168
return AuthType(authType)
169169
}
170170

171+
func IsOAuthDestination(config map[string]interface{}) bool {
172+
authType := GetAuthType(config)
173+
return authType == OAuth
174+
}
175+
171176
// This function creates a new OauthErrorResponseHandler
172177
func NewOAuthErrorHandler(provider tokenProvider, options ...func(*OAuthErrResHandler)) *OAuthErrResHandler {
173178
oAuthErrResHandler := &OAuthErrResHandler{

0 commit comments

Comments
 (0)