Skip to content

Commit 26a1aed

Browse files
lsj4401dmathieu
andauthored
[exporterhelper] Fix metric name preservation during request splitting (open-telemetry#13238)
#### Description This PR fixes an issue in exporter/exporterhelper's sending_queue where metric names were lost during metricRequest splitting. Previously, when large batches of metrics were processed, the splitting logic in metric_batch.go could cause the name field of some metrics to disappear. This fix ensures that all metric fields are properly preserved when metricRequest objects are split. #### Link to tracking issue Fixes open-telemetry#13236 --------- Co-authored-by: Damien Mathieu <[email protected]>
1 parent acb60bc commit 26a1aed

File tree

3 files changed

+132
-3
lines changed

3 files changed

+132
-3
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Preserve all metrics metadata when batch splitting.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13236]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Previously, when large batches of metrics were processed, the splitting logic in `metric_batch.go` could
20+
cause the `name` field of some metrics to disappear. This fix ensures that all metric fields are
21+
properly preserved when `metricRequest` objects are split.
22+
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: []

exporter/exporterhelper/metrics_batch.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,21 @@ func extractMetricDataPoints(srcMetric pmetric.Metric, capacity int, sz sizer.Me
177177
destMetric, removedSize = extractGaugeDataPoints(srcMetric.Gauge(), capacity, sz)
178178
case pmetric.MetricTypeSum:
179179
destMetric, removedSize = extractSumDataPoints(srcMetric.Sum(), capacity, sz)
180+
destMetric.Sum().SetIsMonotonic(srcMetric.Sum().IsMonotonic())
181+
destMetric.Sum().SetAggregationTemporality(srcMetric.Sum().AggregationTemporality())
180182
case pmetric.MetricTypeHistogram:
181183
destMetric, removedSize = extractHistogramDataPoints(srcMetric.Histogram(), capacity, sz)
184+
destMetric.Histogram().SetAggregationTemporality(srcMetric.Histogram().AggregationTemporality())
182185
case pmetric.MetricTypeExponentialHistogram:
183186
destMetric, removedSize = extractExponentialHistogramDataPoints(srcMetric.ExponentialHistogram(), capacity, sz)
187+
destMetric.ExponentialHistogram().SetAggregationTemporality(srcMetric.ExponentialHistogram().AggregationTemporality())
184188
case pmetric.MetricTypeSummary:
185189
destMetric, removedSize = extractSummaryDataPoints(srcMetric.Summary(), capacity, sz)
186190
}
191+
destMetric.SetName(srcMetric.Name())
192+
destMetric.SetDescription(srcMetric.Description())
193+
destMetric.SetUnit(srcMetric.Unit())
194+
srcMetric.Metadata().CopyTo(destMetric.Metadata())
187195
return destMetric, removedSize
188196
}
189197

exporter/exporterhelper/metrics_batch_test.go

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,99 @@ func TestMergeSplitMetrics(t *testing.T) {
136136
}
137137
}
138138

