Skip to content

Commit d31b1e4

Browse files
authored
Reenable profiles batching (#14313)
With the introduction of `MergeTo` into profiles, we can now reenable profiles batching! 🥳 Closes #13106
1 parent c216239 commit d31b1e4

File tree

3 files changed

+53
-44
lines changed

3 files changed

+53
-44
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/otlp)
7+
component: pkg/xexporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Reenable batching for profiles
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [14313]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

exporter/exporterhelper/xexporterhelper/profiles_batch.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,10 @@ func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt expor
3333
if !ok {
3434
return nil, errors.New("invalid input type")
3535
}
36-
// TODO(13106): handle merging of profiles (and change the indice tables with their new indices)
37-
// req2.mergeTo(req, sz)
38-
39-
// If no limit we can simply merge the new request into the current and return.
40-
if maxSize == 0 {
41-
return []Request{req, req2}, nil
36+
err := req2.mergeTo(req, sz)
37+
if err != nil {
38+
return nil, fmt.Errorf("failed merging profiles; %w", err)
4239
}
43-
44-
sp1, err1 := req.split(maxSize, sz)
45-
sp2, err2 := req2.split(maxSize, sz)
46-
47-
return append(sp1, sp2...), errors.Join(err1, err2)
4840
}
4941

5042
// If no limit we can simply merge the new request into the current and return.
@@ -54,14 +46,13 @@ func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt expor
5446
return req.split(maxSize, sz)
5547
}
5648

57-
// TODO(13106): handle merging of profiles (and change the indice tables with their new indices)
58-
/*func (req *profilesRequest) mergeTo(dst *profilesRequest, sz sizer.ProfilesSizer) {
49+
func (req *profilesRequest) mergeTo(dst *profilesRequest, sz sizer.ProfilesSizer) error {
5950
if sz != nil {
6051
dst.setCachedSize(dst.size(sz) + req.size(sz))
6152
req.setCachedSize(0)
6253
}
63-
req.pd.ResourceProfiles().MoveAndAppendTo(dst.pd.ResourceProfiles())
64-
}*/
54+
return req.pd.MergeTo(dst.pd)
55+
}
6556

