Skip to content

Commit 0654524

Browse files
mergify[bot]Andrea Spacca
and
Andrea Spacca
authored
handle EOF on single line content (#33568) (#33877)
* handle EOF on single line content * changelog * fallback to encode_eof if no events in aws-s3 input * lint * lint * collect on EOF in line reader * remove encode eof * remove iterN * fix test * increase test coverage * linting * more linting * increase coverage (cherry picked from commit 7b45320) Co-authored-by: Andrea Spacca <[email protected]>
1 parent 9d25efa commit 0654524

File tree

9 files changed

+221
-97
lines changed

9 files changed

+221
-97
lines changed

CHANGELOG.next.asciidoc

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
5757
- Add handling of AAA operations for Cisco ASA module. {issue}32257[32257] {pull}32789[32789]
5858
- Fix gc.log always shipped even if gc fileset is disabled {issue}30995[30995]
5959
- Fix handling of empty array in httpjson input. {pull}32001[32001]
60+
- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568]
6061
- Fix reporting of `filebeat.events.active` in log events such that the current value is always reported instead of the difference from the last value. {pull}33597[33597]
6162
- Fix splitting array of strings/arrays in httpjson input {issue}30345[30345] {pull}33609[33609]
6263
- Fix Google workspace pagination and document ID generation. {pull}33666[33666]

