Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
666986d
fix: improve least_conn balancer implementation
coder2z May 29, 2025
ff0ddb2
Merge branch 'master' of github.com:apache/apisix into fix/websocket-…
coder2z Jun 10, 2025
197330b
fix: md lint
coder2z Jun 10, 2025
78eeadc
fix: lint+md lint
coder2z Jun 13, 2025
30ae704
fix: lint+md lint
coder2z Jun 13, 2025
cec5172
fix: lint+md lint
coder2z Jun 13, 2025
007ed0b
fix: lint+md lint
coder2z Jun 13, 2025
5dbf4c3
Merge branch 'master' of https://github.com/apache/apisix into fix/we…
Jun 20, 2025
164b5bc
fix: fix test .t lua_shared_dict config
Jun 20, 2025
7b63ecc
feat: master
coder2z Aug 26, 2025
a84e799
fix: test file
coder2z Aug 26, 2025
91f33be
fix: test file
coder2z Aug 27, 2025
13ac9c1
fix: test file
coder2z Aug 27, 2025
a000604
fix: test file
coder2z Aug 27, 2025
cfa31ad
fix: test file
coder2z Aug 27, 2025
900d3dd
fix: lint
coder2z Aug 28, 2025
6161f3c
fix: lint
coder2z Aug 28, 2025
cf62d64
fix: lint
coder2z Aug 29, 2025
e9e311a
Merge branch 'master' into fix/websocket-least-conn
coder2z Sep 3, 2025
983ef0e
fix(test): improve sls-logger.t TEST 16 error log pattern matching
coder2z Sep 4, 2025
072f40f
Merge branch 'master' of github.com:apache/apisix into fix/websocket-…
coder2z Oct 9, 2025
cece69c
fix: fix test
coder2z Oct 9, 2025
8a123d0
fix: fix test
coder2z Oct 9, 2025
8708d2a
Merge branch 'fix/websocket-least-conn' of github.com:coder2z/apisix …
coder2z Oct 9, 2025
08abdce
fix: fix test
coder2z Oct 9, 2025
8cf000e
Merge branch 'master' of https://github.com/apache/apisix into fix/we…
coder2z Oct 11, 2025
02824ed
Merge branch 'master' of https://github.com/apache/apisix into fix/we…
coder2z Oct 11, 2025
2550721
fix: lint
coder2z Oct 11, 2025
4ebc7d5
fix: lint
coder2z Oct 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions apisix/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,21 @@ local function pick_server(route, ctx)

local nodes_count = #up_conf.nodes
if nodes_count == 1 then
local node = up_conf.nodes[1]
ctx.balancer_ip = node.host
ctx.balancer_port = node.port
node.upstream_host = parse_server_for_upstream_host(node, ctx.upstream_scheme)
return node
-- For least_conn balancer, we still need to use the balancer even with single node
-- to track connection counts for future load balancing decisions
if up_conf.type == "least_conn" then
core.log.debug(
"single node with least_conn balancer",
"still using balancer for connection tracking"
)
else
core.log.info("single node with ", up_conf.type, " balancer - skipping balancer")
local node = up_conf.nodes[1]
ctx.balancer_ip = node.host
ctx.balancer_port = node.port
node.upstream_host = parse_server_for_upstream_host(node, ctx.upstream_scheme)
return node
end
end

local version = ctx.upstream_version
Expand Down
133 changes: 124 additions & 9 deletions apisix/balancer/least_conn.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,134 @@

local core = require("apisix.core")
local binaryHeap = require("binaryheap")
local dkjson = require("dkjson")
local ipairs = ipairs
local pairs = pairs

local ngx = ngx
local ngx_shared = ngx.shared
local tostring = tostring

local _M = {}

-- Shared dictionary to store connection counts across balancer recreations
local CONN_COUNT_DICT_NAME = "balancer-least-conn"
local conn_count_dict

local function least_score(a, b)
return a.score < b.score
end

-- Get the connection count key for a specific upstream and server
local function get_conn_count_key(upstream, server)
local upstream_id = upstream.id
if not upstream_id then
-- Fallback to a hash of the upstream configuration using stable encoding
upstream_id = ngx.crc32_short(dkjson.encode(upstream))
core.log.debug("generated upstream_id from hash: ", upstream_id)
end
local key = "conn_count:" .. tostring(upstream_id) .. ":" .. server
core.log.debug("generated connection count key: ", key)
return key
end


-- Get the current connection count for a server from shared dict
local function get_server_conn_count(upstream, server)
local key = get_conn_count_key(upstream, server)
local count, err = conn_count_dict:get(key)
if err then
core.log.error("failed to get connection count for ", server, ": ", err)
return 0
end
local result = count or 0
core.log.debug("retrieved connection count for server ", server, ": ", result)
return result
end

-- Increment the connection count for a server
local function incr_server_conn_count(upstream, server, delta)
local key = get_conn_count_key(upstream, server)
local new_count, err = conn_count_dict:incr(key, delta or 1, 0)
if not new_count then
core.log.error("failed to increment connection count for ", server, ": ", err)
return 0
end
core.log.debug("incremented connection count for server ", server, " by ", delta or 1,
", new count: ", new_count)
return new_count
end


-- Clean up connection counts for servers that are no longer in the upstream
local function cleanup_stale_conn_counts(upstream, current_servers)
local upstream_id = upstream.id
if not upstream_id then
upstream_id = ngx.crc32_short(dkjson.encode(upstream))
end

