Skip to content

Commit de3e87c

Browse files
authored
feat: client side load balancing for user transformations (#5375)
1 parent e504d75 commit de3e87c

File tree

5 files changed

+705
-619
lines changed

5 files changed

+705
-619
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ require (
3535
github.com/apache/pulsar-client-go v0.14.0
3636
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
3737
github.com/aws/aws-sdk-go v1.55.5
38+
github.com/bufbuild/httplb v0.3.0
3839
github.com/cenkalti/backoff v2.2.1+incompatible
3940
github.com/cenkalti/backoff/v4 v4.3.0
4041
github.com/confluentinc/confluent-kafka-go/v2 v2.6.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,8 @@ github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdb
357357
github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
358358
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
359359
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
360+
github.com/bufbuild/httplb v0.3.0 h1:sCMPD+89ydD3atcVareDsiv/kUT+pLHolENMoCGZJV8=
361+
github.com/bufbuild/httplb v0.3.0/go.mod h1:qDNs7dSFxIhKi/DA/rCCPVzbQfHs1JVxPMl9EvrbL4Q=
360362
github.com/buger/goterm v1.0.4 h1:Z9YvGmOih81P0FbVtEYTFF6YsSgxSUKEhf/f9bTMXbY=
361363
github.com/buger/goterm v1.0.4/go.mod h1:HiFWV3xnkolgrBV3mY8m0X0Pumt4zg4QhbdOzQtB8tE=
362364
github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=

processor/transformer/transformer.go

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"fmt"
99
"io"
10+
"net"
1011
"net/http"
1112
"os"
1213
"runtime/trace"
@@ -15,6 +16,8 @@ import (
1516
"sync"
1617
"time"
1718

19+
"github.com/bufbuild/httplb"
20+
"github.com/bufbuild/httplb/resolver"
1821
"github.com/cenkalti/backoff"
1922
jsoniter "github.com/json-iterator/go"
2023
"github.com/samber/lo"
@@ -26,6 +29,7 @@ import (
2629
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
2730
"github.com/rudderlabs/rudder-server/processor/integrations"
2831
"github.com/rudderlabs/rudder-server/utils/httputil"
32+
"github.com/rudderlabs/rudder-server/utils/sysUtils"
2933
"github.com/rudderlabs/rudder-server/utils/types"
3034
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
3135
)
@@ -136,9 +140,9 @@ type Response struct {
136140

137141
type Opt func(*handle)
138142

139-
func WithClient(client *http.Client) Opt {
143+
func WithClient(client HTTPDoer) Opt {
140144
return func(s *handle) {
141-
s.client = client
145+
s.httpClient = client
142146
}
143147
}
144148

@@ -149,6 +153,10 @@ type Transformer interface {
149153
Validate(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
150154
}
151155

156+
type HTTPDoer interface {
157+
Do(req *http.Request) (*http.Response, error)
158+
}
159+
152160
// handle is the handle for this class
153161
type handle struct {
154162
sentStat stats.Measurement
@@ -159,7 +167,7 @@ type handle struct {
159167
logger logger.Logger
160168
stat stats.Stats
161169

162-
client *http.Client
170+
httpClient HTTPDoer
163171

164172
guardConcurrency chan struct{}
165173

@@ -211,16 +219,41 @@ func NewTransformer(conf *config.Config, log logger.Logger, stat stats.Stats, op
211219

212220
trans.guardConcurrency = make(chan struct{}, trans.config.maxConcurrency)
213221

214-
if trans.client == nil {
215-
trans.client = &http.Client{
216-
Transport: &http.Transport{
217-
DisableKeepAlives: trans.config.disableKeepAlives,
218-
MaxConnsPerHost: trans.config.maxHTTPConnections,
219-
MaxIdleConnsPerHost: trans.config.maxHTTPIdleConnections,
220-
IdleConnTimeout: trans.config.maxIdleConnDuration,
221-
},
222-
Timeout: trans.config.timeoutDuration,
223-
}
222+
clientType := conf.GetString("Transformer.Client.type", "stdlib")
223+
224+
transport := &http.Transport{
225+
DisableKeepAlives: trans.config.disableKeepAlives,
226+
MaxConnsPerHost: trans.config.maxHTTPConnections,
227+
MaxIdleConnsPerHost: trans.config.maxHTTPIdleConnections,
228+
IdleConnTimeout: trans.config.maxIdleConnDuration,
229+
}
230+
client := &http.Client{
231+
Transport: transport,
232+
Timeout: trans.config.timeoutDuration,
233+
}
234+
235+
switch clientType {
236+
case "stdlib":
237+
trans.httpClient = client
238+
case "recycled":
239+
trans.httpClient = sysUtils.NewRecycledHTTPClient(func() *http.Client {
240+
return client
241+
}, config.GetDuration("Transformer.Client.ttl", 120, time.Second))
242+
case "httplb":
243+
trans.httpClient = httplb.NewClient(
244+
httplb.WithTransport("http", &HTTPLBTransport{
245+
Transport: transport,
246+
}),
247+
httplb.WithResolver(
248+
resolver.NewDNSResolver(
249+
net.DefaultResolver,
250+
resolver.PreferIPv6,
251+
config.GetDuration("Transformer.Client.ttl", 120, time.Second), // TTL value
252+
),
253+
),
254+
)
255+
default:
256+
panic(fmt.Sprintf("unknown transformer client type: %s", clientType))
224257
}
225258

226259
for _, opt := range opts {
@@ -245,6 +278,14 @@ func (trans *handle) Validate(ctx context.Context, clientEvents []TransformerEve
245278
return trans.transform(ctx, clientEvents, trans.trackingPlanValidationURL(), batchSize, trackingPlanValidationStage)
246279
}
247280

281+
type HTTPLBTransport struct {
282+
*http.Transport
283+
}
284+
285+
func (t *HTTPLBTransport) NewRoundTripper(scheme, target string, config httplb.TransportConfig) httplb.RoundTripperResult {
286+
return httplb.RoundTripperResult{RoundTripper: t.Transport, Close: t.CloseIdleConnections}
287+
}
288+
248289
func (trans *handle) transform(
249290
ctx context.Context,
250291
clientEvents []TransformerEvent,
@@ -474,7 +515,7 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri
474515
// Header to let transformer know that the client understands event filter code
475516
req.Header.Set("X-Feature-Filter-Code", "?1")
476517

477-
resp, reqErr = trans.client.Do(req)
518+
resp, reqErr = trans.httpClient.Do(req)
478519
})
479520
trans.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, tags).SendTiming(time.Since(requestStartTime))
480521
if reqErr != nil {
@@ -495,7 +536,6 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri
495536
retryCount++
496537
trans.logger.Warnn(
497538
"JS HTTP connection error",
498-
logger.NewStringField("URL", url),
499539
logger.NewErrorField(err),
500540
logger.NewIntField("attempts", int64(retryCount)),
501541
)

0 commit comments

Comments
 (0)