139+
func TestSplitMetricsWithDataPointSplit(t *testing.T) {
140+
generateTestMetrics := func(metricType pmetric.MetricType) pmetric.Metrics {
141+
md := pmetric.NewMetrics()
142+
m := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
143+
m.SetName("test_metric")
144+
m.SetDescription("test_description")
145+
m.SetUnit("test_unit")
146+
m.Metadata().PutStr("test_metadata_key", "test_metadata_value")
147+
148+
const numDataPoints = 2
149+
150+
switch metricType {
151+
case pmetric.MetricTypeSum:
152+
sum := m.SetEmptySum()
153+
for i := 0; i < numDataPoints; i++ {
154+
sum.DataPoints().AppendEmpty().SetIntValue(int64(i + 1))
155+
}
156+
case pmetric.MetricTypeGauge:
157+
gauge := m.SetEmptyGauge()
158+
for i := 0; i < numDataPoints; i++ {
159+
gauge.DataPoints().AppendEmpty().SetIntValue(int64(i + 1))
160+
}
161+
case pmetric.MetricTypeHistogram:
162+
hist := m.SetEmptyHistogram()
163+
for i := uint64(0); i < uint64(numDataPoints); i++ {
164+
hist.DataPoints().AppendEmpty().SetCount(i + 1)
165+
}
166+
case pmetric.MetricTypeExponentialHistogram:
167+
expHist := m.SetEmptyExponentialHistogram()
168+
for i := uint64(0); i < uint64(numDataPoints); i++ {
169+
expHist.DataPoints().AppendEmpty().SetCount(i + 1)
170+
}
171+
case pmetric.MetricTypeSummary:
172+
summary := m.SetEmptySummary()
173+
for i := uint64(0); i < uint64(numDataPoints); i++ {
174+
summary.DataPoints().AppendEmpty().SetCount(i + 1)
175+
}
176+
}
177+
return md
178+
}
179+
180+
tests := []struct {
181+
name string
182+
metricType pmetric.MetricType
183+
}{
184+
{
185+
name: "sum",
186+
metricType: pmetric.MetricTypeSum,
187+
},
188+
{
189+
name: "gauge",
190+
metricType: pmetric.MetricTypeGauge,
191+
},
192+
{
193+
name: "histogram",
194+
metricType: pmetric.MetricTypeHistogram,
195+
},
196+
{
197+
name: "exponential_histogram",
198+
metricType: pmetric.MetricTypeExponentialHistogram,
199+
},
200+
{
201+
name: "summary",
202+
metricType: pmetric.MetricTypeSummary,
203+
},
204+
}
205+
for _, tt := range tests {
206+
t.Run(tt.name, func(t *testing.T) {
207+
// Generate metrics with 2 data points.
208+
mr1 := newMetricsRequest(generateTestMetrics(tt.metricType))
209+
210+
// Split by data point, so maxSize is 1.
211+
res, err := mr1.MergeSplit(context.Background(), 1, RequestSizerTypeItems, nil)
212+
require.NoError(t, err)
213+
require.Len(t, res, 2)
214+
215+
for _, req := range res {
216+
actualRequest := req.(*metricsRequest)
217+
// Each split request should contain one data point.
218+
assert.Equal(t, 1, actualRequest.ItemsCount())
219+
m := actualRequest.md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
220+
assert.Equal(t, "test_metric", m.Name())
221+
assert.Equal(t, "test_description", m.Description())
222+
assert.Equal(t, "test_unit", m.Unit())
223+
assert.Equal(t, 1, m.Metadata().Len())
224+
val, ok := m.Metadata().Get("test_metadata_key")
225+
assert.True(t, ok)
226+
assert.Equal(t, "test_metadata_value", val.AsString())
227+
}
228+
})
229+
}
230+
}
231+
139232
func TestMergeSplitMetricsInputNotModifiedIfErrorReturned(t *testing.T) {
140233
r1 := newMetricsRequest(testdata.GenerateMetrics(18)) // 18 metrics, 36 data points
141234
r2 := newLogsRequest(testdata.GenerateLogs(3))
@@ -259,15 +352,15 @@ func TestMergeSplitMetricsBasedOnByteSize(t *testing.T) {
259352
maxSize: s.MetricsSize(testdata.GenerateMetrics(4)),
260353
mr1: newMetricsRequest(pmetric.NewMetrics()),
261354
mr2: newMetricsRequest(testdata.GenerateMetrics(10)),
262-
expectedSizes: []int{706, 504, 625, 378},
355+
expectedSizes: []int{706, 533, 642, 378},
263356
},
264357
{
265358
name: "merge_and_split",
266359
szt: RequestSizerTypeBytes,
267360
maxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(10))/2 + metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(11))/2,
268361
mr1: newMetricsRequest(testdata.GenerateMetrics(8)),
269362
mr2: newMetricsRequest(testdata.GenerateMetrics(20)),
270-
expectedSizes: []int{2107, 2022, 1954, 290},
363+
expectedSizes: []int{2123, 2038, 1983, 290},
271364
},
272365
{
273366
name: "scope_metrics_split",
@@ -281,7 +374,7 @@ func TestMergeSplitMetricsBasedOnByteSize(t *testing.T) {
281374
return md
282375
}()),
283376
mr2: nil,
284-
expectedSizes: []int{706, 700, 85},
377+
expectedSizes: []int{706, 719, 85},
285378
},
286379
}
287380
for _, tt := range tests {

0 commit comments

Comments
 (0)