Skip to content

Commit 0fb6d9a

Browse files
authored
fix(decoder): use configurable limit for max number of records in a record batch (#3120)
Replace the `tmp > 2*math.MaxUint16` arbitrary number safety blanket with `tmp > MaxResponseSize` as we already have that constant (that a user can change via sarama.MaxResponseSize = ...) which is what we use for MaxBytes in a FetchRequest. Also added a test for large record counts. Fixes: #3119
1 parent c7ca87e commit 0fb6d9a

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

real_decoder.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (rd *realDecoder) getArrayLength() (int, error) {
115115
if tmp > rd.remaining() {
116116
rd.off = len(rd.raw)
117117
return -1, ErrInsufficientData
118-
} else if tmp > 2*math.MaxUint16 {
118+
} else if tmp > int(MaxResponseSize) {
119119
return -1, errInvalidArrayLength
120120
}
121121
return tmp, nil

record_test.go

+49
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
package sarama
44

55
import (
6+
"encoding/binary"
7+
"math"
68
"reflect"
79
"testing"
810
"time"
@@ -254,3 +256,50 @@ func TestRecordBatchDecoding(t *testing.T) {
254256
}
255257
}
256258
}
259+
260+
func TestRecordBatchLargeNumRecords(t *testing.T) {
261+
numOfRecords := 10 + (2 * math.MaxUint16)
262+
numofRecordsBytes := make([]byte, 4)
263+
binary.BigEndian.PutUint32(numofRecordsBytes, uint32(numOfRecords))
264+
265+
encodedBatch := []byte{
266+
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
267+
0, 42, 0, 250, // Length
268+
0, 0, 0, 0, // Partition Leader Epoch
269+
2, // Version
270+
103, 68, 166, 213, // CRC
271+
0, 0, // Attributes
272+
0, 0, 0, 0, // Last Offset Delta
273+
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
274+
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
275+
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
276+
0, 0, // Producer Epoch
277+
0, 0, 0, 0, // First Sequence
278+
}
279+
280+
encodedBatch = append(encodedBatch, numofRecordsBytes...)
281+
282+
for range numOfRecords {
283+
encodedBatch = append(encodedBatch, []byte{
284+
40, // Record Length
285+
0, // Attributes
286+
10, // Timestamp Delta
287+
0, // Offset Delta
288+
8, // Key Length
289+
1, 2, 3, 4,
290+
6, // Value Length
291+
5, 6, 7,
292+
2, // Number of Headers
293+
6, // Header Key Length
294+
8, 9, 10, // Header Key
295+
4, // Header Value Length
296+
11, 12, // Header Value
297+
}...)
298+
}
299+
300+
var batch RecordBatch
301+
err := decode(encodedBatch, &batch, nil)
302+
if err != nil {
303+
t.Fatal("received error while decoding record batch", err)
304+
}
305+
}

0 commit comments

Comments
 (0)