Skip to content

Commit f137af0

Browse files
committed
Redis-backed queues are no longer listened to once a list is empty.
* LPop does not wait for new messages. * Use BLPop which is blocking version for LPop and waits for new messages. Signed-off-by: Md Soharab Ansari <[email protected]>
1 parent 14e5b81 commit f137af0

File tree

1 file changed

+19
-19
lines changed

1 file changed

+19
-19
lines changed

redis-http-connector/main.go

+19-19
Original file line numberDiff line numberDiff line change
@@ -29,34 +29,34 @@ func (conn redisConnector) consumeMessage(ctx context.Context) {
2929
"KEDA-Source-Name": {conn.connectordata.SourceName},
3030
}
3131

32-
var messages []string
33-
var listItr int64
3432
forever := make(chan bool)
3533

3634
go func() {
37-
listLength, err := conn.rdbConnection.LLen(ctx, conn.connectordata.Topic).Result()
38-
if err != nil {
39-
conn.logger.Fatal("Error in consuming queue: ", zap.Error(err))
40-
}
41-
for listItr = 0; listItr < listLength; listItr++ {
42-
msg, err := conn.rdbConnection.LPop(ctx, conn.connectordata.Topic).Result()
35+
for {
36+
// BLPop will block and wait for a new message if the list is empty
37+
msg, err := conn.rdbConnection.BLPop(ctx, 0, conn.connectordata.Topic).Result()
4338
if err != nil {
4439
conn.logger.Fatal("Error in consuming queue: ", zap.Error((err)))
4540
}
46-
messages = append(messages, msg)
47-
}
48-
for _, message := range messages {
49-
response, err := common.HandleHTTPRequest(message, headers, conn.connectordata, conn.logger)
50-
if err != nil {
51-
conn.errorHandler(ctx, err)
52-
} else {
53-
defer response.Body.Close()
54-
body, err := io.ReadAll(response.Body)
41+
42+
if len(msg) > 1 {
43+
// BLPop returns a slice with topic and message, we need the second item
44+
message := msg[1]
45+
response, err := common.HandleHTTPRequest(message, headers, conn.connectordata, conn.logger)
5546
if err != nil {
5647
conn.errorHandler(ctx, err)
5748
} else {
58-
if success := conn.responseHandler(ctx, string(body)); success {
59-
conn.logger.Info("Message sending to response successful")
49+
body, err := io.ReadAll(response.Body)
50+
if err != nil {
51+
conn.errorHandler(ctx, err)
52+
} else {
53+
if success := conn.responseHandler(ctx, string(body)); success {
54+
conn.logger.Info("Message sending to response successful")
55+
}
56+
}
57+
err = response.Body.Close()
58+
if err != nil {
59+
conn.logger.Error(err.Error())
6060
}
6161
}
6262
}

0 commit comments

Comments
 (0)