Skip to content

Commit f41e041

Browse files
committed
Added ReqID as Kafka's record key
1 parent f25d26e commit f41e041

File tree

3 files changed

+18
-4
lines changed

3 files changed

+18
-4
lines changed

kafka.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type InputKafkaConfig struct {
3131
Host string `json:"input-kafka-host"`
3232
Topic string `json:"input-kafka-topic"`
3333
UseJSON bool `json:"input-kafka-json-format"`
34-
Offset string `json:"input-kafka-offset"`
34+
Offset string `json:"input-kafka-offset"`
3535
SASLConfig SASLKafkaConfig
3636
}
3737

output_kafka.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ func (o *KafkaOutput) ErrorHandler() {
6565
// PluginWrite writes a message to this plugin
6666
func (o *KafkaOutput) PluginWrite(msg *Message) (n int, err error) {
6767
var message sarama.StringEncoder
68+
meta := payloadMeta(msg.Meta)
69+
recordKey := byteutils.SliceToString(meta[1])
6870

6971
if !o.config.UseJSON {
7072
message = sarama.StringEncoder(byteutils.SliceToString(msg.Meta) + byteutils.SliceToString(msg.Data))
@@ -74,10 +76,7 @@ func (o *KafkaOutput) PluginWrite(msg *Message) (n int, err error) {
7476
for k, v := range mimeHeader {
7577
header[k] = strings.Join(v, ", ")
7678
}
77-
78-
meta := payloadMeta(msg.Meta)
7979
req := msg.Data
80-
8180
kafkaMessage := KafkaMessage{
8281
ReqURL: byteutils.SliceToString(proto.Path(req)),
8382
ReqType: byteutils.SliceToString(meta[0]),
@@ -95,6 +94,9 @@ func (o *KafkaOutput) PluginWrite(msg *Message) (n int, err error) {
9594
Topic: o.config.Topic,
9695
Value: message,
9796
Timestamp: time.Now(),
97+
//
98+
// Making ReqID as Kafka record's key
99+
Key: sarama.StringEncoder(recordKey),
98100
}
99101

100102
return len(message), nil

output_kafka_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ func TestOutputKafkaRAW(t *testing.T) {
2323

2424
resp := <-producer.Successes()
2525

26+
key, _ := resp.Key.Encode()
27+
28+
if string(key) != "2" {
29+
t.Errorf("Key not properly encoded: %q", key)
30+
}
31+
2632
data, _ := resp.Value.Encode()
2733

2834
if string(data) != "1 2 3\nGET / HTTP1.1\r\nHeader: 1\r\n\r\n" {
@@ -46,6 +52,12 @@ func TestOutputKafkaJSON(t *testing.T) {
4652

4753
resp := <-producer.Successes()
4854

55+
key, _ := resp.Key.Encode()
56+
57+
if string(key) != "2" {
58+
t.Errorf("Key not properly encoded: %q", key)
59+
}
60+
4961
data, _ := resp.Value.Encode()
5062

5163
if string(data) != `{"Req_URL":"","Req_Type":"1","Req_ID":"2","Req_Ts":"3","Req_Method":"GET"}` {

0 commit comments

Comments
 (0)