Skip to content

Commit e1d86a3

Browse files
authored
Merge pull request #206 from grepplabs/pr-193
Pr 193
2 parents b2a923b + 6d3994b commit e1d86a3

File tree

3 files changed

+148
-5
lines changed

3 files changed

+148
-5
lines changed

Diff for: proxy/processor_default.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ
158158
if err != nil {
159159
return false, nil, err
160160
}
161-
162-
case 3, 4, 5, 6, 7, 8, 9, 10, 11:
161+
default:
162+
// case 3, 4, 5, 6, 7, 8, 9, 10, 11, 12:
163163
// CorrelationID + ClientID
164164
if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil {
165165
return false, nil, err
@@ -169,8 +169,6 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ
169169
if err != nil {
170170
return false, nil, err
171171
}
172-
default:
173-
return false, nil, fmt.Errorf("produce version %d is not supported", requestKeyVersion.ApiVersion)
174172
}
175173
return acks != 0, bufferRead.Bytes(), nil
176174
}

Diff for: proxy/protocol/responses.go

+26-1
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,32 @@ func createMetadataResponseSchemaVersions() []Schema {
244244
&SchemaTaggedFields{Name: "response_tagged_fields"},
245245
)
246246

247-
return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7, metadataResponseV8, metadataResponseV9, metadataResponseV10, metadataResponseV11, metadataResponseV12}
247+
metadataResponseV13 := NewSchema("metadata_response_v13",
248+
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
249+
&CompactArray{Name: brokersKeyName, Ty: metadataBrokerSchema9},
250+
&Mfield{Name: "cluster_id", Ty: TypeCompactNullableStr},
251+
&Mfield{Name: "controller_id", Ty: TypeInt32},
252+
&CompactArray{Name: "topic_metadata", Ty: topicMetadataSchema12},
253+
&Mfield{Name: "error_code", Ty: TypeInt16},
254+
&SchemaTaggedFields{Name: "response_tagged_fields"},
255+
)
256+
257+
return []Schema{
258+
metadataResponseV0,
259+
metadataResponseV1,
260+
metadataResponseV2,
261+
metadataResponseV3,
262+
metadataResponseV4,
263+
metadataResponseV5,
264+
metadataResponseV6,
265+
metadataResponseV7,
266+
metadataResponseV8,
267+
metadataResponseV9,
268+
metadataResponseV10,
269+
metadataResponseV11,
270+
metadataResponseV12,
271+
metadataResponseV13,
272+
}
248273
}
249274

250275
func createFindCoordinatorResponseSchemaVersions() []Schema {

Diff for: proxy/protocol/responses_test.go

+120
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ var (
4141
return "myhost3", 34003, nil
4242
} else if brokerHost == "localhost" && brokerPort == 9999 {
4343
return "myhost", 34000, nil
44+
} else if brokerHost == "localhost" && brokerPort == 9092 {
45+
return "myhost5", 34005, nil
4446
}
4547
return "", 0, errors.New("unexpected data")
4648
}
@@ -2436,6 +2438,124 @@ func TestMetadataResponseV11(t *testing.T) {
24362438
testMetadataResponse(t, apiVersion, payload, expectedInput, expectedModified)
24372439
}
24382440

