diff --git a/README.md b/README.md index 908c84b..59a73da 100644 --- a/README.md +++ b/README.md @@ -52,8 +52,9 @@ While building the client, thanks for https://github.com/cuiweixie/lua-resty-red 2. nginx.conf add config: lua_shared_dict redis_cluster_slot_locks 100k; + lua_shared_dict redis_cluster_slots_info 100k; -3. or install by luarock, link: https://luarocks.org/modules/steve0511/resty-redis-cluster +4. or install by luarock, link: https://luarocks.org/modules/steve0511/resty-redis-cluster ### Sample usage @@ -61,10 +62,11 @@ While building the client, thanks for https://github.com/cuiweixie/lua-resty-red ```lua local config = { - dict_name = "test_locks", --shared dictionary name for locks, if default value is not used - refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used - name = "testCluster", --rediscluster name - serv_list = { --redis cluster node list(host and port), + dict_name = "test_locks", --shared dictionary name for locks, if default value is not used + refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used + slots_info_dict_name = "test_slots_info", --shared dictionary name for slots_info + name = "testCluster", --rediscluster name + serv_list = { --redis cluster node list (host and port) { ip = "127.0.0.1", port = 7001 }, { ip = "127.0.0.1", port = 7002 }, { ip = "127.0.0.1", port = 7003 }, @@ -72,11 +74,11 @@ local config = { { ip = "127.0.0.1", port = 7005 }, { ip = "127.0.0.1", port = 7006 } }, - keepalive_timeout = 60000, --redis connection pool idle timeout - keepalive_cons = 1000, --redis connection pool size - connect_timeout = 1000, --timeout while connecting - max_redirection = 5, --maximum retry attempts for redirection - max_connection_attempts = 1 --maximum retry attempts for connection + keepalive_timeout = 60000, --redis connection pool idle timeout + keepalive_cons = 1000, --redis connection pool size + connect_timeout = 1000, --timeout while connecting + max_redirection = 5, --maximum retry attempts for redirection + max_connection_attempts = 1 --maximum retry attempts for connection } local redis_cluster = require "rediscluster" @@ -93,10 +95,11 @@ end ```lua local config = { - dict_name = "test_locks", --shared dictionary name for locks, if default value is not used - refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used - name = "testCluster", --rediscluster name - serv_list = { --redis cluster node list(host and port), + dict_name = "test_locks", --shared dictionary name for locks, if default value is not used + refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used + name = "testCluster", --rediscluster name + slots_info_dict_name = "test_slots_info", --shared dictionary name for slots_info + serv_list = { --redis cluster node list (host and port) { ip = "127.0.0.1", port = 7001 }, { ip = "127.0.0.1", port = 7002 }, { ip = "127.0.0.1", port = 7003 }, @@ -104,14 +107,14 @@ local config = { { ip = "127.0.0.1", port = 7005 }, { ip = "127.0.0.1", port = 7006 } }, - keepalive_timeout = 60000, --redis connection pool idle timeout - keepalive_cons = 1000, --redis connection pool size - connect_timeout = 1000, --timeout while connecting - read_timeout = 1000, --timeout while reading - send_timeout = 1000, --timeout while sending - max_redirection = 5, --maximum retry attempts for redirection, - max_connection_attempts = 1, --maximum retry attempts for connection - auth = "pass" --set password while setting auth + keepalive_timeout = 60000, --redis connection pool idle timeout + keepalive_cons = 1000, --redis connection pool size + connect_timeout = 1000, --timeout while connecting + read_timeout = 1000, --timeout while reading + send_timeout = 1000, --timeout while sending + max_redirection = 5, --maximum retry attempts for redirection + max_connection_attempts = 1, --maximum retry attempts for connection + auth = "pass" --set password while setting auth } local redis_cluster = require "rediscluster" @@ -131,8 +134,9 @@ end local cjson = require "cjson" local config = { - dict_name = "test_locks", --shared dictionary name for locks, if default value is not used - refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used + dict_name = "test_locks", --shared dictionary name for locks, if default value is not used + refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used + slots_info_dict_name = "test_slots_info", --shared dictionary name for slots_info name = "testCluster", serv_list = { { ip = "127.0.0.1", port = 7001 }, @@ -181,8 +185,9 @@ end local cjson = require "cjson" local config = { - dict_name = "test_locks", --shared dictionary name for locks, if default value is not used - refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used + dict_name = "test_locks", --shared dictionary name for locks, if default value is not used + refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used + slots_info_dict_name = "test_slots_info", --shared dictionary name for slots_info name = "testCluster", enable_slave_read = true, serv_list = { @@ -218,8 +223,9 @@ end local cjson = require "cjson" local config = { - dict_name = "test_locks", --shared dictionary name for locks, if default value is not used - refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used + dict_name = "test_locks", --shared dictionary name for locks, if default value is not used + refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used + slots_info_dict_name = "test_slots_info", --shared dictionary name for slots_info name = "testCluster", enable_slave_read = true, serv_list = { @@ -261,10 +267,11 @@ end ```lua local config = { - dict_name = "test_locks", --shared dictionary name for locks, if default value is not used - refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used - name = "testCluster", --rediscluster name - serv_list = { --redis cluster node list(host and port), + dict_name = "test_locks", --shared dictionary name for locks, if default value is not used + refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used + slots_info_dict_name = "test_slots_info", --shared dictionary name for slots_info + name = "testCluster", --rediscluster name + serv_list = { --redis cluster node list (host and port) { ip = "127.0.0.1", port = 7001 }, { ip = "127.0.0.1", port = 7002 }, { ip = "127.0.0.1", port = 7003 }, @@ -272,13 +279,13 @@ local config = { { ip = "127.0.0.1", port = 7005 }, { ip = "127.0.0.1", port = 7006 } }, - keepalive_timeout = 60000, --redis connection pool idle timeout - keepalive_cons = 1000, --redis connection pool size - connect_timeout = 1000, --timeout while connecting - read_timeout = 1000, --timeout while reading - send_timeout = 1000, --timeout while sending - max_redirection = 5, --maximum retry attempts for redirection - max_connection_attempts = 1 --maximum retry attempts for connection + keepalive_timeout = 60000, --redis connection pool idle timeout + keepalive_cons = 1000, --redis connection pool size + connect_timeout = 1000, --timeout while connecting + read_timeout = 1000, --timeout while reading + send_timeout = 1000, --timeout while sending + max_redirection = 5, --maximum retry attempts for redirection + max_connection_attempts = 1 --maximum retry attempts for connection } local redis_cluster = require "rediscluster" @@ -298,10 +305,11 @@ end ```lua local config = { - dict_name = "test_locks", --shared dictionary name for locks, if default value is not used - refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used - name = "testCluster", --rediscluster name - serv_list = { --redis cluster node list(host and port), + dict_name = "test_locks", --shared dictionary name for locks, if default value is not used + refresh_lock_key = "refresh_lock", --shared dictionary name prefix for lock of each worker, if default value is not used + slots_info_dict_name = "test_slots_info", --shared dictionary name for slots_info + name = "testCluster", --rediscluster name + serv_list = { --redis cluster node list (host and port) { ip = "127.0.0.1", port = 7001 }, { ip = "127.0.0.1", port = 7002 }, { ip = "127.0.0.1", port = 7003 }, @@ -309,11 +317,11 @@ local config = { { ip = "127.0.0.1", port = 7005 }, { ip = "127.0.0.1", port = 7006 } }, - keepalive_timeout = 60000, --redis connection pool idle timeout - keepalive_cons = 1000, --redis connection pool size - connect_timeout = 1000, --timeout while connecting - max_redirection = 5, --maximum retry attempts for redirection - max_connection_attempts = 1, --maximum retry attempts for connection + keepalive_timeout = 60000, --redis connection pool idle timeout + keepalive_cons = 1000, --redis connection pool size + connect_timeout = 1000, --timeout while connecting + max_redirection = 5, --maximum retry attempts for redirection + max_connection_attempts = 1, --maximum retry attempts for connection connect_opts = { ssl = true, ssl_verify = true, diff --git a/lib/resty/rediscluster.lua b/lib/resty/rediscluster.lua index ced2167..ccd38cb 100644 --- a/lib/resty/rediscluster.lua +++ b/lib/resty/rediscluster.lua @@ -1,6 +1,8 @@ local redis = require "resty.redis" local resty_lock = require "resty.lock" local xmodem = require "resty.xmodem" +local cjson = require "cjson.safe" + local setmetatable = setmetatable local tostring = tostring local string = string @@ -12,14 +14,12 @@ local rawget = rawget local pairs = pairs local unpack = unpack local ipairs = ipairs -local tonumber = tonumber -local match = string.match -local char = string.char local table_insert = table.insert local string_find = string.find local redis_crc = xmodem.redis_crc local DEFAULT_SHARED_DICT_NAME = "redis_cluster_slot_locks" +local DEFAULT_SLOTS_INFO_DICT_NAME = "redis_cluster_slots_info" local DEFAULT_REFRESH_DICT_NAME = "refresh_lock" local DEFAULT_MAX_REDIRECTION = 5 local DEFAULT_MAX_CONNECTION_ATTEMPTS = 3 @@ -28,6 +28,8 @@ local DEFAULT_KEEPALIVE_CONS = 1000 local DEFAULT_CONNECTION_TIMEOUT = 1000 local DEFAULT_SEND_TIMEOUT = 1000 local DEFAULT_READ_TIMEOUT = 1000 +local TOO_MANY_CONNECTIONS = "too many waiting connect operations" +local CONNECTION_POOL_TIMEOUT = "timeout" local function parse_key(key_str) local left_tag_single_index = string_find(key_str, "{", 0) @@ -40,13 +42,11 @@ local function parse_key(key_str) end end - local _M = {} local mt = { __index = _M } local slot_cache = {} -local master_nodes = {} local cmds_for_all_master = { ["flushall"] = true, @@ -58,8 +58,11 @@ local cluster_invalid_cmds = { ["shutdown"] = true } -local function redis_slot(str) - return redis_crc(parse_key(str)) +local function redis_slot(key) + if key == "no_key" then + return 1 + end + return redis_crc(parse_key(key)) end local function check_auth(self, redis_client) @@ -75,26 +78,49 @@ local function check_auth(self, redis_client) else return nil, err end - else return true, nil end end local function release_connection(red, config) - local ok,err = red:set_keepalive(config.keepalive_timeout + local ok, err = red:set_keepalive(config.keepalive_timeout or DEFAULT_KEEPALIVE_TIMEOUT, config.keepalive_cons or DEFAULT_KEEPALIVE_CONS) if not ok then - ngx.log(ngx.ERR,"set keepalive failed:", err) + ngx.log(ngx.ERR, "set keepalive failed: ", err) end end -local function split(s, delimiter) - local result = {}; - for m in (s..delimiter):gmatch("(.-)"..delimiter) do - table_insert(result, m); +local function generate_full_slots_cache_info(slots_info) + local slots = {} + -- while slots are updated, create a list of servers present in cluster + -- this can differ from self.config.serv_list if a cluster is resized (added/removed nodes) + local servers = { serv_list = {} } + for n = 1, #slots_info do + local sub_info = slots_info[n] + -- slot info item 1 and 2 are the subrange start end slots + local start_slot, end_slot = sub_info[1], sub_info[2] + local list = { serv_list = {} } + -- from 3, here lists the host/port/nodeid of in charge nodes + for j = 3, #sub_info do + table.insert(list.serv_list, { + ip = sub_info[j][1], + port = sub_info[j][2], + slave = (j > 3) -- first node in the list is the master + }) + end + + for slot = start_slot, end_slot do + slots[slot] = list + end + + -- append to the list of all servers + for _, serv in ipairs(list.serv_list) do + table.insert(servers.serv_list, serv) + end end - return result; + + return slots, servers end local function try_hosts_slots(self, serv_list) @@ -111,8 +137,8 @@ local function try_hosts_slots(self, serv_list) local redis_client = redis:new() local ok, err, max_connection_timeout_err redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, - config.send_timeout or DEFAULT_SEND_TIMEOUT, - config.read_timeout or DEFAULT_READ_TIMEOUT) + config.send_timeout or DEFAULT_SEND_TIMEOUT, + config.read_timeout or DEFAULT_READ_TIMEOUT) --attempt to connect DEFAULT_MAX_CONNECTION_ATTEMPTS times to redis for k = 1, config.max_connection_attempts or DEFAULT_MAX_CONNECTION_ATTEMPTS do @@ -125,76 +151,43 @@ local function try_hosts_slots(self, serv_list) end ok, err = redis_client:connect(ip, port, self.config.connect_opts) - if ok then break end + if ok then + break + end if err then - ngx.log(ngx.ERR,"unable to connect, attempt nr ", k, " : error: ", err) + ngx.log(ngx.ERR, "unable to connect, attempt number ", k, ", error: ", err) table_insert(errors, err) end end if ok then - local _, autherr = check_auth(self, redis_client) - if autherr then - table_insert(errors, autherr) + local _, auth_err = check_auth(self, redis_client) + if auth_err then + table_insert(errors, auth_err) return nil, errors end - local slots_info - slots_info, err = redis_client:cluster("slots") - if slots_info then - local slots = {} - -- while slots are updated, create a list of servers present in cluster - -- this can differ from self.config.serv_list if a cluster is resized (added/removed nodes) - local servers = { serv_list = {} } - for n = 1, #slots_info do - local sub_info = slots_info[n] - --slot info item 1 and 2 are the subrange start end slots - local start_slot, end_slot = sub_info[1], sub_info[2] - - -- generate new list of servers - for j = 3, #sub_info do - servers.serv_list[#servers.serv_list + 1 ] = { ip = sub_info[j][1], port = sub_info[j][2] } - end - for slot = start_slot, end_slot do - local list = { serv_list = {} } - --from 3, here lists the host/port/nodeid of in charge nodes - for j = 3, #sub_info do - list.serv_list[#list.serv_list + 1] = { ip = sub_info[j][1], port = sub_info[j][2] } - slots[slot] = list - end - end - end - --ngx.log(ngx.NOTICE, "finished initializing slotcache...") + local slots_info, slots_err = redis_client:cluster("slots") + release_connection(redis_client, config) + + if slots_info then + local slots, servers = generate_full_slots_cache_info(slots_info) + --ngx.log(ngx.NOTICE, "finished initializing slot cache...") slot_cache[self.config.name] = slots slot_cache[self.config.name .. "serv_list"] = servers - else - table_insert(errors, err) - end - -- cache master nodes - local nodes_res, nerr = redis_client:cluster("nodes") - if nodes_res then - local nodes_info = split(nodes_res, char(10)) - for _, node in ipairs(nodes_info) do - local node_info = split(node, " ") - if #node_info > 2 then - local is_master = match(node_info[3], "master") ~= nil - if is_master then - local ip_port = split(split(node_info[2], "@")[1], ":") - table_insert(master_nodes, { - ip = ip_port[1], - port = tonumber(ip_port[2]) - }) - end - end + + -- cache slots_info to memory + _, err = self:try_cache_slots_info_to_memory(slots_info) + if err then + ngx.log(ngx.ERR, 'failed to cache slots to memory: ', err) end else - table_insert(errors, nerr) + table_insert(errors, slots_err) end - release_connection(redis_client, config) - -- refresh of slots and master nodes successful + -- refreshed slots successfully -- not required to connect/iterate over additional hosts - if nodes_res and slots_info then + if slots_info then return true, nil end elseif max_connection_timeout_err then @@ -202,6 +195,7 @@ local function try_hosts_slots(self, serv_list) else table_insert(errors, err) end + if #errors == 0 then return true, nil end @@ -209,11 +203,9 @@ local function try_hosts_slots(self, serv_list) return nil, errors end - function _M.fetch_slots(self) local serv_list = self.config.serv_list local serv_list_cached = slot_cache[self.config.name .. "serv_list"] - local serv_list_combined -- if a cached serv_list is present, start with that @@ -240,11 +232,59 @@ function _M.fetch_slots(self) end end +function _M.try_load_slots_from_memory_cache(self) + local dict_name = self.config.slots_info_dict_name or DEFAULT_SLOTS_INFO_DICT_NAME + local slots_cache_dict = ngx.shared[dict_name] + if slots_cache_dict == nil then + return false, dict_name .. ' is nil' + end + + local slots_info_str = slots_cache_dict:get(self.config.name) + if not slots_info_str or slots_info_str == '' then + return false, 'slots_info_str is nil or empty' + end + + local slots_info = cjson.decode(slots_info_str) + if not slots_info then + return false, 'slots_info is nil' + end + + local slots, servers = generate_full_slots_cache_info(slots_info) + if not slots or not servers then + return false, 'slots or servers is nil' + end + + --ngx.log(ngx.NOTICE, "finished initializing slot cache...") + slot_cache[self.config.name] = slots + slot_cache[self.config.name .. "serv_list"] = servers + return true +end + +function _M.try_cache_slots_info_to_memory(self, slots_info) + local dict_name = self.config.slots_info_dict_name or DEFAULT_SLOTS_INFO_DICT_NAME + local slots_cache_dict = ngx.shared[dict_name] + if slots_cache_dict == nil then + return false, dict_name .. ' is nil' + end + + if not slots_info then + return false, 'slots_info is nil' + end + + local slots_info_str = cjson.encode(slots_info) + local success, err = slots_cache_dict:set(self.config.name, slots_info_str) + if not success then + ngx.log(ngx.ERR, 'error set slots_info: ', err, ', slots_info_str: ', slots_info_str) + return false, err + end + return true +end function _M.refresh_slots(self) local worker_id = ngx.worker.id() local lock, err, elapsed, ok - lock, err = resty_lock:new(self.config.dict_name or DEFAULT_SHARED_DICT_NAME, {time_out = 0}) + lock, err = resty_lock:new(self.config.dict_name or DEFAULT_SHARED_DICT_NAME, { time_out = 0 }) + if not lock then ngx.log(ngx.ERR, "failed to create lock in refresh slot cache: ", err) return nil, err @@ -253,23 +293,23 @@ function _M.refresh_slots(self) local refresh_lock_key = (self.config.refresh_lock_key or DEFAULT_REFRESH_DICT_NAME) .. worker_id elapsed, err = lock:lock(refresh_lock_key) if not elapsed then - return nil, 'race refresh lock fail, ' .. err + return nil, "race refresh lock fail, " .. err end self:fetch_slots() ok, err = lock:unlock() if not ok then - ngx.log(ngx.ERR, "failed to unlock in refresh slot cache:", err) + ngx.log(ngx.ERR, "failed to unlock in refresh slot cache: ", err) return nil, err end end - function _M.init_slots(self) if slot_cache[self.config.name] then -- already initialized return true end + local ok, lock, elapsed, err lock, err = resty_lock:new(self.config.dict_name or DEFAULT_SHARED_DICT_NAME) if not lock then @@ -292,143 +332,113 @@ function _M.init_slots(self) return true end + -- try to fetch slots from memory cache + ok, err = self:try_load_slots_from_memory_cache() + if ok then + ok, err = lock:unlock() + if not ok then + ngx.log(ngx.ERR, "failed to unlock in initialization slot cache: ", err) + end + return true + end + local _, errs = self:fetch_slots() if errs then ok, err = lock:unlock() if not ok then - ngx.log(ngx.ERR, "failed to unlock in initialization slot cache:", err) + ngx.log(ngx.ERR, "failed to unlock in initialization slot cache: ", err) end return nil, errs end + ok, err = lock:unlock() if not ok then - ngx.log(ngx.ERR, "failed to unlock in initialization slot cache:", err) + ngx.log(ngx.ERR, "failed to unlock in initialization slot cache: ", err) end -- initialized return true end - - function _M.new(_, config) if not config.name then - return nil, " redis cluster config name is empty" + return nil, "redis cluster config name is empty" end if not config.serv_list or #config.serv_list < 1 then - return nil, " redis cluster config serv_list is empty" + return nil, "redis cluster config serv_list is empty" end - - local inst = { config = config } - inst = setmetatable(inst, mt) - local _, err = inst:init_slots() + local instance = { config = config } + instance = setmetatable(instance, mt) + local _, err = instance:init_slots() if err then return nil, err end - return inst + return instance end - -local function pick_node(self, serv_list, slot, magic_radom_seed) - local host - local port - local slave - local index +local function pick_node(self, serv_list, magic_random_seed) if #serv_list < 1 then - return nil, nil, nil, "serv_list for slot " .. slot .. " is empty" + return nil, nil, nil, "serv_list is empty" end - if self.config.enable_slave_read then - if magic_radom_seed then - index = magic_radom_seed % #serv_list + 1 - else - index = math.random(#serv_list) - end - host = serv_list[index].ip - port = serv_list[index].port - --cluster slots will always put the master node as first - if index > 1 then - slave = true - else - slave = false - end - --ngx.log(ngx.NOTICE, "pickup node: ", c(serv_list[index])) - else - host = serv_list[1].ip - port = serv_list[1].port - slave = false + + if not self.config.enable_slave_read then --ngx.log(ngx.NOTICE, "pickup node: ", cjson.encode(serv_list[1])) + return serv_list[1].ip, serv_list[1].port, false end - return host, port, slave -end + local index + if magic_random_seed then + index = magic_random_seed % #serv_list + 1 + else + index = math.random(#serv_list) + end -local ask_host_and_port = {} - + --cluster slots will always put the master node as first + return serv_list[index].ip, serv_list[index].port, index > 1 +end -local function parse_ask_signal(res) +local function parse_signal(res, prefix) --ask signal sample:ASK 12191 127.0.0.1:7008, so we need to parse and get 127.0.0.1, 7008 if res ~= ngx.null then - if type(res) == "string" and string.sub(res, 1, 3) == "ASK" then - local matched = ngx.re.match(res, [[^ASK [^ ]+ ([^:]+):([^ ]+)]], "jo", nil, ask_host_and_port) - if not matched then - return nil, nil - end - return matched[1], matched[2] + if type(res) == "string" then + res = { res } end + if type(res) == "table" then for i = 1, #res do - if type(res[i]) == "string" and string.sub(res[i], 1, 3) == "ASK" then - local matched = ngx.re.match(res[i], [[^ASK [^ ]+ ([^:]+):([^ ]+)]], "jo", nil, ask_host_and_port) + if type(res[i]) == "string" and string.sub(res[i], 1, #prefix) == prefix then + local matched = ngx.re.match(string.sub(res[i], #prefix + 1), "^ [^ ]+ ([^:]+):([^ ]+)", "jo", nil, nil) if not matched then - return nil, nil + ngx.log(ngx.ERR, "failed to parse redirection host and port. msg is: ", res[i]) + return nil, nil, "failed to parse redirection host and port" end return matched[1], matched[2] end end end end - return nil, nil -end - -local function has_moved_signal(res) - if res ~= ngx.null then - if type(res) == "string" and string.sub(res, 1, 5) == "MOVED" then - return true - else - if type(res) == "table" then - for i = 1, #res do - if type(res[i]) == "string" and string.sub(res[i], 1, 5) == "MOVED" then - return true - end - end - end - end - end - return false + return nil, nil end - local function handle_command_with_retry(self, target_ip, target_port, asking, cmd, key, ...) local config = self.config - key = tostring(key) local slot = redis_slot(key) for k = 1, config.max_redirection or DEFAULT_MAX_REDIRECTION do - if k > 1 then - ngx.log(ngx.NOTICE, "handle retry attempts:" .. k .. " for cmd:" .. cmd .. " key:" .. key) + ngx.log(ngx.NOTICE, "handle retry attempt " .. k .. " for cmd: " .. cmd .. ", key: " .. key) end local slots = slot_cache[self.config.name] if slots == nil or slots[slot] == nil then - return nil, "not slots information present, nginx might have never successfully executed cluster(\"slots\")" + return nil, "no slots information present, nginx might have never successfully executed cluster(\"slots\")" end local serv_list = slots[slot].serv_list -- We must empty local reference to slots cache, otherwise there will be memory issue while - -- coroutine swich happens(eg. ngx.sleep, cosocket), very important! + -- coroutine switch happens(eg. ngx.sleep, cosocket), very important! slots = nil local ip, port, slave, err @@ -437,9 +447,9 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c -- asking redirection should only happens at master nodes ip, port, slave = target_ip, target_port, false else - ip, port, slave, err = pick_node(self, serv_list, slot) + ip, port, slave, err = pick_node(self, serv_list) if err then - ngx.log(ngx.ERR, "pickup node failed, will return failed for this request, meanwhile refereshing slotcache " .. err) + ngx.log(ngx.ERR, "pickup node failed, will return failed for this request. meanwhile refreshing slot cache " .. err) self:refresh_slots() return nil, err end @@ -447,14 +457,14 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c local redis_client = redis:new() redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, - config.send_timeout or DEFAULT_SEND_TIMEOUT, - config.read_timeout or DEFAULT_READ_TIMEOUT) - local ok, connerr = redis_client:connect(ip, port, self.config.connect_opts) + config.send_timeout or DEFAULT_SEND_TIMEOUT, + config.read_timeout or DEFAULT_READ_TIMEOUT) + local ok, connect_err = redis_client:connect(ip, port, self.config.connect_opts) if ok then - local authok, autherr = check_auth(self, redis_client) - if autherr then - return nil, autherr + local _, auth_err = check_auth(self, redis_client) + if auth_err then + return nil, auth_err end if slave then --set readonly @@ -474,7 +484,6 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c end end - local need_to_retry = false local res if cmd == "eval" or cmd == "evalsha" then res, err = redis_client[cmd](redis_client, ...) @@ -482,82 +491,108 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c res, err = redis_client[cmd](redis_client, key, ...) end - if err then - if string.sub(err, 1, 5) == "MOVED" then - --ngx.log(ngx.NOTICE, "find MOVED signal, trigger retry for normal commands, cmd:" .. cmd .. " key:" .. key) - --if retry with moved, we will not asking to specific ip,port anymore + if not err then + release_connection(redis_client, config) + return res, err + end + + local moved_host, moved_port, moved_err = parse_signal(err, "MOVED") + if moved_err or (moved_host ~= nil and moved_port ~= nil) then + --ngx.log(ngx.NOTICE, "found MOVED signal, trigger retry for normal commands. cmd: " .. cmd .. ", key: " .. key) + if moved_host == ip and moved_port == tostring(port) then + -- there's some issue with connection returning bad responses continuously + -- even though this node is the master of the data. + -- close it instead of returning to pool + local _, close_err = redis_client:close() + if close_err then + ngx.log(ngx.ERR, "close connection failed: ", close_err) + return nil, "failed to close redis connection!" + end + else release_connection(redis_client, config) - target_ip = nil - target_port = nil - self:refresh_slots() - need_to_retry = true + end + + target_ip = moved_host + target_port = moved_port + self:refresh_slots() + else + local ask_host, ask_port, ask_err = parse_signal(err, "ASK") + if ask_err then + return nil, ask_err + end - elseif string.sub(err, 1, 3) == "ASK" then - --ngx.log(ngx.NOTICE, "handle asking for normal commands, cmd:" .. cmd .. " key:" .. key) + if ask_host ~= nil and ask_port ~= nil then + --ngx.log(ngx.NOTICE, "handle asking for normal commands, cmd: " .. cmd .. ", key: " .. key) release_connection(redis_client, config) - if asking then - --Should not happen after asking target ip,port and still return ask, if so, return error. - return nil, "nested asking redirection occurred, client cannot retry " - else - local ask_host, ask_port = parse_ask_signal(err) - if ask_host ~= nil and ask_port ~= nil then - return handle_command_with_retry(self, ask_host, ask_port, true, cmd, key, ...) - else - return nil, " cannot parse ask redirection host and port: msg is " .. err - end + if asking then + --Should not happen after asking target ip,port and still return ask, if so, return error + return nil, "nested asking redirection occurred, client cannot retry" end + target_ip = ask_host + target_port = ask_port + asking = true elseif string.sub(err, 1, 11) == "CLUSTERDOWN" then - return nil, "Cannot executing command, cluster status is failed!" + release_connection(redis_client, config) + return nil, "cannot execute command, cluster status is failed!" else - --There might be node fail, we should also refresh slot cache + release_connection(redis_client, config) + -- There might be a node failure, we should also refresh slot cache self:refresh_slots() return nil, err end end - if not need_to_retry then - release_connection(redis_client, config) - return res, err - end else - --There might be node fail, we should also refresh slot cache - self:refresh_slots() + -- `too many waiting connect operations` means queued connect operations is out of backlog + -- `timeout` means timeout while wait for connection release + -- If connect timeout caused by server's issue, the connect_err is `connection timed out` + if connect_err ~= TOO_MANY_CONNECTIONS and connect_err ~= CONNECTION_POOL_TIMEOUT then + -- There might be a node failure, we should also refresh slot cache + self:refresh_slots() + end + if k == config.max_redirection or k == DEFAULT_MAX_REDIRECTION then -- only return after allowing for `k` attempts - return nil, connerr + return nil, connect_err end end end - return nil, "failed to execute command, reaches maximum redirection attempts" -end + return nil, "failed to execute command, reached maximum redirection attempts" +end local function generate_magic_seed(self) - --For pipeline, We don't want request to be forwarded to all channels, eg. if we have 3*3 cluster(3 master 2 replicas) we - --alway want pick up specific 3 nodes for pipeline requests, instead of 9. - --Currently we simply use (num of allnode)%count as a randomly fetch. Might consider a better way in the future. - -- use the dynamic serv_list instead of the static config serv_list - local nodeCount = #slot_cache[self.config.name .. "serv_list"].serv_list - return math.random(nodeCount) + -- For pipeline, we don't want request to be forwarded to all channels, e.g. if we have 3*3 cluster(3 master 2 replicas) + -- we always want pick up specific 3 nodes for pipeline requests, instead of 9. + -- Currently we simply use (num of allnode)%count as a randomly fetch. Might consider a better way in the future. + -- Use the dynamic serv_list instead of the static config serv_list + local node_count = #slot_cache[self.config.name .. "serv_list"].serv_list + return math.random(node_count) end local function _do_cmd_master(self, cmd, key, ...) local errors = {} - for _, master in ipairs(master_nodes) do - local redis_client = redis:new() - redis_client:set_timeouts(self.config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, - self.config.send_timeout or DEFAULT_SEND_TIMEOUT, - self.config.read_timeout or DEFAULT_READ_TIMEOUT) - local ok, err = redis_client:connect(master.ip, master.port, self.config.connect_opts) - if ok then - _, err = redis_client[cmd](redis_client, key, ...) - end - if err then - table_insert(errors, err) + local serv_list = slot_cache[self.config.name .. "serv_list"].serv_list + + for _, server in ipairs(serv_list) do + if not server.slave then + local redis_client = redis:new() + redis_client:set_timeouts(self.config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, + self.config.send_timeout or DEFAULT_SEND_TIMEOUT, + self.config.read_timeout or DEFAULT_READ_TIMEOUT) + + local ok, err = redis_client:connect(server.ip, server.port, self.config.connect_opts) + if ok then + _, err = redis_client[cmd](redis_client, key, ...) + end + if err then + table_insert(errors, err) + end + release_connection(redis_client, self.config) end - release_connection(redis_client, self.config) end + return #errors == 0, table.concat(errors, ";") end @@ -566,11 +601,11 @@ local function _do_cmd(self, cmd, key, ...) return nil, "command not supported" end + -- check if we're in middle of pipeline local _reqs = rawget(self, "_reqs") if _reqs then - local args = { ... } - local t = { cmd = cmd, key = key, args = args } - table_insert(_reqs, t) + local execution = { cmd = cmd, key = key, args = { ... } } + table_insert(_reqs, execution) return end @@ -578,50 +613,57 @@ local function _do_cmd(self, cmd, key, ...) return _do_cmd_master(self, cmd, key, ...) end - local res, err = handle_command_with_retry(self, nil, nil, false, cmd, key, ...) - return res, err + return handle_command_with_retry(self, nil, nil, false, cmd, key, ...) end - local function construct_final_pipeline_resp(self, node_res_map, node_req_map) - --construct final result with origin index - local finalret = {} + --construct final result with original index + local final_response = {} for k, v in pairs(node_res_map) do local reqs = node_req_map[k].reqs local res = v local need_to_fetch_slots = true + for i = 1, #reqs do --deal with redis cluster ask redirection - local ask_host, ask_port = parse_ask_signal(res[i]) + local ask_host, ask_port, ask_err = parse_signal(res[i], "ASK") + if ask_err then + return nil, ask_err + end + if ask_host ~= nil and ask_port ~= nil then - --ngx.log(ngx.NOTICE, "handle ask signal for cmd:" .. reqs[i]["cmd"] .. " key:" .. reqs[i]["key"] .. " target host:" .. ask_host .. " target port:" .. ask_port) - local askres, err = handle_command_with_retry(self, ask_host, ask_port, true, reqs[i]["cmd"], reqs[i]["key"], unpack(reqs[i]["args"])) + --ngx.log(ngx.NOTICE, "handle ask signal for cmd: " .. reqs[i]["cmd"] .. ", key: " .. reqs[i]["key"] .. ", target host: " .. ask_host .. ", target port: " .. ask_port) + local ask_res, err = handle_command_with_retry(self, ask_host, ask_port, true, reqs[i]["cmd"], reqs[i]["key"], unpack(reqs[i]["args"])) if err then return nil, err - else - finalret[reqs[i].origin_index] = askres end - elseif has_moved_signal(res[i]) then - --ngx.log(ngx.NOTICE, "handle moved signal for cmd:" .. reqs[i]["cmd"] .. " key:" .. reqs[i]["key"]) - if need_to_fetch_slots then - -- if there is multiple signal for moved, we just need to fetch slot cache once, and do retry. - self:refresh_slots() - need_to_fetch_slots = false - end - local movedres, err = handle_command_with_retry(self, nil, nil, false, reqs[i]["cmd"], reqs[i]["key"], unpack(reqs[i]["args"])) - if err then - return nil, err + + final_response[reqs[i].origin_index] = ask_res + else + local moved_host, moved_port, moved_err = parse_signal(res[i], "MOVED") + if moved_err or (moved_host ~= nil and moved_port ~= nil) then + --ngx.log(ngx.NOTICE, "handle moved signal for cmd: " .. reqs[i]["cmd"] .. ", key: " .. reqs[i]["key"]) + if need_to_fetch_slots then + -- if there are multiple signals for moved, we just need to fetch slot cache once and retry + self:refresh_slots() + need_to_fetch_slots = false + end + + local moved_res, err = handle_command_with_retry(self, moved_host, moved_port, false, reqs[i]["cmd"], reqs[i]["key"], unpack(reqs[i]["args"])) + if err then + return nil, err + end + + final_response[reqs[i].origin_index] = moved_res else - finalret[reqs[i].origin_index] = movedres + final_response[reqs[i].origin_index] = res[i] end - else - finalret[reqs[i].origin_index] = res[i] end end end - return finalret -end + return final_response +end local function has_cluster_fail_signal_in_pipeline(res) for i = 1, #res do @@ -636,16 +678,14 @@ local function has_cluster_fail_signal_in_pipeline(res) return false end - function _M.init_pipeline(self) self._reqs = {} end - function _M.commit_pipeline(self) local _reqs = rawget(self, "_reqs") - - if not _reqs or #_reqs == 0 then return + if not _reqs or #_reqs == 0 then + return nil, "no pipeline" end self._reqs = nil @@ -653,30 +693,29 @@ function _M.commit_pipeline(self) local slots = slot_cache[config.name] if slots == nil then - return nil, "not slots information present, nginx might have never successfully executed cluster(\"slots\")" + return nil, "no slots information present, nginx might have never successfully executed cluster(\"slots\")" end local node_res_map = {} - local node_req_map = {} - local magicRandomPickupSeed = generate_magic_seed(self) + local magic_random_seed = generate_magic_seed(self) --construct req to real node mapping for i = 1, #_reqs do - -- Because we will forward req to different nodes, so the result will not be the origin order, - -- we need to record the original index and finally we can construct the result with origin order + -- Because we forward requests to different nodes, the result might not be the original order, + -- we need to record the original index to be able to later construct the result with original order _reqs[i].origin_index = i local key = _reqs[i].key local slot = redis_slot(tostring(key)) if slots[slot] == nil then - return nil, "not slots information present, nginx might have never successfully executed cluster(\"slots\")" + return nil, "no slots information present, nginx might have never successfully executed cluster(\"slots\")" end local slot_item = slots[slot] - local ip, port, slave, err = pick_node(self, slot_item.serv_list, slot, magicRandomPickupSeed) + local ip, port, slave, err = pick_node(self, slot_item.serv_list, magic_random_seed) if err then -- We must empty local reference to slots cache, otherwise there will be memory issue while - -- coroutine swich happens(eg. ngx.sleep, cosocket), very important! + -- coroutine switch happens (e.g. ngx.sleep, cosocket), very important! slots = nil self:refresh_slots() return nil, err @@ -692,7 +731,7 @@ function _M.commit_pipeline(self) end -- We must empty local reference to slots cache, otherwise there will be memory issue while - -- coroutine swich happens(eg. ngx.sleep, cosocket), very important! + -- coroutine switch happens (e.g. ngx.sleep, cosocket), very important! slots = nil for k, v in pairs(node_req_map) do @@ -702,22 +741,22 @@ function _M.commit_pipeline(self) local slave = v.slave local redis_client = redis:new() redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, - config.send_timeout or DEFAULT_SEND_TIMEOUT, - config.read_timeout or DEFAULT_READ_TIMEOUT) - local ok, err = redis_client:connect(ip, port, self.config.connect_opts) + config.send_timeout or DEFAULT_SEND_TIMEOUT, + config.read_timeout or DEFAULT_READ_TIMEOUT) + local ok, connect_err = redis_client:connect(ip, port, self.config.connect_opts) if ok then - local authok, autherr = check_auth(self, redis_client) - if autherr then - return nil, autherr + local _, auth_err = check_auth(self, redis_client) + if auth_err then + return nil, auth_err end if slave then --set readonly - local ok, err = redis_client:readonly() - if not ok then + local readonly_ok, readonly_err = redis_client:readonly() + if not readonly_ok then self:refresh_slots() - return nil, err + return nil, readonly_err end end @@ -734,61 +773,67 @@ function _M.commit_pipeline(self) redis_client[req.cmd](redis_client, req.key) end end - local res, err = redis_client:commit_pipeline() - if err then - --There might be node fail, we should also refresh slot cache + + local res, commit_err = redis_client:commit_pipeline() + if commit_err then + -- There might be a node failure, we should also refresh slot cache self:refresh_slots() - return nil, err .. " return from " .. tostring(ip) .. ":" .. tostring(port) + return nil, commit_err .. " returned from " .. tostring(ip) .. ":" .. tostring(port) end + release_connection(redis_client, config) if has_cluster_fail_signal_in_pipeline(res) then - return nil, "Cannot executing pipeline command, cluster status is failed!" + return nil, "cannot execute pipeline command, cluster status is failed!" end - release_connection(redis_client, config) + node_res_map[k] = res else - --There might be node fail, we should also refresh slot cache - self:refresh_slots() - return nil, err .. "pipeline commit failed while connecting to " .. tostring(ip) .. ":" .. tostring(port) + -- `too many waiting connect operations` means queued connect operations is out of backlog + -- `timeout` means timeout while wait for connection release + -- If connect timeout caused by server's issue, the connect_err is `connection timed out` + if connect_err ~= TOO_MANY_CONNECTIONS and connect_err ~= CONNECTION_POOL_TIMEOUT then + -- There might be a node failure, we should also refresh slot cache + self:refresh_slots() + end + return nil, connect_err .. " pipeline commit failed while connecting to " .. tostring(ip) .. ":" .. tostring(port) end end - --construct final result with origin index + --construct final result with original index local final_res, err = construct_final_pipeline_resp(self, node_res_map, node_req_map) - if not err then - return final_res - else - return nil, err .. " failed to construct final pipeline result " + if err then + return nil, err .. " failed to construct final pipeline result" end + return final_res end - function _M.cancel_pipeline(self) self._reqs = nil end local function _do_eval_cmd(self, cmd, ...) ---[[ -eval command usage: -eval(script, 1, key, arg1, arg2 ...) -eval(script, 0, arg1, arg2 ...) -]] - local args = {...} + --[[ + eval command usage: + eval(script, 1, key, arg1, arg2 ...) + eval(script, 0, arg1, arg2 ...) + ]] + local args = { ... } local keys_num = args[2] if type(keys_num) ~= "number" then - return nil, "Cannot execute eval without keys number" + return nil, "cannot execute eval without keys number" end if keys_num > 1 then - return nil, "Cannot execute eval with more than one keys for redis cluster" + return nil, "cannot execute eval with more than one keys for redis cluster" end - local key = args[3] or "no_key" + + local key = (keys_num == 1 and args[3]) or "no_key" return _do_cmd(self, cmd, key, ...) end + -- dynamic cmd setmetatable(_M, { __index = function(_, cmd) - local method = - function(self, ...) + local method = function(self, ...) if cmd == "eval" or cmd == "evalsha" then return _do_eval_cmd(self, cmd, ...) else @@ -796,8 +841,7 @@ setmetatable(_M, { end end - -- cache the lazily generated method in our - -- module table + -- cache the lazily generated method in our module table _M[cmd] = method return method end