Skip to content

Commit 9c6bb10

Browse files
authored
Use batches to call CreateTimeSeries (#617)
* Use batches for call to CreateTimeSeries * Refactor exportTimeSeries * Add tests
1 parent c3f1a10 commit 9c6bb10

2 files changed

Lines changed: 144 additions & 19 deletions

File tree

exporter/metric/metric.go

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ import (
5151
)
5252

5353
const (
54+
// The number of timeserieses to send to GCM in a single request. This
55+
// is a hard limit in the GCM API, so we never want to exceed 200.
56+
sendBatchSize = 200
57+
5458
cloudMonitoringMetricDescriptorNameFormat = "workload.googleapis.com/%s"
5559
)
5660

@@ -221,31 +225,34 @@ func (me *metricExporter) createMetricDescriptorIfNeeded(ctx context.Context, md
221225
// exportTimeSeries create TimeSeries from the records in cps.
222226
// res should be the common resource among all TimeSeries, such as instance id, application name and so on.
223227
func (me *metricExporter) exportTimeSeries(ctx context.Context, rm metricdata.ResourceMetrics) error {
224-
tss := []*monitoringpb.TimeSeries{}
225-
mr := me.resourceToMonitoredResourcepb(rm.Resource)
226-
var aggError error
228+
tss, aggErr := me.recordsToTspbs(rm)
229+
if len(tss) == 0 {
230+
return aggErr
231+
}
227232

228-
extraLabels := me.extraLabelsFromResource(rm.Resource)
229-
for _, scope := range rm.ScopeMetrics {
230-
for _, metrics := range scope.Metrics {
231-
ts, err := me.recordToTspb(metrics, mr, scope.Scope, extraLabels)
232-
aggError = multierr.Append(aggError, err)
233-
tss = append(tss, ts...)
233+
name := fmt.Sprintf("projects/%s", me.o.projectID)
234+
235+
var createErrors []error
236+
for i := 0; i < len(tss); i += sendBatchSize {
237+
j := i + sendBatchSize
238+
if j >= len(tss) {
239+
j = len(tss)
234240
}
235-
}
236241

237-
if len(tss) == 0 {
238-
return aggError
239-
}
242+
// TODO: When this exporter is rewritten, support writing to multiple
243+
// projects based on the "gcp.project.id" resource.
244+
req := &monitoringpb.CreateTimeSeriesRequest{
245+
Name: name,
246+
TimeSeries: tss[i:j],
247+
}
240248

241-
// TODO: When this exporter is rewritten, support writing to multiple
242-
// projects based on the "gcp.project.id" resource.
243-
req := &monitoringpb.CreateTimeSeriesRequest{
244-
Name: fmt.Sprintf("projects/%s", me.o.projectID),
245-
TimeSeries: tss,
249+
err := me.client.CreateTimeSeries(ctx, req)
250+
if err != nil {
251+
createErrors = append(createErrors, err)
252+
}
246253
}
247254

248-
return multierr.Append(aggError, me.client.CreateTimeSeries(ctx, req))
255+
return multierr.Append(aggErr, multierr.Combine(createErrors...))
249256
}
250257

251258
func (me *metricExporter) extraLabelsFromResource(res *resource.Resource) *attribute.Set {
@@ -495,6 +502,28 @@ func (me *metricExporter) recordToTspb(m metricdata.Metrics, mr *monitoredrespb.
495502
return tss, aggErr
496503
}
497504

505+
func (me *metricExporter) recordsToTspbs(rm metricdata.ResourceMetrics) ([]*monitoringpb.TimeSeries, error) {
506+
mr := me.resourceToMonitoredResourcepb(rm.Resource)
507+
extraLabels := me.extraLabelsFromResource(rm.Resource)
508+
509+
var (
510+
tss []*monitoringpb.TimeSeries
511+
errors []error
512+
)
513+
for _, scope := range rm.ScopeMetrics {
514+
for _, metrics := range scope.Metrics {
515+
ts, err := me.recordToTspb(metrics, mr, scope.Scope, extraLabels)
516+
if err != nil {
517+
errors = append(errors, err)
518+
}
519+
520+
tss = append(tss, ts...)
521+
}
522+
}
523+
524+
return tss, multierr.Combine(errors...)
525+
}
526+
498527
func sanitizeUTF8(s string) string {
499528
return strings.ToValidUTF8(s, "�")
500529
}

exporter/metric/metric_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,3 +1073,99 @@ func TestMetricTypeToDisplayName(t *testing.T) {
10731073
})
10741074
}
10751075
}
1076+
1077+
func TestBatchingExport(t *testing.T) {
1078+
setup := func(t *testing.T) (metric.Exporter, *cloudmock.MetricsTestServer) {
1079+
testServer, err := cloudmock.NewMetricTestServer()
1080+
//nolint:errcheck
1081+
go testServer.Serve()
1082+
t.Cleanup(testServer.Shutdown)
1083+
1084+
assert.NoError(t, err)
1085+
1086+
clientOpts := []option.ClientOption{
1087+
option.WithEndpoint(testServer.Endpoint),
1088+
option.WithoutAuthentication(),
1089+
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
1090+
}
1091+
1092+
exporter, err := New(
1093+
WithProjectID("PROJECT_ID_NOT_REAL"),
1094+
WithMonitoringClientOptions(clientOpts...),
1095+
WithMetricDescriptorTypeFormatter(formatter),
1096+
)
1097+
assert.NoError(t, err)
1098+
1099+
t.Cleanup(func() {
1100+
ctx := context.Background()
1101+
err := exporter.Shutdown(ctx)
1102+
assert.NoError(t, err)
1103+
})
1104+
1105+
return exporter, testServer
1106+
}
1107+
1108+
createMetrics := func(n int) []metricdata.Metrics {
1109+
inputMetrics := make([]metricdata.Metrics, n)
1110+
for i := 0; i < n; i++ {
1111+
inputMetrics[i] = metricdata.Metrics{Name: "testing", Data: metricdata.Histogram{
1112+
DataPoints: []metricdata.HistogramDataPoint{
1113+
{},
1114+
},
1115+
}}
1116+
}
1117+
1118+
return inputMetrics
1119+
}
1120+
1121+
for _, tc := range []struct {
1122+
desc string
1123+
numMetrics int
1124+
expectedCreateTSCalls int
1125+
}{
1126+
{desc: "0 metrics"},
1127+
{
1128+
desc: "150 metrics",
1129+
numMetrics: 150,
1130+
expectedCreateTSCalls: 1,
1131+
},
1132+
{
1133+
desc: "200 metrics",
1134+
numMetrics: 200,
1135+
expectedCreateTSCalls: 1,
1136+
},
1137+
{
1138+
desc: "201 metrics",
1139+
numMetrics: 201,
1140+
expectedCreateTSCalls: 2,
1141+
},
1142+
{
1143+
desc: "500 metrics",
1144+
numMetrics: 500,
1145+
expectedCreateTSCalls: 3,
1146+
},
1147+
{
1148+
desc: "1199 metrics",
1149+
numMetrics: 1199,
1150+
expectedCreateTSCalls: 6,
1151+
},
1152+
} {
1153+
t.Run(tc.desc, func(t *testing.T) {
1154+
exporter, testServer := setup(t)
1155+
input := createMetrics(tc.numMetrics)
1156+
ctx := context.Background()
1157+
1158+
err := exporter.Export(ctx, metricdata.ResourceMetrics{
1159+
ScopeMetrics: []metricdata.ScopeMetrics{
1160+
{
1161+
Metrics: input,
1162+
},
1163+
},
1164+
})
1165+
assert.NoError(t, err)
1166+
1167+
gotCalls := testServer.CreateTimeSeriesRequests()
1168+
assert.Equal(t, tc.expectedCreateTSCalls, len(gotCalls))
1169+
})
1170+
}
1171+
}

0 commit comments

Comments
 (0)