@@ -46,7 +46,6 @@ local _M = {}
46
46
local mt = { __index = _M }
47
47
48
48
local slot_cache = {}
49
- local master_nodes = {}
50
49
51
50
local cmds_for_all_master = {
52
51
[" flushall" ] = true ,
@@ -147,21 +146,25 @@ local function try_hosts_slots(self, serv_list)
147
146
local servers = { serv_list = {} }
148
147
for n = 1 , # slots_info do
149
148
local sub_info = slots_info [n ]
150
- -- slot info item 1 and 2 are the subrange start end slots
149
+ -- slot info item 1 and 2 are the subrange start end slots
151
150
local start_slot , end_slot = sub_info [1 ], sub_info [2 ]
152
-
153
- -- generate new list of servers
151
+ local list = { serv_list = {} }
152
+ -- from 3, here lists the host/port/nodeid of in charge nodes
154
153
for j = 3 , # sub_info do
155
- servers .serv_list [# servers .serv_list + 1 ] = { ip = sub_info [j ][1 ], port = sub_info [j ][2 ] }
154
+ table.insert (list .serv_list ,{
155
+ ip = sub_info [j ][1 ],
156
+ port = sub_info [j ][2 ],
157
+ slave = (j > 3 ) -- first node in the list is the master
158
+ })
156
159
end
157
160
158
161
for slot = start_slot , end_slot do
159
- local list = { serv_list = {} }
160
- -- from 3, here lists the host/port/nodeid of in charge nodes
161
- for j = 3 , # sub_info do
162
- list . serv_list [ # list . serv_list + 1 ] = { ip = sub_info [ j ][ 1 ], port = sub_info [ j ][ 2 ] }
163
- slots [ slot ] = list
164
- end
162
+ slots [ slot ] = list
163
+ end
164
+
165
+ -- append to the list of all servers
166
+ for _ , serv in ipairs ( list . serv_list ) do
167
+ table.insert ( servers . serv_list , serv )
165
168
end
166
169
end
167
170
-- ngx.log(ngx.NOTICE, "finished initializing slotcache...")
@@ -170,31 +173,11 @@ local function try_hosts_slots(self, serv_list)
170
173
else
171
174
table_insert (errors , err )
172
175
end
173
- -- cache master nodes
174
- local nodes_res , nerr = redis_client :cluster (" nodes" )
175
- if nodes_res then
176
- local nodes_info = split (nodes_res , char (10 ))
177
- for _ , node in ipairs (nodes_info ) do
178
- local node_info = split (node , " " )
179
- if # node_info > 2 then
180
- local is_master = match (node_info [3 ], " master" ) ~= nil
181
- if is_master then
182
- local ip_port = split (split (node_info [2 ], " @" )[1 ], " :" )
183
- table_insert (master_nodes , {
184
- ip = ip_port [1 ],
185
- port = tonumber (ip_port [2 ])
186
- })
187
- end
188
- end
189
- end
190
- else
191
- table_insert (errors , nerr )
192
- end
193
176
release_connection (redis_client , config )
194
177
195
- -- refresh of slots and master nodes successful
178
+ -- refresh of slots successfully
196
179
-- not required to connect/iterate over additional hosts
197
- if nodes_res and slots_info then
180
+ if slots_info then
198
181
return true , nil
199
182
end
200
183
elseif max_connection_timeout_err then
@@ -321,11 +304,7 @@ function _M.new(_, config)
321
304
return inst
322
305
end
323
306
324
-
325
307
local function pick_node (self , serv_list , slot , magic_radom_seed )
326
- local host
327
- local port
328
- local slave
329
308
local index
330
309
if # serv_list < 1 then
331
310
return nil , nil , nil , " serv_list for slot " .. slot .. " is empty"
@@ -336,28 +315,15 @@ local function pick_node(self, serv_list, slot, magic_radom_seed)
336
315
else
337
316
index = math.random (# serv_list )
338
317
end
339
- host = serv_list [index ].ip
340
- port = serv_list [index ].port
341
- -- cluster slots will always put the master node as first
342
- if index > 1 then
343
- slave = true
344
- else
345
- slave = false
346
- end
347
- -- ngx.log(ngx.NOTICE, "pickup node: ", c(serv_list[index]))
348
318
else
349
- host = serv_list [1 ].ip
350
- port = serv_list [1 ].port
351
- slave = false
352
- -- ngx.log(ngx.NOTICE, "pickup node: ", cjson.encode(serv_list[1]))
319
+ index = 1
353
320
end
354
- return host , port , slave
321
+ -- ngx.log(ngx.NOTICE, "pickup node: ", cjson.encode(serv_list[index]))
322
+ return serv_list [index ].ip , serv_list [index ].port , serv_list [index ].slave
355
323
end
356
324
357
-
358
325
local ask_host_and_port = {}
359
326
360
-
361
327
local function parse_ask_signal (res )
362
328
-- ask signal sample:ASK 12191 127.0.0.1:7008, so we need to parse and get 127.0.0.1, 7008
363
329
if res ~= ngx .null then
@@ -537,19 +503,22 @@ end
537
503
538
504
local function _do_cmd_master (self , cmd , key , ...)
539
505
local errors = {}
540
- for _ , master in ipairs (master_nodes ) do
541
- local redis_client = redis :new ()
542
- redis_client :set_timeouts (self .config .connect_timeout or DEFAULT_CONNECTION_TIMEOUT ,
543
- self .config .send_timeout or DEFAULT_SEND_TIMEOUT ,
544
- self .config .read_timeout or DEFAULT_READ_TIMEOUT )
545
- local ok , err = redis_client :connect (master .ip , master .port , self .config .connect_opts )
546
- if ok then
547
- _ , err = redis_client [cmd ](redis_client , key , ... )
548
- end
549
- if err then
550
- table_insert (errors , err )
506
+ local serv_list = slot_cache [self .config .name .. " serv_list" ].serv_list
507
+ for _ , server in ipairs (serv_list ) do
508
+ if not server .slave then
509
+ local redis_client = redis :new ()
510
+ redis_client :set_timeouts (self .config .connect_timeout or DEFAULT_CONNECTION_TIMEOUT ,
511
+ self .config .send_timeout or DEFAULT_SEND_TIMEOUT ,
512
+ self .config .read_timeout or DEFAULT_READ_TIMEOUT )
513
+ local ok , err = redis_client :connect (server .ip , server .port , self .config .connect_opts )
514
+ if ok then
515
+ _ , err = redis_client [cmd ](redis_client , key , ... )
516
+ end
517
+ if err then
518
+ table_insert (errors , err )
519
+ end
520
+ release_connection (redis_client , self .config )
551
521
end
552
- release_connection (redis_client , self .config )
553
522
end
554
523
return # errors == 0 , table.concat (errors , " ;" )
555
524
end
0 commit comments