Skip to content

Commit 14b86f8

Browse files
authored
Merge pull request #10 from Revolyssup/revolyssup/add-local-cache
feat: add healthcheck for failed master nodes and refactor retry strategy
2 parents ba380ba + 91969db commit 14b86f8

File tree

1 file changed

+147
-26
lines changed

1 file changed

+147
-26
lines changed

lib/resty/rediscluster.lua

+147-26
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,77 @@ local redis_crc = xmodem.redis_crc
2121

2222
local DEFAULT_SHARED_DICT_NAME = "redis_cluster_slot_locks"
2323
local DEFAULT_REFRESH_DICT_NAME = "refresh_lock"
24-
local DEFAULT_MAX_REDIRECTION = 5
25-
local DEFAULT_MAX_CONNECTION_ATTEMPTS = 3
24+
local DEFAULT_MAX_REDIRECTION = 2
25+
local DEFAULT_MAX_CONNECTION_ATTEMPTS = 2
2626
local DEFAULT_KEEPALIVE_TIMEOUT = 55000
2727
local DEFAULT_KEEPALIVE_CONS = 1000
2828
local DEFAULT_CONNECTION_TIMEOUT = 1000
2929
local DEFAULT_SEND_TIMEOUT = 1000
3030
local DEFAULT_READ_TIMEOUT = 1000
31+
local DEFAULT_HEALTH_DICT_NAME = "redis_cluster_health"
32+
local err_unhealthy_master = "master node is unhealthy"
33+
34+
local function generate_key(name, ip, port)
35+
return name .. ":" .. ip .. ":" .. port
36+
end
37+
38+
39+
local function health_check_timer(premature)
40+
if premature then
41+
return
42+
end
43+
44+
local health_dict = ngx.shared[DEFAULT_HEALTH_DICT_NAME]
45+
if not health_dict then
46+
return
47+
end
48+
49+
local all_keys = health_dict:get_keys()
50+
for _, key in ipairs(all_keys) do
51+
local ip, port = string.match(key, "^[^:]+:([^:]+):(%d+)$")
52+
if not ip or not port then
53+
health_dict:delete(key)
54+
goto continue
55+
end
56+
port = tonumber(port)
57+
local ok
58+
-- Create a new Redis client for each check
59+
local red = redis:new()
60+
red:set_timeouts(500, 500, 500) -- 500ms for connect/send/read
61+
ngx.log(ngx.WARN, "health check for: ", ip, ":", port)
62+
-- Attempt to connect and send PING
63+
local ok, err = red:connect(ip, port)
64+
if ok then
65+
-- Check if PING succeeds
66+
local res, err = red:ping()
67+
if res == nil then
68+
ok = false
69+
err = "PING failed"
70+
end
71+
red:close() -- Close connection after check
72+
end
73+
-- Update health status based on check
74+
if ok then
75+
health_dict:set(key, 0, 0) -- Healthy: reset failures, no TTL
76+
ngx.log(ngx.WARN, "health check success for: ", ip, ":", port)
77+
else
78+
local failures = health_dict:get(key) or 0
79+
health_dict:set(key, failures + 1, 60) -- Unhealthy: increment failures with TTL
80+
ngx.log(ngx.WARN, "health check failed for: ", ip, ":", port, "failures: ", failures + 1)
81+
end
82+
83+
::continue::
84+
end
85+
end
86+
87+
local function track_node_failure(ip, port, name)
88+
local health_dict = ngx.shared[DEFAULT_HEALTH_DICT_NAME]
89+
if not health_dict then
90+
return
91+
end
92+
local key = generate_key(name, ip, port)
93+
health_dict:incr(key, 1, 0, 60)
94+
end
3195

3296
local function parse_key(key_str)
3397
local left_tag_single_index = string_find(key_str, "{", 0)
@@ -48,6 +112,20 @@ local mt = { __index = _M }
48112
local slot_cache = {}
49113
local master_nodes = {}
50114

