Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ai-proxy-multi): support healthcheck #12063

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apisix/consumer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ local function filter(consumer)
return
end

plugin.set_plugins_meta_parent(consumer.value.plugins, consumer)

-- We expect the id is the same as username. Fix up it here if it isn't.
consumer.value.id = consumer.value.username
end
Expand Down
3 changes: 3 additions & 0 deletions apisix/http/service.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
--
local core = require("apisix.core")
local apisix_upstream = require("apisix.upstream")
local plugin = require("apisix.plugin")
local plugin_checker = require("apisix.plugin").plugin_checker
local services
local error = error
Expand Down Expand Up @@ -46,6 +47,8 @@ local function filter(service)
return
end

plugin.set_plugins_meta_parent(service.value.plugins, service)

apisix_upstream.filter_upstream(service.value.upstream, service)

core.log.info("filter service: ", core.json.delay_encode(service, true))
Expand Down
25 changes: 25 additions & 0 deletions apisix/plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ local error = error
-- make linter happy to avoid error: getting the Lua global "load"
-- luacheck: globals load, ignore lua_load
local lua_load = load
local getmetatable = getmetatable
local setmetatable = setmetatable

local is_http = ngx.config.subsystem == "http"
local local_plugins_hash = core.table.new(0, 32)
local stream_local_plugins = core.table.new(32, 0)
Expand Down Expand Up @@ -1161,6 +1164,28 @@ local function run_meta_pre_function(conf, api_ctx, name)
end
end


function _M.set_plugins_meta_parent(plugins, parent)
if not plugins then
return
end
for _, plugin_conf in pairs(plugins) do
if not plugin_conf._meta then
plugin_conf._meta = {}
end
if not plugin_conf._meta.parent then
local mt_table = getmetatable(plugin_conf._meta)
if mt_table then
mt_table.parent = parent
else
plugin_conf._meta = setmetatable(plugin_conf._meta,
{ __index = {parent = parent} })
end
end
end
end


function _M.run_plugin(phase, plugins, api_ctx)
local plugin_run = false
api_ctx = api_ctx or ngx.ctx.api_ctx
Expand Down
210 changes: 203 additions & 7 deletions apisix/plugins/ai-proxy-multi.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@ local core = require("apisix.core")
local schema = require("apisix.plugins.ai-proxy.schema")
local base = require("apisix.plugins.ai-proxy.base")
local plugin = require("apisix.plugin")
local ipmatcher = require("resty.ipmatcher")
local events = require("apisix.events")

local tonumber = tonumber
local pairs = pairs
local tostring = tostring

local require = require
local pcall = pcall
local ipairs = ipairs
local type = type

local priority_balancer = require("apisix.balancer.priority")
local healthcheck