6657
func (req *profilesRequest) split(maxSize int, sz sizer.ProfilesSizer) ([]Request, error) {
6758
var res []Request
@@ -83,6 +74,8 @@ func extractProfiles(srcProfiles pprofile.Profiles, capacity int, sz sizer.Profi
8374
destProfiles := pprofile.NewProfiles()
8475
capacityLeft := capacity - sz.ProfilesSize(destProfiles)
8576
removedSize := 0
77+
78+
srcProfiles.Dictionary().CopyTo(destProfiles.Dictionary())
8679
srcProfiles.ResourceProfiles().RemoveIf(func(srcRP pprofile.ResourceProfiles) bool {
8780
// If the no more capacity left just return.
8881
if capacityLeft == 0 {

exporter/exporterhelper/xexporterhelper/profiles_batch_test.go

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ func TestMergeProfiles(t *testing.T) {
2222
pr2 := newProfilesRequest(testdata.GenerateProfiles(3))
2323
res, err := pr1.MergeSplit(context.Background(), 0, exporterhelper.RequestSizerTypeItems, pr2)
2424
require.NoError(t, err)
25-
assert.Len(t, res, 2)
26-
assert.Equal(t, 2, res[0].ItemsCount())
27-
assert.Equal(t, 3, res[1].ItemsCount())
25+
assert.Len(t, res, 1)
26+
assert.Equal(t, 5, res[0].ItemsCount())
2827
}
2928

3029
func TestMergeProfilesInvalidInput(t *testing.T) {
@@ -34,8 +33,6 @@ func TestMergeProfilesInvalidInput(t *testing.T) {
3433
}
3534

3635
func TestMergeSplitProfiles(t *testing.T) {
37-
t.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)")
38-
3936
tests := []struct {
4037
name string
4138
szt exporterhelper.RequestSizerType
@@ -53,12 +50,16 @@ func TestMergeSplitProfiles(t *testing.T) {
5350
expected: []Request{newProfilesRequest(pprofile.NewProfiles())},
5451
},
5552
{
56-
name: "first_request_empty",
57-
szt: exporterhelper.RequestSizerTypeItems,
58-
maxSize: 10,
59-
pr1: newProfilesRequest(pprofile.NewProfiles()),
60-
pr2: newProfilesRequest(testdata.GenerateProfiles(5)),
61-
expected: []Request{newProfilesRequest(testdata.GenerateProfiles(5))},
53+
name: "first_request_empty",
54+
szt: exporterhelper.RequestSizerTypeItems,
55+
maxSize: 10,
56+
pr1: newProfilesRequest(testdata.GenerateProfiles(0)),
57+
pr2: newProfilesRequest(testdata.GenerateProfiles(5)),
58+
expected: []Request{newProfilesRequest(func() pprofile.Profiles {
59+
profiles := testdata.GenerateProfiles(0)
60+
_ = testdata.GenerateProfiles(5).MergeTo(profiles)
61+
return profiles
62+
}())},
6263
},
6364
{
6465
name: "first_empty_second_nil",
@@ -137,8 +138,6 @@ func TestMergeSplitProfiles(t *testing.T) {
137138
}
138139

139140
func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) {
140-
t.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)")
141-
142141
tests := []struct {
143142
name string
144143
szt exporterhelper.RequestSizerType
@@ -253,7 +252,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) {
253252
{
254253
name: "merge_only",
255254
szt: exporterhelper.RequestSizerTypeBytes,
256-
maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(11)),
255+
maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(13)),
257256
pr1: newProfilesRequest(testdata.GenerateProfiles(4)),
258257
pr2: newProfilesRequest(testdata.GenerateProfiles(6)),
259258
expected: []Request{newProfilesRequest(func() pprofile.Profiles {
@@ -266,12 +265,12 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) {
266265
name: "split_only",
267266
szt: exporterhelper.RequestSizerTypeBytes,
268267
maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(4)),
269-
pr1: newProfilesRequest(pprofile.NewProfiles()),
268+
pr1: newProfilesRequest(testdata.GenerateProfiles(0)),
270269
pr2: newProfilesRequest(testdata.GenerateProfiles(10)),
271270
expected: []Request{
272271
newProfilesRequest(testdata.GenerateProfiles(4)),
273-
newProfilesRequest(testdata.GenerateProfiles(4)),
274-
newProfilesRequest(testdata.GenerateProfiles(2)),
272+
newProfilesRequest(testdata.GenerateProfiles(5)),
273+
newProfilesRequest(testdata.GenerateProfiles(1)),
275274
},
276275
},
277276
{
@@ -283,11 +282,11 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) {
283282
expected: []Request{
284283
newProfilesRequest(func() pprofile.Profiles {
285284
profiles := testdata.GenerateProfiles(7)
286-
testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles())
285+
testdata.GenerateProfiles(3).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles())
287286
return profiles
288287
}()),
289-
newProfilesRequest(testdata.GenerateProfiles(10)),
290-
newProfilesRequest(testdata.GenerateProfiles(9)),
288+
newProfilesRequest(testdata.GenerateProfiles(11)),
289+
newProfilesRequest(testdata.GenerateProfiles(7)),
291290
},
292291
},
293292
}
@@ -313,8 +312,6 @@ func TestExtractProfiles(t *testing.T) {
313312
}
314313

315314
func TestMergeSplitManySmallLogs(t *testing.T) {
316-
t.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)")
317-
318315
// All requests merge into a single batch.
319316
merged := []Request{newProfilesRequest(testdata.GenerateProfiles(1))}
320317
for range 1000 {
@@ -326,8 +323,6 @@ func TestMergeSplitManySmallLogs(t *testing.T) {
326323
}
327324

328325
func BenchmarkSplittingBasedOnByteSizeManySmallProfiles(b *testing.B) {
329-
b.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)")
330-
331326
// All requests merge into a single batch.
332327
b.ReportAllocs()
333328
for b.Loop() {
@@ -342,13 +337,11 @@ func BenchmarkSplittingBasedOnByteSizeManySmallProfiles(b *testing.B) {
342337
)
343338
merged = append(merged[0:len(merged)-1], res...)
344339
}
345-
assert.Len(b, merged, 1)
340+
assert.Len(b, merged, 2)
346341
}
347342
}
348343

349344
func BenchmarkSplittingBasedOnByteSizeManyProfilesSlightlyAboveLimit(b *testing.B) {
350-
b.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)")
351-
352345
// Every incoming request results in a split.
353346
b.ReportAllocs()
354347
for b.Loop() {
@@ -369,8 +362,6 @@ func BenchmarkSplittingBasedOnByteSizeManyProfilesSlightlyAboveLimit(b *testing.
369362
}
370363

371364
func BenchmarkSplittingBasedOnByteSizeHugeProfiles(b *testing.B) {
372-
b.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)")
373-
374365
// One request splits into many batches.
375366
b.ReportAllocs()
376367
for b.Loop() {

0 commit comments

Comments
 (0)