Skip to content

Commit 960b5b1

Browse files
feat: add mqtt support on raccoon
1 parent 1a43f42 commit 960b5b1

14 files changed

Lines changed: 658 additions & 30 deletions

File tree

config/load.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ func Load() {
2929
metricStatsdConfigLoader()
3030
eventDistributionConfigLoader()
3131
eventConfigLoader()
32+
serverMQTTConfigLoader()
3233
}

config/server.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
var Server server
1111
var ServerWs serverWs
1212
var ServerGRPC serverGRPC
13+
var ServerMQTT serverMQTT
1314

1415
type server struct {
1516
DedupEnabled bool
@@ -37,6 +38,33 @@ type serverGRPC struct {
3738
TLSPublicKey string
3839
}
3940

41+
type serverMQTT struct {
42+
ConsulConfig consul
43+
AuthConfig auth
44+
ConsumerConfig consumer
45+
ConnGroup string
46+
}
47+
48+
type auth struct {
49+
Username string
50+
Password string
51+
}
52+
53+
type consul struct {
54+
Address string
55+
HealthOnly bool
56+
KVKey string
57+
WaitTime time.Duration
58+
}
59+
60+
type consumer struct {
61+
RetryIntervalInSec time.Duration
62+
LogLevel string
63+
WriteTimeoutInSec time.Duration
64+
PoolSize int
65+
TopicFormat string
66+
}
67+
4068
func serverConfigLoader() {
4169
viper.SetDefault("SERVER_BATCH_DEDUP_IN_CONNECTION_ENABLED", "false")
4270
Server = server{
@@ -85,3 +113,39 @@ func serverGRPCConfigLoader() {
85113
TLSPublicKey: util.MustGetString("SERVER_GRPC_TLS_PUBLIC_KEY"),
86114
}
87115
}
116+
117+
func serverMQTTConfigLoader() {
118+
viper.SetDefault("SERVER_MQTT_CONSUL_ADDRESS", "consul:8081")
119+
viper.SetDefault("SERVER_MQTT_CONSUL_KV_KEY", "kv/path")
120+
viper.SetDefault("SERVER_MQTT_CONSUL_HEALTH_ONLY", true)
121+
viper.SetDefault("SERVER_MQTT_CONSUL_WAIT_TIME", 300)
122+
viper.SetDefault("SERVER_MQTT_AUTH_USERNAME", "test")
123+
viper.SetDefault("SERVER_MQTT_AUTH_PASSWORD", "pass")
124+
viper.SetDefault("SERVER_MQTT_CONSUMER_RETRY_INTERVAL_IN_SEC", 1)
125+
viper.SetDefault("SERVER_MQTT_CONSUMER_WRITE_TIMEOUT_IN_SEC", 1)
126+
viper.SetDefault("SERVER_MQTT_CONSUMER_LOG_LEVEL", "warn")
127+
viper.SetDefault("SERVER_MQTT_CONSUMER_POOL_SIZE", 1)
128+
viper.SetDefault("SERVER_MQTT_CONSUMER_TOPIC_FORMAT", "$share/clickstream/v1/+/+")
129+
viper.SetDefault("SERVER_MQTT_CONNECTION_GROUP", "consumer")
130+
131+
ServerMQTT = serverMQTT{
132+
ConsulConfig: consul{
133+
Address: util.MustGetString("SERVER_MQTT_CONSUL_ADDRESS"),
134+
HealthOnly: util.MustGetBool("SERVER_MQTT_CONSUL_HEALTH_ONLY"),
135+
KVKey: util.MustGetString("SERVER_MQTT_CONSUL_KV_KEY"),
136+
WaitTime: util.MustGetDuration("SERVER_MQTT_CONSUL_WAIT_TIME", time.Second),
137+
},
138+
AuthConfig: auth{
139+
Username: util.MustGetString("SERVER_MQTT_AUTH_USERNAME"),
140+
Password: util.MustGetString("SERVER_MQTT_AUTH_USERNAME"),
141+
},
142+
ConsumerConfig: consumer{
143+
RetryIntervalInSec: util.MustGetDuration("SERVER_MQTT_CONSUMER_RETRY_INTERVAL_IN_SEC", time.Second),
144+
LogLevel: util.MustGetString("SERVER_MQTT_CONSUMER_LOG_LEVEL"),
145+
WriteTimeoutInSec: util.MustGetDuration("SERVER_MQTT_CONSUMER_POOL_SIZE", time.Second),
146+
PoolSize: util.MustGetInt("SERVER_MQTT_CONSUMER_POOL_SIZE"),
147+
TopicFormat: util.MustGetString("SERVER_MQTT_CONSUMER_TOPIC_FORMAT"),
148+
},
149+
ConnGroup: util.MustGetString("SERVER_MQTT_CONNECTION_GROUP"),
150+
}
151+
}

go.mod

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,65 @@
11
module github.com/goto/raccoon
22

3-
go 1.17
3+
go 1.24
44

55
require (
66
buf.build/gen/go/gotocompany/proton/grpc/go v1.3.0-20230313110213-9a3d240d5293.1
77
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.29.0-20230313110213-9a3d240d5293.1
8+
github.com/gojek/courier-go v0.7.8
9+
github.com/gojek/courier-go/consul v0.7.3
810
github.com/gorilla/mux v1.7.4
9-
github.com/gorilla/websocket v1.4.2
11+
github.com/gorilla/websocket v1.5.3
12+
github.com/goto/raccoon/clients/go v0.0.0-20250203072106-1dbea749aaf3
1013
github.com/sirupsen/logrus v1.6.0
1114
github.com/spf13/viper v1.7.0
12-
github.com/stretchr/testify v1.8.1
15+
github.com/stretchr/testify v1.9.0
1316
google.golang.org/grpc v1.53.0
1417
google.golang.org/protobuf v1.29.0
1518
gopkg.in/alexcesaro/statsd.v2 v2.0.0
1619
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.4.2
1720
)
1821

1922
require (
23+
github.com/armon/go-metrics v0.4.1 // indirect
2024
github.com/confluentinc/confluent-kafka-go v1.4.2 // indirect
21-
github.com/davecgh/go-spew v1.1.1 // indirect
25+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
26+
github.com/fatih/color v1.16.0 // indirect
2227
github.com/fsnotify/fsnotify v1.4.7 // indirect
28+
github.com/gojek/paho.mqtt.golang v1.7.0 // indirect
29+
github.com/gojekfarm/xtools/generic v0.7.0 // indirect
2330
github.com/golang/protobuf v1.5.2 // indirect
31+
github.com/hashicorp/consul/api v1.32.1 // indirect
32+
github.com/hashicorp/errwrap v1.1.0 // indirect
33+
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
34+
github.com/hashicorp/go-hclog v1.5.0 // indirect
35+
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
36+
github.com/hashicorp/go-multierror v1.1.1 // indirect
37+
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
38+
github.com/hashicorp/golang-lru v0.5.4 // indirect
2439
github.com/hashicorp/hcl v1.0.0 // indirect
40+
github.com/hashicorp/serf v0.10.1 // indirect
2541
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
42+
github.com/kr/text v0.2.0 // indirect
2643
github.com/magiconair/properties v1.8.1 // indirect
27-
github.com/mitchellh/mapstructure v1.1.2 // indirect
44+
github.com/mattn/go-colorable v0.1.13 // indirect
45+
github.com/mattn/go-isatty v0.0.20 // indirect
46+
github.com/mitchellh/go-homedir v1.1.0 // indirect
47+
github.com/mitchellh/mapstructure v1.5.0 // indirect
2848
github.com/pelletier/go-toml v1.2.0 // indirect
29-
github.com/pmezard/go-difflib v1.0.0 // indirect
49+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
3050
github.com/spf13/afero v1.9.2 // indirect
3151
github.com/spf13/cast v1.3.0 // indirect
3252
github.com/spf13/jwalterweatherman v1.0.0 // indirect
3353
github.com/spf13/pflag v1.0.3 // indirect
34-
github.com/stretchr/objx v0.5.0 // indirect
54+
github.com/stretchr/objx v0.5.2 // indirect
3555
github.com/subosito/gotenv v1.2.0 // indirect
36-
golang.org/x/net v0.5.0 // indirect
37-
golang.org/x/sys v0.4.0 // indirect
38-
golang.org/x/text v0.6.0 // indirect
56+
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 // indirect
57+
golang.org/x/net v0.38.0 // indirect
58+
golang.org/x/sync v0.12.0 // indirect
59+
golang.org/x/sys v0.31.0 // indirect
60+
golang.org/x/text v0.23.0 // indirect
3961
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
4062
gopkg.in/ini.v1 v1.51.0 // indirect
41-
gopkg.in/yaml.v2 v2.2.4 // indirect
63+
gopkg.in/yaml.v2 v2.2.5 // indirect
4264
gopkg.in/yaml.v3 v3.0.1 // indirect
4365
)

0 commit comments

Comments
 (0)