-
Notifications
You must be signed in to change notification settings - Fork 8.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add least_connections load balancing algorithm. #9025
base: main
Are you sure you want to change the base?
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
local util = require("util") | ||
local split = require("util.split") | ||
require("resty.core") | ||
|
||
local ngx = ngx | ||
local ipairs = ipairs | ||
local setmetatable = setmetatable | ||
local string_format = string.format | ||
local ngx_log = ngx.log | ||
local DEBUG = ngx.DEBUG | ||
|
||
|
||
local _M = { name = "leastconn" } | ||
|
||
function _M.new(self, backend) | ||
local o = { | ||
peers = backend.endpoints | ||
} | ||
setmetatable(o, self) | ||
self.__index = self | ||
return o | ||
end | ||
|
||
function _M.is_affinitized() | ||
return false | ||
end | ||
|
||
|
||
local function get_upstream_name(upstream) | ||
return upstream.address .. ":" .. upstream.port | ||
end | ||
|
||
|
||
function _M.balance(self) | ||
local peers = self.peers | ||
local endpoint = peers[1] | ||
local endpoints = ngx.shared.balancer_leastconn | ||
local feasible_endpoints = {} | ||
|
||
if #peers ~= 1 then | ||
local lowestconns = 2147483647 | ||
-- find the lowest connection count | ||
for _, peer in pairs(peers) do | ||
local peer_name = get_upstream_name(peer) | ||
local peer_conn_count = endpoints:get(peer_name) | ||
if peer_conn_count == nil then | ||
-- Peer has never been recorded as having connections - add it to the connection | ||
-- tracking table and the list of feasible peers | ||
endpoints:set(peer_name,0,0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm surprised we don't have a default expiry on these entries. Does it make sense to add a long expiry (2 days) by default, just to catch any leaks? |
||
lowestconns = 0 | ||
feasible_endpoints[#feasible_endpoints+1] = peer | ||
elseif peer_conn_count < lowestconns then | ||
-- Peer has fewer connections than any other peer evaluated so far - add it as the | ||
-- only feasible endpoint for now | ||
feasible_endpoints = {peer} | ||
lowestconns = peer_conn_count | ||
elseif peer_conn_count == lowestconns then | ||
-- Peer has equal fewest connections as other peers - add it to the list of | ||
-- feasible peers | ||
feasible_endpoints[#feasible_endpoints+1] = peer | ||
end | ||
end | ||
ngx_log(DEBUG, "select from ", #feasible_endpoints, " feasible endpoints out of ", #peers) | ||
endpoint = feasible_endpoints[math.random(1,#feasible_endpoints)] | ||
end | ||
|
||
local selected_endpoint = get_upstream_name(endpoint) | ||
ngx_log(DEBUG, "selected endpoint ", selected_endpoint) | ||
|
||
-- Update the endpoint connection count | ||
endpoints:incr(selected_endpoint,1,1,0) | ||
|
||
return selected_endpoint | ||
end | ||
|
||
function _M.after_balance(_) | ||
local endpoints = ngx.shared.balancer_leastconn | ||
local upstream = split.get_last_value(ngx.var.upstream_addr) | ||
|
||
if util.is_blank(upstream) then | ||
return | ||
end | ||
endpoints:incr(upstream,-1,0,0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a feeling there is a bug somewhere (most likely with removing/adding endpoints) which either calls this twice, or otherwise does this the "wrong" way; ![]() These graphs show a massively divergent usage, all similar in increasing amount and such, which leads me to think that this either never reaches "0" after some TTL, or otherwise does not properly "reset" when it sees that it has gone below zero There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way it is currently written, once a pod becomes unready, the ingress-nginx will effectively set the "number of connections" it is tracking to 0. I mentioned this as well as a an alternative solution in a comment above: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know, that code is totally unnecessary, I've changed it around in my fork (It still seems to exhibit an off-by-one error somewhere after indeterminate amount of time after an indeterminate event, which is why I haven't yet suggested it here) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've found the off-by-one error, and i've committed it here, if anyone's interested; https://github.com/ShadowJonathan/ingress-nginx/tree/add-least_connections-load-balancing |
||
end | ||
|
||
function _M.sync(self, backend) | ||
local normalized_endpoints_added, normalized_endpoints_removed = | ||
util.diff_endpoints(self.peers, backend.endpoints) | ||
|
||
if #normalized_endpoints_added == 0 and #normalized_endpoints_removed == 0 then | ||
return | ||
end | ||
|
||
ngx_log(DEBUG, string_format("[%s] peers have changed for backend %s", self.name, backend.name)) | ||
|
||
self.peers = backend.endpoints | ||
|
||
for _, endpoint_string in ipairs(normalized_endpoints_removed) do | ||
ngx.shared.balancer_leastconn:delete(endpoint_string) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing these endpoints here may not have the desired affect. If a pod has a bunch of connections, briefly fails its readiness probe (causing it to no longer be included as a backend), we will remove it from being tracked here. But if the pod becomes ready again, it may still be handling a bunch of connections, but we will initialize its value to 0 again. Maybe this is OK, but it is worth calling out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's not ideal behaviour. I think it would be safe to instead set expiration time to (say) 30 minutes. This would ensure we still track connection count for connections up to that time while not allowing the shared dict to grow indefinitely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Setting it to expire asynchronously makes some sense, though it will be difficult to choose the TTL. It makes sense that this least conn load balancer would still track pods that are handling connections, even if the pod is not ready. It should decrement the number of connections on a backend that finishes a request, even if that pod is not ready for example. Is there a way we can tell the difference between a pod that is not ready now (but therefore may become ready soon) versus a pod that is being killed (and therefore will never become ready again, and would be safe to fully delete)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Not to the best of my knowledge, and that's the heart of the issue - all we know is that an upstream address has disappeared from the list of endpoints for that service. We could find out more by querying the API server but that introduces substantially more complexity that I think the ingress-nginx team would not want to take on. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This thread is still a bit concerning to me. It is unintuitive to me that a pod that becomes unready will result in ingress-nginx thinking it has 0 connections. Maybe setting the TTL to 2 days and just letting the entry in the map expire if we don't see it again in 2 days makes sense? (2 days is chosen somewhat arbitrarily) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update: all of these effects were due to removing nodes from the table when they "go missing", plus there was an off-by-one error that has now been fixed on my fork, see the other thread |
||
end | ||
|
||
for _, endpoint_string in ipairs(normalized_endpoints_added) do | ||
ngx.shared.balancer_leastconn:set(endpoint_string,0,0) | ||
end | ||
|
||
end | ||
|
||
return _M |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
local util = require("util") | ||
local say = require("say") | ||
|
||
local original_ngx = ngx | ||
local function reset_ngx() | ||
_G.ngx = original_ngx | ||
end | ||
|
||
local function included_in(state, arguments) | ||
if not type(arguments[1]) == "table" or #arguments ~= 2 then | ||
return false | ||
end | ||
|
||
local table = arguments[1] | ||
for _, value in pairs(table) do | ||
if value == arguments[2] then | ||
return true | ||
end | ||
end | ||
return false | ||
end | ||
assert:register("assertion", "included_in", included_in, "assertion.has_property.positive", "assertion.has_property.negative") | ||
|
||
local function mock_ngx(mock) | ||
local _ngx = mock | ||
setmetatable(_ngx, { __index = ngx }) | ||
_G.ngx = _ngx | ||
end | ||
|
||
local function flush_connection_count() | ||
ngx.shared.balancer_leastconn:flush_all() | ||
end | ||
|
||
local function set_backend_count(endpoint_string, count) | ||
ngx.shared.balancer_leastconn:set(endpoint_string, count) | ||
end | ||
|
||
describe("Balancer leastconn", function() | ||
local balancer_leastconn = require("balancer.leastconn") | ||
local ngx_now = 1543238266 | ||
local backend, instance | ||
|
||
before_each(function() | ||
package.loaded["balancer.leastconn"] = nil | ||
balancer_leastconn = require("balancer.leastconn") | ||
|
||
backend = { | ||
name = "namespace-service-port", | ||
["load-balance"] = "least_connections", | ||
endpoints = { | ||
{ address = "10.10.10.1", port = "8080" }, | ||
{ address = "10.10.10.2", port = "8080" }, | ||
{ address = "10.10.10.3", port = "8080" }, | ||
} | ||
} | ||
set_backend_count("10.10.10.1:8080", 0) | ||
set_backend_count("10.10.10.2:8080", 1) | ||
set_backend_count("10.10.10.3:8080", 5) | ||
|
||
instance = balancer_leastconn:new(backend) | ||
end) | ||
|
||
after_each(function() | ||
reset_ngx() | ||
flush_connection_count() | ||
end) | ||
|
||
describe("after_balance()", function() | ||
it("updates connection count", function() | ||
ngx.var = { upstream_addr = "10.10.10.2:8080" } | ||
|
||
local count_before = ngx.shared.balancer_leastconn:get(ngx.var.upstream_addr) | ||
instance:after_balance() | ||
local count_after = ngx.shared.balancer_leastconn:get(ngx.var.upstream_addr) | ||
|
||
assert.are.equals(count_before - 1, count_after) | ||
end) | ||
end) | ||
|
||
describe("balance()", function() | ||
it("increments connection count on selected peer", function() | ||
local single_endpoint_backend = util.deepcopy(backend) | ||
table.remove(single_endpoint_backend.endpoints, 3) | ||
table.remove(single_endpoint_backend.endpoints, 2) | ||
local single_endpoint_instance = balancer_leastconn:new(single_endpoint_backend) | ||
|
||
local upstream = single_endpoint_backend.endpoints[1] | ||
local upstream_name = upstream.address .. ":" .. upstream.port | ||
|
||
set_backend_count(upstream_name, 0) | ||
single_endpoint_instance:balance() | ||
local count_after = ngx.shared.balancer_leastconn:get(upstream_name) | ||
|
||
assert.are.equals(1, count_after) | ||
end) | ||
|
||
it("returns single endpoint when the given backend has only one endpoint", function() | ||
local single_endpoint_backend = util.deepcopy(backend) | ||
table.remove(single_endpoint_backend.endpoints, 3) | ||
table.remove(single_endpoint_backend.endpoints, 2) | ||
local single_endpoint_instance = balancer_leastconn:new(single_endpoint_backend) | ||
|
||
local peer = single_endpoint_instance:balance() | ||
|
||
assert.are.equals("10.10.10.1:8080", peer) | ||
end) | ||
|
||
it("picks the endpoint with lowest connection count", function() | ||
local two_endpoints_backend = util.deepcopy(backend) | ||
table.remove(two_endpoints_backend.endpoints, 2) | ||
local two_endpoints_instance = balancer_leastconn:new(two_endpoints_backend) | ||
|
||
local peer = two_endpoints_instance:balance() | ||
|
||
assert.equal("10.10.10.1:8080", peer) | ||
end) | ||
|
||
it("picks one of the endpoints with tied lowest connection count", function() | ||
set_backend_count("10.10.10.1:8080", 8) | ||
set_backend_count("10.10.10.2:8080", 5) | ||
set_backend_count("10.10.10.3:8080", 5) | ||
|
||
local peer = instance:balance() | ||
assert.included_in({"10.10.10.2:8080", "10.10.10.3:8080"}, peer) | ||
end) | ||
|
||
end) | ||
|
||
describe("sync()", function() | ||
it("does not reset stats when endpoints do not change", function() | ||
local new_backend = util.deepcopy(backend) | ||
|
||
instance:sync(new_backend) | ||
|
||
assert.are.same(new_backend.endpoints, instance.peers) | ||
assert.are.same(new_backend.endpoints, backend.endpoints) | ||
end) | ||
|
||
it("updates peers, deletes stats for old endpoints and sets connection count to zero for new ones", function() | ||
local new_backend = util.deepcopy(backend) | ||
|
||
-- existing endpoint 10.10.10.2 got deleted | ||
-- and replaced with 10.10.10.4 | ||
new_backend.endpoints[2].address = "10.10.10.4" | ||
-- and there's one new extra endpoint | ||
table.insert(new_backend.endpoints, { address = "10.10.10.5", port = "8080" }) | ||
|
||
instance:sync(new_backend) | ||
|
||
assert.are.same(new_backend.endpoints, instance.peers) | ||
|
||
assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.1:8080"), 0) | ||
assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.2:8080"), nil) | ||
assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.3:8080"), 5) | ||
assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.4:8080"), 0) | ||
assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.5:8080"), 0) | ||
end) | ||
end) | ||
|
||
end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we behave when
peers
is empty?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that if we reached this line it would cause a Lua exception and Nginx would return an internal server error response, but I'm checking the actual behaviour with some more tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't come across a test for this yet. Worth adding one?