Skip to content

Commit 3a415bf

Browse files
feat: add mqtt support on raccoon (#24)
1 parent 1a43f42 commit 3a415bf

38 files changed

Lines changed: 1809 additions & 49 deletions

.env.sample

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,17 @@ METRIC_STATSD_ADDRESS=":8125"
3131
METRIC_STATSD_FLUSH_PERIOD_MS=100
3232

3333
LOG_LEVEL="info"
34+
35+
SERVER_MQTT_CONSUL_ADDRESS="consul:8081"
36+
SERVER_MQTT_CONSUL_KV_KEY="kv/path"
37+
SERVER_MQTT_CONSUL_HEALTH_ONLY="true"
38+
SERVER_MQTT_CONSUL_WAIT_TIME="300"
39+
SERVER_MQTT_AUTH_USERNAME="test"
40+
SERVER_MQTT_AUTH_PASSWORD="pass"
41+
SERVER_MQTT_CONSUMER_RETRY_INTERVAL_IN_SEC=1
42+
SERVER_MQTT_CONSUMER_WRITE_TIMEOUT_IN_SEC=1
43+
SERVER_MQTT_CONSUMER_LOG_LEVEL="warn"
44+
SERVER_MQTT_CONSUMER_POOL_SIZE=1
45+
SERVER_MQTT_CONSUMER_TOPIC_FORMAT="default-topic"
46+
SERVER_MQTT_CONNECTION_GROUP="default"
47+
SERVER_MQTT_CONSUMER_KEEP_ALIVE_IN_SEC=45

.env.test

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,17 @@ METRIC_STATSD_ADDRESS=":8125"
3232
METRIC_STATSD_FLUSH_PERIOD_MS=100
3333

3434
LOG_LEVEL="info"
35+
36+
SERVER_MQTT_CONSUL_ADDRESS="consul:8081"
37+
SERVER_MQTT_CONSUL_KV_KEY="kv/path"
38+
SERVER_MQTT_CONSUL_HEALTH_ONLY="true"
39+
SERVER_MQTT_CONSUL_WAIT_TIME="300"
40+
SERVER_MQTT_AUTH_USERNAME="test"
41+
SERVER_MQTT_AUTH_PASSWORD="pass"
42+
SERVER_MQTT_CONSUMER_RETRY_INTERVAL_IN_SEC=1
43+
SERVER_MQTT_CONSUMER_WRITE_TIMEOUT_IN_SEC=1
44+
SERVER_MQTT_CONSUMER_LOG_LEVEL="warn"
45+
SERVER_MQTT_CONSUMER_POOL_SIZE=1
46+
SERVER_MQTT_CONSUMER_TOPIC_FORMAT="default-topic"
47+
SERVER_MQTT_CONNECTION_GROUP="default"
48+
SERVER_MQTT_CONSUMER_KEEP_ALIVE_IN_SEC=45

.github/workflows/build.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212
- name: Setup Go
1313
uses: actions/setup-go@v2.1.3
1414
with:
15-
go-version: '1.17'
15+
go-version: '1.24'
1616
- name: Checkout repo
1717
uses: actions/checkout@v2
1818
- name: Setup Project
@@ -28,7 +28,7 @@ jobs:
2828
- name: Setup Go
2929
uses: actions/setup-go@v2.1.3
3030
with:
31-
go-version: '1.17'
31+
go-version: '1.24'
3232
- uses: actions/checkout@v2
3333
- name: Build
3434
run: make all

Dockerfile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
FROM golang:1.17
1+
FROM golang:1.24
22

33
WORKDIR /app
44
RUN apt-get update && apt-get install unzip --no-install-recommends --assume-yes
55
COPY . .
66
RUN make update-deps && make compile
77

8-
FROM debian:bullseye
8+
#bookworm-slim comparible with golang:1.24 and has glibc 2.34
9+
FROM debian:bookworm-slim
910
WORKDIR /app
1011
COPY --from=0 /app/raccoon ./raccoon
1112
COPY . .

app/server.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package app
33
import (
44
"context"
55
"fmt"
6+
"github.com/goto/raccoon/health"
67
"os"
78
"os/signal"
89
"runtime"
@@ -31,7 +32,7 @@ func StartServer(ctx context.Context, cancel context.CancelFunc, shutdown chan b
3132
logger.Info("Exiting server")
3233
os.Exit(0)
3334
}
34-
35+
registerHealthCheck(httpServices, kPublisher)
3536
logger.Info("Start worker -->")
3637
workerPool := worker.CreateWorkerPool(config.Worker.WorkersPoolSize, bufferChannel, config.Worker.DeliveryChannelSize, kPublisher)
3738
workerPool.StartWorkers()
@@ -99,3 +100,12 @@ func reportProcMetrics() {
99100
metrics.Gauge("server_mem_gc_pauseTotalNs_current", m.PauseTotalNs, "")
100101
}
101102
}
103+
104+
func registerHealthCheck(svcs services.Services, kafka *publisher.Kafka) {
105+
health.Register("kafka-broker", kafka.HealthCheck)
106+
for _, svc := range svcs.B {
107+
if svc.Name() == "MQTT" {
108+
health.Register("mqtt-broker", svc.HealthCheck)
109+
}
110+
}
111+
}

app/server_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,11 @@ func isClosed(ch <-chan collection.CollectRequest) bool {
6969
// mockKafkaClient is a mock for the Client interface
7070
type mockKafkaClient struct {
7171
// Tracking flags
72-
ProduceCalled bool
73-
CloseCalled bool
74-
FlushCalled bool
75-
EventsCalled bool
72+
ProduceCalled bool
73+
CloseCalled bool
74+
FlushCalled bool
75+
EventsCalled bool
76+
GetMetadataCalled bool
7677

7778
ReturnFlushLeft int
7879
EventChan chan kafka.Event
@@ -99,3 +100,8 @@ func (m *mockKafkaClient) Events() chan kafka.Event {
99100
}
100101
return m.EventChan
101102
}
103+
104+
func (m *mockKafkaClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error) {
105+
m.GetMetadataCalled = true
106+
return &kafka.Metadata{}, nil
107+
}

config/load.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ func Load() {
2929
metricStatsdConfigLoader()
3030
eventDistributionConfigLoader()
3131
eventConfigLoader()
32+
serverMQTTConfigLoader()
3233
}

config/load_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,35 @@ func TestGRPCServerConfig(t *testing.T) {
5252
assert.Equal(t, "8081", ServerGRPC.Port)
5353
}
5454

55+
func TestServerMQTTConfig(t *testing.T) {
56+
os.Setenv("SERVER_MQTT_CONSUL_ADDRESS", "consul:8081")
57+
os.Setenv("SERVER_MQTT_CONSUL_KV_KEY", "kv/path")
58+
os.Setenv("SERVER_MQTT_CONSUL_HEALTH_ONLY", "true")
59+
os.Setenv("SERVER_MQTT_CONSUL_WAIT_TIME", "300")
60+
os.Setenv("SERVER_MQTT_AUTH_USERNAME", "test")
61+
os.Setenv("SERVER_MQTT_AUTH_PASSWORD", "pass")
62+
os.Setenv("SERVER_MQTT_CONSUMER_RETRY_INTERVAL_IN_SEC", "1")
63+
os.Setenv("SERVER_MQTT_CONSUMER_WRITE_TIMEOUT_IN_SEC", "1")
64+
os.Setenv("SERVER_MQTT_CONSUMER_LOG_LEVEL", "warn")
65+
os.Setenv("SERVER_MQTT_CONSUMER_POOL_SIZE", "1")
66+
os.Setenv("SERVER_MQTT_CONSUMER_TOPIC_FORMAT", "default-topic")
67+
os.Setenv("SERVER_MQTT_CONNECTION_GROUP", "consumer")
68+
serverMQTTConfigLoader()
69+
assert.Equal(t, "consul:8081", ServerMQTT.ConsulConfig.Address)
70+
assert.Equal(t, "kv/path", ServerMQTT.ConsulConfig.KVKey)
71+
assert.Equal(t, true, ServerMQTT.ConsulConfig.HealthOnly)
72+
assert.Equal(t, 300*time.Second, ServerMQTT.ConsulConfig.WaitTime)
73+
assert.Equal(t, "test", ServerMQTT.AuthConfig.Username)
74+
assert.Equal(t, "pass", ServerMQTT.AuthConfig.Password)
75+
assert.Equal(t, 1*time.Second, ServerMQTT.ConsumerConfig.RetryIntervalInSec)
76+
assert.Equal(t, 1*time.Second, ServerMQTT.ConsumerConfig.WriteTimeoutInSec)
77+
assert.Equal(t, "warn", ServerMQTT.ConsumerConfig.LogLevel)
78+
assert.Equal(t, 1, ServerMQTT.ConsumerConfig.PoolSize)
79+
assert.Equal(t, "default-topic", ServerMQTT.ConsumerConfig.TopicFormat)
80+
assert.Equal(t, "consumer", ServerMQTT.ConnGroup)
81+
82+
}
83+
5584
func TestDynamicConfigLoad(t *testing.T) {
5685
os.Setenv("PUBLISHER_KAFKA_CLIENT_RANDOM", "anything")
5786
os.Setenv("PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS", "localhost:9092")
@@ -67,6 +96,8 @@ func TestKafkaConfig_ToKafkaConfigMap(t *testing.T) {
6796
os.Setenv("PUBLISHER_KAFKA_CLIENT_ACKS", "1")
6897
os.Setenv("PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES", "10000")
6998
os.Setenv("SOMETHING_PUBLISHER_KAFKA_CLIENT_SOMETHING", "anything")
99+
os.Setenv("PUBLISHER_KAFKA_HEALTHCHECK_TOPIC_NAME", "test-log")
100+
os.Setenv("PUBLISHER_KAFKA_HEALTHCHECK_TIMEOUT_MS", "5000")
70101
publisherKafkaConfigLoader()
71102
kafkaConfig := PublisherKafka.ToKafkaConfigMap()
72103
bootstrapServer, _ := kafkaConfig.Get("bootstrap.servers", "")
@@ -76,6 +107,8 @@ func TestKafkaConfig_ToKafkaConfigMap(t *testing.T) {
76107
assert.Equal(t, "", topic)
77108
assert.NotEqual(t, something, "anything")
78109
assert.Equal(t, 4, len(*kafkaConfig))
110+
assert.Equal(t, "test-log", PublisherKafka.HealthCheckConfig.TopicName)
111+
assert.Equal(t, 5000, PublisherKafka.HealthCheckConfig.TimeOut)
79112
}
80113

81114
func TestWorkerConfig(t *testing.T) {

config/publisher.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,17 @@ import (
1313
var PublisherKafka publisherKafka
1414
var dynamicKafkaClientConfigPrefix = "PUBLISHER_KAFKA_CLIENT_"
1515

16+
// publisherKafka defines configuration parameters for a Kafka-based publisher.
17+
// It includes flushing behavior and health check settings.
1618
type publisherKafka struct {
17-
FlushInterval int
19+
FlushInterval int // Interval (in seconds) to flush the events during shutdown
20+
HealthCheckConfig healthcheck // Configuration for Kafka broker health check
21+
}
22+
23+
// healthcheck holds settings used to monitor the health of Kafka broker
24+
type healthcheck struct {
25+
TopicName string // Kafka topic name used for health check
26+
TimeOut int // Timeout duration (in seconds) for health check operations
1827
}
1928

2029
func (k publisherKafka) ToKafkaConfigMap() *confluent.ConfigMap {
@@ -43,9 +52,15 @@ func dynamicKafkaClientConfigLoad() []byte {
4352
func publisherKafkaConfigLoader() {
4453
viper.SetDefault("PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES", "100000")
4554
viper.SetDefault("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS", "1000")
55+
viper.SetDefault("PUBLISHER_KAFKA_HEALTHCHECK_TOPIC_NAME", "clickstream-test-log")
56+
viper.SetDefault("PUBLISHER_KAFKA_HEALTHCHECK_TIMEOUT_MS", "5000")
4657
viper.MergeConfig(bytes.NewBuffer(dynamicKafkaClientConfigLoad()))
4758

4859
PublisherKafka = publisherKafka{
4960
FlushInterval: util.MustGetInt("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS"),
61+
HealthCheckConfig: healthcheck{
62+
TopicName: util.MustGetString("PUBLISHER_KAFKA_HEALTHCHECK_TOPIC_NAME"),
63+
TimeOut: util.MustGetInt("PUBLISHER_KAFKA_HEALTHCHECK_TIMEOUT_MS"),
64+
},
5065
}
5166
}

config/server.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
var Server server
1111
var ServerWs serverWs
1212
var ServerGRPC serverGRPC
13+
var ServerMQTT serverMQTT
1314

1415
type server struct {
1516
DedupEnabled bool
@@ -37,6 +38,39 @@ type serverGRPC struct {
3738
TLSPublicKey string
3839
}
3940

41+
// serverMQTT represents the complete configuration for an MQTT server setup.
42+
// It includes authentication, Consul configuration, and consumer-specific settings.
43+
type serverMQTT struct {
44+
ConsulConfig consul // Configuration related to Consul service discovery
45+
AuthConfig auth // MQTT authentication credentials
46+
ConsumerConfig consumer // Consumer behavior and connection settings
47+
ConnGroup string // Connection group name used for identifying MQTT client group
48+
}
49+
50+
// auth defines MQTT authentication credentials.
51+
type auth struct {
52+
Username string // Username for authenticating with the MQTT broker
53+
Password string // Password for authenticating with the MQTT broker
54+
}
55+
56+
// consul holds configuration details for connecting and interacting with a Consul agent.
57+
type consul struct {
58+
Address string // Address of the Consul agent (e.g., localhost:8500)
59+
HealthOnly bool // When true, only healthy service instances are used
60+
KVKey string // Key in Consul KV store for retrieving MQTT Broker Address
61+
WaitTime time.Duration // Maximum wait time for Consul blocking queries
62+
}
63+
64+
// consumer contains configuration parameters controlling MQTT message consumption.
65+
type consumer struct {
66+
RetryIntervalInSec time.Duration // Time interval (in seconds) before retrying connection
67+
LogLevel string // Log verbosity level (e.g., "info", "debug", "error")
68+
WriteTimeoutInSec time.Duration // Timeout duration (in seconds) for write operations
69+
PoolSize int // Number of concurrent consumers to consume from MQTT topic
70+
TopicFormat string // Format or pattern for subscribing to MQTT topics (e.g., "share/raccoon/{service}")
71+
KeepAlive time.Duration // Amount of time that the client should wait before sending a PING request to the broker
72+
}
73+
4074
func serverConfigLoader() {
4175
viper.SetDefault("SERVER_BATCH_DEDUP_IN_CONNECTION_ENABLED", "false")
4276
Server = server{
@@ -85,3 +119,41 @@ func serverGRPCConfigLoader() {
85119
TLSPublicKey: util.MustGetString("SERVER_GRPC_TLS_PUBLIC_KEY"),
86120
}
87121
}
122+
123+
func serverMQTTConfigLoader() {
124+
viper.SetDefault("SERVER_MQTT_CONSUL_ADDRESS", "consul:8081")
125+
viper.SetDefault("SERVER_MQTT_CONSUL_KV_KEY", "kv/path")
126+
viper.SetDefault("SERVER_MQTT_CONSUL_HEALTH_ONLY", true)
127+
viper.SetDefault("SERVER_MQTT_CONSUL_WAIT_TIME", 300)
128+
viper.SetDefault("SERVER_MQTT_AUTH_USERNAME", "test")
129+
viper.SetDefault("SERVER_MQTT_AUTH_PASSWORD", "pass")
130+
viper.SetDefault("SERVER_MQTT_CONSUMER_RETRY_INTERVAL_IN_SEC", 1)
131+
viper.SetDefault("SERVER_MQTT_CONSUMER_WRITE_TIMEOUT_IN_SEC", 1)
132+
viper.SetDefault("SERVER_MQTT_CONSUMER_KEEP_ALIVE_IN_SEC", 45)
133+
viper.SetDefault("SERVER_MQTT_CONSUMER_LOG_LEVEL", "warn")
134+
viper.SetDefault("SERVER_MQTT_CONSUMER_POOL_SIZE", 1)
135+
viper.SetDefault("SERVER_MQTT_CONSUMER_TOPIC_FORMAT", "default-topic")
136+
viper.SetDefault("SERVER_MQTT_CONNECTION_GROUP", "default")
137+
138+
ServerMQTT = serverMQTT{
139+
ConsulConfig: consul{
140+
Address: util.MustGetString("SERVER_MQTT_CONSUL_ADDRESS"),
141+
HealthOnly: util.MustGetBool("SERVER_MQTT_CONSUL_HEALTH_ONLY"),
142+
KVKey: util.MustGetString("SERVER_MQTT_CONSUL_KV_KEY"),
143+
WaitTime: util.MustGetDuration("SERVER_MQTT_CONSUL_WAIT_TIME", time.Second),
144+
},
145+
AuthConfig: auth{
146+
Username: util.MustGetString("SERVER_MQTT_AUTH_USERNAME"),
147+
Password: util.MustGetString("SERVER_MQTT_AUTH_PASSWORD"),
148+
},
149+
ConsumerConfig: consumer{
150+
RetryIntervalInSec: util.MustGetDuration("SERVER_MQTT_CONSUMER_RETRY_INTERVAL_IN_SEC", time.Second),
151+
LogLevel: util.MustGetString("SERVER_MQTT_CONSUMER_LOG_LEVEL"),
152+
WriteTimeoutInSec: util.MustGetDuration("SERVER_MQTT_CONSUMER_WRITE_TIMEOUT_IN_SEC", time.Second),
153+
PoolSize: util.MustGetInt("SERVER_MQTT_CONSUMER_POOL_SIZE"),
154+
TopicFormat: util.MustGetString("SERVER_MQTT_CONSUMER_TOPIC_FORMAT"),
155+
KeepAlive: util.MustGetDuration("SERVER_MQTT_CONSUMER_KEEP_ALIVE_IN_SEC", time.Second),
156+
},
157+
ConnGroup: util.MustGetString("SERVER_MQTT_CONNECTION_GROUP"),
158+
}
159+
}

0 commit comments

Comments
 (0)