-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
Copy pathoffset_fetch_response_test.go
68 lines (56 loc) · 2.3 KB
/
offset_fetch_response_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package sarama
import (
"fmt"
"testing"
)
var (
emptyOffsetFetchResponse = []byte{
0x00, 0x00, 0x00, 0x00,
}
emptyOffsetFetchResponseV2 = []byte{
0x00, 0x00, 0x00, 0x00,
0x00, 0x2A,
}
emptyOffsetFetchResponseV3 = []byte{
0x00, 0x00, 0x00, 0x09,
0x00, 0x00, 0x00, 0x00,
0x00, 0x2A,
}
)
func TestEmptyOffsetFetchResponse(t *testing.T) {
for version := 0; version <= 1; version++ {
response := OffsetFetchResponse{Version: int16(version)}
testResponse(t, fmt.Sprintf("empty v%d", version), &response, emptyOffsetFetchResponse)
}
responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
testResponse(t, "empty V2", &responseV2, emptyOffsetFetchResponseV2)
for version := 3; version <= 5; version++ {
responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
testResponse(t, fmt.Sprintf("empty v%d", version), &responseV3, emptyOffsetFetchResponseV3)
}
}
func TestNormalOffsetFetchResponse(t *testing.T) {
// The response encoded form cannot be checked for it varies due to
// unpredictable map traversal order.
// Hence the 'nil' as byte[] parameter in the 'testResponse(..)' calls
for version := 0; version <= 1; version++ {
response := OffsetFetchResponse{Version: int16(version)}
response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, -1, "md", ErrRequestTimedOut})
response.Blocks["m"] = nil
testResponse(t, fmt.Sprintf("Normal v%d", version), &response, nil)
}
responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, -1, "md", ErrRequestTimedOut})
responseV2.Blocks["m"] = nil
testResponse(t, "normal V2", &responseV2, nil)
for version := 3; version <= 4; version++ {
responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, -1, "md", ErrRequestTimedOut})
responseV3.Blocks["m"] = nil
testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil)
}
responseV5 := OffsetFetchResponse{Version: 5, Err: ErrInvalidRequest, ThrottleTimeMs: 9}
responseV5.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut})
responseV5.Blocks["m"] = nil
testResponse(t, "normal V5", &responseV5, nil)
}