Skip to content

Commit 104f492

Browse files
Copilotvcastellm
andauthored
Fix HTTP executor memory leak with LRU client pool and proper connection management (#1810)
* Initial plan * Fix HTTP executor memory leak with LRU client pool and proper connection management Co-authored-by: vcastellm <47026+vcastellm@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: vcastellm <47026+vcastellm@users.noreply.github.com>
1 parent 7c57d28 commit 104f492

File tree

2 files changed

+168
-6
lines changed

2 files changed

+168
-6
lines changed

plugin/http/http.go

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package http
22

33
import (
44
"bytes"
5+
"crypto/sha256"
56
"crypto/tls"
67
"crypto/x509"
8+
"encoding/hex"
79
"encoding/json"
810
"errors"
911
"fmt"
@@ -12,13 +14,16 @@ import (
1214
"log"
1315
"net/http"
1416
"regexp"
17+
"sort"
1518
"strconv"
1619
"strings"
20+
"sync"
1721
"time"
1822

1923
"github.com/armon/circbuf"
2024
dkplugin "github.com/distribworks/dkron/v4/plugin"
2125
"github.com/distribworks/dkron/v4/types"
26+
lru "github.com/hashicorp/golang-lru"
2227
)
2328

2429
const (
@@ -28,17 +33,33 @@ const (
2833
// This is to prevent Serf's memory from growing to an enormous
2934
// amount due to a faulty handler.
3035
maxBufSize = 256000
36+
// maxClientPoolSize limits the number of HTTP clients cached
37+
// This prevents unbounded memory growth in the client pool
38+
maxClientPoolSize = 100
3139
)
3240

3341
// HTTP process http request
3442
type HTTP struct {
35-
clientPool map[string]http.Client
43+
clientPool *lru.Cache
44+
mu sync.RWMutex
3645
}
3746

3847
// New
3948
func New() *HTTP {
49+
cache, err := lru.NewWithEvict(maxClientPoolSize, func(key interface{}, value interface{}) {
50+
// Optionally close idle connections when evicting clients
51+
if client, ok := value.(http.Client); ok {
52+
if transport, ok := client.Transport.(*http.Transport); ok {
53+
transport.CloseIdleConnections()
54+
}
55+
}
56+
})
57+
if err != nil {
58+
// Fallback to a smaller cache if creation fails
59+
cache, _ = lru.New(10)
60+
}
4061
return &HTTP{
41-
clientPool: make(map[string]http.Client),
62+
clientPool: cache,
4263
}
4364
}
4465

@@ -111,15 +132,24 @@ func (s *HTTP) ExecuteImpl(args *types.ExecuteRequest) ([]byte, error) {
111132
ok bool
112133
)
113134

114-
cc := args.Config["timeout"] + args.Config["tlsRootCAsFile"] + args.Config["tlsCertificateFile"] + args.Config["tlsCertificateKeyFile"]
135+
clientKey := s.generateClientKey(args.Config)
115136

116-
if client, ok = s.clientPool[cc]; !ok {
137+
s.mu.RLock()
138+
if cachedClient, found := s.clientPool.Get(clientKey); found {
139+
client = cachedClient.(http.Client)
140+
ok = true
141+
}
142+
s.mu.RUnlock()
143+
144+
if !ok {
117145
var warns []error
118146
client, warns = createClient(args.Config)
119147
for _, warn := range warns {
120148
_, _ = output.Write([]byte(fmt.Sprintf("Warning: %s.\n", warn.Error())))
121149
}
122-
s.clientPool[cc] = client
150+
s.mu.Lock()
151+
s.clientPool.Add(clientKey, client)
152+
s.mu.Unlock()
123153
}
124154

125155
// do request
@@ -167,6 +197,35 @@ func (s *HTTP) ExecuteImpl(args *types.ExecuteRequest) ([]byte, error) {
167197
return output.Bytes(), nil
168198
}
169199

200+
// generateClientKey creates a unique key for the client pool based on configuration
201+
// This fixes the key collision issue by using proper hashing instead of string concatenation
202+
func (s *HTTP) generateClientKey(config map[string]string) string {
203+
// Only include configuration that affects the HTTP client behavior
204+
relevantKeys := []string{
205+
"timeout",
206+
"tlsNoVerifyPeer",
207+
"tlsRootCAsFile",
208+
"tlsCertificateFile",
209+
"tlsCertificateKeyFile",
210+
}
211+
212+
// Create a sorted map to ensure consistent key generation
213+
var keyParts []string
214+
for _, key := range relevantKeys {
215+
if value, exists := config[key]; exists && value != "" {
216+
keyParts = append(keyParts, fmt.Sprintf("%s=%s", key, value))
217+
}
218+
}
219+
220+
// Sort to ensure consistent ordering
221+
sort.Strings(keyParts)
222+
223+
// Create hash of the configuration
224+
hasher := sha256.New()
225+
hasher.Write([]byte(strings.Join(keyParts, "|")))
226+
return hex.EncodeToString(hasher.Sum(nil))
227+
}
228+
170229
// createClient always returns a new http client. Any errors returned are
171230
// errors in the configuration.
172231
func createClient(config map[string]string) (http.Client, []error) {
@@ -199,8 +258,20 @@ func createClient(config map[string]string) (http.Client, []error) {
199258
}
200259
}
201260

261+
// Create transport with proper connection pooling settings
262+
transport := &http.Transport{
263+
TLSClientConfig: tlsconf,
264+
// Set reasonable connection pool limits to prevent resource leaks
265+
MaxIdleConns: 10,
266+
MaxIdleConnsPerHost: 5,
267+
MaxConnsPerHost: 20,
268+
IdleConnTimeout: 30 * time.Second,
269+
TLSHandshakeTimeout: 10 * time.Second,
270+
ExpectContinueTimeout: 1 * time.Second,
271+
}
272+
202273
return http.Client{
203-
Transport: &http.Transport{TLSClientConfig: tlsconf},
274+
Transport: transport,
204275
Timeout: time.Duration(_timeout) * time.Second,
205276
}, errs
206277
}

plugin/http/http_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"log"
88
"net/http"
99
"net/http/httptest"
10+
"runtime"
1011
"testing"
1112

1213
"github.com/distribworks/dkron/v4/types"
@@ -158,3 +159,93 @@ func TestRootCA(t *testing.T) {
158159
fmt.Println(output.Error)
159160
assert.Equal(t, "", output.Error)
160161
}
162+
163+
// TestClientPoolMemoryLeak tests that the client pool doesn't grow unbounded
164+
func TestClientPoolMemoryLeak(t *testing.T) {
165+
ts := newTestServer()
166+
defer ts.Close()
167+
168+
httpExecutor := New()
169+
170+
// Get initial memory stats
171+
var m1, m2 runtime.MemStats
172+
runtime.GC()
173+
runtime.ReadMemStats(&m1)
174+
175+
// Simulate many different configurations to fill the client pool
176+
// Use different combinations of TLS configs to create unique client pool keys
177+
for i := 0; i < 1000; i++ {
178+
config := map[string]string{
179+
"method": "GET",
180+
"url": fmt.Sprintf("%s/200", ts.URL),
181+
"expectCode": "200",
182+
"timeout": fmt.Sprintf("%d", 30+i%100), // Different timeout values
183+
"tlsRootCAsFile": fmt.Sprintf("/tmp/fake%d.crt", i%100), // Different fake TLS files
184+
"tlsCertificateFile": fmt.Sprintf("/tmp/cert%d.pem", i%100), // Different fake cert files
185+
"tlsCertificateKeyFile": fmt.Sprintf("/tmp/key%d.pem", i%100), // Different fake key files
186+
}
187+
188+
pa := &types.ExecuteRequest{
189+
JobName: fmt.Sprintf("testJob%d", i),
190+
Config: config,
191+
}
192+
193+
// We expect this to fail due to file not found, but the client should still be cached
194+
httpExecutor.ExecuteImpl(pa)
195+
}
196+
197+
// Check memory usage after creating many clients
198+
runtime.GC()
199+
runtime.ReadMemStats(&m2)
200+
201+
// Check that client pool size is reasonable
202+
poolSize := httpExecutor.clientPool.Len()
203+
t.Logf("Client pool size: %d", poolSize)
204+
assert.LessOrEqual(t, poolSize, maxClientPoolSize, "Client pool should be capped at max size")
205+
assert.Greater(t, poolSize, 10, "Client pool should have many entries for diverse configurations")
206+
}
207+
208+
// TestClientPoolKeyCollisions tests for unintended client sharing due to poor key generation
209+
func TestClientPoolKeyCollisions(t *testing.T) {
210+
ts := newTestServer()
211+
defer ts.Close()
212+
213+
httpExecutor := New()
214+
215+
// These two configurations should create different clients but currently don't
216+
// due to poor key generation (simple string concatenation)
217+
config1 := map[string]string{
218+
"method": "GET",
219+
"url": fmt.Sprintf("%s/200", ts.URL),
220+
"expectCode": "200",
221+
"timeout": "30", // timeout = "30"
222+
"tlsRootCAsFile": "file.crt", // tlsRootCAsFile = "file.crt"
223+
}
224+
225+
config2 := map[string]string{
226+
"method": "GET",
227+
"url": fmt.Sprintf("%s/200", ts.URL),
228+
"expectCode": "200",
229+
"timeout": "3", // timeout = "3"
230+
"tlsRootCAsFile": "0file.crt", // tlsRootCAsFile = "0file.crt"
231+
}
232+
233+
// The concatenated keys would be:
234+
// config1: "30" + "file.crt" + "" + "" = "30file.crt"
235+
// config2: "3" + "0file.crt" + "" + "" = "30file.crt"
236+
// These are identical! This is a collision bug.
237+
238+
pa1 := &types.ExecuteRequest{JobName: "test1", Config: config1}
239+
pa2 := &types.ExecuteRequest{JobName: "test2", Config: config2}
240+
241+
// Execute both
242+
httpExecutor.ExecuteImpl(pa1)
243+
httpExecutor.ExecuteImpl(pa2)
244+
245+
// Due to the collision, only one client should be in the pool
246+
poolSize := httpExecutor.clientPool.Len()
247+
t.Logf("Client pool size after potential collision: %d", poolSize)
248+
249+
// With the fix, different configs should create different clients
250+
assert.Equal(t, 2, poolSize, "Proper key generation should prevent unintended client sharing")
251+
}

0 commit comments

Comments
 (0)