local prefix = "conn_count:" .. tostring(upstream_id) .. ":"
core.log.debug("cleaning up stale connection counts with prefix: ", prefix)
local keys, err = conn_count_dict:get_keys(0) -- Get all keys
if err then
core.log.error("failed to get keys from shared dict: ", err)
return
end

for _, key in ipairs(keys or {}) do
if core.string.has_prefix(key, prefix) then
local server = key:sub(#prefix + 1)
if not current_servers[server] then
-- This server is no longer in the upstream, clean it up
local ok, delete_err = conn_count_dict:delete(key)
if not ok and delete_err then
core.log.error("failed to delete stale connection count for server ",
server, ": ", delete_err)
else
core.log.info("cleaned up stale connection count for server: ", server)
end
end
end
end
end

function _M.new(up_nodes, upstream)
if not conn_count_dict then
conn_count_dict = ngx_shared[CONN_COUNT_DICT_NAME]
end

if not conn_count_dict then
core.log.error("shared dict '", CONN_COUNT_DICT_NAME, "' not found")
return nil, "shared dict not found"
end

local servers_heap = binaryHeap.minUnique(least_score)

-- Clean up stale connection counts for removed servers
cleanup_stale_conn_counts(upstream, up_nodes)

for server, weight in pairs(up_nodes) do
local score = 1 / weight
-- Get the persisted connection count for this server
local conn_count = get_server_conn_count(upstream, server)
-- Score directly reflects weighted connection count
local score = (conn_count + 1) / weight

core.log.debug("initializing server ", server,
" | weight: ", weight,
" | conn_count: ", conn_count,
" | score: ", score,
" | upstream_id: ", upstream.id or "no-id")

-- Note: the argument order of insert is different from others
servers_heap:insert({
server = server,
effect_weight = 1 / weight,
weight = weight,
score = score,
}, server)
end

return {
upstream = upstream,
get = function (ctx)
get = function(ctx)
local server, info, err
if ctx.balancer_tried_servers then
local tried_server_list = {}
Expand Down Expand Up @@ -75,15 +176,29 @@ function _M.new(up_nodes, upstream)
return nil, err
end

info.score = info.score + info.effect_weight
-- Get current connection count for detailed logging
local current_conn_count = get_server_conn_count(upstream, server)
info.score = (current_conn_count + 1) / info.weight
servers_heap:update(server, info)
incr_server_conn_count(upstream, server, 1)
return server
end,
after_balance = function (ctx, before_retry)
after_balance = function(ctx, before_retry)
local server = ctx.balancer_server
local info = servers_heap:valueByPayload(server)
info.score = info.score - info.effect_weight
if not info then
core.log.error("server info not found for: ", server)
return
end

local current_conn_count = get_server_conn_count(upstream, server)
info.score = (current_conn_count - 1) / info.weight
if info.score < 0 then
info.score = 0 -- Prevent negative scores
end
servers_heap:update(server, info)
-- Decrement connection count in shared dict
incr_server_conn_count(upstream, server, -1)

if not before_retry then
if ctx.balancer_tried_servers then
Expand All @@ -100,7 +215,7 @@ function _M.new(up_nodes, upstream)

ctx.balancer_tried_servers[server] = true
end,
before_retry_next_priority = function (ctx)
before_retry_next_priority = function(ctx)
if ctx.balancer_tried_servers then
core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
ctx.balancer_tried_servers = nil
Expand All @@ -109,5 +224,5 @@ function _M.new(up_nodes, upstream)
}
end


return _M

3 changes: 3 additions & 0 deletions apisix/cli/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ local _M = {
["prometheus-metrics"] = "15m",
["standalone-config"] = "10m",
["status-report"] = "1m",
["balancer-least-conn"] = "10m",
}
},
stream = {
Expand All @@ -113,6 +114,7 @@ local _M = {
["worker-events-stream"] = "10m",
["tars-stream"] = "1m",
["upstream-healthcheck-stream"] = "10m",
["balancer-least-conn"] = "10m",
}
},
main_configuration_snippet = "",
Expand Down Expand Up @@ -160,6 +162,7 @@ local _M = {
["balancer-ewma"] = "10m",
["balancer-ewma-locks"] = "10m",
["balancer-ewma-last-touched-at"] = "10m",
["balancer-least-conn"] = "10m",
["plugin-limit-req-redis-cluster-slot-lock"] = "1m",
["plugin-limit-count-redis-cluster-slot-lock"] = "1m",
["plugin-limit-conn-redis-cluster-slot-lock"] = "1m",
Expand Down
6 changes: 6 additions & 0 deletions apisix/cli/ngx_tpl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ lua {
{% if status then %}
lua_shared_dict status-report {* meta.lua_shared_dict["status-report"] *};
{% end %}
{% if enable_stream then %}
lua_shared_dict balancer-least-conn {* meta.lua_shared_dict["balancer-least-conn"] *};
{% end %}
}

{% if enabled_stream_plugins["prometheus"] and not enable_http then %}
Expand Down Expand Up @@ -284,6 +287,9 @@ http {
lua_shared_dict balancer-ewma {* http.lua_shared_dict["balancer-ewma"] *};
lua_shared_dict balancer-ewma-locks {* http.lua_shared_dict["balancer-ewma-locks"] *};
lua_shared_dict balancer-ewma-last-touched-at {* http.lua_shared_dict["balancer-ewma-last-touched-at"] *};
{% if not enable_stream then %}
lua_shared_dict balancer-least-conn {* http.lua_shared_dict["balancer-least-conn"] *};
{% end %}
lua_shared_dict etcd-cluster-health-check {* http.lua_shared_dict["etcd-cluster-health-check"] *}; # etcd health check

# for discovery shared dict
Expand Down
Loading
Loading