2441+
func TestMetadataResponseV13(t *testing.T) {
2442+
apiVersion := int16(13)
2443+
payload := "0000000002000000010a6c6f63616c686f737400002384000017354c3667336e5368542d654d43744b2d2d58383673770000000102000010746573742d6e6f2d686561646572737d064a8944b14f078805ea02998647bf00040000000000010000000100000000020000000102000000010100000000000002000000010000000002000000010200000001010000000000000000000001000000000200000001020000000101008000000000000000"
2444+
expectedInput := []string{
2445+
"throttle_time_ms int32 0",
2446+
"[brokers]",
2447+
"brokers struct",
2448+
"node_id int32 1",
2449+
"host string localhost",
2450+
"port int32 9092",
2451+
"rack *string <nil>",
2452+
"[broker_tagged_fields]",
2453+
"cluster_id *string 5L6g3nShT-eMCtK--X86sw",
2454+
"controller_id int32 1",
2455+
"[topic_metadata]",
2456+
"topic_metadata struct",
2457+
"error_code int16 0",
2458+
"name *string test-no-headers",
2459+
"topic_id uuid 7d064a89-44b1-4f07-8805-ea02998647bf",
2460+
"is_internal bool false",
2461+
"[partition_metadata]",
2462+
"partition_metadata struct",
2463+
"error_code int16 0",
2464+
"partition int32 1",
2465+
"leader int32 1",
2466+
"leader_epoch int32 0",
2467+
"[replicas]",
2468+
"replicas int32 1",
2469+
"[isr]",
2470+
"isr int32 1",
2471+
"[offline_replicas]",
2472+
"[partition_metadata_tagged_fields]",
2473+
"partition_metadata struct",
2474+
"error_code int16 0",
2475+
"partition int32 2",
2476+
"leader int32 1",
2477+
"leader_epoch int32 0",
2478+
"[replicas]",
2479+
"replicas int32 1",
2480+
"[isr]",
2481+
"isr int32 1",
2482+
"[offline_replicas]",
2483+
"[partition_metadata_tagged_fields]",
2484+
"partition_metadata struct",
2485+
"error_code int16 0",
2486+
"partition int32 0",
2487+
"leader int32 1",
2488+
"leader_epoch int32 0",
2489+
"[replicas]",
2490+
"replicas int32 1",
2491+
"[isr]",
2492+
"isr int32 1",
2493+
"[offline_replicas]",
2494+
"[partition_metadata_tagged_fields]",
2495+
"topic_authorized_operations int32 -2147483648",
2496+
"[topic_metadata_tagged_fields]",
2497+
"error_code int16 0",
2498+
"[response_tagged_fields]",
2499+
}
2500+
expectedModified := []string{
2501+
"throttle_time_ms int32 0",
2502+
"[brokers]",
2503+
"brokers struct",
2504+
"node_id int32 1",
2505+
"host string myhost5",
2506+
"port int32 34005",
2507+
"rack *string <nil>",
2508+
"[broker_tagged_fields]",
2509+
"cluster_id *string 5L6g3nShT-eMCtK--X86sw",
2510+
"controller_id int32 1",
2511+
"[topic_metadata]",
2512+
"topic_metadata struct",
2513+
"error_code int16 0",
2514+
"name *string test-no-headers",
2515+
"topic_id uuid 7d064a89-44b1-4f07-8805-ea02998647bf",
2516+
"is_internal bool false",
2517+
"[partition_metadata]",
2518+
"partition_metadata struct",
2519+
"error_code int16 0",
2520+
"partition int32 1",
2521+
"leader int32 1",
2522+
"leader_epoch int32 0",
2523+
"[replicas]",
2524+
"replicas int32 1",
2525+
"[isr]",
2526+
"isr int32 1",
2527+
"[offline_replicas]",
2528+
"[partition_metadata_tagged_fields]",
2529+
"partition_metadata struct",
2530+
"error_code int16 0",
2531+
"partition int32 2",
2532+
"leader int32 1",
2533+
"leader_epoch int32 0",
2534+
"[replicas]",
2535+
"replicas int32 1",
2536+
"[isr]",
2537+
"isr int32 1",
2538+
"[offline_replicas]",
2539+
"[partition_metadata_tagged_fields]",
2540+
"partition_metadata struct",
2541+
"error_code int16 0",
2542+
"partition int32 0",
2543+
"leader int32 1",
2544+
"leader_epoch int32 0",
2545+
"[replicas]",
2546+
"replicas int32 1",
2547+
"[isr]",
2548+
"isr int32 1",
2549+
"[offline_replicas]",
2550+
"[partition_metadata_tagged_fields]",
2551+
"topic_authorized_operations int32 -2147483648",
2552+
"[topic_metadata_tagged_fields]",
2553+
"error_code int16 0",
2554+
"[response_tagged_fields]",
2555+
}
2556+
testMetadataResponse(t, apiVersion, payload, expectedInput, expectedModified)
2557+
}
2558+
24392559
func testMetadataResponse(t *testing.T, apiVersion int16, payload string, expectedInput, expectedModified []string) {
24402560
bytes, err := hex.DecodeString(payload)
24412561
if err != nil {

0 commit comments

Comments
 (0)