Skip to content

Commit 6c58557

Browse files
committed
Bytes based batching for traces
Signed-off-by: Israel Blancas <[email protected]>
1 parent a8b2be1 commit 6c58557

File tree

6 files changed

+383
-54
lines changed

6 files changed

+383
-54
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
5+
6+
import (
7+
math_bits "math/bits"
8+
9+
"go.opentelemetry.io/collector/pdata/ptrace"
10+
)
11+
12+
type TracesSizer interface {
13+
TracesSize(ld ptrace.Traces) int
14+
ResourceSpansSize(rs ptrace.ResourceSpans) int
15+
ScopeSpansSize(ss ptrace.ScopeSpans) int
16+
SpanSize(span ptrace.Span) int
17+
// DeltaSize() returns the delta size when a span is added.
18+
DeltaSize(newItemSize int) int
19+
}
20+
21+
// TracesBytesSizer returns the byte size of serialized protos.
22+
type TracesBytesSizer struct {
23+
ptrace.ProtoMarshaler
24+
}
25+
26+
// DeltaSize() returns the delta size of a proto slice when a new item is added.
27+
// Example:
28+
//
29+
// prevSize := proto1.Size()
30+
// proto1.RepeatedField().AppendEmpty() = proto2
31+
//
32+
// Then currSize of proto1 can be calculated as
33+
//
34+
// currSize := (prevSize + sizer.DeltaSize(proto2.Size()))
35+
//
36+
// This is derived from pdata/internal/data/protogen/trace/v1/trace.pb.go
37+
// which is generated with gogo/protobuf.
38+
func (s *TracesBytesSizer) DeltaSize(newItemSize int) int {
39+
return 1 + newItemSize + sov(uint64(newItemSize)) //nolint:gosec // disable G115
40+
}
41+
42+
// TracesCountSizer returns the number of spans in the traces.
43+
type TracesCountSizer struct{}
44+
45+
func (s *TracesCountSizer) TracesSize(td ptrace.Traces) int {
46+
return td.SpanCount()
47+
}
48+
49+
func (s *TracesCountSizer) ResourceSpansSize(rs ptrace.ResourceSpans) int {
50+
count := 0
51+
for k := 0; k < rs.ScopeSpans().Len(); k++ {
52+
count += rs.ScopeSpans().At(k).Spans().Len()
53+
}
54+
return count
55+
}
56+
57+
func (s *TracesCountSizer) ScopeSpansSize(ss ptrace.ScopeSpans) int {
58+
return ss.Spans().Len()
59+
}
60+
61+
func (s *TracesCountSizer) SpanSize(_ ptrace.Span) int {
62+
return 1
63+
}
64+
65+
func (s *TracesCountSizer) DeltaSize(newItemSize int) int {
66+
return newItemSize
67+
}
68+
69+
func sov(x uint64) int {
70+
return (math_bits.Len64(x|1) + 6) / 7
71+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
package sizer
4+
5+
import (
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
"go.opentelemetry.io/collector/pdata/testdata"
11+
)
12+
13+
func TestTracesCountSizer(t *testing.T) {
14+
td := testdata.GenerateTraces(5)
15+
sizer := TracesCountSizer{}
16+
require.Equal(t, 5, sizer.TracesSize(td))
17+
18+
rs := td.ResourceSpans().At(0)
19+
require.Equal(t, 5, sizer.ResourceSpansSize(rs))
20+
21+
ss := rs.ScopeSpans().At(0)
22+
require.Equal(t, 5, sizer.ScopeSpansSize(ss))
23+
24+
require.Equal(t, 1, sizer.SpanSize(ss.Spans().At(0)))
25+
require.Equal(t, 1, sizer.SpanSize(ss.Spans().At(1)))
26+
require.Equal(t, 1, sizer.SpanSize(ss.Spans().At(2)))
27+
require.Equal(t, 1, sizer.SpanSize(ss.Spans().At(3)))
28+
require.Equal(t, 1, sizer.SpanSize(ss.Spans().At(4)))
29+
30+
prevSize := sizer.ScopeSpansSize(ss)
31+
span := ss.Spans().At(2)
32+
span.CopyTo(ss.Spans().AppendEmpty())
33+
require.Equal(t, sizer.ScopeSpansSize(ss), prevSize+sizer.DeltaSize(sizer.SpanSize(span)))
34+
}
35+
36+
func TestTracesBytesSizer(t *testing.T) {
37+
td := testdata.GenerateTraces(2)
38+
sizer := TracesBytesSizer{}
39+
require.Equal(t, 338, sizer.TracesSize(td))
40+
41+
rs := td.ResourceSpans().At(0)
42+
require.Equal(t, 335, sizer.ResourceSpansSize(rs))
43+
44+
ss := rs.ScopeSpans().At(0)
45+
require.Equal(t, 290, sizer.ScopeSpansSize(ss))
46+
47+
require.Equal(t, 187, sizer.SpanSize(ss.Spans().At(0)))
48+
require.Equal(t, 96, sizer.SpanSize(ss.Spans().At(1)))
49+
50+
prevSize := sizer.ScopeSpansSize(ss)
51+
span := ss.Spans().At(1)
52+
spanSize := sizer.SpanSize(span)
53+
span.CopyTo(ss.Spans().AppendEmpty())
54+
ds := sizer.DeltaSize(spanSize)
55+
require.Equal(t, prevSize+ds, sizer.ScopeSpansSize(ss))
56+
}
57+
58+
func TestLogsBytesDeltaSize(t *testing.T) {
59+
sizer := TracesBytesSizer{}
60+
require.Equal(t, 129, sizer.DeltaSize(127))
61+
require.Equal(t, 131, sizer.DeltaSize(128))
62+
require.Equal(t, 242, sizer.DeltaSize(239))
63+
}

exporter/exporterhelper/traces.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/consumer/consumererror"
1515
"go.opentelemetry.io/collector/exporter"
1616
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1718
"go.opentelemetry.io/collector/pdata/ptrace"
1819
"go.opentelemetry.io/collector/pipeline"
1920
)
@@ -24,16 +25,16 @@ var (
2425
)
2526

