@@ -19,7 +19,12 @@ local table_insert = table.insert
19
19
local string_find = string.find
20
20
local redis_crc = xmodem .redis_crc
21
21
22
+ local cjson = require (' cjson.safe' )
23
+ local cjson_encode = cjson .encode
24
+ local cjson_deocde = cjson .decode
25
+
22
26
local DEFAULT_SHARED_DICT_NAME = " redis_cluster_slot_locks"
27
+ local DEFAULT_SLOTS_INFO_DICT_NAME = " redis_cluster_slots_info"
23
28
local DEFAULT_REFRESH_DICT_NAME = " refresh_lock"
24
29
local DEFAULT_MAX_REDIRECTION = 5
25
30
local DEFAULT_MAX_CONNECTION_ATTEMPTS = 3
@@ -96,6 +101,42 @@ local function split(s, delimiter)
96
101
return result ;
97
102
end
98
103
104
+ local function generate_full_slots_cache_info (slots_info )
105
+ if not slots_info then
106
+ return nil , nil , ' slots_info is nil'
107
+ end
108
+
109
+ local slots = {}
110
+ -- while slots are updated, create a list of servers present in cluster
111
+ -- this can differ from self.config.serv_list if a cluster is resized (added/removed nodes)
112
+ local servers = { serv_list = {} }
113
+ for n = 1 , # slots_info do
114
+ local sub_info = slots_info [n ]
115
+ -- slot info item 1 and 2 are the subrange start end slots
116
+ local start_slot , end_slot = sub_info [1 ], sub_info [2 ]
117
+ local list = { serv_list = {} }
118
+ -- from 3, here lists the host/port/nodeid of in charge nodes
119
+ for j = 3 , # sub_info do
120
+ table.insert (list .serv_list ,{
121
+ ip = sub_info [j ][1 ],
122
+ port = sub_info [j ][2 ],
123
+ slave = (j > 3 ) -- first node in the list is the master
124
+ })
125
+ end
126
+
127
+ for slot = start_slot , end_slot do
128
+ slots [slot ] = list
129
+ end
130
+
131
+ -- append to the list of all servers
132
+ for _ , serv in ipairs (list .serv_list ) do
133
+ table.insert (servers .serv_list ,serv )
134
+ end
135
+ end
136
+
137
+ return slots , servers
138
+ end
139
+
99
140
local function try_hosts_slots (self , serv_list )
100
141
local start_time = ngx .now ()
101
142
local errors = {}
@@ -140,36 +181,16 @@ local function try_hosts_slots(self, serv_list)
140
181
local slots_info
141
182
slots_info , err = redis_client :cluster (" slots" )
142
183
if slots_info then
143
- local slots = {}
144
- -- while slots are updated, create a list of servers present in cluster
145
- -- this can differ from self.config.serv_list if a cluster is resized (added/removed nodes)
146
- local servers = { serv_list = {} }
147
- for n = 1 , # slots_info do
148
- local sub_info = slots_info [n ]
149
- -- slot info item 1 and 2 are the subrange start end slots
150
- local start_slot , end_slot = sub_info [1 ], sub_info [2 ]
151
- local list = { serv_list = {} }
152
- -- from 3, here lists the host/port/nodeid of in charge nodes
153
- for j = 3 , # sub_info do
154
- table.insert (list .serv_list ,{
155
- ip = sub_info [j ][1 ],
156
- port = sub_info [j ][2 ],
157
- slave = (j > 3 ) -- first node in the list is the master
158
- })
159
- end
160
-
161
- for slot = start_slot , end_slot do
162
- slots [slot ] = list
163
- end
164
-
165
- -- append to the list of all servers
166
- for _ , serv in ipairs (list .serv_list ) do
167
- table.insert (servers .serv_list ,serv )
168
- end
169
- end
184
+ local slots , servers = generate_full_slots_cache_info (slots_info )
170
185
-- ngx.log(ngx.NOTICE, "finished initializing slotcache...")
171
186
slot_cache [self .config .name ] = slots
172
187
slot_cache [self .config .name .. " serv_list" ] = servers
188
+
189
+ -- cache slots_info to memory
190
+ _ , err = self :try_cache_slots_info_to_memory (slots_info )
191
+ if err then
192
+ ngx .log (ngx .ERR , ' failed to cache slots to memory: ' , err )
193
+ end
173
194
else
174
195
table_insert (errors , err )
175
196
end
@@ -223,6 +244,56 @@ function _M.fetch_slots(self)
223
244
end
224
245
end
225
246
247
+ function _M .try_load_slots_from_memory_cache (self )
248
+ local dict_name = self .config .slots_info_dict_name or DEFAULT_SLOTS_INFO_DICT_NAME
249
+ local slots_cache_dict = ngx .shared [dict_name ]
250
+ if slots_cache_dict == nil then
251
+ return false , dict_name .. ' is nil'
252
+ end
253
+
254
+
255
+ local slots_info_str = slots_cache_dict :get (self .config .name )
256
+ if not slots_info_str or slots_info_str == ' ' then
257
+ ngx .log (ngx .ERR , ' slots_info_str: ' , slots_info_str )
258
+ return false , ' slots_info_str is nil or empty'
259
+ end
260
+
261
+ local slots_info = cjson_decode (slots_info_str )
262
+ if not slots_info then
263
+ return false , ' slots_info is nil'
264
+ end
265
+
266
+ local slots , servers = generate_full_slots_cache_info (slots_info )
267
+ if not slots or not servers then
268
+ return false , ' slots or servers is nil'
269
+ end
270
+
271
+ -- ngx.log(ngx.NOTICE, "finished initializing slotcache...")
272
+ slot_cache [self .config .name ] = slots
273
+ slot_cache [self .config .name .. " serv_list" ] = servers
274
+
275
+ return true
276
+ end
277
+
278
+ function _M .try_cache_slots_info_to_memory (self , slots_info )
279
+ local dict_name = self .config .slots_info_dict_name or DEFAULT_SLOTS_INFO_DICT_NAME
280
+ local slots_cache_dict = ngx .shared [dict_name ]
281
+ if slots_cache_dict == nil then
282
+ return false , dict_name .. ' is nil'
283
+ end
284
+
285
+ if not slots_info then
286
+ return false , ' slots_info is nil'
287
+ end
288
+
289
+ local slots_info_str = cjson_encode (slots_info )
290
+ local success , err = slots_cache_dict :set (self .config .name , slots_info_str )
291
+ if not success then
292
+ ngx .log (ngx .ERR , ' error set slots_info: ' , err , ' , slots_info_str: ' , slots_info_str )
293
+ return false , err
294
+ end
295
+ return true
296
+ end
226
297
227
298
function _M .refresh_slots (self )
228
299
local worker_id = ngx .worker .id ()
@@ -275,6 +346,19 @@ function _M.init_slots(self)
275
346
return true
276
347
end
277
348
349
+ -- fetch slots from memory cache
350
+ ok , err = self :try_load_slots_from_memory_cache ()
351
+ if err then
352
+ ngx .log (ngx .ERR , ' failed to fetch slots from memory cache: ' , err )
353
+ end
354
+ if ok then
355
+ ok , err = lock :unlock ()
356
+ if not ok then
357
+ ngx .log (ngx .ERR , " failed to unlock in initialization slot cache:" , err )
358
+ end
359
+ return true
360
+ end
361
+
278
362
local _ , errs = self :fetch_slots ()
279
363
if errs then
280
364
ok , err = lock :unlock ()
@@ -487,8 +571,13 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
487
571
return res , err
488
572
end
489
573
else
574
+ -- `too many waiting connect operations` means queued connect operations is out of backlog
575
+ -- `timeout` means timeout while wait for connection release
576
+ -- if connect timeout caused by server's issue, the connerr is `connection timed out`
577
+ if connerr ~= ' too many waiting connect operations' and connerr ~= ' timeout' then
578
+ self :refresh_slots ()
579
+ end
490
580
-- There might be node fail, we should also refresh slot cache
491
- self :refresh_slots ()
492
581
if k == config .max_redirection or k == DEFAULT_MAX_REDIRECTION then
493
582
-- only return after allowing for `k` attempts
494
583
return nil , connerr
0 commit comments