Skip to content

Commit c3d4c14

Browse files
sd2kclaude
andauthored
fix: prevent memory leaks in streamable-http mode (#685)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b6e4886 commit c3d4c14

6 files changed

Lines changed: 830 additions & 19 deletions

File tree

client_cache.go

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
package mcpgrafana
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"fmt"
7+
"log/slog"
8+
"net/http"
9+
"net/url"
10+
"sync"
11+
12+
"github.com/grafana/incident-go"
13+
"go.opentelemetry.io/otel"
14+
"go.opentelemetry.io/otel/attribute"
15+
"go.opentelemetry.io/otel/metric"
16+
"golang.org/x/sync/singleflight"
17+
)
18+
19+
const clientCacheMeterName = "mcp-grafana"
20+
21+
// clientCacheKey uniquely identifies a client by its credentials and target.
22+
type clientCacheKey struct {
23+
url string
24+
apiKey string
25+
username string
26+
password string
27+
orgID int64
28+
}
29+
30+
// cacheKeyFromRequest builds a clientCacheKey from request-derived credentials.
31+
func cacheKeyFromRequest(grafanaURL, apiKey string, basicAuth *url.Userinfo, orgID int64) clientCacheKey {
32+
key := clientCacheKey{
33+
url: grafanaURL,
34+
apiKey: apiKey,
35+
orgID: orgID,
36+
}
37+
if basicAuth != nil {
38+
key.username = basicAuth.Username()
39+
key.password, _ = basicAuth.Password()
40+
}
41+
return key
42+
}
43+
44+
// String returns a redacted string representation for logging.
45+
func (k clientCacheKey) String() string {
46+
hasKey := k.apiKey != ""
47+
hasBasic := k.username != ""
48+
return fmt.Sprintf("url=%s apiKey=%t basicAuth=%t orgID=%d", k.url, hasKey, hasBasic, k.orgID)
49+
}
50+
51+
// clientCacheMetrics holds OTel instruments for cache observability.
52+
type clientCacheMetrics struct {
53+
lookups metric.Int64Counter // Total lookups (hits + misses)
54+
hits metric.Int64Counter // Cache hits
55+
misses metric.Int64Counter // Cache misses (new client created)
56+
size metric.Int64Gauge // Current number of cached clients
57+
}
58+
59+
func newClientCacheMetrics() clientCacheMetrics {
60+
meter := otel.GetMeterProvider().Meter(clientCacheMeterName)
61+
62+
lookups, _ := meter.Int64Counter("mcp.client_cache.lookups",
63+
metric.WithDescription("Total number of client cache lookups"),
64+
metric.WithUnit("{lookup}"),
65+
)
66+
hits, _ := meter.Int64Counter("mcp.client_cache.hits",
67+
metric.WithDescription("Number of client cache hits (existing client reused)"),
68+
metric.WithUnit("{hit}"),
69+
)
70+
misses, _ := meter.Int64Counter("mcp.client_cache.misses",
71+
metric.WithDescription("Number of client cache misses (new client created)"),
72+
metric.WithUnit("{miss}"),
73+
)
74+
size, _ := meter.Int64Gauge("mcp.client_cache.size",
75+
metric.WithDescription("Current number of cached clients"),
76+
metric.WithUnit("{client}"),
77+
)
78+
79+
return clientCacheMetrics{
80+
lookups: lookups,
81+
hits: hits,
82+
misses: misses,
83+
size: size,
84+
}
85+
}
86+
87+
var (
88+
attrClientTypeGrafana = attribute.String("client.type", "grafana")
89+
attrClientTypeIncident = attribute.String("client.type", "incident")
90+
)
91+
92+
// ClientCache caches HTTP clients keyed by credentials to avoid creating
93+
// new transports per request. This prevents the memory leak described in
94+
// https://github.com/grafana/mcp-grafana/issues/682.
95+
type ClientCache struct {
96+
mu sync.RWMutex
97+
grafanaClients map[clientCacheKey]*GrafanaClient
98+
incidentClients map[clientCacheKey]*incident.Client
99+
metrics clientCacheMetrics
100+
sfGrafana singleflight.Group
101+
sfIncident singleflight.Group
102+
}
103+
104+
// NewClientCache creates a new client cache.
105+
func NewClientCache() *ClientCache {
106+
return &ClientCache{
107+
grafanaClients: make(map[clientCacheKey]*GrafanaClient),
108+
incidentClients: make(map[clientCacheKey]*incident.Client),
109+
metrics: newClientCacheMetrics(),
110+
}
111+
}
112+
113+
// GetOrCreateGrafanaClient returns a cached Grafana client for the given key,
114+
// or creates one using createFn if no cached client exists.
115+
// The createFn is called outside the cache lock via singleflight to avoid
116+
// blocking concurrent cache reads during slow client creation (e.g. network I/O).
117+
func (c *ClientCache) GetOrCreateGrafanaClient(key clientCacheKey, createFn func() *GrafanaClient) *GrafanaClient {
118+
ctx := context.Background()
119+
typeAttr := metric.WithAttributes(attrClientTypeGrafana)
120+
c.metrics.lookups.Add(ctx, 1, typeAttr)
121+
122+
// Fast path: check with read lock
123+
c.mu.RLock()
124+
if client, ok := c.grafanaClients[key]; ok {
125+
c.mu.RUnlock()
126+
c.metrics.hits.Add(ctx, 1, typeAttr)
127+
return client
128+
}
129+
c.mu.RUnlock()
130+
131+
// Slow path: use singleflight to create outside the lock,
132+
// deduplicating concurrent requests for the same key.
133+
// Use fmt.Sprintf("%v", key) for the singleflight key to include actual
134+
// credential values (the struct fields), not the redacted String() output.
135+
sfKey := fmt.Sprintf("%v", key)
136+
val, _, _ := c.sfGrafana.Do(sfKey, func() (any, error) {
137+
// Double-check after winning the singleflight race
138+
c.mu.RLock()
139+
if client, ok := c.grafanaClients[key]; ok {
140+
c.mu.RUnlock()
141+
return client, nil
142+
}
143+
c.mu.RUnlock()
144+
145+
// Create the client without holding any lock
146+
client := createFn()
147+
148+
// Store the result
149+
c.mu.Lock()
150+
c.grafanaClients[key] = client
151+
c.metrics.misses.Add(ctx, 1, typeAttr)
152+
c.metrics.size.Record(ctx, int64(len(c.grafanaClients)), typeAttr)
153+
slog.Debug("Cached new Grafana client", "key", key, "cache_size", len(c.grafanaClients))
154+
c.mu.Unlock()
155+
156+
return client, nil
157+
})
158+
159+
return val.(*GrafanaClient)
160+
}
161+
162+
// GetOrCreateIncidentClient returns a cached incident client for the given key,
163+
// or creates one using createFn if no cached client exists.
164+
// The createFn is called outside the cache lock via singleflight to avoid
165+
// blocking concurrent cache reads during slow client creation.
166+
func (c *ClientCache) GetOrCreateIncidentClient(key clientCacheKey, createFn func() *incident.Client) *incident.Client {
167+
ctx := context.Background()
168+
typeAttr := metric.WithAttributes(attrClientTypeIncident)
169+
c.metrics.lookups.Add(ctx, 1, typeAttr)
170+
171+
// Fast path: check with read lock
172+
c.mu.RLock()
173+
if client, ok := c.incidentClients[key]; ok {
174+
c.mu.RUnlock()
175+
c.metrics.hits.Add(ctx, 1, typeAttr)
176+
return client
177+
}
178+
c.mu.RUnlock()
179+
180+
// Slow path: use singleflight to create outside the lock
181+
sfKey := fmt.Sprintf("%v", key)
182+
val, _, _ := c.sfIncident.Do(sfKey, func() (any, error) {
183+
c.mu.RLock()
184+
if client, ok := c.incidentClients[key]; ok {
185+
c.mu.RUnlock()
186+
return client, nil
187+
}
188+
c.mu.RUnlock()
189+
190+
client := createFn()
191+
192+
c.mu.Lock()
193+
c.incidentClients[key] = client
194+
c.metrics.misses.Add(ctx, 1, typeAttr)
195+
c.metrics.size.Record(ctx, int64(len(c.incidentClients)), typeAttr)
196+
slog.Debug("Cached new incident client", "key", key, "cache_size", len(c.incidentClients))
197+
c.mu.Unlock()
198+
199+
return client, nil
200+
})
201+
202+
return val.(*incident.Client)
203+
}
204+
205+
// Close cleans up cached clients. For incident clients, idle connections
206+
// are closed via the underlying HTTP transport. Grafana clients use a
207+
// go-openapi runtime whose transport is set via reflection, so we clear
208+
// the map and let the GC reclaim resources.
209+
func (c *ClientCache) Close() {
210+
c.mu.Lock()
211+
defer c.mu.Unlock()
212+
213+
for key, client := range c.incidentClients {
214+
if client.HTTPClient != nil {
215+
client.HTTPClient.CloseIdleConnections()
216+
}
217+
delete(c.incidentClients, key)
218+
}
219+
for key := range c.grafanaClients {
220+
delete(c.grafanaClients, key)
221+
}
222+
223+
ctx := context.Background()
224+
c.metrics.size.Record(ctx, 0, metric.WithAttributes(attrClientTypeGrafana))
225+
c.metrics.size.Record(ctx, 0, metric.WithAttributes(attrClientTypeIncident))
226+
slog.Debug("Client cache closed")
227+
}
228+
229+
// Size returns the number of cached clients (for testing/metrics).
230+
func (c *ClientCache) Size() (grafana, incident int) {
231+
c.mu.RLock()
232+
defer c.mu.RUnlock()
233+
return len(c.grafanaClients), len(c.incidentClients)
234+
}
235+
236+
// hashAPIKey returns a short hash of the API key for use in logging.
237+
// This avoids logging the full key.
238+
func hashAPIKey(key string) string {
239+
if key == "" {
240+
return ""
241+
}
242+
h := sha256.Sum256([]byte(key))
243+
return fmt.Sprintf("%x", h[:4])
244+
}
245+
246+
// extractGrafanaClientCached creates an httpContextFunc that uses the cache.
247+
func extractGrafanaClientCached(cache *ClientCache) httpContextFunc {
248+
return func(ctx context.Context, req *http.Request) context.Context {
249+
config := GrafanaConfigFromContext(ctx)
250+
if config.OrgID == 0 {
251+
slog.Warn("No org ID found in request headers or environment variables, using default org. Set GRAFANA_ORG_ID or pass X-Grafana-Org-Id header to target a specific org.")
252+
}
253+
254+
u, apiKey, basicAuth, _ := extractKeyGrafanaInfoFromReq(req)
255+
key := cacheKeyFromRequest(u, apiKey, basicAuth, config.OrgID)
256+
257+
grafanaClient := cache.GetOrCreateGrafanaClient(key, func() *GrafanaClient {
258+
slog.Debug("Creating new Grafana client (cache miss)", "url", u, "api_key_hash", hashAPIKey(apiKey))
259+
return NewGrafanaClient(ctx, u, apiKey, basicAuth)
260+
})
261+
262+
return WithGrafanaClient(ctx, grafanaClient)
263+
}
264+
}
265+
266+
// extractIncidentClientCached creates an httpContextFunc that uses the cache.
267+
func extractIncidentClientCached(cache *ClientCache) httpContextFunc {
268+
return func(ctx context.Context, req *http.Request) context.Context {
269+
grafanaURL, apiKey, _, orgID := extractKeyGrafanaInfoFromReq(req)
270+
key := cacheKeyFromRequest(grafanaURL, apiKey, nil, orgID)
271+
272+
incidentClient := cache.GetOrCreateIncidentClient(key, func() *incident.Client {
273+
incidentURL := fmt.Sprintf("%s/api/plugins/grafana-irm-app/resources/api/v1/", grafanaURL)
274+
slog.Debug("Creating new incident client (cache miss)", "url", incidentURL)
275+
client := incident.NewClient(incidentURL, apiKey)
276+
277+
config := GrafanaConfigFromContext(ctx)
278+
transport, err := BuildTransport(&config, nil)
279+
if err != nil {
280+
slog.Error("Failed to create custom transport for incident client, using default", "error", err)
281+
} else {
282+
orgIDWrapped := NewOrgIDRoundTripper(transport, orgID)
283+
client.HTTPClient.Transport = wrapWithUserAgent(orgIDWrapped)
284+
}
285+
286+
return client
287+
})
288+
289+
return WithIncidentClient(ctx, incidentClient)
290+
}
291+
}

0 commit comments

Comments
 (0)