Skip to content

Commit ef0a15e

Browse files
authored
fix: httplb client doesn't respect connection limits (#5773)
# Description - respecting `MaxConnsPerHost` & `MaxIdleConnsPerHost` in `httplb` transport - removed `recycled` client as it can be emulated using `httplb` - `httplb` client should only use `WithRoundTripperMaxLifetime` in case `{DEST,USER}_TRANSFORM_URL_IS_HEADLESS=false` or `Transformer.Client.recycle: true`. If none is present, `WithRoundTripperMaxLifetime` will not be used ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 61642c8 commit ef0a15e

File tree

9 files changed

+45
-63
lines changed

9 files changed

+45
-63
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ require (
3535
github.com/apache/pulsar-client-go v0.14.0
3636
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
3737
github.com/aws/aws-sdk-go v1.55.7
38-
github.com/bufbuild/httplb v0.3.1
38+
github.com/bufbuild/httplb v0.4.0
3939
github.com/cenkalti/backoff v2.2.1+incompatible
4040
github.com/cenkalti/backoff/v4 v4.3.0
4141
github.com/confluentinc/confluent-kafka-go/v2 v2.8.0

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,8 @@ github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdb
351351
github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
352352
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
353353
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
354-
github.com/bufbuild/httplb v0.3.1 h1:eY3bDouZyqcyEdUL4/NibxoVh7mXCMKVne1859TDxwQ=
355-
github.com/bufbuild/httplb v0.3.1/go.mod h1:oMeYRvMM4jbtYhwIwWwKnrnWxe2eeXrp2sqEqaLkuJs=
354+
github.com/bufbuild/httplb v0.4.0 h1:f3ebi8Dj1tNelLJPf4V5XlblefH/WqwybHYpGODTtbk=
355+
github.com/bufbuild/httplb v0.4.0/go.mod h1:9XDjl/3UvlkOQUKthLlKn92C1/1SuZ3UCiekxZbenck=
356356
github.com/buger/goterm v1.0.4 h1:Z9YvGmOih81P0FbVtEYTFF6YsSgxSUKEhf/f9bTMXbY=
357357
github.com/buger/goterm v1.0.4/go.mod h1:HiFWV3xnkolgrBV3mY8m0X0Pumt4zg4QhbdOzQtB8tE=
358358
github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=

internal/transformer-client/client.go

+36-21
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
package transformerclient
44

55
import (
6-
"context"
76
"net"
87
"net/http"
98
"time"
@@ -12,8 +11,6 @@ import (
1211
"github.com/bufbuild/httplb/conn"
1312
"github.com/bufbuild/httplb/picker"
1413
"github.com/bufbuild/httplb/resolver"
15-
16-
"github.com/rudderlabs/rudder-server/utils/sysUtils"
1714
)
1815

1916
type ClientConfig struct {
@@ -26,10 +23,9 @@ type ClientConfig struct {
2623

2724
ClientTimeout time.Duration // 600*time.Second
2825
ClientTTL time.Duration // 10*time.Second
29-
30-
ClientType string // stdlib(default), recycled, httplb
31-
32-
PickerType string // power_of_two(default), round_robin, least_loaded_random, least_loaded_round_robin, random
26+
ClientType string // stdlib(default), httplb
27+
PickerType string // power_of_two(default), round_robin, least_loaded_random, least_loaded_round_robin, random
28+
Recycle bool // false
3329
}
3430

3531
type Client interface {
@@ -72,22 +68,24 @@ func NewClient(config *ClientConfig) Client {
7268
}
7369

7470
switch config.ClientType {
75-
case "stdlib":
76-
return client
77-
case "recycled":
78-
return sysUtils.NewRecycledHTTPClient(func() *http.Client {
79-
return client
80-
}, clientTTL)
8171
case "httplb":
82-
return httplb.NewClient(
83-
httplb.WithRootContext(context.TODO()),
72+
tr := &httplbtransport{
73+
MaxConnsPerHost: transport.MaxConnsPerHost,
74+
MaxIdleConnsPerHost: transport.MaxIdleConnsPerHost,
75+
}
76+
options := []httplb.ClientOption{
8477
httplb.WithPicker(getPicker(config.PickerType)),
8578
httplb.WithIdleConnectionTimeout(transport.IdleConnTimeout),
8679
httplb.WithRequestTimeout(client.Timeout),
87-
httplb.WithRoundTripperMaxLifetime(transport.IdleConnTimeout),
88-
httplb.WithIdleTransportTimeout(2*transport.IdleConnTimeout),
8980
httplb.WithResolver(resolver.NewDNSResolver(net.DefaultResolver, resolver.PreferIPv4, clientTTL)),
90-
)
81+
httplb.WithTransport("http", tr),
82+
httplb.WithTransport("https", tr),
83+
}
84+
if config.Recycle {
85+
options = append(options, httplb.WithRoundTripperMaxLifetime(transport.IdleConnTimeout))
86+
}
87+
88+
return httplb.NewClient(options...)
9189
default:
9290
return client
9391
}
@@ -110,10 +108,27 @@ func getPicker(pickerType string) func(prev picker.Picker, allConns conn.Conns)
110108
}
111109
}
112110

113-
type HTTPLBTransport struct {
111+
type httplbtransport struct {
112+
MaxConnsPerHost int
113+
MaxIdleConnsPerHost int
114114
*http.Transport
115115
}
116116

117-
func (t *HTTPLBTransport) NewRoundTripper(scheme, target string, config httplb.TransportConfig) httplb.RoundTripperResult {
118-
return httplb.RoundTripperResult{RoundTripper: t.Transport, Close: t.CloseIdleConnections}
117+
func (s httplbtransport) NewRoundTripper(_, _ string, opts httplb.TransportConfig) httplb.RoundTripperResult {
118+
transport := &http.Transport{
119+
Proxy: opts.ProxyFunc,
120+
GetProxyConnectHeader: opts.ProxyConnectHeadersFunc,
121+
DialContext: opts.DialFunc,
122+
ForceAttemptHTTP2: true,
123+
MaxConnsPerHost: s.MaxConnsPerHost,
124+
MaxIdleConns: s.MaxIdleConnsPerHost,
125+
MaxIdleConnsPerHost: s.MaxIdleConnsPerHost,
126+
IdleConnTimeout: opts.IdleConnTimeout,
127+
TLSHandshakeTimeout: opts.TLSHandshakeTimeout,
128+
TLSClientConfig: opts.TLSClientConfig,
129+
MaxResponseHeaderBytes: opts.MaxResponseHeaderBytes,
130+
ExpectContinueTimeout: 1 * time.Second,
131+
DisableCompression: opts.DisableCompression,
132+
}
133+
return httplb.RoundTripperResult{RoundTripper: transport, Close: transport.CloseIdleConnections}
119134
}

processor/internal/transformer/destination_transformer/destination_transformer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats, opts ...Opt)
6464
handle.conf = conf
6565
handle.log = log
6666
handle.stat = stat
67-
handle.client = transformerclient.NewClient(transformerutils.TransformerClientConfig(conf, "DestinationTransformer"))
67+
handle.client = transformerclient.NewClient(transformerutils.TransformerClientConfig(conf, "DestinationTransformer", conf.GetBoolVar(true, "DEST_TRANSFORM_URL_IS_HEADLESS")))
6868
handle.config.destTransformationURL = handle.conf.GetString("DEST_TRANSFORM_URL", "http://localhost:9090")
6969
handle.config.timeoutDuration = conf.GetDuration("HttpClient.procTransformer.timeout", 600, time.Second)
7070
handle.config.maxRetry = conf.GetReloadableIntVar(30, 1, "Processor.DestinationTransformer.maxRetry", "Processor.maxRetry")

processor/internal/transformer/trackingplan_validation/trackingplan_validation.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats, opts ...Opt)
4242
handle.conf = conf
4343
handle.log = log
4444
handle.stat = stat
45-
handle.client = transformerclient.NewClient(transformerutils.TransformerClientConfig(conf, "TrackingPlanValidation"))
45+
handle.client = transformerclient.NewClient(transformerutils.TransformerClientConfig(conf, "TrackingPlanValidation", conf.GetBoolVar(true, "DEST_TRANSFORM_URL_IS_HEADLESS")))
4646
handle.config.destTransformationURL = handle.conf.GetString("DEST_TRANSFORM_URL", "http://localhost:9090")
4747
handle.config.maxRetry = conf.GetReloadableIntVar(30, 1, "Processor.TrackingPlanValidation.maxRetry", "Processor.maxRetry")
4848
handle.config.timeoutDuration = conf.GetDuration("HttpClient.procTransformer.timeout", 600, time.Second)

processor/internal/transformer/user_transformer/user_transformer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats, opts ...Opt)
4343
handle.conf = conf
4444
handle.log = log.Child("user_transformer")
4545
handle.stat = stat
46-
handle.client = transformerclient.NewClient(transformerutils.TransformerClientConfig(conf, "UserTransformer"))
46+
handle.client = transformerclient.NewClient(transformerutils.TransformerClientConfig(conf, "UserTransformer", conf.GetBoolVar(true, "USER_TRANSFORM_URL_IS_HEADLESS")))
4747
handle.config.userTransformationURL = handle.conf.GetString("USER_TRANSFORM_URL", handle.conf.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"))
4848
handle.config.timeoutDuration = conf.GetDuration("HttpClient.procTransformer.timeout", 600, time.Second)
4949
handle.config.failOnUserTransformTimeout = conf.GetReloadableBoolVar(false, "Processor.UserTransformer.failOnUserTransformTimeout", "Processor.Transformer.failOnUserTransformTimeout")

processor/internal/transformer/utils.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func IsJobTerminated(status int) bool {
2626
return status >= http.StatusOK && status < http.StatusInternalServerError
2727
}
2828

29-
func TransformerClientConfig(conf *config.Config, configPrefix string) *transformerclient.ClientConfig {
29+
func TransformerClientConfig(conf *config.Config, configPrefix string, headless bool) *transformerclient.ClientConfig {
3030
transformerClientConfig := &transformerclient.ClientConfig{
3131
ClientTimeout: conf.GetDurationVar(600, time.Second, fmt.Sprintf("HttpClient.procTransformer.%s.timeout", configPrefix), "HttpClient.procTransformer.timeout"),
3232
ClientTTL: conf.GetDurationVar(10, time.Second, fmt.Sprintf("Transformer.Client.%s.ttl", configPrefix), "Transformer.Client.ttl"),
@@ -37,6 +37,7 @@ func TransformerClientConfig(conf *config.Config, configPrefix string) *transfor
3737
transformerClientConfig.TransportConfig.MaxConnsPerHost = conf.GetIntVar(100, 1, fmt.Sprintf("Transformer.Client.%s.maxHTTPConnections", configPrefix), "Transformer.Client.maxHTTPConnections")
3838
transformerClientConfig.TransportConfig.MaxIdleConnsPerHost = conf.GetIntVar(10, 1, fmt.Sprintf("Transformer.Client.%s.maxHTTPIdleConnections", configPrefix), "Transformer.Client.maxHTTPIdleConnections")
3939
transformerClientConfig.TransportConfig.IdleConnTimeout = conf.GetDurationVar(30, time.Second, fmt.Sprintf("Transformer.Client.%s.maxIdleConnDuration", configPrefix), "Transformer.Client.maxIdleConnDuration")
40+
transformerClientConfig.Recycle = !headless || conf.GetBoolVar(false, fmt.Sprintf("Transformer.Client.%s.recycle", configPrefix), "Transformer.Client.recycle")
4041
return transformerClientConfig
4142
}
4243

router/transformer/transformer.go

+1
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,7 @@ func (trans *handle) transformerClientConfig() *transformerclient.ClientConfig {
620620
transformerClientConfig.TransportConfig.MaxConnsPerHost = config.GetIntVar(100, 1, "Transformer.Client.maxHTTPConnections")
621621
transformerClientConfig.TransportConfig.MaxIdleConnsPerHost = config.GetIntVar(10, 1, "Transformer.Client.maxHTTPIdleConnections")
622622
transformerClientConfig.TransportConfig.IdleConnTimeout = config.GetDurationVar(30, time.Second, "Transformer.Client.maxIdleConnDuration")
623+
transformerClientConfig.Recycle = !config.GetBoolVar(true, "DEST_TRANSFORM_URL_IS_HEADLESS") || config.GetBoolVar(false, "Transformer.Client.DestinationTransformer.recycle", "Transformer.Client.recycle")
623624
return transformerClientConfig
624625
}
625626

utils/sysUtils/httpclient.go

-35
Original file line numberDiff line numberDiff line change
@@ -3,44 +3,9 @@ package sysUtils
33

44
import (
55
"net/http"
6-
"sync"
7-
"time"
86
)
97

108
// HTTPClient interface
119
type HTTPClientI interface {
1210
Do(req *http.Request) (*http.Response, error)
1311
}
14-
15-
type RecycledHTTPClient struct {
16-
client *http.Client
17-
lastRefreshTime time.Time
18-
ttl time.Duration
19-
clientFunc func() *http.Client
20-
lock sync.Mutex
21-
}
22-
23-
func NewRecycledHTTPClient(_clientFunc func() *http.Client, _ttl time.Duration) *RecycledHTTPClient {
24-
return &RecycledHTTPClient{
25-
client: _clientFunc(),
26-
clientFunc: _clientFunc,
27-
ttl: _ttl,
28-
lastRefreshTime: time.Now(),
29-
}
30-
}
31-
32-
func (r *RecycledHTTPClient) GetClient() *http.Client {
33-
r.lock.Lock()
34-
defer r.lock.Unlock()
35-
36-
if r.ttl > 0 && time.Since(r.lastRefreshTime) > r.ttl {
37-
r.client.CloseIdleConnections()
38-
r.client = r.clientFunc()
39-
r.lastRefreshTime = time.Now()
40-
}
41-
return r.client
42-
}
43-
44-
func (r *RecycledHTTPClient) Do(req *http.Request) (*http.Response, error) {
45-
return r.GetClient().Do(req)
46-
}

0 commit comments

Comments
 (0)