Skip to content

Commit 8d77a20

Browse files
committed
app/vlinsert/opentelemetry: parse ScopeLogs.scope field and store its contents as log fields with scope. prefix
See #826
1 parent f7e403c commit 8d77a20

File tree

3 files changed

+135
-12
lines changed

3 files changed

+135
-12
lines changed

app/vlinsert/opentelemetry/opentelemetry_test.go

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,30 @@ func TestPushProtobufRequest(t *testing.T) {
5454
resultsExpected := `{"_msg":"log-line-message","severity":"Trace"}`
5555
f(data, timestampsExpected, resultsExpected)
5656

57+
// single line with scope attributes
58+
data = `[{
59+
"scopeLogs": [{
60+
"scope": {
61+
"name": "foo",
62+
"version": "v1.234.5",
63+
"attributes": [
64+
{"key":"abc","value":{"stringValue":"de"}},
65+
{"key":"x","value":{"stringValue":"aaa"}}
66+
]
67+
},
68+
"logRecords": [{
69+
"timeUnixNano": 1234,
70+
"severityNumber": 1,
71+
"body": {
72+
"stringValue": "log-line-message"
73+
}
74+
}]
75+
}]
76+
}]`
77+
timestampsExpected = []int64{1234}
78+
resultsExpected = `{"scope.name":"foo","scope.version":"v1.234.5","scope.attributes.abc":"de","scope.attributes.x":"aaa","_msg":"log-line-message","severity":"Trace"}`
79+
f(data, timestampsExpected, resultsExpected)
80+
5781
// severities mapping
5882
data = `[{
5983
"scopeLogs": [{
@@ -110,6 +134,11 @@ func TestPushProtobufRequest(t *testing.T) {
110134
]
111135
},
112136
"scopeLogs": [{
137+
"scope": {
138+
"attributes": [
139+
{"key":"abc","value":{"stringValue":"de"}}
140+
]
141+
},
113142
"logRecords": [
114143
{"timeUnixNano":1234,"severityNumber":1,"body":{"stringValue":"log-line-message"}},
115144
{"timeUnixNano":1235,"severityNumber":5,"body":{"stringValue":"log-line-message-msg-2"}}
@@ -133,8 +162,8 @@ func TestPushProtobufRequest(t *testing.T) {
133162
]
134163
}]`
135164
timestampsExpected = []int64{1234, 1235, 2345, 2346, 2347, 2348, 3333, 432}
136-
resultsExpected = `{"logger":"context","instance_id":"10","node_taints.role":"dev","node_taints.cluster_load_percent":"0.55","_msg":"log-line-message","severity":"Trace"}
137-
{"logger":"context","instance_id":"10","node_taints.role":"dev","node_taints.cluster_load_percent":"0.55","_msg":"log-line-message-msg-2","severity":"Debug"}
165+
resultsExpected = `{"logger":"context","instance_id":"10","node_taints.role":"dev","node_taints.cluster_load_percent":"0.55","scope.name":"unknown","scope.version":"unknown","scope.attributes.abc":"de","_msg":"log-line-message","severity":"Trace"}
166+
{"logger":"context","instance_id":"10","node_taints.role":"dev","node_taints.cluster_load_percent":"0.55","scope.name":"unknown","scope.version":"unknown","scope.attributes.abc":"de","_msg":"log-line-message-msg-2","severity":"Debug"}
138167
{"_msg":"log-line-resource-scope-1-0-0","severity":"Info2"}
139168
{"_msg":"log-line-resource-scope-1-0-1","severity":"Info2"}
140169
{"_msg":"log-line-resource-scope-1-1-0","severity":"Info4"}
@@ -437,15 +466,39 @@ func (kvl *keyValueList) marshalProtobuf(mm *easyproto.MessageMarshaler) {
437466

438467
// scopeLogs represents the corresponding OTEL protobuf message
439468
type scopeLogs struct {
440-
LogRecords []logRecord `json:"logRecords,omitzero"`
469+
Scope *instrumentationScope `json:"scope,omitzero"`
470+
LogRecords []logRecord `json:"logRecords,omitzero"`
441471
}
442472

443473
func (sl *scopeLogs) marshalProtobuf(mm *easyproto.MessageMarshaler) {
474+
if sl.Scope != nil {
475+
sl.Scope.marshalProtobuf(mm.AppendMessage(1))
476+
}
444477
for _, m := range sl.LogRecords {
445478
m.marshalProtobuf(mm.AppendMessage(2))
446479
}
447480
}
448481

482+
// instrumentationScope represents the corresponding OTEL protobuf message.
483+
// See https://github.com/open-telemetry/opentelemetry-proto/blob/a5f0eac5b802f7ae51dfe41e5116fe5548955e64/opentelemetry/proto/common/v1/common.proto#L76
484+
type instrumentationScope struct {
485+
Name string `json:"name,omitzero"`
486+
Version string `json:"version,omitzero"`
487+
Attributes []*keyValue `json:"attributes,omitzero"`
488+
}
489+
490+
func (s *instrumentationScope) marshalProtobuf(mm *easyproto.MessageMarshaler) {
491+
if s.Name != "" {
492+
mm.AppendString(1, s.Name)
493+
}
494+
if s.Version != "" {
495+
mm.AppendString(2, s.Version)
496+
}
497+
for _, m := range s.Attributes {
498+
m.marshalProtobuf(mm.AppendMessage(3))
499+
}
500+
}
501+
449502
// logRecord represents the corresponding OTEL protobuf message.
450503
// See https://github.com/open-telemetry/oteps/blob/main/text/logs/0097-log-data-model.md
451504
type logRecord struct {

app/vlinsert/opentelemetry/pb.go

Lines changed: 77 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"strconv"
66
"time"
77

8+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
9+
810
"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
911
"github.com/VictoriaMetrics/easyproto"
1012
)
@@ -57,12 +59,12 @@ func decodeResourceLogs(src []byte, pushLogs pushLogsHandler) (err error) {
5759
defer logstorage.PutFields(fs)
5860

5961
// Decode resource
60-
data, ok, err := easyproto.GetMessageData(src, 1)
62+
resourceData, ok, err := easyproto.GetMessageData(src, 1)
6163
if err != nil {
6264
return fmt.Errorf("cannot find Resource: %w", err)
6365
}
6466
if ok {
65-
if err = decodeResource(data, fs, fb); err != nil {
67+
if err = decodeResource(resourceData, fs, fb); err != nil {
6668
return fmt.Errorf("cannot decode Resource: %w", err)
6769
}
6870
}
@@ -120,6 +122,7 @@ func decodeResource(src []byte, fs *logstorage.Fields, fb *fmtBuffer) (err error
120122

121123
func decodeScopeLogs(src []byte, fs *logstorage.Fields, pushLogs pushLogsHandler) (err error) {
122124
// message ScopeLogs {
125+
// InstrumentationScope scope = 1;
123126
// repeated LogRecord log_records = 2;
124127
// }
125128

@@ -128,6 +131,18 @@ func decodeScopeLogs(src []byte, fs *logstorage.Fields, pushLogs pushLogsHandler
128131

129132
streamFieldsLen := len(fs.Fields)
130133

134+
scopeData, ok, err := easyproto.GetMessageData(src, 1)
135+
if err != nil {
136+
return fmt.Errorf("cannot read InstrumentationScope: %w", err)
137+
}
138+
if ok {
139+
if err := decodeInstrumentationScope(scopeData, fs, fb); err != nil {
140+
return fmt.Errorf("cannot decode InstrumentationScope: %w", err)
141+
}
142+
}
143+
144+
commonFieldsLen := len(fs.Fields)
145+
131146
var fc easyproto.FieldContext
132147
for len(src) > 0 {
133148
src, err = fc.NextField(src)
@@ -142,7 +157,7 @@ func decodeScopeLogs(src []byte, fs *logstorage.Fields, pushLogs pushLogsHandler
142157
}
143158

144159
fb.reset()
145-
fs.Fields = fs.Fields[:streamFieldsLen]
160+
fs.Fields = fs.Fields[:commonFieldsLen]
146161

147162
eventName, timestamp, err := decodeLogRecord(data, fs, fb)
148163
if err != nil {
@@ -159,6 +174,9 @@ func decodeScopeLogs(src []byte, fs *logstorage.Fields, pushLogs pushLogsHandler
159174
f.Value = eventName
160175

161176
pushLogs(timestamp, fs.Fields, streamFieldsLen+1)
177+
178+
// Return back common fields to their places before the next iteration
179+
fs.Fields = append(fs.Fields[:streamFieldsLen], fs.Fields[streamFieldsLen+1:commonFieldsLen+1]...)
162180
} else {
163181
pushLogs(timestamp, fs.Fields, streamFieldsLen)
164182
}
@@ -167,6 +185,56 @@ func decodeScopeLogs(src []byte, fs *logstorage.Fields, pushLogs pushLogsHandler
167185
return nil
168186
}
169187

188+
func decodeInstrumentationScope(src []byte, fs *logstorage.Fields, fb *fmtBuffer) error {
189+
// See https://github.com/open-telemetry/opentelemetry-proto/blob/a5f0eac5b802f7ae51dfe41e5116fe5548955e64/opentelemetry/proto/common/v1/common.proto#L76
190+
//
191+
// message InstrumentationScope {
192+
// string name = 1;
193+
// string version = 2;
194+
// repeated KeyValue attributes = 3;
195+
// }
196+
197+
nameData, ok, err := easyproto.GetMessageData(src, 1)
198+
if err != nil {
199+
return fmt.Errorf("cannot read name: %w", err)
200+
}
201+
name := "unknown"
202+
if ok {
203+
name = bytesutil.ToUnsafeString(nameData)
204+
}
205+
fs.Add("scope.name", name)
206+
207+
versionData, ok, err := easyproto.GetMessageData(src, 2)
208+
if err != nil {
209+
return fmt.Errorf("cannot read version: %w", err)
210+
}
211+
version := "unknown"
212+
if ok {
213+
version = bytesutil.ToUnsafeString(versionData)
214+
}
215+
fs.Add("scope.version", version)
216+
217+
var fc easyproto.FieldContext
218+
for len(src) > 0 {
219+
src, err = fc.NextField(src)
220+
if err != nil {
221+
return fmt.Errorf("cannot read the next field: %w", err)
222+
}
223+
switch fc.FieldNum {
224+
case 3:
225+
attributesData, ok := fc.MessageData()
226+
if !ok {
227+
return fmt.Errorf("cannot read Attributes data")
228+
}
229+
if err := decodeKeyValue(attributesData, fs, fb, "scope.attributes"); err != nil {
230+
return fmt.Errorf("cannot decode Attributes: %w", err)
231+
}
232+
}
233+
}
234+
235+
return nil
236+
}
237+
170238
func decodeLogRecord(src []byte, fs *logstorage.Fields, fb *fmtBuffer) (string, int64, error) {
171239
// See https://github.com/open-telemetry/opentelemetry-proto/blob/a5f0eac5b802f7ae51dfe41e5116fe5548955e64/opentelemetry/proto/logs/v1/logs.proto#L136
172240
//
@@ -228,11 +296,11 @@ func decodeLogRecord(src []byte, fs *logstorage.Fields, fb *fmtBuffer) (string,
228296
return "", 0, fmt.Errorf("cannot decode Body: %w", err)
229297
}
230298
case 6:
231-
data, ok := fc.MessageData()
299+
attributesData, ok := fc.MessageData()
232300
if !ok {
233301
return "", 0, fmt.Errorf("cannot read Attributes data")
234302
}
235-
if err := decodeKeyValue(data, fs, fb, ""); err != nil {
303+
if err := decodeKeyValue(attributesData, fs, fb, ""); err != nil {
236304
return "", 0, fmt.Errorf("cannot decode Attributes: %w", err)
237305
}
238306
case 9:
@@ -282,17 +350,17 @@ func decodeKeyValue(src []byte, fs *logstorage.Fields, fb *fmtBuffer, fieldNameP
282350
// }
283351

284352
// Decode key
285-
data, ok, err := easyproto.GetMessageData(src, 1)
353+
keyData, ok, err := easyproto.GetMessageData(src, 1)
286354
if err != nil {
287355
return fmt.Errorf("cannot find Key in KeyValue: %w", err)
288356
}
289357
if !ok {
290358
return fmt.Errorf("key is missing in KeyValue")
291359
}
292-
fieldName := fb.formatSubFieldName(fieldNamePrefix, data)
360+
fieldName := fb.formatSubFieldName(fieldNamePrefix, keyData)
293361

294362
// Decode value
295-
data, ok, err = easyproto.GetMessageData(src, 2)
363+
valueData, ok, err := easyproto.GetMessageData(src, 2)
296364
if err != nil {
297365
return fmt.Errorf("cannot find Value in KeyValue: %w", err)
298366
}
@@ -301,7 +369,7 @@ func decodeKeyValue(src []byte, fs *logstorage.Fields, fb *fmtBuffer, fieldNameP
301369
return nil
302370
}
303371

304-
if err := decodeAnyValue(data, fs, fb, fieldName); err != nil {
372+
if err := decodeAnyValue(valueData, fs, fb, fieldName); err != nil {
305373
return fmt.Errorf("cannot decode AnyValue: %w", err)
306374
}
307375

docs/victorialogs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ according to the follosing docs:
2121

2222
## tip
2323

24+
* FEATURE: [OpenTelemetry data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/opentelemetry/): parse [`scope` inside ScopeLogs](https://github.com/open-telemetry/opentelemetry-proto/blob/a5f0eac5b802f7ae51dfe41e5116fe5548955e64/opentelemetry/proto/logs/v1/logs.proto#L72) and store it into [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) starting with `scope.` prefix. See [#826](https://github.com/VictoriaMetrics/VictoriaLogs/issues/826).
25+
2426
## [v1.40.0](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.40.0)
2527

2628
Released at 2025-12-05

0 commit comments

Comments
 (0)