-
Notifications
You must be signed in to change notification settings - Fork 8.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add least_connections load balancing algorithm. #9025
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
local util = require("util") | ||
local split = require("util.split") | ||
require("resty.core") | ||
|
||
local ngx = ngx | ||
local ipairs = ipairs | ||
local tostring = tostring | ||
local string = string | ||
local tonumber = tonumber | ||
local setmetatable = setmetatable | ||
local string_format = string.format | ||
local ngx_log = ngx.log | ||
local INFO = ngx.INFO | ||
local WARN = ngx.WARN | ||
|
||
|
||
local _M = { name = "leastconn" } | ||
|
||
function _M.new(self, backend) | ||
local o = { | ||
peers = backend.endpoints | ||
} | ||
setmetatable(o, self) | ||
self.__index = self | ||
return o | ||
end | ||
|
||
function _M.is_affinitized() | ||
return false | ||
end | ||
|
||
|
||
local function get_upstream_name(upstream) | ||
return upstream.address .. ":" .. upstream.port | ||
end | ||
|
||
|
||
function _M.balance(self) | ||
local peers = self.peers | ||
local endpoint = peers[1] | ||
local endpoints = ngx.shared.balancer_leastconn | ||
local feasible_endpoints = {} | ||
|
||
if #peers ~= 1 then | ||
local lowestconns = 9999 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we set this to max int or something higher? There very well could be peers with more than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not certain what max int is in this case due to the various ways Lua could be compiled, but it's definitely higher than 9999 :). As a safe balance I've changed it to maximum signed 32 bit int, which should still be more connections than we're likely to support in other places. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This thread is marked as resolved, but I still see this set to |
||
-- find the lowest connection count | ||
for _, peer in pairs(peers) do | ||
local conns = endpoints:get(get_upstream_name(peer)) | ||
alowde marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if conns == nil then | ||
endpoints:set(get_upstream_name(peer),0,600) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we set an expire time on this? If we go There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My concern with depending on the sync method to handle garbage collection is trusting that the sync method will be reliably called with updates, and that we won't lose the list of peers between updates. With that said the main reason I was thinking to distrust sync was in case of nginx restarts - but then in that case the shared dict would be reset anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also agree the expiration time should be infinite. Do you have a reason to be suspicious of the sync method not being called reliably? I hope it is something we can depend on. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No specific reason to be suspicious, call it a generalised sense of professional paranoia :). Will fix the expiration time shortly. |
||
conns = 0 | ||
end | ||
ngx_log(WARN, "Found ", conns, " conns for peer ", get_upstream_name(peer)) | ||
alowde marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if conns <= lowestconns then | ||
lowestconns = conns | ||
end | ||
end | ||
|
||
-- get peers with lowest connections | ||
for _, peer in pairs(peers) do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like we have two for loops over There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
local conns = endpoints:get(get_upstream_name(peer)) | ||
if conns ~= nil and conns == lowestconns then | ||
feasible_endpoints[#feasible_endpoints+1] = peer | ||
end | ||
end | ||
ngx_log(WARN, "got ", #feasible_endpoints, " feasible endpoints") | ||
|
||
endpoint = feasible_endpoints[math.random(1,#feasible_endpoints)] | ||
end | ||
|
||
|
||
ngx_log(WARN, "chose endpoint ", get_upstream_name(endpoint)) | ||
-- Update the endpoint connection count with a TTL of 10 minutes | ||
endpoints:incr(get_upstream_name(endpoint),1,1,600) | ||
|
||
return get_upstream_name(endpoint) | ||
end | ||
|
||
function _M.after_balance(_) | ||
local endpoints = ngx.shared.balancer_leastconn | ||
local upstream = split.get_last_value(ngx.var.upstream_addr) | ||
|
||
ngx_log(WARN, "decrement conn count for upstream ", upstream) | ||
|
||
if util.is_blank(upstream) then | ||
return | ||
end | ||
ngx_log(WARN, "decrement endpoints", upstream) | ||
ngx_log(WARN, endpoints:incr(upstream,-1,0,600)) | ||
end | ||
|
||
function _M.sync(self, backend) | ||
local normalized_endpoints_added, normalized_endpoints_removed = | ||
util.diff_endpoints(self.peers, backend.endpoints) | ||
|
||
if #normalized_endpoints_added == 0 and #normalized_endpoints_removed == 0 then | ||
ngx_log(WARN, "endpoints did not change for backend " .. tostring(backend.name)) | ||
return | ||
end | ||
|
||
ngx_log(WARN, string_format("[%s] peers have changed for backend %s", self.name, backend.name)) | ||
|
||
self.peers = backend.endpoints | ||
|
||
for _, endpoint_string in ipairs(normalized_endpoints_removed) do | ||
ngx.shared.balancer_leastconn:delete(endpoint_string) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing these endpoints here may not have the desired affect. If a pod has a bunch of connections, briefly fails its readiness probe (causing it to no longer be included as a backend), we will remove it from being tracked here. But if the pod becomes ready again, it may still be handling a bunch of connections, but we will initialize its value to 0 again. Maybe this is OK, but it is worth calling out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's not ideal behaviour. I think it would be safe to instead set expiration time to (say) 30 minutes. This would ensure we still track connection count for connections up to that time while not allowing the shared dict to grow indefinitely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Setting it to expire asynchronously makes some sense, though it will be difficult to choose the TTL. It makes sense that this least conn load balancer would still track pods that are handling connections, even if the pod is not ready. It should decrement the number of connections on a backend that finishes a request, even if that pod is not ready for example. Is there a way we can tell the difference between a pod that is not ready now (but therefore may become ready soon) versus a pod that is being killed (and therefore will never become ready again, and would be safe to fully delete)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Not to the best of my knowledge, and that's the heart of the issue - all we know is that an upstream address has disappeared from the list of endpoints for that service. We could find out more by querying the API server but that introduces substantially more complexity that I think the ingress-nginx team would not want to take on. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This thread is still a bit concerning to me. It is unintuitive to me that a pod that becomes unready will result in ingress-nginx thinking it has 0 connections. Maybe setting the TTL to 2 days and just letting the entry in the map expire if we don't see it again in 2 days makes sense? (2 days is chosen somewhat arbitrarily) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update: all of these effects were due to removing nodes from the table when they "go missing", plus there was an off-by-one error that has now been fixed on my fork, see the other thread |
||
end | ||
|
||
for _, endpoint_string in ipairs(normalized_endpoints_added) do | ||
ngx.shared.balancer_leastconn:set(endpoint_string,0,600) | ||
end | ||
|
||
end | ||
|
||
return _M |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
local util = require("util") | ||
local say = require("say") | ||
|
||
local original_ngx = ngx | ||
local function reset_ngx() | ||
_G.ngx = original_ngx | ||
end | ||
|
||
local function included_in(state, arguments) | ||
if not type(arguments[1]) == "table" or #arguments ~= 2 then | ||
return false | ||
end | ||
|
||
local table = arguments[1] | ||
for _, value in pairs(table) do | ||
if value == arguments[2] then | ||
return true | ||
end | ||
end | ||
return false | ||
end | ||
assert:register("assertion", "included_in", included_in, "assertion.has_property.positive", "assertion.has_property.negative") | ||
|
||
local function mock_ngx(mock) | ||
local _ngx = mock | ||
setmetatable(_ngx, { __index = ngx }) | ||
_G.ngx = _ngx | ||
end | ||
|
||
local function flush_connection_count() | ||
ngx.shared.balancer_leastconn:flush_all() | ||
end | ||
|
||
local function set_backend_count(endpoint_string, count) | ||
ngx.shared.balancer_leastconn:set(endpoint_string, count) | ||
end | ||
|
||
describe("Balancer leastconn", function() | ||
local balancer_leastconn = require("balancer.leastconn") | ||
local ngx_now = 1543238266 | ||
local backend, instance | ||
|
||
before_each(function() | ||
package.loaded["balancer.leastconn"] = nil | ||
balancer_leastconn = require("balancer.leastconn") | ||
|
||
backend = { | ||
name = "namespace-service-port", | ||
["load-balance"] = "least_connections", | ||
endpoints = { | ||
{ address = "10.10.10.1", port = "8080" }, | ||
{ address = "10.10.10.2", port = "8080" }, | ||
{ address = "10.10.10.3", port = "8080" }, | ||
} | ||
} | ||
set_backend_count("10.10.10.1:8080", 0) | ||
set_backend_count("10.10.10.2:8080", 1) | ||
set_backend_count("10.10.10.3:8080", 5) | ||
|
||
instance = balancer_leastconn:new(backend) | ||
end) | ||
|
||
after_each(function() | ||
reset_ngx() | ||
flush_connection_count() | ||
end) | ||
|
||
describe("after_balance()", function() | ||
it("updates connection count", function() | ||
ngx.var = { upstream_addr = "10.10.10.2:8080" } | ||
|
||
local count_before = ngx.shared.balancer_leastconn:get(ngx.var.upstream_addr) | ||
instance:after_balance() | ||
local count_after = ngx.shared.balancer_leastconn:get(ngx.var.upstream_addr) | ||
|
||
assert.are.equals(count_before - 1, count_after) | ||
end) | ||
end) | ||
|
||
describe("balance()", function() | ||
it("increments connection count on selected peer", function() | ||
local single_endpoint_backend = util.deepcopy(backend) | ||
table.remove(single_endpoint_backend.endpoints, 3) | ||
table.remove(single_endpoint_backend.endpoints, 2) | ||
local single_endpoint_instance = balancer_leastconn:new(single_endpoint_backend) | ||
|
||
local upstream = single_endpoint_backend.endpoints[1] | ||
local upstream_name = upstream.address .. ":" .. upstream.port | ||
|
||
set_backend_count(upstream_name, 0) | ||
single_endpoint_instance:balance() | ||
local count_after = ngx.shared.balancer_leastconn:get(upstream_name) | ||
|
||
assert.are.equals(1, count_after) | ||
end) | ||
|
||
it("returns single endpoint when the given backend has only one endpoint", function() | ||
local single_endpoint_backend = util.deepcopy(backend) | ||
table.remove(single_endpoint_backend.endpoints, 3) | ||
table.remove(single_endpoint_backend.endpoints, 2) | ||
local single_endpoint_instance = balancer_leastconn:new(single_endpoint_backend) | ||
|
||
local peer = single_endpoint_instance:balance() | ||
|
||
assert.are.equals("10.10.10.1:8080", peer) | ||
end) | ||
|
||
it("picks the endpoint with lowest connection count", function() | ||
local two_endpoints_backend = util.deepcopy(backend) | ||
table.remove(two_endpoints_backend.endpoints, 2) | ||
local two_endpoints_instance = balancer_leastconn:new(two_endpoints_backend) | ||
|
||
local peer = two_endpoints_instance:balance() | ||
|
||
assert.equal("10.10.10.1:8080", peer) | ||
end) | ||
|
||
it("picks one of the endpoints with tied lowest connection count", function() | ||
set_backend_count("10.10.10.1:8080", 8) | ||
set_backend_count("10.10.10.2:8080", 5) | ||
set_backend_count("10.10.10.3:8080", 5) | ||
|
||
local peer = instance:balance() | ||
assert.included_in({"10.10.10.2:8080", "10.10.10.3:8080"}, peer) | ||
end) | ||
|
||
end) | ||
|
||
describe("sync()", function() | ||
it("does not reset stats when endpoints do not change", function() | ||
local new_backend = util.deepcopy(backend) | ||
|
||
instance:sync(new_backend) | ||
|
||
assert.are.same(new_backend.endpoints, instance.peers) | ||
assert.are.same(new_backend.endpoints, backend.endpoints) | ||
end) | ||
|
||
it("updates peers, deletes stats for old endpoints and sets connection count to zero for new ones", function() | ||
local new_backend = util.deepcopy(backend) | ||
|
||
-- existing endpoint 10.10.10.2 got deleted | ||
-- and replaced with 10.10.10.4 | ||
new_backend.endpoints[2].address = "10.10.10.4" | ||
-- and there's one new extra endpoint | ||
table.insert(new_backend.endpoints, { address = "10.10.10.5", port = "8080" }) | ||
|
||
instance:sync(new_backend) | ||
|
||
assert.are.same(new_backend.endpoints, instance.peers) | ||
|
||
assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.1:8080"), 0) | ||
assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.2:8080"), nil) | ||
assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.3:8080"), 5) | ||
assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.4:8080"), 0) | ||
assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.5:8080"), 0) | ||
end) | ||
end) | ||
|
||
end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we behave when
peers
is empty?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that if we reached this line it would cause a Lua exception and Nginx would return an internal server error response, but I'm checking the actual behaviour with some more tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't come across a test for this yet. Worth adding one?