-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
Copy pathproduce_request_test.go
109 lines (100 loc) · 2.75 KB
/
produce_request_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package sarama
import (
"testing"
"time"
)
var (
produceRequestEmpty = []byte{
0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
}
produceRequestHeader = []byte{
0x01, 0x23,
0x00, 0x00, 0x04, 0x44,
0x00, 0x00, 0x00, 0x00,
}
produceRequestOneMessage = []byte{
0x01, 0x23,
0x00, 0x00, 0x04, 0x44,
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0xAD,
0x00, 0x00, 0x00, 0x1C,
// messageSet
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x10,
// message
0x23, 0x96, 0x4a, 0xf7, // CRC
0x00,
0x00,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE,
}
produceRequestOneRecord = []byte{
0xFF, 0xFF, // Transaction ID
0x01, 0x23, // Required Acks
0x00, 0x00, 0x04, 0x44, // Timeout
0x00, 0x00, 0x00, 0x01, // Number of Topics
0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
0x00, 0x00, 0x00, 0x01, // Number of Partitions
0x00, 0x00, 0x00, 0xAD, // Partition
0x00, 0x00, 0x00, 0x52, // Records length
// recordBatch
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x46,
0x00, 0x00, 0x00, 0x00,
0x02,
0xCA, 0x33, 0xBC, 0x05,
0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x01, 0x58, 0x8D, 0xCD, 0x59, 0x38,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
// record
0x28,
0x00,
0x0A,
0x00,
0x08, 0x01, 0x02, 0x03, 0x04,
0x06, 0x05, 0x06, 0x07,
0x02,
0x06, 0x08, 0x09, 0x0A,
0x04, 0x0B, 0x0C,
}
)
func TestProduceRequest(t *testing.T) {
request := new(ProduceRequest)
testRequest(t, "empty", request, produceRequestEmpty)
request.RequiredAcks = 0x123
request.Timeout = 0x444
testRequest(t, "header", request, produceRequestHeader)
request.AddMessage("topic", 0xAD, &Message{Codec: CompressionNone, Key: nil, Value: []byte{0x00, 0xEE}})
testRequest(t, "one message", request, produceRequestOneMessage)
request.Version = 3
batch := &RecordBatch{
LastOffsetDelta: 1,
Version: 2,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5 * time.Millisecond,
Key: []byte{0x01, 0x02, 0x03, 0x04},
Value: []byte{0x05, 0x06, 0x07},
Headers: []*RecordHeader{{
Key: []byte{0x08, 0x09, 0x0A},
Value: []byte{0x0B, 0x0C},
}},
}},
}
request.AddBatch("topic", 0xAD, batch)
packet := testRequestEncode(t, "one record", request, produceRequestOneRecord)
// compressRecords field is not populated on decoding because consumers
// are only interested in decoded records.
batch.compressedRecords = nil
testRequestDecode(t, "one record", request, packet)
}