Skip to content

Commit 0d9e18c

Browse files
committed
lib/logstorage: reduce the propbability of improper usage of LogRows.MustAdd() function
The LogRows.MustAdd function was accepting streamFields []Field as the last arg. VictoriaLogs data model expects that all the stream fields exist in the log fields. The LogRows.MustAdd() function was easy to misuse by passing streamFields, which are missing in the fields slice. This could result in violation of VictoriaLogs data model - e.g. there could be stream fields, which are searchable via {field_name=value} syntax, while aren't searchable via field_name:=value syntax. Fix this by accepting streamFieldsLen arg instead of streamFields arg at LogRows.MustAdd. The caller must ensure that the given number of stream fields exist in the beginning of the fields slice.
1 parent 6b7ee92 commit 0d9e18c

File tree

22 files changed

+97
-92
lines changed

22 files changed

+97
-92
lines changed

app/vlagent/kubernetescollector/processor.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,14 @@ type logFileProcessor struct {
3636
lr *logstorage.LogRows
3737
tenantID logstorage.TenantID
3838

39+
// streamFieldsLen is the number of stream fields at the beginning of commonFields.
40+
streamFieldsLen int
41+
42+
// commonFields are common fields for the given log file.
3943
commonFields []logstorage.Field
40-
streamFields []logstorage.Field
44+
45+
// fieldsBuf is used for constructing log fields from commonFields and the actual log line fields before sending them to VictoriaLogs.
46+
fieldsBuf []logstorage.Field
4147

4248
// partialCRIContent accumulates the content of partial CRI log lines.
4349
// Can be truncated if it exceeds maxLineSize.
@@ -47,13 +53,20 @@ type logFileProcessor struct {
4753
}
4854

4955
func newLogFileProcessor(storage insertutil.LogRowsStorage, commonFields []logstorage.Field) *logFileProcessor {
50-
var streamFields []logstorage.Field
56+
// move stream fields to the beginning of commonFields
57+
58+
streamFields := make([]logstorage.Field, 0, len(commonFields))
5159
for _, f := range commonFields {
52-
for _, fieldName := range streamFieldNames {
53-
if f.Name == fieldName {
54-
streamFields = append(streamFields, f)
55-
break
56-
}
60+
if slices.Contains(streamFieldNames, f.Name) {
61+
streamFields = append(streamFields, f)
62+
}
63+
}
64+
streamFieldsLen := len(streamFields)
65+
66+
fields := streamFields
67+
for _, f := range commonFields {
68+
if !slices.Contains(streamFieldNames, f.Name) {
69+
fields = append(fields, f)
5770
}
5871
}
5972

@@ -68,8 +81,8 @@ func newLogFileProcessor(storage insertutil.LogRowsStorage, commonFields []logst
6881
lr: lr,
6982
tenantID: tenantID,
7083

71-
commonFields: commonFields,
72-
streamFields: streamFields,
84+
streamFieldsLen: streamFieldsLen,
85+
commonFields: fields,
7386
}
7487
}
7588

@@ -146,7 +159,7 @@ func (lfp *logFileProcessor) joinPartialLines(criLine criLine) (int64, []byte, b
146159
lfp.partialCRIContentSize += len(criLine.content)
147160
if lfp.partialCRIContentSize > maxLogLineSize {
148161
// Discard the too large log line.
149-
reportLogRowSizeExceeded(lfp.streamFields, lfp.partialCRIContentSize)
162+
reportLogRowSizeExceeded(lfp.commonFields[:lfp.streamFieldsLen], lfp.partialCRIContentSize)
150163

151164
lfp.partialCRIContent.Reset()
152165
lfp.partialCRIContentSize = 0
@@ -189,8 +202,6 @@ func (lfp *logFileProcessor) addLineInternal(timestamp int64, line []byte) {
189202
logstorage.RenameField(parser.Fields, *msgField, "_msg")
190203
}
191204

192-
parser.Fields = append(parser.Fields, lfp.commonFields...)
193-
194205
if len(parser.Fields) > 1000 {
195206
line := logstorage.MarshalFieldsToJSON(nil, parser.Fields)
196207
logger.Warnf("dropping log line with %d fields; %s", parser.Fields, line)
@@ -201,7 +212,11 @@ func (lfp *logFileProcessor) addLineInternal(timestamp int64, line []byte) {
201212
}
202213

203214
func (lfp *logFileProcessor) addRow(timestamp int64, fields []logstorage.Field) {
204-
lfp.lr.MustAdd(lfp.tenantID, timestamp, fields, lfp.streamFields)
215+
clear(lfp.fieldsBuf)
216+
lfp.fieldsBuf = append(lfp.fieldsBuf[:0], lfp.commonFields...)
217+
lfp.fieldsBuf = append(lfp.fieldsBuf, fields...)
218+
219+
lfp.lr.MustAdd(lfp.tenantID, timestamp, lfp.fieldsBuf, lfp.streamFieldsLen)
205220
lfp.storage.MustAddRows(lfp.lr)
206221
lfp.lr.ResetKeepSettings()
207222
}

app/vlinsert/datadog/datadog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ func readLogsRequest(ts int64, data []byte, lmp insertutil.LogMessageProcessor)
253253
if err != nil {
254254
return err
255255
}
256-
lmp.AddRow(ts, fields, nil)
256+
lmp.AddRow(ts, fields, -1)
257257
fields = fields[:0]
258258
}
259259
return nil

app/vlinsert/elasticsearch/elasticsearch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func readBulkLine(lr *insertutil.LineReader, timeFields, msgFields []string, lmp
207207
ts = time.Now().UnixNano()
208208
}
209209
logstorage.RenameField(p.Fields, msgFields, "_msg")
210-
lmp.AddRow(ts, p.Fields, nil)
210+
lmp.AddRow(ts, p.Fields, -1)
211211

212212
return true, nil
213213
}

app/vlinsert/insertutil/common_params.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,10 @@ func CanWriteData() error {
176176
type LogMessageProcessor interface {
177177
// AddRow must add row to the LogMessageProcessor with the given timestamp and fields.
178178
//
179-
// If streamFields is non-nil, then the given streamFields must be used as log stream fields instead of pre-configured fields.
179+
// If streamFieldsLen >= 0, then the given number of initial fields must be used as log stream fields instead of pre-configured fields.
180180
//
181181
// The LogMessageProcessor implementation cannot hold references to fields, since the caller can reuse them.
182-
AddRow(timestamp int64, fields, streamFields []logstorage.Field)
182+
AddRow(timestamp int64, fields []logstorage.Field, streamFieldsLen int)
183183

184184
// MustClose() must flush all the remaining fields and free up resources occupied by LogMessageProcessor.
185185
MustClose()
@@ -227,8 +227,9 @@ func (lmp *logMessageProcessor) initPeriodicFlush() {
227227

228228
// AddRow adds new log message to lmp with the given timestamp and fields.
229229
//
230-
// If streamFields is non-nil, then it is used as log stream fields instead of the pre-configured stream fields.
231-
func (lmp *logMessageProcessor) AddRow(timestamp int64, fields, streamFields []logstorage.Field) {
230+
// If streamFieldsLen >= 0, then the given number of the initial fields is used as log stream fields
231+
// instead of the pre-configured stream fields.
232+
func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Field, streamFieldsLen int) {
232233
lmp.rowsIngestedTotal.Inc()
233234
n := logstorage.EstimatedJSONRowLen(fields)
234235
lmp.bytesIngestedTotal.Add(n)
@@ -243,7 +244,7 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields, streamFields []l
243244
lmp.mu.Lock()
244245
defer lmp.mu.Unlock()
245246

246-
lmp.lr.MustAdd(lmp.cp.TenantID, timestamp, fields, streamFields)
247+
lmp.lr.MustAdd(lmp.cp.TenantID, timestamp, fields, streamFieldsLen)
247248

248249
if lmp.cp.Debug {
249250
s := lmp.lr.GetRowString(0)

app/vlinsert/insertutil/testutils.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ type TestLogMessageProcessor struct {
1515
}
1616

1717
// AddRow adds row with the given timestamp and fields to tlp
18-
func (tlp *TestLogMessageProcessor) AddRow(timestamp int64, fields, streamFields []logstorage.Field) {
19-
if streamFields != nil {
20-
panic(fmt.Errorf("BUG: streamFields must be nil; got %v", streamFields))
18+
func (tlp *TestLogMessageProcessor) AddRow(timestamp int64, fields []logstorage.Field, streamFieldsLen int) {
19+
if streamFieldsLen >= 0 {
20+
panic(fmt.Errorf("BUG: streamFieldsLen must be negative; got %d", streamFieldsLen))
2121
}
2222
tlp.timestamps = append(tlp.timestamps, timestamp)
2323
tlp.rows = append(tlp.rows, string(logstorage.MarshalFieldsToJSON(nil, fields)))
@@ -48,7 +48,7 @@ func (tlp *TestLogMessageProcessor) Verify(timestampsExpected []int64, resultExp
4848
type BenchmarkLogMessageProcessor struct{}
4949

5050
// AddRow implements LogMessageProcessor interface.
51-
func (blp *BenchmarkLogMessageProcessor) AddRow(_ int64, _, _ []logstorage.Field) {
51+
func (blp *BenchmarkLogMessageProcessor) AddRow(_ int64, _ []logstorage.Field, _ int) {
5252
}
5353

5454
// MustClose implements LogMessageProcessor interface.

app/vlinsert/journald/journald.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func readJournaldLogEntry(streamName string, lr *insertutil.LineReader, lmp inse
249249
if ts == 0 {
250250
ts = time.Now().UnixNano()
251251
}
252-
lmp.AddRow(ts, fb.fields, nil)
252+
lmp.AddRow(ts, fb.fields, -1)
253253
}
254254
return nil
255255
}

app/vlinsert/jsonline/jsonline.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func readLine(lr *insertutil.LineReader, timeFields, msgFields []string, lmp ins
111111
return true, fmt.Errorf("%s; line contents: %q", err, line)
112112
}
113113
logstorage.RenameField(p.Fields, msgFields, "_msg")
114-
lmp.AddRow(ts, p.Fields, nil)
114+
lmp.AddRow(ts, p.Fields, -1)
115115

116116
return true, nil
117117
}

app/vlinsert/loki/loki_json.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -166,15 +166,16 @@ func parseJSONRequest(data []byte, lmp insertutil.LogMessageProcessor, msgFields
166166
return fmt.Errorf("unexpected log message type for %q; want string", lineA[1])
167167
}
168168
allowMsgRenaming := addMsgField(fieldsTmp, msgParser, bytesutil.ToUnsafeString(msg))
169-
170-
var streamFields []logstorage.Field
171-
if useDefaultStreamFields {
172-
streamFields = fieldsTmp.Fields[:commonFieldsLen]
173-
}
174169
if allowMsgRenaming {
175170
logstorage.RenameField(fieldsTmp.Fields[commonFieldsLen:], msgFields, "_msg")
176171
}
177-
lmp.AddRow(ts, fieldsTmp.Fields, streamFields)
172+
173+
streamFieldsLen := -1
174+
if useDefaultStreamFields {
175+
streamFieldsLen = commonFieldsLen
176+
}
177+
178+
lmp.AddRow(ts, fieldsTmp.Fields, streamFieldsLen)
178179
}
179180
}
180181

app/vlinsert/loki/loki_protobuf.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,15 @@ func parseProtobufRequest(data []byte, lmp insertutil.LogMessageProcessor, msgFi
7272
}
7373

7474
allowMsgRenaming := addMsgField(fs, msgParser, line)
75-
76-
var streamFields []logstorage.Field
77-
if useDefaultStreamFields {
78-
streamFields = fs.Fields[:streamFieldsLen]
79-
}
8075
if allowMsgRenaming {
8176
logstorage.RenameField(fs.Fields[streamFieldsLen:], msgFields, "_msg")
8277
}
8378

84-
lmp.AddRow(timestamp, fs.Fields, streamFields)
79+
if !useDefaultStreamFields {
80+
streamFieldsLen = -1
81+
}
82+
83+
lmp.AddRow(timestamp, fs.Fields, streamFieldsLen)
8584
}
8685

8786
if err := decodePushRequest(data, pushLogs); err != nil {

app/vlinsert/loki/loki_protobuf_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ type testLogMessageProcessor struct {
1414
pr pushRequest
1515
}
1616

17-
func (tlp *testLogMessageProcessor) AddRow(timestamp int64, fields, streamFields []logstorage.Field) {
18-
if streamFields != nil {
19-
panic(fmt.Errorf("unexpected non-nil streamFields: %v", streamFields))
17+
func (tlp *testLogMessageProcessor) AddRow(timestamp int64, fields []logstorage.Field, streamFieldsLen int) {
18+
if streamFieldsLen >= 0 {
19+
panic(fmt.Errorf("unexpected positive streamFieldsLen: %d", streamFieldsLen))
2020
}
2121
msg := ""
2222
for _, f := range fields {

0 commit comments

Comments
 (0)