Skip to content

Commit ab530e9

Browse files
feat: add proto decoder
1 parent 262bdbf commit ab530e9

3 files changed

Lines changed: 13 additions & 2 deletions

File tree

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@ require (
77
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.29.0-20230313110213-9a3d240d5293.1
88
github.com/gojek/courier-go v0.7.8
99
github.com/gojek/courier-go/consul v0.7.3
10+
github.com/gojekfarm/xtools/xproto v0.10.0
1011
github.com/gorilla/mux v1.7.4
1112
github.com/gorilla/websocket v1.5.3
1213
github.com/goto/raccoon/clients/go v0.0.0-20250203072106-1dbea749aaf3
1314
github.com/sirupsen/logrus v1.6.0
1415
github.com/spf13/viper v1.7.0
1516
github.com/stretchr/testify v1.9.0
1617
google.golang.org/grpc v1.53.0
17-
google.golang.org/protobuf v1.29.0
18+
google.golang.org/protobuf v1.33.0
1819
gopkg.in/alexcesaro/statsd.v2 v2.0.0
1920
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.4.2
2021
)

go.sum

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,8 @@ github.com/gojek/paho.mqtt.golang v1.7.0/go.mod h1:dsB7ojVcuI8Nw1t35g72omp1Swa6/
501501
github.com/gojek/valkyrie v0.0.0-20180215180059-6aee720afcdf/go.mod h1:QzhUKaYKJmcbTnCYCAVQrroCOY7vOOI8cSQ4NbuhYf0=
502502
github.com/gojekfarm/xtools/generic v0.7.0 h1:iBLSYE+eekH8pUvsf0qHLyBS77mMPWlKK7xUE1kZFfU=
503503
github.com/gojekfarm/xtools/generic v0.7.0/go.mod h1:T0yk+yt4s1jwr9Gbtyb/BADXGKRfCYIBz9pN1GnmkQA=
504+
github.com/gojekfarm/xtools/xproto v0.10.0 h1:cxRe/8OJKkAb5c52NOfnaXvtCmlQkrIwslzHCntQY60=
505+
github.com/gojekfarm/xtools/xproto v0.10.0/go.mod h1:qbna3ipjT8aEFXuOBi91fFlejLfi8f0YohjXheBRyTU=
504506
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
505507
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
506508
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@@ -1402,8 +1404,9 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
14021404
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
14031405
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
14041406
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
1405-
google.golang.org/protobuf v1.29.0 h1:44S3JjaKmLEE4YIkjzexaP+NzZsudE3Zin5Njn/pYX0=
14061407
google.golang.org/protobuf v1.29.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
1408+
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
1409+
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
14071410
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
14081411
gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc=
14091412
gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU=

services/mqtt/client/pubsub.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"fmt"
66
"github.com/gojek/courier-go"
77
"github.com/gojek/courier-go/consul"
8+
"github.com/gojekfarm/xtools/xproto"
89
"github.com/goto/raccoon/config"
910
"github.com/goto/raccoon/logger"
11+
"io"
1012
)
1113

1214
// MqttPubSubClient wraps a courier MQTT client with start/stop lifecycle management.
@@ -46,6 +48,7 @@ func NewMqttPubSubClient(ctx context.Context, handler courier.MessageHandler, cl
4648
courier.WithWriteTimeout(config.ServerMQTT.ConsumerConfig.WriteTimeoutInSec),
4749
courier.WithOnConnect(registerHandler(ctx, handler)),
4850
courier.WithLogger(logger.GetLogger()),
51+
courier.WithCustomDecoder(protoDecoder),
4952
}
5053

5154
client, err := courier.NewClient(clientOpts...)
@@ -91,3 +94,7 @@ func (m *MqttPubSubClient) Stop() error {
9194
func (m *MqttPubSubClient) IsConnected() bool {
9295
return m.client.IsConnected()
9396
}
97+
98+
func protoDecoder(ctx context.Context, r io.Reader) courier.Decoder {
99+
return xproto.NewDecoder(r)
100+
}

0 commit comments

Comments
 (0)