local pickers = {}
local lrucache_server_picker = core.lrucache.new({
Expand Down Expand Up @@ -119,16 +126,197 @@ local function transform_instances(new_instances, instance)
end


local function create_server_picker(conf, ups_tab)
local function parse_domain_for_node(node)
local host = node.domain or node.host
if not ipmatcher.parse_ipv4(host)
and not ipmatcher.parse_ipv6(host)
then
node.domain = host

local ip, err = core.resolver.parse_domain(host)
if ip then
node.host = ip
end

if err then
core.log.error("dns resolver domain: ", host, " error: ", err)
end
end
end


local function resolve_endpoint(instance_conf)
local endpoint = core.table.try_read_attr(instance_conf, "override", "endpoint")
local scheme, host, port, _ = endpoint:match("^(https?)://([^:/]+):?(%d*)(/?.*)$")
if port == "" then
port = (scheme == "https") and "443" or "80"
end
local node = {
host = host,
port = tonumber(port),
scheme = scheme,
}
parse_domain_for_node(node)
return node
end


local function get_healthchecker_name(conf, instance_name)
return core.table.concat({plugin_name, tostring(conf), instance_name}, "#")
end


local function release_checkers(healthcheck_parent)
local ai_checkers = healthcheck_parent.ai_checkers
core.log.info("try to release ai_checkers: ", tostring(ai_checkers))
for _, checker in pairs(ai_checkers) do
checker:clear()
checker:stop()
end
end


local function get_checkers_status_ver(checkers)
local status_ver_total = 0
for _, checker in pairs(checkers) do
status_ver_total = status_ver_total + checker.status_ver
end
return status_ver_total
end


local function create_checkers(conf)
if healthcheck == nil then
healthcheck = require("resty.healthcheck")
end

local healthcheck_parent = conf._meta.parent
if healthcheck_parent.ai_checkers and healthcheck_parent.ai_checker_conf == conf then
return healthcheck_parent.ai_checkers
end

if conf.is_creating_ai_checkers then
core.log.info("another request is creating new checker")
return nil
end
conf.is_creating_ai_checkers = true

local ai_checkers = core.table.new(0, #conf.instances)

for _, ins in ipairs(conf.instances) do
if ins.checks then
core.log.info("create new healthcheck instance for ai_instance: ", ins.name,
" checks: ", core.json.delay_encode(ins.checks, true))
local checker, err = healthcheck.new({
name = get_healthchecker_name(conf, ins.name),
shm_name = "upstream-healthcheck",
checks = ins.checks,
events_module = events:get_healthcheck_events_modele(),
})
if not checker then
core.log.error("failed to create healthcheck instance: ", err)
conf.is_creating_ai_checkers = nil
return nil
end
ai_checkers[ins.name] = checker
end
end

if healthcheck_parent.ai_checkers then
local ok, err = pcall(core.config_util.cancel_clean_handler, healthcheck_parent,
healthcheck_parent.ai_checkers_idx, true)
if not ok then
core.log.error("cancel clean handler error: ", err)
end
end

for _, ins in ipairs(conf.instances) do
local node = resolve_endpoint(ins)
local host = ins.checks and ins.checks.active and ins.checks.active.host
local port = ins.checks and ins.checks.active and ins.checks.active.port
local checker = ai_checkers[ins.name]
if checker then
local ok, err = checker:add_target(node.host, port or node.port, host)
if not ok then
core.log.error("failed to add new health check target: ", node.host, ":",
port or node.port, " err: ", err)
end
end
end

healthcheck_parent.clean_handlers = healthcheck_parent.clean_handlers or {}
local check_idx, err = core.config_util.add_clean_handler(healthcheck_parent, release_checkers)
if not check_idx then
conf.is_creating_ai_checkers = nil
for _, checker in pairs(ai_checkers) do
checker:clear()
checker:stop()
end
core.log.error("failed to add clean handler, err:",
err, " healthcheck parent:", core.json.delay_encode(healthcheck_parent, true))

return nil
end

healthcheck_parent.ai_checkers = ai_checkers
healthcheck_parent.ai_checkers_idx = check_idx
healthcheck_parent.ai_checker_conf = conf

conf.is_creating_ai_checkers = nil

return ai_checkers
end


local function fetch_health_instances(conf, checkers)
local instances = conf.instances
local new_instances = core.table.new(0, #instances)
if not checkers then
for _, ins in ipairs(conf.instances) do
transform_instances(new_instances, ins)
end
return new_instances
end

for _, ins in ipairs(instances) do
local checker = checkers[ins.name]
if checker then
local host = ins.checks and ins.checks.active and ins.checks.active.host
local port = ins.checks and ins.checks.active and ins.checks.active.port

local node = resolve_endpoint(ins)
local ok, err = checker:get_target_status(node.host, port or node.port, host)
if ok then
transform_instances(new_instances, ins)
elseif err then
core.log.error("failed to get health check target status, addr: ",
node.host, ":", port or node.port, ", host: ", host, ", err: ", err)
end
else
transform_instances(new_instances, ins)
end
end

if core.table.nkeys(new_instances) == 0 then
core.log.warn("all upstream nodes is unhealthy, use default")
for _, ins in ipairs(instances) do
transform_instances(new_instances, ins)
end
end

return new_instances
end


local function create_server_picker(conf, ups_tab, checkers)
local picker = pickers[conf.balancer.algorithm] -- nil check
if not picker then
pickers[conf.balancer.algorithm] = require("apisix.balancer." .. conf.balancer.algorithm)
picker = pickers[conf.balancer.algorithm]
end
local new_instances = {}
for _, ins in ipairs(conf.instances) do
transform_instances(new_instances, ins)
end

local new_instances = fetch_health_instances(conf, checkers)
core.log.info("fetch health instances: ", core.json.delay_encode(new_instances))

if #new_instances._priority_index > 1 then
core.log.info("new instances: ", core.json.delay_encode(new_instances))
Expand All @@ -150,10 +338,18 @@ end


local function pick_target(ctx, conf, ups_tab)
local checkers = #conf.instances > 1 and create_checkers(conf)

local version = plugin.conf_version(conf)
if checkers then
local status_ver = get_checkers_status_ver(checkers)
version = version .. "#" .. status_ver
end

local server_picker = ctx.server_picker
if not server_picker then
server_picker = lrucache_server_picker(ctx.matched_route.key, plugin.conf_version(conf),
create_server_picker, conf, ups_tab)
server_picker = lrucache_server_picker(ctx.matched_route.key, version,
create_server_picker, conf, ups_tab, checkers)
end
if not server_picker then
return nil, nil, "failed to fetch server picker"
Expand Down
9 changes: 9 additions & 0 deletions apisix/plugins/ai-proxy/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local schema_def = require("apisix.schema_def")

local _M = {}

local auth_item_schema = {
Expand Down Expand Up @@ -120,6 +122,13 @@ local ai_instance_schema = {
},
},
},
checks = {
type = "object",
properties = {
active = schema_def.health_checker_active,
},
required = {"active"}
},
required = {"name", "provider", "auth", "weight"}
},
}
Expand Down
4 changes: 4 additions & 0 deletions apisix/router.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ local require = require
local http_route = require("apisix.http.route")
local apisix_upstream = require("apisix.upstream")
local core = require("apisix.core")
local set_plugins_meta_parent = require("apisix.plugin").set_plugins_meta_parent

local str_lower = string.lower
local ipairs = ipairs

Expand All @@ -33,6 +35,8 @@ local function filter(route)
return
end

set_plugins_meta_parent(route.value.plugins, route)

if route.value.host then
route.value.host = str_lower(route.value.host)
elseif route.value.hosts then
Expand Down
Loading
Loading