Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Consistent Hashing with Bounded Loads #9239

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions internal/ingress/annotations/upstreamhashby/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ type upstreamhashby struct {

// Config contains the Consistent hash configuration to be used in the Ingress
type Config struct {
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
UpstreamHashByBalanceFactor float32 `json:"upstream-hash-by-balance-factor,omitempty"`
}

// NewParser creates a new UpstreamHashBy annotation parser
Expand All @@ -44,10 +45,11 @@ func (a upstreamhashby) Parse(ing *networking.Ingress) (interface{}, error) {
upstreamHashBy, _ := parser.GetStringAnnotation("upstream-hash-by", ing)
upstreamHashBySubset, _ := parser.GetBoolAnnotation("upstream-hash-by-subset", ing)
upstreamHashbySubsetSize, _ := parser.GetIntAnnotation("upstream-hash-by-subset-size", ing)
upstreamHashByBalanceFactor, _ := parser.GetFloatAnnotation("upstream-hash-by-balance-factor", ing)

if upstreamHashbySubsetSize == 0 {
upstreamHashbySubsetSize = 3
}

return &Config{upstreamHashBy, upstreamHashBySubset, upstreamHashbySubsetSize}, nil
return &Config{upstreamHashBy, upstreamHashBySubset, upstreamHashbySubsetSize, upstreamHashByBalanceFactor}, nil
}
1 change: 1 addition & 0 deletions internal/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B
upstreams[name].UpstreamHashBy.UpstreamHashBy = anns.UpstreamHashBy.UpstreamHashBy
upstreams[name].UpstreamHashBy.UpstreamHashBySubset = anns.UpstreamHashBy.UpstreamHashBySubset
upstreams[name].UpstreamHashBy.UpstreamHashBySubsetSize = anns.UpstreamHashBy.UpstreamHashBySubsetSize
upstreams[name].UpstreamHashBy.UpstreamHashByBalanceFactor = anns.UpstreamHashBy.UpstreamHashByBalanceFactor

upstreams[name].LoadBalancing = anns.LoadBalancing
if upstreams[name].LoadBalancing == "" {
Expand Down
12 changes: 12 additions & 0 deletions internal/ingress/defaults/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ type Backend struct {
// Default 3
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size"`

// Configures percentage of average cluster load to bound per upstream host.
// For example, with a value of 1.5 no upstream host will get a load more than 1.5x times
// the average load of all the hosts in the cluster.
//
// This is implemented based on the method described in the paper https://arxiv.org/abs/1608.01350
// This is an O(N) algorithm, unlike other load balancers.
// Using a lower hash_balance_factor results in more hosts being probed,
// so use a higher value if you require better performance.
//
// Defaults to 2 (meaning a host might be overloaded 2x compared to average)
UpstreamHashByBalanceFactor float32 `json:"upstream-hash-by-balance-factor"`

// Let's us choose a load balancing algorithm per ingress
LoadBalancing string `json:"load-balance"`

Expand Down
7 changes: 4 additions & 3 deletions pkg/apis/ingress/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ type CookieSessionAffinity struct {

// UpstreamHashByConfig described setting from the upstream-hash-by* annotations.
type UpstreamHashByConfig struct {
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
UpstreamHashByBalanceFactor float32 `json:"upstream-hash-by-balance-factor,omitempty"`
}

// Endpoint describes a kubernetes endpoint in a backend
Expand Down
6 changes: 5 additions & 1 deletion rootfs/etc/nginx/lua/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ local configuration = require("configuration")
local round_robin = require("balancer.round_robin")
local chash = require("balancer.chash")
local chashsubset = require("balancer.chashsubset")
local chashboundedloads = require("balancer.chashboundedloads")
local sticky_balanced = require("balancer.sticky_balanced")
local sticky_persistent = require("balancer.sticky_persistent")
local ewma = require("balancer.ewma")
Expand All @@ -29,6 +30,7 @@ local IMPLEMENTATIONS = {
round_robin = round_robin,
chash = chash,
chashsubset = chashsubset,
chashboundedloads = chashboundedloads,
sticky_balanced = sticky_balanced,
sticky_persistent = sticky_persistent,
ewma = ewma,
Expand All @@ -55,7 +57,9 @@ local function get_implementation(backend)

elseif backend["upstreamHashByConfig"] and
backend["upstreamHashByConfig"]["upstream-hash-by"] then
if backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then
if backend["upstreamHashByConfig"]["upstream-hash-by-balance-factor"] then
name = "chashboundedloads"
elseif backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then
name = "chashsubset"
else
name = "chash"
Expand Down
257 changes: 257 additions & 0 deletions rootfs/etc/nginx/lua/balancer/chashboundedloads.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
-- Implements Consistent Hashing with Bounded Loads based on the paper [1].
-- For the specified hash-balance-factor, requests to any upstream host are capped
-- at hash_balance_factor times the average number of requests across the cluster.
-- When a request arrives for an upstream host that is currently serving at its max capacity,
-- linear probing is used to identify the next eligible host.
--
-- This is an O(N) algorithm, unlike other load balancers. Using a lower hash-balance-factor
-- results in more hosts being probed, so use a higher value if you require better performance.
--
-- [1]: https://arxiv.org/abs/1608.01350

local resty_roundrobin = require("resty.roundrobin")
local resty_chash = require("resty.chash")
local setmetatable = setmetatable
local lrucache = require("resty.lrucache")

local util = require("util")
local split = require("util.split")
local reverse_table = util.reverse_table
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to also include reverse_table implementation in this PR.


local string_format = string.format
local INFO = ngx.INFO
local ngx_ERR = ngx.ERR
local ngx_WARN = ngx.WARN
local ngx_log = ngx.log
local math_ceil = math.ceil
local ipairs = ipairs
local ngx = ngx

local DEFAULT_HASH_BALANCE_FACTOR = 2

local HOST_SEED = util.get_host_seed()

-- Controls how many "tenants" we'll keep track of
-- to avoid routing them to alternative_backends
-- as they were already consistently routed to some endpoint.
-- Lowering this value will increases the chances of more
-- tenants being routed to alternative_backends.
-- Similarly, increasing this value will keep more tenants
-- consistently routed to the same endpoint in the main backend.
local SEEN_LRU_SIZE = 1000

local _M = {}

local function incr_req_stats(self, endpoint)
if not self.requests_by_endpoint[endpoint] then
self.requests_by_endpoint[endpoint] = 1
else
self.requests_by_endpoint[endpoint] = self.requests_by_endpoint[endpoint] + 1
end
self.total_requests = self.total_requests + 1
end

local function decr_req_stats(self, endpoint)
if self.requests_by_endpoint[endpoint] then
self.requests_by_endpoint[endpoint] = self.requests_by_endpoint[endpoint] - 1
if self.requests_by_endpoint[endpoint] == 0 then
self.requests_by_endpoint[endpoint] = nil
end
end
self.total_requests = self.total_requests - 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if self.total_requests is greater than 0?

Copy link
Contributor Author

@kirs kirs Feb 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should never happen according to our tests. The only point I could see is a critical panic-like log, is that what you've been thinking?
If the algorithm is wrong in some way and we've missed a bug, I'd rather let us run with negative total_requests and let something else break more loudly than if we let it quietly never decrement it.

end

local function get_hash_by_value(self)
if not ngx.ctx.chash_hash_by_value then
ngx.ctx.chash_hash_by_value = util.generate_var_value(self.hash_by)
end

local v = ngx.ctx.chash_hash_by_value
if v == "" then
return nil
end
return v
end

local function endpoint_eligible(self, endpoint)
-- (num_requests * hash-balance-factor / num_servers)
local allowed = math_ceil(
(self.total_requests + 1) * self.balance_factor / self.total_endpoints)
local current = self.requests_by_endpoint[endpoint]
if current == nil then
return true, 0, allowed
else
return current < allowed, current, allowed
end
end

local function update_balance_factor(self, backend)
local balance_factor = backend["upstreamHashByConfig"]["upstream-hash-by-balance-factor"]
if balance_factor and balance_factor <= 1 then
ngx_log(ngx_WARN,
"upstream-hash-by-balance-factor must be > 1. Forcing it to the default value of ",
DEFAULT_HASH_BALANCE_FACTOR)
balance_factor = DEFAULT_HASH_BALANCE_FACTOR
end
self.balance_factor = balance_factor or DEFAULT_HASH_BALANCE_FACTOR
end

local function normalize_endpoints(endpoints)
local b = {}
for i, endpoint in ipairs(endpoints) do
b[i] = string_format("%s:%s", endpoint.address, endpoint.port)
end
return b
end

local function update_endpoints(self, endpoints)
self.endpoints = endpoints
self.endpoints_reverse = reverse_table(endpoints)
self.total_endpoints = #endpoints
self.ring_seed = util.array_mod(HOST_SEED, self.total_endpoints)
end

function _M.is_affinitized(self)
-- alternative_backends might contain a canary backend that gets a percentage of traffic.
-- If a tenant has already been consistently routed to a endpoint, we want to stick to that
-- to keep a higher cache ratio, rather than routing it to an alternative backend.
-- This would mean that alternative backends (== canary) would mostly be seeing "new" tenants.

if not self.alternative_backends or not self.alternative_backends[1] then
return false
end

local hash_by_value = get_hash_by_value(self)
if not hash_by_value then
return false
end

return self.seen_hash_by_values:get(hash_by_value) ~= nil
end

function _M.new(self, backend)
local nodes = util.get_nodes(backend.endpoints)

local complex_val, err =
util.parse_complex_value(backend["upstreamHashByConfig"]["upstream-hash-by"])
if err ~= nil then
ngx_log(ngx_ERR, "could not parse the value of the upstream-hash-by: ", err)
end

local o = {
name = "chashboundedloads",

chash = resty_chash:new(nodes),
roundrobin = resty_roundrobin:new(nodes),
alternative_backends = backend.alternativeBackends,
hash_by = complex_val,

requests_by_endpoint = {},
total_requests = 0,
seen_hash_by_values = lrucache.new(SEEN_LRU_SIZE)
}

update_endpoints(o, normalize_endpoints(backend.endpoints))
update_balance_factor(o, backend)

setmetatable(o, self)
self.__index = self
return o
end

function _M.sync(self, backend)
self.alternative_backends = backend.alternativeBackends

update_balance_factor(self, backend)

local new_endpoints = normalize_endpoints(backend.endpoints)

if util.deep_compare(self.endpoints, new_endpoints) then
ngx_log(INFO, "endpoints did not change for backend", backend.name)
return
end

ngx_log(INFO, string_format("[%s] endpoints have changed for backend %s",
self.name, backend.name))

update_endpoints(self, new_endpoints)

local nodes = util.get_nodes(backend.endpoints)
self.chash:reinit(nodes)
self.roundrobin:reinit(nodes)

self.seen_hash_by_values = lrucache.new(SEEN_LRU_SIZE)

ngx_log(INFO, string_format("[%s] nodes have changed for backend %s", self.name, backend.name))
end

function _M.balance(self)
local hash_by_value = get_hash_by_value(self)

-- Tenant key not available, falling back to round-robin
if not hash_by_value then
local endpoint = self.roundrobin:find()
ngx.var.chashbl_debug = "fallback_round_robin"
return endpoint
end

self.seen_hash_by_values:set(hash_by_value, true)

local tried_endpoints
if not ngx.ctx.balancer_chashbl_tried_endpoints then
tried_endpoints = {}
ngx.ctx.balancer_chashbl_tried_endpoints = tried_endpoints
else
tried_endpoints = ngx.ctx.balancer_chashbl_tried_endpoints
end

local first_endpoint = self.chash:find(hash_by_value)
local index = self.endpoints_reverse[first_endpoint]

-- By design, resty.chash always points to the same element of the ring,
-- regardless of the environment. In this algorithm, we want the consistency
-- to be "seeded" based on the host where it's running.
-- That's how both Envoy and Haproxy implement this.
-- For convenience, we keep resty.chash but manually introduce the seed.
index = util.array_mod(index + self.ring_seed, self.total_endpoints)

for i=0, self.total_endpoints-1 do
local j = util.array_mod(index + i, self.total_endpoints)
local endpoint = self.endpoints[j]

if not tried_endpoints[endpoint] then
local eligible, current, allowed = endpoint_eligible(self, endpoint)

if eligible then
ngx.var.chashbl_debug = string_format(
"attempt=%d score=%d allowed=%d total_requests=%d hash_by_value=%s",
i, current, allowed, self.total_requests, hash_by_value)

incr_req_stats(self, endpoint)
tried_endpoints[endpoint] = true
return endpoint
end
end
end

-- Normally, this case should never be reach out because with balance_factor > 1
-- there should always be an eligible endpoint.
-- This would get reached only if the number of endpoints is less or equal
-- than max Nginx retries and tried_endpoints contains all endpoints.
incr_req_stats(self, first_endpoint)
ngx.var.chashbl_debug = "fallback_first_endpoint"
return first_endpoint
end

function _M.after_balance(self)
local tried_upstreams = split.split_upstream_var(ngx.var.upstream_addr)
if (not tried_upstreams) or (not get_hash_by_value(self)) then
return
end

for _, addr in ipairs(tried_upstreams) do
decr_req_stats(self, addr)
end
end

return _M
Loading