libbeat/reader/readfile/bench_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package readfile
2020
import (
2121
"bytes"
2222
"encoding/hex"
23+
"errors"
2324
"fmt"
2425
"io"
2526
"io/ioutil"
@@ -39,7 +40,7 @@ func BenchmarkEncoderReader(b *testing.B) {
3940
b.Run(name, func(b *testing.B) {
4041
b.ReportAllocs()
4142
for bN := 0; bN < b.N; bN++ {
42-
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit})
43+
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit, false})
4344
if err != nil {
4445
b.Fatal("failed to initialize reader:", err)
4546
}
@@ -48,7 +49,7 @@ func BenchmarkEncoderReader(b *testing.B) {
4849
for i := 0; ; i++ {
4950
msg, err := reader.Next()
5051
if err != nil {
51-
if err == io.EOF {
52+
if errors.Is(err, io.EOF) {
5253
b.ReportMetric(float64(i), "processed_lines")
5354
break
5455
} else {

libbeat/reader/readfile/encode.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,15 @@ type Config struct {
4040
BufferSize int
4141
Terminator LineTerminator
4242
MaxBytes int
43+
// If CollectOnEOF is set to true (default false) the line reader will return the buffer if EOF reached: this
44+
// will ensure full content including last line with no EOL will be returned for fully retrieved content that's
45+
// not appended anymore between reads.
46+
// If CollectOnEOF is set to false the line reader will return 0 content and keep the buffer at the current
47+
// state of appending data after temporarily EOF.
48+
CollectOnEOF bool
4349
}
4450

45-
// New creates a new Encode reader from input reader by applying
51+
// NewEncodeReader creates a new Encode reader from input reader by applying
4652
// the given codec.
4753
func NewEncodeReader(r io.ReadCloser, config Config) (EncoderReader, error) {
4854
eReader, err := NewLineReader(r, config)

libbeat/reader/readfile/line.go

+68-28
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package readfile
1919

2020
import (
2121
"bytes"
22+
"errors"
2223
"fmt"
2324
"io"
2425

@@ -33,18 +34,22 @@ const unlimited = 0
3334
// LineReader reads lines from underlying reader, decoding the input stream
3435
// using the configured codec. The reader keeps track of bytes consumed
3536
// from raw input stream for every decoded line.
37+
// If collectOnEOF is set to true (default false) it will return the buffer if EOF reached.
38+
// If collectOnEOF is set to false it will return 0 content and keep the buffer at the current
39+
// state of appending data after temporarily EOF.
3640
type LineReader struct {
37-
reader io.ReadCloser
38-
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
39-
nl []byte
40-
decodedNl []byte
41-
inBuffer *streambuf.Buffer
42-
outBuffer *streambuf.Buffer
43-
inOffset int // input buffer read offset
44-
byteCount int // number of bytes decoded from input buffer into output buffer
45-
decoder transform.Transformer
46-
tempBuffer []byte
47-
logger *logp.Logger
41+
reader io.ReadCloser
42+
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
43+
nl []byte
44+
decodedNl []byte
45+
collectOnEOF bool
46+
inBuffer *streambuf.Buffer
47+
outBuffer *streambuf.Buffer
48+
inOffset int // input buffer read offset
49+
byteCount int // number of bytes decoded from input buffer into output buffer
50+
decoder transform.Transformer
51+
tempBuffer []byte
52+
logger *logp.Logger
4853
}
4954

5055
// NewLineReader creates a new reader object
@@ -63,15 +68,16 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
6368
}
6469

6570
return &LineReader{
66-
reader: input,
67-
maxBytes: config.MaxBytes,
68-
decoder: config.Codec.NewDecoder(),
69-
nl: nl,
70-
decodedNl: terminator,
71-
inBuffer: streambuf.New(nil),
72-
outBuffer: streambuf.New(nil),
73-
tempBuffer: make([]byte, config.BufferSize),
74-
logger: logp.NewLogger("reader_line"),
71+
reader: input,
72+
maxBytes: config.MaxBytes,
73+
decoder: config.Codec.NewDecoder(),
74+
nl: nl,
75+
decodedNl: terminator,
76+
collectOnEOF: config.CollectOnEOF,
77+
inBuffer: streambuf.New(nil),
78+
outBuffer: streambuf.New(nil),
79+
tempBuffer: make([]byte, config.BufferSize),
80+
logger: logp.NewLogger("reader_line"),
7581
}, nil
7682
}
7783

@@ -88,12 +94,46 @@ func (r *LineReader) Next() (b []byte, n int, err error) {
8894
// read next 'potential' line from input buffer/reader
8995
err := r.advance()
9096
if err != nil {
97+
if errors.Is(err, io.EOF) && r.collectOnEOF {
98+
// Found EOF and collectOnEOF is true
99+
// -> decode input sequence into outBuffer
100+
// let's take whole buffer len without len(nl) if it ends with it
101+
end := r.inBuffer.Len()
102+
if bytes.HasSuffix(r.inBuffer.Bytes(), r.decodedNl) {
103+
end -= len(r.nl)
104+
}
105+
106+
sz, err := r.decode(end)
107+
if err != nil {
108+
r.logger.Errorf("Error decoding line: %s", err)
109+
// In case of error increase size by unencoded length
110+
sz = r.inBuffer.Len()
111+
}
112+
113+
// Consume transformed bytes from input buffer
114+
_ = r.inBuffer.Advance(sz)
115+
r.inBuffer.Reset()
116+
117+
// output buffer contains untile EOF. Extract
118+
// byte slice from buffer and reset output buffer.
119+
bytes, err := r.outBuffer.Collect(r.outBuffer.Len())
120+
r.outBuffer.Reset()
121+
if err != nil {
122+
// This should never happen as otherwise we have a broken state
123+
panic(err)
124+
}
125+
126+
// return and reset consumed bytes count
127+
sz = r.byteCount
128+
r.byteCount = 0
129+
return bytes, sz, io.EOF
130+
}
131+
91132
// return and reset consumed bytes count
92133
sz := r.byteCount
93134
r.byteCount = 0
94135
return nil, sz, err
95136
}
96-
97137
// Check last decoded byte really being newline also unencoded
98138
// if not, continue reading
99139
buf := r.outBuffer.Bytes()
@@ -144,13 +184,13 @@ func (r *LineReader) advance() error {
144184
// Try to read more bytes into buffer
145185
n, err := r.reader.Read(r.tempBuffer)
146186

147-
if err == io.EOF && n > 0 {
187+
if errors.Is(err, io.EOF) && n > 0 {
148188
// Continue processing the returned bytes. The next call will yield EOF with 0 bytes.
149189
err = nil
150190
}
151191

152192
// Write to buffer also in case of err
153-
r.inBuffer.Write(r.tempBuffer[:n])
193+
_, _ = r.inBuffer.Write(r.tempBuffer[:n])
154194

155195
if err != nil {
156196
return err
@@ -169,7 +209,7 @@ func (r *LineReader) advance() error {
169209
// If newLine is found, drop the lines longer than maxBytes
170210
for idx != -1 && idx > r.maxBytes {
171211
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
172-
err = r.inBuffer.Advance(idx + len(r.nl))
212+
_ = r.inBuffer.Advance(idx + len(r.nl))
173213
r.byteCount += idx + len(r.nl)
174214
r.inBuffer.Reset()
175215
r.inOffset = 0
@@ -237,7 +277,7 @@ func (r *LineReader) skipUntilNewLine() (int, error) {
237277
idx = bytes.Index(r.tempBuffer[:n], r.nl)
238278

239279
if idx != -1 {
240-
r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
280+
_, _ = r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
241281
skipped += idx + len(r.nl)
242282
} else {
243283
skipped += n
@@ -267,8 +307,8 @@ func (r *LineReader) decode(end int) (int, error) {
267307
nDst, nSrc, err = r.decoder.Transform(r.tempBuffer, inBytes[start:end], false)
268308
if err != nil {
269309
// Check if error is different from destination buffer too short
270-
if err != transform.ErrShortDst {
271-
r.outBuffer.Write(inBytes[0:end])
310+
if !errors.Is(err, transform.ErrShortDst) {
311+
_, _ = r.outBuffer.Write(inBytes[0:end])
272312
start = end
273313
break
274314
}
@@ -278,7 +318,7 @@ func (r *LineReader) decode(end int) (int, error) {
278318
}
279319

280320
start += nSrc
281-
r.outBuffer.Write(r.tempBuffer[:nDst])
321+
_, _ = r.outBuffer.Write(r.tempBuffer[:nDst])
282322
}
283323

284324
r.byteCount += start

0 commit comments

Comments
 (0)