115+
116+
117+
local function is_node_healthy(ip, port, name)
118+
local health_dict = ngx.shared[DEFAULT_HEALTH_DICT_NAME]
119+
if not health_dict then
120+
return true
121+
end
122+
123+
local key = generate_key(name, ip, port)
124+
local is_healthy = (health_dict:get(key) or 0) <= 3
125+
return is_healthy
126+
end
127+
128+
51129
local cmds_for_all_master = {
52130
["flushall"] = true,
53131
["flushdb"] = true
@@ -104,18 +182,21 @@ local function try_hosts_slots(self, serv_list)
104182
if #serv_list < 1 then
105183
return nil, "failed to fetch slots, serv_list config is empty"
106184
end
107-
108185
for i = 1, #serv_list do
109186
local ip = serv_list[i].ip
110187
local port = serv_list[i].port
188+
local is_healthy = is_node_healthy(ip, port, self.config.name)
189+
if not is_healthy then
190+
goto continue
191+
end
111192
local redis_client = redis:new()
112193
local ok, err, max_connection_timeout_err
113-
redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT,
114-
config.send_timeout or DEFAULT_SEND_TIMEOUT,
115-
config.read_timeout or DEFAULT_READ_TIMEOUT)
116-
117194
--attempt to connect DEFAULT_MAX_CONNECTION_ATTEMPTS times to redis
118195
for k = 1, config.max_connection_attempts or DEFAULT_MAX_CONNECTION_ATTEMPTS do
196+
local attempt_timeout = k == 1 and 100 or 800 -- 100ms first attempt, 800ms retry
197+
redis_client:set_timeouts(attempt_timeout,
198+
config.send_timeout or DEFAULT_SEND_TIMEOUT,
199+
config.read_timeout or DEFAULT_READ_TIMEOUT)
119200
local total_connection_time_ms = (ngx.now() - start_time) * 1000
120201
if (config.max_connection_timeout and total_connection_time_ms > config.max_connection_timeout) then
121202
max_connection_timeout_err = "max_connection_timeout of " .. config.max_connection_timeout .. "ms reached."
@@ -125,13 +206,16 @@ local function try_hosts_slots(self, serv_list)
125206
end
126207

127208
ok, err = redis_client:connect(ip, port, self.config.connect_opts)
128-
if ok then break end
209+
210+
if ok then
211+
break
212+
end
129213
if err then
130214
ngx.log(ngx.ERR,"unable to connect, attempt nr ", k, " : error: ", err)
215+
track_node_failure(ip, port, self.config.name)
131216
table_insert(errors, err)
132217
end
133218
end
134-
135219
if ok then
136220
local _, autherr = check_auth(self, redis_client)
137221
if autherr then
@@ -205,6 +289,7 @@ local function try_hosts_slots(self, serv_list)
205289
if #errors == 0 then
206290
return true, nil
207291
end
292+
::continue::
208293
end
209294
return nil, errors
210295
end
@@ -231,7 +316,6 @@ function _M.fetch_slots(self)
231316
end
232317

233318
serv_list_cached = nil -- important!
234-
235319
local _, errors = try_hosts_slots(self, serv_list_combined)
236320
if errors then
237321
local err = "failed to fetch slots: " .. table.concat(errors, ";")
@@ -257,6 +341,22 @@ function _M.refresh_slots(self)
257341
end
258342

259343
self:fetch_slots()
344+
-- Cleanup health dict entries for removed nodes
345+
local health_dict = ngx.shared[DEFAULT_HEALTH_DICT_NAME]
346+
local current_nodes = {}
347+
local servers = slot_cache[self.config.name .. "serv_list"].serv_list
348+
for _, node in ipairs(servers) do
349+
local key = generate_key(self.config.name, node.ip, node.port)
350+
current_nodes[key] = true
351+
end
352+
-- Cleanup stale nodes
353+
local all_keys = health_dict:get_keys()
354+
for _, key in ipairs(all_keys) do
355+
if not current_nodes[key] then
356+
health_dict:delete(key)
357+
end
358+
end
359+
260360
ok, err = lock:unlock()
261361
if not ok then
262362
ngx.log(ngx.ERR, "failed to unlock in refresh slot cache:", err)
@@ -330,33 +430,47 @@ end
330430

331431

