@@ -21,13 +21,77 @@ local redis_crc = xmodem.redis_crc
2121
2222local DEFAULT_SHARED_DICT_NAME = " redis_cluster_slot_locks"
2323local DEFAULT_REFRESH_DICT_NAME = " refresh_lock"
24- local DEFAULT_MAX_REDIRECTION = 5
25- local DEFAULT_MAX_CONNECTION_ATTEMPTS = 3
24+ local DEFAULT_MAX_REDIRECTION = 2
25+ local DEFAULT_MAX_CONNECTION_ATTEMPTS = 2
2626local DEFAULT_KEEPALIVE_TIMEOUT = 55000
2727local DEFAULT_KEEPALIVE_CONS = 1000
2828local DEFAULT_CONNECTION_TIMEOUT = 1000
2929local DEFAULT_SEND_TIMEOUT = 1000
3030local DEFAULT_READ_TIMEOUT = 1000
31+ local DEFAULT_HEALTH_DICT_NAME = " redis_cluster_health"
32+ local err_unhealthy_master = " master node is unhealthy"
33+
34+ local function generate_key (name , ip , port )
35+ return name .. " :" .. ip .. " :" .. port
36+ end
37+
38+
39+ local function health_check_timer (premature )
40+ if premature then
41+ return
42+ end
43+
44+ local health_dict = ngx .shared [DEFAULT_HEALTH_DICT_NAME ]
45+ if not health_dict then
46+ return
47+ end
48+
49+ local all_keys = health_dict :get_keys ()
50+ for _ , key in ipairs (all_keys ) do
51+ local ip , port = string.match (key , " ^[^:]+:([^:]+):(%d+)$" )
52+ if not ip or not port then
53+ health_dict :delete (key )
54+ goto continue
55+ end
56+ port = tonumber (port )
57+ local ok
58+ -- Create a new Redis client for each check
59+ local red = redis :new ()
60+ red :set_timeouts (500 , 500 , 500 ) -- 500ms for connect/send/read
61+ ngx .log (ngx .WARN , " health check for: " , ip , " :" , port )
62+ -- Attempt to connect and send PING
63+ local ok , err = red :connect (ip , port )
64+ if ok then
65+ -- Check if PING succeeds
66+ local res , err = red :ping ()
67+ if res == nil then
68+ ok = false
69+ err = " PING failed"
70+ end
71+ red :close () -- Close connection after check
72+ end
73+ -- Update health status based on check
74+ if ok then
75+ health_dict :set (key , 0 , 0 ) -- Healthy: reset failures, no TTL
76+ ngx .log (ngx .WARN , " health check success for: " , ip , " :" , port )
77+ else
78+ local failures = health_dict :get (key ) or 0
79+ health_dict :set (key , failures + 1 , 60 ) -- Unhealthy: increment failures with TTL
80+ ngx .log (ngx .WARN , " health check failed for: " , ip , " :" , port , " failures: " , failures + 1 )
81+ end
82+
83+ :: continue::
84+ end
85+ end
86+
87+ local function track_node_failure (ip , port , name )
88+ local health_dict = ngx .shared [DEFAULT_HEALTH_DICT_NAME ]
89+ if not health_dict then
90+ return
91+ end
92+ local key = generate_key (name , ip , port )
93+ health_dict :incr (key , 1 , 0 , 60 )
94+ end
3195
3296local function parse_key (key_str )
3397 local left_tag_single_index = string_find (key_str , " {" , 0 )
@@ -48,6 +112,20 @@ local mt = { __index = _M }
48112local slot_cache = {}
49113local master_nodes = {}
50114
115+
116+
117+ local function is_node_healthy (ip , port , name )
118+ local health_dict = ngx .shared [DEFAULT_HEALTH_DICT_NAME ]
119+ if not health_dict then
120+ return true
121+ end
122+
123+ local key = generate_key (name , ip , port )
124+ local is_healthy = (health_dict :get (key ) or 0 ) <= 3
125+ return is_healthy
126+ end
127+
128+
51129local cmds_for_all_master = {
52130 [" flushall" ] = true ,
53131 [" flushdb" ] = true
@@ -104,18 +182,21 @@ local function try_hosts_slots(self, serv_list)
104182 if # serv_list < 1 then
105183 return nil , " failed to fetch slots, serv_list config is empty"
106184 end
107-
108185 for i = 1 , # serv_list do
109186 local ip = serv_list [i ].ip
110187 local port = serv_list [i ].port
188+ local is_healthy = is_node_healthy (ip , port , self .config .name )
189+ if not is_healthy then
190+ goto continue
191+ end
111192 local redis_client = redis :new ()
112193 local ok , err , max_connection_timeout_err
113- redis_client :set_timeouts (config .connect_timeout or DEFAULT_CONNECTION_TIMEOUT ,
114- config .send_timeout or DEFAULT_SEND_TIMEOUT ,
115- config .read_timeout or DEFAULT_READ_TIMEOUT )
116-
117194 -- attempt to connect DEFAULT_MAX_CONNECTION_ATTEMPTS times to redis
118195 for k = 1 , config .max_connection_attempts or DEFAULT_MAX_CONNECTION_ATTEMPTS do
196+ local attempt_timeout = k == 1 and 100 or 800 -- 100ms first attempt, 800ms retry
197+ redis_client :set_timeouts (attempt_timeout ,
198+ config .send_timeout or DEFAULT_SEND_TIMEOUT ,
199+ config .read_timeout or DEFAULT_READ_TIMEOUT )
119200 local total_connection_time_ms = (ngx .now () - start_time ) * 1000
120201 if (config .max_connection_timeout and total_connection_time_ms > config .max_connection_timeout ) then
121202 max_connection_timeout_err = " max_connection_timeout of " .. config .max_connection_timeout .. " ms reached."
@@ -125,13 +206,16 @@ local function try_hosts_slots(self, serv_list)
125206 end
126207
127208 ok , err = redis_client :connect (ip , port , self .config .connect_opts )
128- if ok then break end
209+
210+ if ok then
211+ break
212+ end
129213 if err then
130214 ngx .log (ngx .ERR ," unable to connect, attempt nr " , k , " : error: " , err )
215+ track_node_failure (ip , port , self .config .name )
131216 table_insert (errors , err )
132217 end
133218 end
134-
135219 if ok then
136220 local _ , autherr = check_auth (self , redis_client )
137221 if autherr then
@@ -205,6 +289,7 @@ local function try_hosts_slots(self, serv_list)
205289 if # errors == 0 then
206290 return true , nil
207291 end
292+ :: continue::
208293 end
209294 return nil , errors
210295end
@@ -231,7 +316,6 @@ function _M.fetch_slots(self)
231316 end
232317
233318 serv_list_cached = nil -- important!
234-
235319 local _ , errors = try_hosts_slots (self , serv_list_combined )
236320 if errors then
237321 local err = " failed to fetch slots: " .. table.concat (errors , " ;" )
@@ -257,6 +341,22 @@ function _M.refresh_slots(self)
257341 end
258342
259343 self :fetch_slots ()
344+ -- Cleanup health dict entries for removed nodes
345+ local health_dict = ngx .shared [DEFAULT_HEALTH_DICT_NAME ]
346+ local current_nodes = {}
347+ local servers = slot_cache [self .config .name .. " serv_list" ].serv_list
348+ for _ , node in ipairs (servers ) do
349+ local key = generate_key (self .config .name , node .ip , node .port )
350+ current_nodes [key ] = true
351+ end
352+ -- Cleanup stale nodes
353+ local all_keys = health_dict :get_keys ()
354+ for _ , key in ipairs (all_keys ) do
355+ if not current_nodes [key ] then
356+ health_dict :delete (key )
357+ end
358+ end
359+
260360 ok , err = lock :unlock ()
261361 if not ok then
262362 ngx .log (ngx .ERR , " failed to unlock in refresh slot cache:" , err )
@@ -330,33 +430,47 @@ end
330430
331431
332432local function pick_node (self , serv_list , slot , magic_radom_seed )
433+ local healthy_servers = {}
434+ for i , node in ipairs (serv_list ) do
435+ -- first node here is master. If its unhealthy then we should return err
436+ local is_healthy = is_node_healthy (node .ip , node .port , self .config .name )
437+ if i == 1 and not is_healthy then
438+ return nil , nil , nil , err_unhealthy_master
439+ end
440+ if is_healthy then
441+ table_insert (healthy_servers , node )
442+ end
443+ end
444+ if # healthy_servers == 0 then
445+ return nil , nil , nil , " No healthy nodes"
446+ end
333447 local host
334448 local port
335449 local slave
336450 local index
337- if # serv_list < 1 then
451+ if # healthy_servers < 1 then
338452 return nil , nil , nil , " serv_list for slot " .. slot .. " is empty"
339453 end
340454 if self .config .enable_slave_read then
341455 if magic_radom_seed then
342- index = magic_radom_seed % # serv_list + 1
456+ index = magic_radom_seed % # healthy_servers + 1
343457 else
344- index = math.random (# serv_list )
458+ index = math.random (# healthy_servers )
345459 end
346- host = serv_list [index ].ip
347- port = serv_list [index ].port
460+ host = healthy_servers [index ].ip
461+ port = healthy_servers [index ].port
348462 -- cluster slots will always put the master node as first
349463 if index > 1 then
350464 slave = true
351465 else
352466 slave = false
353467 end
354- -- ngx.log(ngx.NOTICE, "pickup node: ", c(serv_list [index]))
468+ -- ngx.log(ngx.NOTICE, "pickup node: ", c(healthy_servers [index]))
355469 else
356- host = serv_list [1 ].ip
357- port = serv_list [1 ].port
470+ host = healthy_servers [1 ].ip
471+ port = healthy_servers [1 ].port
358472 slave = false
359- -- ngx.log(ngx.NOTICE, "pickup node: ", cjson.encode(serv_list [1]))
473+ -- ngx.log(ngx.NOTICE, "pickup node: ", cjson.encode(healthy_servers [1]))
360474 end
361475 return host , port , slave
362476end
@@ -414,11 +528,10 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
414528
415529 key = tostring (key )
416530 local slot = redis_slot (key )
417-
418531 for k = 1 , config .max_redirection or DEFAULT_MAX_REDIRECTION do
419-
532+ local attempt_timeout = k == 1 and 100 or 800 -- 100ms first attempt, 800ms retry
420533 if k > 1 then
421- ngx .log (ngx .NOTICE , " handle retry attempts:" .. k .. " for cmd:" .. cmd .. " key:" .. key )
534+ ngx .log (ngx .WARN , " handle retry attempts:" .. k .. " for cmd:" .. cmd .. " key:" .. key )
422535 end
423536
424537 local slots = slot_cache [self .config .name ]
@@ -439,18 +552,22 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
439552 else
440553 ip , port , slave , err = pick_node (self , serv_list , slot )
441554 if err then
555+ if err == err_unhealthy_master then
556+ return nil , err
557+ end
442558 ngx .log (ngx .ERR , " pickup node failed, will return failed for this request, meanwhile refereshing slotcache " .. err )
443559 self :refresh_slots ()
444560 return nil , err
445561 end
446562 end
447-
448563 local redis_client = redis :new ()
449- redis_client :set_timeouts (config . connect_timeout or DEFAULT_CONNECTION_TIMEOUT ,
564+ redis_client :set_timeouts (attempt_timeout ,
450565 config .send_timeout or DEFAULT_SEND_TIMEOUT ,
451566 config .read_timeout or DEFAULT_READ_TIMEOUT )
452567 local ok , connerr = redis_client :connect (ip , port , self .config .connect_opts )
453-
568+ if not ok then
569+ track_node_failure (ip , port , self .config .name )
570+ end
454571 if ok then
455572 local authok , autherr = check_auth (self , redis_client )
456573 if autherr then
@@ -481,7 +598,6 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
481598 else
482599 res , err = redis_client [cmd ](redis_client , key , ... )
483600 end
484-
485601 if err then
486602 if string.sub (err , 1 , 5 ) == " MOVED" then
487603 -- ngx.log(ngx.NOTICE, "find MOVED signal, trigger retry for normal commands, cmd:" .. cmd .. " key:" .. key)
@@ -512,6 +628,7 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
512628 return nil , " Cannot executing command, cluster status is failed!"
513629 else
514630 -- There might be node fail, we should also refresh slot cache
631+ track_node_failure (ip , port , self .config .name )
515632 self :refresh_slots ()
516633 return nil , err
517634 end
@@ -802,4 +919,8 @@ setmetatable(_M, {
802919 end
803920})
804921
922+ function _M .init ()
923+ ngx .timer .every (1 , health_check_timer )
924+ end
925+
805926return _M
0 commit comments