Skip to content

Commit 4ffb744

Browse files
vlinsert: improve opentelemetry decode perfomance (#720)
Co-authored-by: Aliaksandr Valialkin <[email protected]>
1 parent e214adf commit 4ffb744

File tree

15 files changed

+1211
-2186
lines changed

15 files changed

+1211
-2186
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package opentelemetry
2+
3+
import (
4+
"encoding/base64"
5+
"encoding/hex"
6+
"strconv"
7+
"sync"
8+
9+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
10+
"github.com/valyala/fastjson"
11+
)
12+
13+
type fmtBuffer struct {
14+
buf []byte
15+
}
16+
17+
var fmtBufferPool sync.Pool
18+
19+
func getFmtBuffer() *fmtBuffer {
20+
v := fmtBufferPool.Get()
21+
if v == nil {
22+
return &fmtBuffer{}
23+
}
24+
return v.(*fmtBuffer)
25+
}
26+
27+
func putFmtBuffer(fb *fmtBuffer) {
28+
fb.reset()
29+
fmtBufferPool.Put(fb)
30+
}
31+
32+
func (fb *fmtBuffer) reset() {
33+
fb.buf = fb.buf[:0]
34+
}
35+
36+
func (fb *fmtBuffer) formatInt(v int64) string {
37+
n := len(fb.buf)
38+
fb.buf = strconv.AppendInt(fb.buf, v, 10)
39+
return bytesutil.ToUnsafeString(fb.buf[n:])
40+
}
41+
42+
func (fb *fmtBuffer) formatFloat(v float64) string {
43+
n := len(fb.buf)
44+
fb.buf = strconv.AppendFloat(fb.buf, v, 'f', -1, 64)
45+
return bytesutil.ToUnsafeString(fb.buf[n:])
46+
}
47+
48+
func (fb *fmtBuffer) formatSubFieldName(prefix string, suffix []byte) string {
49+
if prefix == "" {
50+
// There is no prefix, so just return the suffix as is.
51+
return bytesutil.ToUnsafeString(suffix)
52+
}
53+
54+
n := len(fb.buf)
55+
fb.buf = append(fb.buf, prefix...)
56+
fb.buf = append(fb.buf, '.')
57+
fb.buf = append(fb.buf, suffix...)
58+
59+
return bytesutil.ToUnsafeString(fb.buf[n:])
60+
}
61+
62+
func (fb *fmtBuffer) formatHex(src []byte) string {
63+
n := len(fb.buf)
64+
fb.buf = hex.AppendEncode(fb.buf, src)
65+
return bytesutil.ToUnsafeString(fb.buf[n:])
66+
}
67+
68+
func (fb *fmtBuffer) formatBase64(src []byte) string {
69+
n := len(fb.buf)
70+
fb.buf = base64.StdEncoding.AppendEncode(fb.buf, src)
71+
return bytesutil.ToUnsafeString(fb.buf[n:])
72+
}
73+
74+
func (fb *fmtBuffer) encodeJSONValue(v *fastjson.Value) string {
75+
n := len(fb.buf)
76+
fb.buf = v.MarshalTo(fb.buf)
77+
return bytesutil.ToUnsafeString(fb.buf[n:])
78+
}

app/vlinsert/opentelemetry/opentelemetry.go

Lines changed: 9 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
99
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
10-
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
1110
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
1211
"github.com/VictoriaMetrics/metrics"
1312

@@ -76,80 +75,20 @@ var (
7675
)
7776

7877
func pushProtobufRequest(data []byte, lmp insertutil.LogMessageProcessor, msgFields []string, useDefaultStreamFields bool) error {
79-
var req pb.ExportLogsServiceRequest
80-
if err := req.UnmarshalProtobuf(data); err != nil {
81-
errorsTotal.Inc()
82-
return fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(data), err)
83-
}
84-
85-
var commonFields []logstorage.Field
86-
for _, rl := range req.ResourceLogs {
87-
commonFields = commonFields[:0]
88-
commonFields = appendKeyValues(commonFields, rl.Resource.Attributes, "")
89-
commonFieldsLen := len(commonFields)
90-
for _, sc := range rl.ScopeLogs {
91-
commonFields = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp, msgFields, useDefaultStreamFields)
92-
}
93-
}
94-
95-
return nil
96-
}
97-
98-
func pushFieldsFromScopeLogs(sc *pb.ScopeLogs, commonFields []logstorage.Field, lmp insertutil.LogMessageProcessor, msgFields []string, useDefaultStreamFields bool) []logstorage.Field {
99-
fields := commonFields
100-
for _, lr := range sc.LogRecords {
101-
fields = fields[:len(commonFields)]
102-
if lr.Body.KeyValueList != nil {
103-
fields = appendKeyValues(fields, lr.Body.KeyValueList.Values, "")
104-
logstorage.RenameField(fields[len(commonFields):], msgFields, "_msg")
105-
} else {
106-
fields = append(fields, logstorage.Field{
107-
Name: "_msg",
108-
Value: lr.Body.FormatString(true),
109-
})
110-
}
111-
fields = appendKeyValues(fields, lr.Attributes, "")
112-
if len(lr.TraceID) > 0 {
113-
fields = append(fields, logstorage.Field{
114-
Name: "trace_id",
115-
Value: lr.TraceID,
116-
})
117-
}
118-
if len(lr.SpanID) > 0 {
119-
fields = append(fields, logstorage.Field{
120-
Name: "span_id",
121-
Value: lr.SpanID,
122-
})
123-
}
124-
fields = append(fields, logstorage.Field{
125-
Name: "severity",
126-
Value: lr.FormatSeverity(),
127-
})
78+
pushLogs := func(timestamp int64, fields []logstorage.Field, resourceFieldsLen int) {
79+
logstorage.RenameField(fields[resourceFieldsLen:], msgFields, "_msg")
12880

12981
var streamFields []logstorage.Field
13082
if useDefaultStreamFields {
131-
streamFields = commonFields
83+
streamFields = fields[:resourceFieldsLen]
13284
}
133-
lmp.AddRow(lr.ExtractTimestampNano(), fields, streamFields)
134-
}
135-
return fields
136-
}
13785

138-
func appendKeyValues(fields []logstorage.Field, kvs []*pb.KeyValue, parentField string) []logstorage.Field {
139-
for _, attr := range kvs {
140-
fieldName := attr.Key
141-
if parentField != "" {
142-
fieldName = parentField + "." + fieldName
143-
}
86+
lmp.AddRow(timestamp, fields, streamFields)
87+
}
14488

145-
if attr.Value.KeyValueList != nil {
146-
fields = appendKeyValues(fields, attr.Value.KeyValueList.Values, fieldName)
147-
} else {
148-
fields = append(fields, logstorage.Field{
149-
Name: fieldName,
150-
Value: attr.Value.FormatString(true),
151-
})
152-
}
89+
if err := decodeLogsData(data, pushLogs); err != nil {
90+
errorsTotal.Inc()
91+
return fmt.Errorf("cannot decode LogsData request from %d bytes: %w", len(data), err)
15392
}
154-
return fields
93+
return nil
15594
}

0 commit comments

Comments
 (0)