Skip to content

Commit 98f1bae

Browse files
fix(cloudsql): cache SKUs at startup to prevent OOM on every scrape (#862)
1 parent 7f2b0f7 commit 98f1bae

3 files changed

Lines changed: 188 additions & 10 deletions

File tree

pkg/google/cloudsql/cloudsql.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cloudsql
22

33
import (
44
"context"
5+
"fmt"
56
"log/slog"
67
"strings"
78
"time"
@@ -44,10 +45,30 @@ var (
4445
)
4546
)
4647

47-
func New(config *Config, gcpClient client.Client) (*Collector, error) {
48+
func New(ctx context.Context, config *Config, gcpClient client.Client) (*Collector, error) {
4849
pm := newPricingMap(config.Logger, gcpClient)
4950
projects := strings.Split(config.Projects, ",")
5051
regions := client.RegionsForProjects(gcpClient, projects, config.Logger)
52+
53+
if err := pm.getSKus(ctx); err != nil {
54+
return nil, fmt.Errorf("failed to initialise Cloud SQL pricing: %w", err)
55+
}
56+
57+
go func() {
58+
ticker := time.NewTicker(CostRefreshInterval)
59+
defer ticker.Stop()
60+
for {
61+
select {
62+
case <-ctx.Done():
63+
return
64+
case <-ticker.C:
65+
if err := pm.getSKus(ctx); err != nil {
66+
config.Logger.Error("failed to refresh Cloud SQL pricing SKUs", "error", err)
67+
}
68+
}
69+
}
70+
}()
71+
5172
return &Collector{
5273
gcpClient: gcpClient,
5374
config: config,
@@ -69,11 +90,6 @@ func (c *Collector) Describe(ch chan<- *prometheus.Desc) error {
6990
func (c *Collector) Collect(ctx context.Context, ch chan<- prometheus.Metric) error {
7091
logger := c.logger.With("logger", "cloudsql")
7192

72-
if err := c.pricingMap.getSKus(ctx); err != nil {
73-
logger.Error("failed to load pricing SKUs", "error", err)
74-
return err
75-
}
76-
7793
instances, err := c.getAllCloudSQL(ctx)
7894
if err != nil {
7995
return err

pkg/google/cloudsql/cloudsql_test.go

Lines changed: 165 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http"
88
"net/http/httptest"
99
"os"
10+
"sync"
1011
"testing"
1112

1213
"cloud.google.com/go/billing/apiv1/billingpb"
@@ -19,6 +20,8 @@ import (
1920
"google.golang.org/api/option"
2021
sqladmin "google.golang.org/api/sqladmin/v1beta4"
2122
"google.golang.org/genproto/googleapis/type/money"
23+
"google.golang.org/grpc/codes"
24+
"google.golang.org/grpc/status"
2225
)
2326

2427
func newTestGCPClient(t *testing.T, computeHandlers map[string]any, sqlAdminHandlers map[string]any, skus []*billingpb.Sku) *client.Mock {
@@ -59,6 +62,165 @@ func newTestGCPClient(t *testing.T, computeHandlers map[string]any, sqlAdminHand
5962
return client.NewMock("test-project", 0, nil, nil, catalogClient, computeService, sqlAdminService)
6063
}
6164

65+
type failingCatalogServer struct {
66+
billingpb.UnimplementedCloudCatalogServer
67+
}
68+
69+
func (s *failingCatalogServer) ListServices(_ context.Context, _ *billingpb.ListServicesRequest) (*billingpb.ListServicesResponse, error) {
70+
return nil, status.Error(codes.Internal, "billing API unavailable")
71+
}
72+
73+
type switchableCatalogServer struct {
74+
billingpb.UnimplementedCloudCatalogServer
75+
mu sync.Mutex
76+
disabled bool
77+
skus []*billingpb.Sku
78+
}
79+
80+
func (s *switchableCatalogServer) disable() {
81+
s.mu.Lock()
82+
defer s.mu.Unlock()
83+
s.disabled = true
84+
}
85+
86+
func (s *switchableCatalogServer) ListServices(_ context.Context, _ *billingpb.ListServicesRequest) (*billingpb.ListServicesResponse, error) {
87+
s.mu.Lock()
88+
defer s.mu.Unlock()
89+
if s.disabled {
90+
return nil, status.Error(codes.Unavailable, "billing server disabled")
91+
}
92+
return &billingpb.ListServicesResponse{
93+
Services: []*billingpb.Service{
94+
{Name: "services/cloud-sql", DisplayName: "Cloud SQL"},
95+
},
96+
}, nil
97+
}
98+
99+
func (s *switchableCatalogServer) ListSkus(_ context.Context, _ *billingpb.ListSkusRequest) (*billingpb.ListSkusResponse, error) {
100+
s.mu.Lock()
101+
defer s.mu.Unlock()
102+
if s.disabled {
103+
return nil, status.Error(codes.Unavailable, "billing server disabled")
104+
}
105+
return &billingpb.ListSkusResponse{Skus: s.skus}, nil
106+
}
107+
108+
func TestNew_FailsIfInitialSKUFetchFails(t *testing.T) {
109+
catalogClient := client.NewTestBillingClient(t, &failingCatalogServer{})
110+
111+
computeSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
112+
_ = json.NewEncoder(w).Encode(struct{}{})
113+
}))
114+
t.Cleanup(computeSrv.Close)
115+
116+
sqlAdminSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
117+
_ = json.NewEncoder(w).Encode(struct{}{})
118+
}))
119+
t.Cleanup(sqlAdminSrv.Close)
120+
121+
computeService, err := computev1.NewService(context.Background(), option.WithoutAuthentication(), option.WithEndpoint(computeSrv.URL))
122+
require.NoError(t, err)
123+
124+
sqlAdminService, err := sqladmin.NewService(context.Background(), option.WithoutAuthentication(), option.WithEndpoint(sqlAdminSrv.URL))
125+
require.NoError(t, err)
126+
127+
gcpClient := client.NewMock("test-project", 0, nil, nil, catalogClient, computeService, sqlAdminService)
128+
config := &Config{Projects: "test-project", Logger: slog.New(slog.NewTextHandler(os.Stdout, nil))}
129+
130+
_, err = New(context.Background(), config, gcpClient)
131+
require.Error(t, err)
132+
assert.ErrorContains(t, err, "failed to initialise Cloud SQL pricing")
133+
}
134+
135+
func TestCollect_UsesCachedSKUs(t *testing.T) {
136+
skus := []*billingpb.Sku{
137+
{
138+
SkuId: "test-sku-id",
139+
Category: &billingpb.Category{
140+
ServiceDisplayName: "Cloud SQL",
141+
},
142+
Description: "Cloud SQL: MYSQL db-f1-micro ZONAL instance running in test-region",
143+
GeoTaxonomy: &billingpb.GeoTaxonomy{
144+
Regions: []string{"test-region"},
145+
},
146+
PricingInfo: []*billingpb.PricingInfo{
147+
{
148+
PricingExpression: &billingpb.PricingExpression{
149+
TieredRates: []*billingpb.PricingExpression_TierRate{
150+
{
151+
UnitPrice: &money.Money{
152+
Nanos: 25000000, // $0.025 per hour
153+
},
154+
},
155+
},
156+
},
157+
},
158+
},
159+
},
160+
}
161+
162+
billingSrv := &switchableCatalogServer{skus: skus}
163+
catalogClient := client.NewTestBillingClient(t, billingSrv)
164+
165+
computeSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
166+
if r.URL.Path == "/projects/test-project/regions" {
167+
_ = json.NewEncoder(w).Encode(&computev1.RegionList{
168+
Items: []*computev1.Region{{Name: "test-region"}},
169+
})
170+
return
171+
}
172+
_ = json.NewEncoder(w).Encode(struct{}{})
173+
}))
174+
t.Cleanup(computeSrv.Close)
175+
176+
sqlAdminSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
177+
if r.URL.Path == "/sql/v1beta4/projects/test-project/instances" {
178+
_ = json.NewEncoder(w).Encode(&sqladmin.InstancesListResponse{
179+
Items: []*sqladmin.DatabaseInstance{
180+
{
181+
Name: "test-name",
182+
Region: "test-region",
183+
ConnectionName: "test-project:test-region:test-name",
184+
Settings: &sqladmin.Settings{Tier: "db-f1-micro", AvailabilityType: "ZONAL"},
185+
DatabaseVersion: "MYSQL_8_0",
186+
},
187+
},
188+
})
189+
return
190+
}
191+
_ = json.NewEncoder(w).Encode(struct{}{})
192+
}))
193+
t.Cleanup(sqlAdminSrv.Close)
194+
195+
computeService, err := computev1.NewService(context.Background(), option.WithoutAuthentication(), option.WithEndpoint(computeSrv.URL))
196+
require.NoError(t, err)
197+
198+
sqlAdminService, err := sqladmin.NewService(context.Background(), option.WithoutAuthentication(), option.WithEndpoint(sqlAdminSrv.URL))
199+
require.NoError(t, err)
200+
201+
gcpClient := client.NewMock("test-project", 0, nil, nil, catalogClient, computeService, sqlAdminService)
202+
config := &Config{Projects: "test-project", Logger: slog.New(slog.NewTextHandler(os.Stdout, nil))}
203+
204+
collector, err := New(context.Background(), config, gcpClient)
205+
require.NoError(t, err)
206+
207+
// Disable the billing backend — Collect() must use SKUs cached at init
208+
billingSrv.disable()
209+
210+
ch := make(chan prometheus.Metric, 10)
211+
err = collector.Collect(context.Background(), ch)
212+
require.NoError(t, err)
213+
214+
select {
215+
case metric := <-ch:
216+
result := utils.ReadMetrics(metric)
217+
assert.Equal(t, "test-project:test-region:test-name", result.Labels["instance"])
218+
assert.InDelta(t, 0.025, result.Value, 1e-9)
219+
default:
220+
t.Fatal("expected a metric to be emitted from cached SKUs")
221+
}
222+
}
223+
62224
func TestCollector(t *testing.T) {
63225
tests := []struct {
64226
name string
@@ -202,7 +364,7 @@ func TestCollector(t *testing.T) {
202364
t.Run(tt.name, func(t *testing.T) {
203365
gcpClient := newTestGCPClient(t, tt.regionsHandlers, tt.sqlAdminHandlers, tt.skus)
204366
config := &Config{Projects: "test-project", Logger: slog.New(slog.NewTextHandler(os.Stdout, nil))}
205-
collector, err := New(config, gcpClient)
367+
collector, err := New(context.Background(), config, gcpClient)
206368
require.NoError(t, err)
207369

208370
ch := make(chan prometheus.Metric, 1)
@@ -280,8 +442,8 @@ func TestGetAllCloudSQL(t *testing.T) {
280442
for _, tt := range tests {
281443
t.Run(tt.name, func(t *testing.T) {
282444
gcpClient := newTestGCPClient(t, tt.regionsHandlers, tt.sqlAdminHandlers, nil)
283-
config := &Config{Projects: "test-project"}
284-
collector, err := New(config, gcpClient)
445+
config := &Config{Projects: "test-project", Logger: slog.New(slog.NewTextHandler(os.Stdout, nil))}
446+
collector, err := New(context.Background(), config, gcpClient)
285447
require.NoError(t, err)
286448

287449
instances, err := collector.getAllCloudSQL(context.Background())

pkg/google/gcp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func New(ctx context.Context, config *Config) (*GCP, error) {
141141
continue
142142
}
143143
case "SQL":
144-
collector, err = cloudsql.New(&cloudsql.Config{
144+
collector, err = cloudsql.New(ctx, &cloudsql.Config{
145145
Projects: config.Projects,
146146
ScrapeInterval: config.ScrapeInterval,
147147
Logger: config.Logger,

0 commit comments

Comments
 (0)