6
6
"log"
7
7
"net/http"
8
8
"os"
9
+ "os/signal"
10
+ "syscall"
9
11
10
12
"github.com/go-redis/redis/v8"
11
13
"go.uber.org/zap"
@@ -19,7 +21,7 @@ type redisConnector struct {
19
21
logger * zap.Logger
20
22
}
21
23
22
- func (conn redisConnector ) consumeMessage (ctx context. Context ) {
24
+ func (conn redisConnector ) consumeMessage (sigterm chan os. Signal ) {
23
25
24
26
headers := http.Header {
25
27
"KEDA-Topic" : {conn .connectordata .Topic },
@@ -29,14 +31,16 @@ func (conn redisConnector) consumeMessage(ctx context.Context) {
29
31
"KEDA-Source-Name" : {conn .connectordata .SourceName },
30
32
}
31
33
34
+ var ctx = context .Background ()
32
35
forever := make (chan bool )
33
36
34
37
go func () {
35
38
for {
36
39
// BLPop will block and wait for a new message if the list is empty
37
40
msg , err := conn .rdbConnection .BLPop (ctx , 0 , conn .connectordata .Topic ).Result ()
38
41
if err != nil {
39
- conn .logger .Fatal ("Error in consuming queue: " , zap .Error ((err )))
42
+ conn .logger .Error ("Error in consuming queue: " , zap .Error ((err )))
43
+ forever <- false
40
44
}
41
45
42
46
if len (msg ) > 1 {
@@ -57,13 +61,15 @@ func (conn redisConnector) consumeMessage(ctx context.Context) {
57
61
err = response .Body .Close ()
58
62
if err != nil {
59
63
conn .logger .Error (err .Error ())
64
+ forever <- false
60
65
}
61
66
}
62
67
}
63
68
}
64
69
}()
65
70
conn .logger .Info ("Redis consumer up and running!" )
66
71
<- forever
72
+ sigterm <- syscall .SIGTERM
67
73
}
68
74
69
75
func (conn redisConnector ) errorHandler (ctx context.Context , err error ) {
@@ -118,16 +124,20 @@ func main() {
118
124
}
119
125
password := os .Getenv ("PASSWORD_FROM_ENV" )
120
126
121
- var ctx = context .Background ()
122
127
rdb := redis .NewClient (& redis.Options {
123
128
Addr : address ,
124
129
Password : password ,
125
130
})
126
131
132
+ sigterm := make (chan os.Signal , 1 )
127
133
conn := redisConnector {
128
134
rdbConnection : rdb ,
129
135
connectordata : connectordata ,
130
136
logger : logger ,
131
137
}
132
- conn .consumeMessage (ctx )
138
+ conn .consumeMessage (sigterm )
139
+
140
+ signal .Notify (sigterm , syscall .SIGINT , syscall .SIGTERM )
141
+ <- sigterm
142
+ logger .Info ("Terminating: Redis consumer" )
133
143
}
0 commit comments