44 "context"
55 "encoding/json"
66 "fmt"
7+ "net"
8+ "os"
79 "time"
810
911 "github.com/dnwe/otelsarama"
@@ -22,6 +24,29 @@ import (
2224 "github.com/IBM/sarama"
2325)
2426
27+ // isRouteserverReachable checks if the routeserver is available and reachable
28+ func isRouteserverReachable (logger log.Factory ) (string , bool ) {
29+ routeserverURL := os .Getenv ("SIGNADOT_ROUTESERVER" )
30+ if routeserverURL == "" {
31+ logger .Bg ().Info ("No routeserver URL found" )
32+ return "" , false
33+ }
34+
35+ // Try to connect to check reachability
36+ timeout := 2 * time .Second
37+ conn , err := net .DialTimeout ("tcp" , routeserverURL , timeout )
38+ if err != nil {
39+ logger .Bg ().Info ("Routeserver not reachable" ,
40+ zap .String ("url" , routeserverURL ),
41+ zap .Error (err ))
42+ return routeserverURL , false
43+ }
44+
45+ conn .Close ()
46+ logger .Bg ().Info ("Routeserver is reachable" , zap .String ("url" , routeserverURL ))
47+ return routeserverURL , true
48+ }
49+
2550// Consumer represents a Sarama consumer group consumer
2651type Consumer struct {
2752 tracer trace.Tracer
@@ -35,11 +60,23 @@ type Consumer struct {
3560
3661func newConsumer (ctx context.Context , tracerProvider trace.TracerProvider ,
3762 logger log.Factory ) * Consumer {
38- // create a routesapi baseline watched instance
39- // TODO: remove this in case we're not running with Signadot
40- routing , err := watched .BaselineWatchedFromEnv ()
41- if err != nil {
42- panic (err )
63+ // Log something funky during initialization
64+ logger .Bg ().Info ("🚕 Vrooom! Driver service starting up! 🚗 Ready to find the fastest routes! 🛣️" )
65+
66+ // Only initialize routing if routeserver URL is set and reachable
67+ var routing watched.BaselineWatched
68+ routeserverURL , reachable := isRouteserverReachable (logger )
69+
70+ if reachable {
71+ logger .Bg ().Info ("Initializing routing with reachable routeserver" ,
72+ zap .String ("url" , routeserverURL ))
73+ var err error
74+ routing , err = watched .BaselineWatchedFromEnv ()
75+ if err != nil {
76+ logger .Bg ().Info ("Failed to create BaselineWatchedFromEnv, continuing without routing" , zap .Error (err ))
77+ }
78+ } else {
79+ logger .Bg ().Info ("Continuing without routing" )
4380 }
4481
4582 tracer := tracerProvider .Tracer ("driver" )
@@ -84,6 +121,10 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
84121}
85122
86123func (consumer * Consumer ) shouldProcess (routingKey string ) bool {
124+ if consumer .routing == nil {
125+ // If routing is not set up, process all messages
126+ return true
127+ }
87128 if sbName := config .SignadotSandboxName (); sbName != "" {
88129 return consumer .routing .RoutesTo (routingKey , sbName )
89130 }
0 commit comments