2627
type tracesRequest struct {
27-
td ptrace.Traces
28-
pusher consumer.ConsumeTracesFunc
29-
cachedItemsCount int
28+
td ptrace.Traces
29+
pusher consumer.ConsumeTracesFunc
30+
cachedSize int
3031
}
3132

3233
func newTracesRequest(td ptrace.Traces, pusher consumer.ConsumeTracesFunc) Request {
3334
return &tracesRequest{
34-
td: td,
35-
pusher: pusher,
36-
cachedItemsCount: td.SpanCount(),
35+
td: td,
36+
pusher: pusher,
37+
cachedSize: -1,
3738
}
3839
}
3940

@@ -66,11 +67,18 @@ func (req *tracesRequest) Export(ctx context.Context) error {
6667
}
6768

6869
func (req *tracesRequest) ItemsCount() int {
69-
return req.cachedItemsCount
70+
return req.td.SpanCount()
7071
}
7172

72-
func (req *tracesRequest) setCachedItemsCount(count int) {
73-
req.cachedItemsCount = count
73+
func (req *tracesRequest) Size(sizer sizer.TracesSizer) int {
74+
if req.cachedSize == -1 {
75+
req.cachedSize = sizer.TracesSize(req.td)
76+
}
77+
return req.cachedSize
78+
}
79+
80+
func (req *tracesRequest) setCachedSize(size int) {
81+
req.cachedSize = size
7482
}
7583

7684
type tracesExporter struct {

exporter/exporterhelper/traces_batch.go

Lines changed: 96 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,106 +8,159 @@ import (
88
"errors"
99

1010
"go.opentelemetry.io/collector/exporter/exporterbatcher"
11+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1112
"go.opentelemetry.io/collector/pdata/ptrace"
1213
)
1314

1415
// MergeSplit splits and/or merges the provided traces request and the current request into one or more requests
1516
// conforming with the MaxSizeConfig.
1617
func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.SizeConfig, r2 Request) ([]Request, error) {
18+
var sz sizer.TracesSizer
19+
switch cfg.Sizer {
20+
case exporterbatcher.SizerTypeItems:
21+
sz = &sizer.TracesCountSizer{}
22+
case exporterbatcher.SizerTypeBytes:
23+
sz = &sizer.TracesBytesSizer{}
24+
default:
25+
return nil, errors.New("unknown sizer type")
26+
}
27+
1728
if r2 != nil {
1829
req2, ok := r2.(*tracesRequest)
1930
if !ok {
2031
return nil, errors.New("invalid input type")
2132
}
22-
req2.mergeTo(req)
33+
req2.mergeTo(req, sz)
2334
}
2435

2536
// If no limit we can simply merge the new request into the current and return.
2637
if cfg.MaxSize == 0 {
2738
return []Request{req}, nil
2839
}
29-
return req.split(cfg)
40+
return req.split(cfg.MaxSize, sz), nil
3041
}
3142

32-
func (req *tracesRequest) mergeTo(dst *tracesRequest) {
33-
dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount())
34-
req.setCachedItemsCount(0)
43+
func (req *tracesRequest) mergeTo(dst *tracesRequest, sz sizer.TracesSizer) {
44+
if sz != nil {
45+
dst.setCachedSize(dst.Size(sz) + req.Size(sz))
46+
req.setCachedSize(0)
47+
}
3548
req.td.ResourceSpans().MoveAndAppendTo(dst.td.ResourceSpans())
3649
}
3750

38-
func (req *tracesRequest) split(cfg exporterbatcher.SizeConfig) ([]Request, error) {
51+
func (req *tracesRequest) split(maxSize int, sz sizer.TracesSizer) []Request {
3952
var res []Request
40-
for req.ItemsCount() > cfg.MaxSize {
41-
td := extractTraces(req.td, cfg.MaxSize)
42-
size := td.SpanCount()
43-
req.setCachedItemsCount(req.ItemsCount() - size)
44-
res = append(res, &tracesRequest{td: td, pusher: req.pusher, cachedItemsCount: size})
53+
for req.Size(sz) > maxSize {
54+
td, rmSize := extractTraces(req.td, maxSize, sz)
55+
req.setCachedSize(req.Size(sz) - rmSize)
56+
res = append(res, newTracesRequest(td, req.pusher))
4557
}
4658
res = append(res, req)
47-
return res, nil
59+
return res
4860
}
4961

5062
// extractTraces extracts a new traces with a maximum number of spans.
51-
func extractTraces(srcTraces ptrace.Traces, count int) ptrace.Traces {
63+
func extractTraces(srcTraces ptrace.Traces, capacity int, sz sizer.TracesSizer) (ptrace.Traces, int) {
5264
destTraces := ptrace.NewTraces()
65+
capacityLeft := capacity - sz.TracesSize(destTraces)
66+
removedSize := 0
5367
srcTraces.ResourceSpans().RemoveIf(func(srcRS ptrace.ResourceSpans) bool {
54-
if count == 0 {
68+
// If the no more capacity left just return.
69+
if capacityLeft == 0 {
5570
return false
5671
}
57-
needToExtract := resourceTracesCount(srcRS) > count
58-
if needToExtract {
59-
srcRS = extractResourceSpans(srcRS, count)
72+
rawRsSize := sz.ResourceSpansSize(srcRS)
73+
rsSize := sz.DeltaSize(rawRsSize)
74+
75+
if rsSize > capacityLeft {
76+
extSrcRS, extRsSize := extractResourceSpans(srcRS, capacityLeft, sz)
77+
// This cannot make it to exactly 0 for the bytes,
78+
// force it to be 0 since that is the stopping condition.
79+
capacityLeft = 0
80+
removedSize += extRsSize
81+
// There represents the delta between the delta sizes.
82+
removedSize += rsSize - rawRsSize - (sz.DeltaSize(rawRsSize-extRsSize) - (rawRsSize - extRsSize))
83+
// It is possible that for the bytes scenario, the extracted field contains no log records.
84+
// Do not add it to the destination if that is the case.
85+
if extSrcRS.ScopeSpans().Len() > 0 {
86+
extSrcRS.MoveTo(destTraces.ResourceSpans().AppendEmpty())
87+
}
88+
return extSrcRS.ScopeSpans().Len() != 0
6089
}
61-
count -= resourceTracesCount(srcRS)
90+
capacityLeft -= rsSize
91+
removedSize += rsSize
92+
6293
srcRS.MoveTo(destTraces.ResourceSpans().AppendEmpty())
63-
return !needToExtract
94+
return true
6495
})
65-
return destTraces
96+
return destTraces, removedSize
6697
}
6798

6899
// extractResourceSpans extracts spans and returns a new resource spans with the specified number of spans.
69-
func extractResourceSpans(srcRS ptrace.ResourceSpans, count int) ptrace.ResourceSpans {
100+
func extractResourceSpans(srcRS ptrace.ResourceSpans, capacity int, sz sizer.TracesSizer) (ptrace.ResourceSpans, int) {
70101
destRS := ptrace.NewResourceSpans()
71102
destRS.SetSchemaUrl(srcRS.SchemaUrl())
72103
srcRS.Resource().CopyTo(destRS.Resource())
104+
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
105+
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.ResourceSpansSize(destRS)
106+
removedSize := 0
73107
srcRS.ScopeSpans().RemoveIf(func(srcSS ptrace.ScopeSpans) bool {
74-
if count == 0 {
108+
// If the no more capacity left just return.
109+
if capacityLeft == 0 {
75110
return false
76111
}
77-
needToExtract := srcSS.Spans().Len() > count
78-
if needToExtract {
79-
srcSS = extractScopeSpans(srcSS, count)
112+
113+
rawSlSize := sz.ScopeSpansSize(srcSS)
114+
ssSize := sz.DeltaSize(rawSlSize)
115+
if ssSize > capacityLeft {
116+
extSrcSS, extSsSize := extractScopeSpans(srcSS, capacityLeft, sz)
117+
// This cannot make it to exactly 0 for the bytes,
118+
// force it to be 0 since that is the stopping condition.
119+
capacityLeft = 0
120+
removedSize += extSsSize
121+
// There represents the delta between the delta sizes.
122+
removedSize += ssSize - rawSlSize - (sz.DeltaSize(rawSlSize-extSsSize) - (rawSlSize - extSsSize))
123+
// It is possible that for the bytes scenario, the extracted field contains no log records.
124+
// Do not add it to the destination if that is the case.
125+
if extSrcSS.Spans().Len() > 0 {
126+
extSrcSS.MoveTo(destRS.ScopeSpans().AppendEmpty())
127+
}
128+
return extSrcSS.Spans().Len() != 0
80129
}
81-
count -= srcSS.Spans().Len()
130+
capacityLeft -= ssSize
131+
removedSize += ssSize
132+
82133
srcSS.MoveTo(destRS.ScopeSpans().AppendEmpty())
83-
return !needToExtract
134+
return true
84135
})
85-
srcRS.Resource().CopyTo(destRS.Resource())
86-
return destRS
136+
return destRS, removedSize
87137
}
88138

89139
// extractScopeSpans extracts spans and returns a new scope spans with the specified number of spans.
90-
func extractScopeSpans(srcSS ptrace.ScopeSpans, count int) ptrace.ScopeSpans {
140+
func extractScopeSpans(srcSS ptrace.ScopeSpans, capacity int, sz sizer.TracesSizer) (ptrace.ScopeSpans, int) {
91141
destSS := ptrace.NewScopeSpans()
92142
destSS.SetSchemaUrl(srcSS.SchemaUrl())
93143
srcSS.Scope().CopyTo(destSS.Scope())
144+
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
145+
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.ScopeSpansSize(destSS)
146+
removedSize := 0
94147
srcSS.Spans().RemoveIf(func(srcSpan ptrace.Span) bool {
95-
if count == 0 {
148+
// If the no more capacity left just return.
149+
if capacityLeft == 0 {
150+
return false
151+
}
152+
rsSize := sz.DeltaSize(sz.SpanSize(srcSpan))
153+
if rsSize > capacityLeft {
154+
// This cannot make it to exactly 0 for the bytes,
155+
// force it to be 0 since that is the stopping condition.
156+
capacityLeft = 0
96157
return false
97158
}
159+
160+
capacityLeft -= rsSize
161+
removedSize += rsSize
98162
srcSpan.MoveTo(destSS.Spans().AppendEmpty())
99-
count--
100163
return true
101164
})
102-
return destSS
103-
}
104-
105-
// resourceTracesCount calculates the total number of spans in the pdata.ResourceSpans.
106-
func resourceTracesCount(rs ptrace.ResourceSpans) int {
107-
count := 0
108-
rs.ScopeSpans().RemoveIf(func(ss ptrace.ScopeSpans) bool {
109-
count += ss.Spans().Len()
110-
return false
111-
})
112-
return count
165+
return destSS, removedSize
113166
}

0 commit comments

Comments
 (0)