Skip to content

Commit 761e75d

Browse files
fix: review comments
1 parent cdebdc4 commit 761e75d

10 files changed

Lines changed: 61 additions & 39 deletions

File tree

app/server.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package app
33
import (
44
"context"
55
"fmt"
6+
"github.com/goto/raccoon/health"
67
"os"
78
"os/signal"
89
"runtime"
@@ -31,7 +32,7 @@ func StartServer(ctx context.Context, cancel context.CancelFunc, shutdown chan b
3132
logger.Info("Exiting server")
3233
os.Exit(0)
3334
}
34-
35+
registerHealthCheck(httpServices, kPublisher)
3536
logger.Info("Start worker -->")
3637
workerPool := worker.CreateWorkerPool(config.Worker.WorkersPoolSize, bufferChannel, config.Worker.DeliveryChannelSize, kPublisher)
3738
workerPool.StartWorkers()
@@ -99,3 +100,12 @@ func reportProcMetrics() {
99100
metrics.Gauge("server_mem_gc_pauseTotalNs_current", m.PauseTotalNs, "")
100101
}
101102
}
103+
104+
func registerHealthCheck(svcs services.Services, kafka *publisher.Kafka) {
105+
health.Register("kafka-broker", kafka.HealthCheck)
106+
for _, svc := range svcs.B {
107+
if svc.Name() == "MQTT" {
108+
health.Register("mqtt-broker", svc.HealthCheck)
109+
}
110+
}
111+
}