332432
local function pick_node(self, serv_list, slot, magic_radom_seed)
433+
local healthy_servers = {}
434+
for i, node in ipairs(serv_list) do
435+
-- first node here is master. If its unhealthy then we should return err
436+
local is_healthy = is_node_healthy(node.ip, node.port, self.config.name)
437+
if i == 1 and not is_healthy then
438+
return nil, nil, nil, err_unhealthy_master
439+
end
440+
if is_healthy then
441+
table_insert(healthy_servers, node)
442+
end
443+
end
444+
if #healthy_servers == 0 then
445+
return nil, nil, nil, "No healthy nodes"
446+
end
333447
local host
334448
local port
335449
local slave
336450
local index
337-
if #serv_list < 1 then
451+
if #healthy_servers < 1 then
338452
return nil, nil, nil, "serv_list for slot " .. slot .. " is empty"
339453
end
340454
if self.config.enable_slave_read then
341455
if magic_radom_seed then
342-
index = magic_radom_seed % #serv_list + 1
456+
index = magic_radom_seed % #healthy_servers + 1
343457
else
344-
index = math.random(#serv_list)
458+
index = math.random(#healthy_servers)
345459
end
346-
host = serv_list[index].ip
347-
port = serv_list[index].port
460+
host = healthy_servers[index].ip
461+
port = healthy_servers[index].port
348462
--cluster slots will always put the master node as first
349463
if index > 1 then
350464
slave = true
351465
else
352466
slave = false
353467
end
354-
--ngx.log(ngx.NOTICE, "pickup node: ", c(serv_list[index]))
468+
--ngx.log(ngx.NOTICE, "pickup node: ", c(healthy_servers[index]))
355469
else
356-
host = serv_list[1].ip
357-
port = serv_list[1].port
470+
host = healthy_servers[1].ip
471+
port = healthy_servers[1].port
358472
slave = false
359-
--ngx.log(ngx.NOTICE, "pickup node: ", cjson.encode(serv_list[1]))
473+
--ngx.log(ngx.NOTICE, "pickup node: ", cjson.encode(healthy_servers[1]))
360474
end
361475
return host, port, slave
362476
end
@@ -414,11 +528,10 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
414528

415529
key = tostring(key)
416530
local slot = redis_slot(key)
417-
418531
for k = 1, config.max_redirection or DEFAULT_MAX_REDIRECTION do
419-
532+
local attempt_timeout = k == 1 and 100 or 800 -- 100ms first attempt, 800ms retry
420533
if k > 1 then
421-
ngx.log(ngx.NOTICE, "handle retry attempts:" .. k .. " for cmd:" .. cmd .. " key:" .. key)
534+
ngx.log(ngx.WARN, "handle retry attempts:" .. k .. " for cmd:" .. cmd .. " key:" .. key)
422535
end
423536

424537
local slots = slot_cache[self.config.name]
@@ -439,18 +552,22 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
439552
else
440553
ip, port, slave, err = pick_node(self, serv_list, slot)
441554
if err then
555+
if err == err_unhealthy_master then
556+
return nil, err
557+
end
442558
ngx.log(ngx.ERR, "pickup node failed, will return failed for this request, meanwhile refereshing slotcache " .. err)
443559
self:refresh_slots()
444560
return nil, err
445561
end
446562
end
447-
448563
local redis_client = redis:new()
449-
redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT,
564+
redis_client:set_timeouts(attempt_timeout,
450565
config.send_timeout or DEFAULT_SEND_TIMEOUT,
451566
config.read_timeout or DEFAULT_READ_TIMEOUT)
452567
local ok, connerr = redis_client:connect(ip, port, self.config.connect_opts)
453-
568+
if not ok then
569+
track_node_failure(ip, port, self.config.name)
570+
end
454571
if ok then
455572
local authok, autherr = check_auth(self, redis_client)
456573
if autherr then
@@ -481,7 +598,6 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
481598
else
482599
res, err = redis_client[cmd](redis_client, key, ...)
483600
end
484-
485601
if err then
486602
if string.sub(err, 1, 5) == "MOVED" then
487603
--ngx.log(ngx.NOTICE, "find MOVED signal, trigger retry for normal commands, cmd:" .. cmd .. " key:" .. key)
@@ -512,6 +628,7 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
512628
return nil, "Cannot executing command, cluster status is failed!"
513629
else
514630
--There might be node fail, we should also refresh slot cache
631+
track_node_failure(ip, port, self.config.name)
515632
self:refresh_slots()
516633
return nil, err
517634
end
@@ -802,4 +919,8 @@ setmetatable(_M, {
802919
end
803920
})
804921

922+
function _M.init()
923+
ngx.timer.every(1, health_check_timer)
924+
end
925+
805926
return _M

0 commit comments

Comments
 (0)