Skip to content

Commit b9d35de

Browse files
committed
consum correctly
1 parent 59ddf9f commit b9d35de

File tree

2 files changed

+7
-9
lines changed

2 files changed

+7
-9
lines changed

examples/kustomization/kafka.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@ spec:
1818
port: 9092
1919
type: internal
2020
tls: false
21+
configuration:
22+
useServiceDnsDomain: true
2123
- name: tls
2224
port: 9093
2325
type: internal
2426
tls: true
27+
configuration:
28+
useServiceDnsDomain: true
2529
config:
2630
offsets.topic.replication.factor: 3
2731
transaction.state.log.replication.factor: 3

telemetry/consumer.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package telemetry
22

33
import (
4-
"encoding/base64"
54
"fmt"
65
"log"
76
"os"
@@ -84,15 +83,10 @@ func (c *Consumer) Start() error {
8483
}
8584

8685
func (c *Consumer) handleMessage(msg *kafka.Message) error {
87-
// The message value should be a base64 encoded protobuf
88-
data, err := base64.StdEncoding.DecodeString(string(msg.Value))
89-
if err != nil {
90-
return fmt.Errorf("failed to decode base64 message: %v", err)
91-
}
92-
93-
// Unmarshal the protobuf message
86+
// The message value should be raw protobuf data
87+
// Unmarshal the protobuf message directly
9488
payload := &protos.Payload{}
95-
if err := proto.Unmarshal(data, payload); err != nil {
89+
if err := proto.Unmarshal(msg.Value, payload); err != nil {
9690
return fmt.Errorf("failed to unmarshal protobuf: %v", err)
9791
}
9892

0 commit comments

Comments
 (0)