config/publisher.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,17 @@ import (
1313
var PublisherKafka publisherKafka
1414
var dynamicKafkaClientConfigPrefix = "PUBLISHER_KAFKA_CLIENT_"
1515

16+
// publisherKafka defines configuration parameters for a Kafka-based publisher.
17+
// It includes flushing behavior and health check settings.
1618
type publisherKafka struct {
17-
FlushInterval int
18-
HealthCheckConfig healthcheck
19+
FlushInterval int // Interval (in seconds) to flush the events during shutdown
20+
HealthCheckConfig healthcheck // Configuration for Kafka nroker health check
1921
}
2022

23+
// healthcheck holds settings used to monitor the health of Kafka broker
2124
type healthcheck struct {
22-
TopicName string
23-
TimeOut int
25+
TopicName string // Kafka topic name used for health check
26+
TimeOut int // Timeout duration (in seconds) for health check operations
2427
}
2528

2629
func (k publisherKafka) ToKafkaConfigMap() *confluent.ConfigMap {

config/server.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,31 +38,36 @@ type serverGRPC struct {
3838
TLSPublicKey string
3939
}
4040

41+
// serverMQTT represents the complete configuration for an MQTT server setup.
42+
// It includes authentication, Consul configuration, and consumer-specific settings.
4143
type serverMQTT struct {
42-
ConsulConfig consul
43-
AuthConfig auth
44-
ConsumerConfig consumer
45-
ConnGroup string
44+
ConsulConfig consul // Configuration related to Consul service discovery
45+
AuthConfig auth // MQTT authentication credentials
46+
ConsumerConfig consumer // Consumer behavior and connection settings
47+
ConnGroup string // Connection group name used for identifying MQTT client group
4648
}
4749

50+
// auth defines MQTT authentication credentials.
4851
type auth struct {
49-
Username string
50-
Password string
52+
Username string // Username for authenticating with the MQTT broker
53+
Password string // Password for authenticating with the MQTT broker
5154
}
5255

56+
// consul holds configuration details for connecting and interacting with a Consul agent.
5357
type consul struct {
54-
Address string
55-
HealthOnly bool
56-
KVKey string
57-
WaitTime time.Duration
58+
Address string // Address of the Consul agent (e.g., localhost:8500)
59+
HealthOnly bool // When true, only healthy service instances are used
60+
KVKey string // Key in Consul KV store for retrieving configuration or metadata
61+
WaitTime time.Duration // Maximum wait time for Consul blocking queries
5862
}
5963

64+
// consumer contains configuration parameters controlling MQTT message consumption.
6065
type consumer struct {
61-
RetryIntervalInSec time.Duration
62-
LogLevel string
63-
WriteTimeoutInSec time.Duration
64-
PoolSize int
65-
TopicFormat string
66+
RetryIntervalInSec time.Duration // Time interval (in seconds) before retrying connection or message processing
67+
LogLevel string // Log verbosity level (e.g., "info", "debug", "error")
68+
WriteTimeoutInSec time.Duration // Timeout duration (in seconds) for write operations
69+
PoolSize int // Number of concurrent consumer workers or goroutines
70+
TopicFormat string // Format or pattern for subscribing to MQTT topics (e.g., "topic/{region}/{service}")
6671
}
6772

6873
func serverConfigLoader() {
@@ -142,7 +147,7 @@ func serverMQTTConfigLoader() {
142147
ConsumerConfig: consumer{
143148
RetryIntervalInSec: util.MustGetDuration("SERVER_MQTT_CONSUMER_RETRY_INTERVAL_IN_SEC", time.Second),
144149
LogLevel: util.MustGetString("SERVER_MQTT_CONSUMER_LOG_LEVEL"),
145-
WriteTimeoutInSec: util.MustGetDuration("SERVER_MQTT_CONSUMER_POOL_SIZE", time.Second),
150+
WriteTimeoutInSec: util.MustGetDuration("SERVER_MQTT_CONSUMER_WRITE_TIMEOUT_IN_SEC", time.Second),
146151
PoolSize: util.MustGetInt("SERVER_MQTT_CONSUMER_POOL_SIZE"),
147152
TopicFormat: util.MustGetString("SERVER_MQTT_CONSUMER_TOPIC_FORMAT"),
148153
},

health/registry.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,16 @@
11
package health
22

3-
import "sync"
4-
53
type Checker func() error
64

75
var (
8-
mu sync.RWMutex
96
checkers = make(map[string]Checker)
107
)
118

129
func Register(name string, fn Checker) {
13-
mu.Lock()
14-
defer mu.Unlock()
1510
checkers[name] = fn
1611
}
1712

1813
func CheckAll() map[string]string {
19-
mu.RLock()
20-
defer mu.RUnlock()
21-
2214
results := make(map[string]string)
2315
for name, fn := range checkers {
2416
if err := fn(); err != nil {

health/registry_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@ func TestRegister(t *testing.T) {
1313

1414
Register("db", mockChecker)
1515

16-
mu.RLock()
17-
defer mu.RUnlock()
18-
1916
if len(checkers) != 1 {
2017
t.Fatalf("expected 1 checker, got %d", len(checkers))
2118
}
@@ -80,7 +77,5 @@ func TestCheckAll(t *testing.T) {
8077
}
8178

8279
func resetCheckers() {
83-
mu.Lock()
84-
defer mu.Unlock()
8580
checkers = make(map[string]Checker)
8681
}

services/grpc/service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ func (s *Service) Shutdown(context.Context) error {
4747
return nil
4848
}
4949

50+
// HealthCheck check for grpc
51+
func (s *Service) HealthCheck() error {
52+
return nil
53+
}
54+
5055
func newGRPCServer() *grpc.Server {
5156
if config.ServerGRPC.TLSEnabled {
5257
return grpc.NewServer(grpc.Creds(loadTLSCredentials()))

services/mqtt/handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ func (h *Handler) MQTTHandler(ctx context.Context, c courier.PubSub, message *co
4141
}
4242
//to be removed post end-to-end test
4343
for _, event := range req.Events {
44-
log.Infof("MQTT message content post deserialization %v", event)
44+
log.Infof("MQTT message content post deserialization event : %v", event)
4545
}
46+
log.Infof("MQTT message request id %v", req.ReqGuid)
4647

4748
//instrument the request number
4849
metrics.Increment(

services/pprof/service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,8 @@ func (*Service) Name() string {
2929
func (s *Service) Shutdown(ctx context.Context) error {
3030
return s.s.Shutdown(ctx)
3131
}
32+
33+
// HealthCheck check for pprof
34+
func (s *Service) HealthCheck() error {
35+
return nil
36+
}

services/rest/service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,8 @@ func (*Service) Name() string {
8686
func (s *Service) Shutdown(ctx context.Context) error {
8787
return s.s.Shutdown(ctx)
8888
}
89+
90+
// HealthCheck check for rest
91+
func (s *Service) HealthCheck() error {
92+
return nil
93+
}

services/services.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@ type bootstrapper interface {
1717
Init(ctx context.Context) error
1818
Shutdown(ctx context.Context) error
1919
Name() string
20+
HealthCheck() error
2021
}
2122

2223
type Services struct {
23-
b []bootstrapper
24+
B []bootstrapper
2425
}
2526

2627
func (s *Services) Start(ctx context.Context, cancel context.CancelFunc) {
2728
logger.Info("starting servers")
28-
for _, init := range s.b {
29+
for _, init := range s.B {
2930
i := init
3031
go func() {
3132
logger.Infof("%s Server --> startServers", i.Name())
@@ -38,7 +39,7 @@ func (s *Services) Start(ctx context.Context, cancel context.CancelFunc) {
3839
}
3940

4041
func (s *Services) Shutdown(ctx context.Context) {
41-
for _, b := range s.b {
42+
for _, b := range s.B {
4243
logger.Infof("%s Server --> shutting down", b.Name())
4344
b.Shutdown(ctx)
4445
}
@@ -47,7 +48,7 @@ func (s *Services) Shutdown(ctx context.Context) {
4748
func Create(b chan collection.CollectRequest, ctx context.Context) Services {
4849
c := collection.NewChannelCollector(b)
4950
return Services{
50-
b: []bootstrapper{
51+
B: []bootstrapper{
5152
grpc.NewGRPCService(c),
5253
pprof.NewPprofService(),
5354
rest.NewRestService(c, ctx),

0 commit comments

Comments
 (0)