Skip to content

Commit 988a41b

Browse files
issue-91: allow reading strings with arbitrary length when parsing response in stream mode (#95)
--------- Co-authored-by: hagen1778 <[email protected]>
1 parent 9e6db28 commit 988a41b

17 files changed

+108
-25
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## tip
44

5+
* BUGFIX: allow reading strings with arbitrary length when parsing response in stream mode. See [this issue](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/91).
6+
57
## v0.6.1
68

79
* BUGFIX: fixed healthcheck

pkg/plugin/response.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bufio"
55
"bytes"
66
"encoding/json"
7+
"errors"
78
"fmt"
89
"io"
910
"time"
@@ -41,10 +42,30 @@ func parseStreamResponse(reader io.Reader) backend.DataResponse {
4142
lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
4243
lineField.Name = gLineField
4344

44-
scanner := bufio.NewScanner(reader)
45+
br := bufio.NewReaderSize(reader, 64*1024)
46+
var parser fastjson.Parser
47+
var finishedReading bool
48+
for n := 0; !finishedReading; n++ {
49+
b, err := br.ReadBytes('\n')
50+
if err != nil {
51+
if errors.Is(err, bufio.ErrBufferFull) {
52+
backend.Logger.Info("skipping line number #%d: line too long", n)
53+
continue
54+
}
55+
if errors.Is(err, io.EOF) {
56+
// b can be != nil when EOF is returned, so we need to process it
57+
finishedReading = true
58+
} else {
59+
return newResponseError(fmt.Errorf("cannot read line in response: %s", err), backend.StatusInternal)
60+
}
61+
}
62+
63+
if len(b) == 0 {
64+
continue
65+
}
4566

46-
for scanner.Scan() {
47-
value, err := fastjson.ParseBytes(scanner.Bytes())
67+
b = bytes.Trim(b, "\n")
68+
value, err := parser.ParseBytes(b)
4869
if err != nil {
4970
return newResponseError(fmt.Errorf("error decode response: %s", err), backend.StatusInternal)
5071
}

pkg/plugin/response_test.go

Lines changed: 65 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"bytes"
55
"fmt"
66
"io"
7+
"os"
78
"reflect"
9+
"strings"
810
"testing"
911
"time"
1012

@@ -16,12 +18,12 @@ import (
1618
func Test_parseStreamResponse(t *testing.T) {
1719
tests := []struct {
1820
name string
19-
response string
21+
filename string
2022
want func() backend.DataResponse
2123
}{
2224
{
2325
name: "empty response",
24-
response: "",
26+
filename: "test-data/empty",
2527
want: func() backend.DataResponse {
2628
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
2729
labelsField.Name = gLabelsField
@@ -43,28 +45,28 @@ func Test_parseStreamResponse(t *testing.T) {
4345
},
4446
{
4547
name: "incorrect response",
46-
response: "abcd",
48+
filename: "test-data/incorrect_response",
4749
want: func() backend.DataResponse {
4850
return newResponseError(fmt.Errorf("error decode response: cannot parse JSON: cannot parse number: unexpected char: \"a\"; unparsed tail: \"abcd\""), backend.StatusInternal)
4951
},
5052
},
5153
{
5254
name: "incorrect time in the response",
53-
response: `{"_time":"acdf"}`,
55+
filename: "test-data/incorrect_time",
5456
want: func() backend.DataResponse {
5557
return newResponseError(fmt.Errorf("error parse time from _time field: cannot parse acdf: cannot parse duration \"acdf\""), backend.StatusInternal)
5658
},
5759
},
5860
{
5961
name: "invalid stream in the response",
60-
response: `{"_time":"2024-02-20", "_stream":"{application=\"logs-benchmark-Apache.log-1708437847\",hostname=}"}`,
62+
filename: "test-data/invalid_stream",
6163
want: func() backend.DataResponse {
6264
return newResponseError(fmt.Errorf("StringExpr: unexpected token \"}\"; want \"string\"; unparsed data: \"}\""), backend.StatusInternal)
6365
},
6466
},
6567
{
6668
name: "correct response line",
67-
response: `{"_msg":"123","_stream":"{application=\"logs-benchmark-Apache.log-1708437847\",hostname=\"e28a622d7792\"}","_time":"2024-02-20T14:04:27Z"}`,
69+
filename: "test-data/correct_response",
6870
want: func() backend.DataResponse {
6971
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
7072
labelsField.Name = gLabelsField
@@ -98,7 +100,7 @@ func Test_parseStreamResponse(t *testing.T) {
98100
},
99101
{
100102
name: "response with different labels",
101-
response: `{"_msg":"123","_stream":"{application=\"logs-benchmark-Apache.log-1708437847\",hostname=\"e28a622d7792\"}","_time":"2024-02-20T14:04:27Z", "job": "vlogs"}`,
103+
filename: "test-data/different_labels",
102104
want: func() backend.DataResponse {
103105
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
104106
labelsField.Name = gLabelsField
@@ -132,9 +134,8 @@ func Test_parseStreamResponse(t *testing.T) {
132134
},
133135
},
134136
{
135-
name: "response with different labels and without standard fields",
136-
response: `{"stream":"stderr","count(*)":"394"}
137-
{"stream":"stdout","count(*)":"21"}`,
137+
name: "response with different labels and without standard fields",
138+
filename: "test-data/no_standard_fields",
138139
want: func() backend.DataResponse {
139140
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
140141
labelsField.Name = gLabelsField
@@ -173,7 +174,7 @@ func Test_parseStreamResponse(t *testing.T) {
173174
},
174175
{
175176
name: "response with different labels only one label",
176-
response: `{"level":""}`,
177+
filename: "test-data/only_one_label",
177178
want: func() backend.DataResponse {
178179
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
179180
labelsField.Name = gLabelsField
@@ -203,9 +204,8 @@ func Test_parseStreamResponse(t *testing.T) {
203204
},
204205
},
205206
{
206-
name: "response when one stream field is defined and other is free fields",
207-
response: `{"_time":"2024-06-26T13:00:00Z","logs":"1400"}
208-
{"_time":"2024-06-26T14:00:00Z","logs":"374"}`,
207+
name: "response when one stream field is defined and other is free fields",
208+
filename: "test-data/stream_and_free_field",
209209
want: func() backend.DataResponse {
210210
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
211211
labelsField.Name = gLabelsField
@@ -247,7 +247,7 @@ func Test_parseStreamResponse(t *testing.T) {
247247
},
248248
{
249249
name: "response has ANSI chars",
250-
response: `{"_time":"2024-06-26T13:15:15.000Z","_stream_id":"00000000000000009eaf29866f70976a098adc735393deb1","_stream":"{compose_project=\"app\",compose_service=\"gateway\"}","_msg":"\x1b[2m2024-06-26T13:15:15.004Z\x1b[0;39m \x1b[32mTRACE\x1b[0;39m \x1b[35m1\x1b[0;39m \x1b[2m---\x1b[0;39m \x1b[2m[ parallel-19]\x1b[0;39m \x1b[36mo.s.c.g.f.WeightCalculatorWebFilter \x1b[0;39m \x1b[2m:\x1b[0;39m Weights attr: {} ","compose_project":"app","compose_service":"gateway"}`,
250+
filename: "test-data/ANSI_chars",
251251
want: func() backend.DataResponse {
252252
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
253253
labelsField.Name = gLabelsField
@@ -282,7 +282,7 @@ func Test_parseStreamResponse(t *testing.T) {
282282
},
283283
{
284284
name: "response has unicode",
285-
response: `{"_time":"2024-06-26T13:20:34.000Z","_stream":"{compose_project=\"app\",compose_service=\"gateway\"}","_msg":"\u001b[2m2024-06-26T13:20:34.608Z\u001b[0;39m \u001b[33m WARN\u001b[0;39m \u001b[35m1\u001b[0;39m \u001b[2m---\u001b[0;39m \u001b[2m[ main]\u001b[0;39m \u001b[36mjakarta.persistence.spi \u001b[0;39m \u001b[2m:\u001b[0;39m jakarta.persistence.spi::No valid providers found. ","compose_project":"app","compose_service":"gateway"}`,
285+
filename: "test-data/unicode_present",
286286
want: func() backend.DataResponse {
287287
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
288288
labelsField.Name = gLabelsField
@@ -324,7 +324,7 @@ func Test_parseStreamResponse(t *testing.T) {
324324
},
325325
{
326326
name: "response has labels and message, time field is empty",
327-
response: `{"count":"507","_msg":"507"}`,
327+
filename: "test-data/time_field_empty",
328328
want: func() backend.DataResponse {
329329
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
330330
labelsField.Name = gLabelsField
@@ -354,10 +354,8 @@ func Test_parseStreamResponse(t *testing.T) {
354354
},
355355
},
356356
{
357-
name: "new test",
358-
response: `{"_time": "2024-09-10T12:24:38.124811Z","_stream_id": "00000000000000002e3bd2bdc376279a6418761ca20c417c","_stream": "{path=\"/var/lib/docker/containers/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89-json.log\",stream=\"stderr\"}","_msg": "1","path": "/var/lib/docker/containers/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89-json.log","stream": "stderr","time": "2024-09-10T12:24:38.124811792Z"}
359-
{"_time": "2024-09-10T12:36:10.664553169Z","_stream_id": "0000000000000000356bfe9e3c71128c750d94c15df6b908","_stream": "{stream=\"stream1\"}","_msg": "2","date": "0","stream": "stream1","log.level": "info"}
360-
{"_time": "2024-09-10T13:06:56.45147Z","_stream_id": "00000000000000002e3bd2bdc376279a6418761ca20c417c","_stream": "{path=\"/var/lib/docker/containers/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89-json.log\",stream=\"stderr\"}","_msg": "3","path": "/var/lib/docker/containers/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89-json.log","stream": "stderr","time": "2024-09-10T13:06:56.451470093Z"}`,
357+
name: "double labels",
358+
filename: "test-data/double_labels",
361359
want: func() backend.DataResponse {
362360
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
363361
labelsField.Name = gLabelsField
@@ -414,15 +412,60 @@ func Test_parseStreamResponse(t *testing.T) {
414412
frame.Meta = &data.FrameMeta{}
415413
rsp.Frames = append(rsp.Frames, frame)
416414

415+
return rsp
416+
},
417+
},
418+
{
419+
name: "large response more than 1MB",
420+
filename: "test-data/large_msg_response_2MB",
421+
want: func() backend.DataResponse {
422+
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
423+
labelsField.Name = gLabelsField
424+
425+
timeFd := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
426+
timeFd.Name = gTimeField
427+
428+
lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
429+
lineField.Name = gLineField
430+
431+
timeFd.Append(time.Date(2024, 9, 10, 12, 36, 10, 664000000, time.UTC))
432+
433+
// string with more than 1MB
434+
str := strings.Repeat("1", 1024*1024*2)
435+
436+
lineField.Append(str)
437+
438+
labels := data.Labels{
439+
"_stream_id": "0000000000000000356bfe9e3c71128c750d94c15df6b908",
440+
"date": "0",
441+
"stream": "stream1",
442+
"log.level": "info",
443+
}
444+
445+
b, _ := labelsToJSON(labels)
446+
labelsField.Append(b)
447+
448+
frame := data.NewFrame("", timeFd, lineField, labelsField)
449+
450+
rsp := backend.DataResponse{}
451+
frame.Meta = &data.FrameMeta{}
452+
rsp.Frames = append(rsp.Frames, frame)
453+
417454
return rsp
418455
},
419456
},
420457
}
421458
for _, tt := range tests {
422459
t.Run(tt.name, func(t *testing.T) {
423-
r := io.NopCloser(bytes.NewBuffer([]byte(tt.response)))
460+
file, err := os.ReadFile(tt.filename)
461+
if err != nil {
462+
t.Fatalf("error reading file: %s", err)
463+
}
464+
465+
r := io.NopCloser(bytes.NewBuffer(file))
424466
w := tt.want()
425467
resp := parseStreamResponse(r)
468+
426469
if w.Error != nil {
427470
if !reflect.DeepEqual(w, resp) {
428471
t.Errorf("parseStreamResponse() = %#v, want %#v", resp, w)

pkg/plugin/test-data/ANSI_chars

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"_time":"2024-06-26T13:15:15.000Z","_stream_id":"00000000000000009eaf29866f70976a098adc735393deb1","_stream":"{compose_project=\"app\",compose_service=\"gateway\"}","_msg":"\x1b[2m2024-06-26T13:15:15.004Z\x1b[0;39m \x1b[32mTRACE\x1b[0;39m \x1b[35m1\x1b[0;39m \x1b[2m---\x1b[0;39m \x1b[2m[ parallel-19]\x1b[0;39m \x1b[36mo.s.c.g.f.WeightCalculatorWebFilter \x1b[0;39m \x1b[2m:\x1b[0;39m Weights attr: {} ","compose_project":"app","compose_service":"gateway"}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"_msg":"123","_stream":"{application=\"logs-benchmark-Apache.log-1708437847\",hostname=\"e28a622d7792\"}","_time":"2024-02-20T14:04:27Z"}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"_msg":"123","_stream":"{application=\"logs-benchmark-Apache.log-1708437847\",hostname=\"e28a622d7792\"}","_time":"2024-02-20T14:04:27Z", "job": "vlogs"}

pkg/plugin/test-data/double_labels

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"_time": "2024-09-10T12:24:38.124811Z","_stream_id": "00000000000000002e3bd2bdc376279a6418761ca20c417c","_stream": "{path=\"/var/lib/docker/containers/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89-json.log\",stream=\"stderr\"}","_msg": "1","path": "/var/lib/docker/containers/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89-json.log","stream": "stderr","time": "2024-09-10T12:24:38.124811792Z"}
2+
{"_time": "2024-09-10T12:36:10.664553169Z","_stream_id": "0000000000000000356bfe9e3c71128c750d94c15df6b908","_stream": "{stream=\"stream1\"}","_msg": "2","date": "0","stream": "stream1","log.level": "info"}
3+
{"_time": "2024-09-10T13:06:56.45147Z","_stream_id": "00000000000000002e3bd2bdc376279a6418761ca20c417c","_stream": "{path=\"/var/lib/docker/containers/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89-json.log\",stream=\"stderr\"}","_msg": "3","path": "/var/lib/docker/containers/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89/c01cbe414773fa6b3e4e0976fb27c3583b1a5cd4b7007662477df66987f97f89-json.log","stream": "stderr","time": "2024-09-10T13:06:56.451470093Z"}

pkg/plugin/test-data/empty

Whitespace-only changes.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
abcd
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"_time":"acdf"}

0 commit comments

Comments
 (0)