diff --git a/apisix/discovery/kubernetes/informer_factory.lua b/apisix/discovery/kubernetes/informer_factory.lua index fd434c047391..d31841b1e20d 100644 --- a/apisix/discovery/kubernetes/informer_factory.lua +++ b/apisix/discovery/kubernetes/informer_factory.lua @@ -285,7 +285,7 @@ local function list_watch(informer, apiserver) core.log.info("begin to list ", informer.kind) informer.fetch_state = "listing" - if informer.pre_List then + if informer.pre_list then informer:pre_list() end @@ -298,7 +298,7 @@ local function list_watch(informer, apiserver) end informer.fetch_state = "list finished" - if informer.post_List then + if informer.post_list then informer:post_list() end diff --git a/apisix/discovery/kubernetes/init.lua b/apisix/discovery/kubernetes/init.lua index 695a9dd7f684..b6092d8ee51d 100644 --- a/apisix/discovery/kubernetes/init.lua +++ b/apisix/discovery/kubernetes/init.lua @@ -51,7 +51,7 @@ local function sort_nodes_cmp(left, right) return left.port < right.port end -local function on_endpoint_slices_modified(handle, endpoint) +local function on_endpoint_slices_modified(handle, endpoint, operate) if handle.namespace_selector and not handle:namespace_selector(endpoint.metadata.namespace) then return @@ -114,10 +114,15 @@ local function on_endpoint_slices_modified(handle, endpoint) if err then core.log.error("set endpoint into discovery DICT failed, ", err) handle.endpoint_dict:delete(endpoint_key .. "#version") + return + end + if operate == "list" then + handle.current_keys_hash[endpoint_key] = true + handle.current_keys_hash[endpoint_key .. "#version"] = true end end -local function on_endpoint_modified(handle, endpoint) +local function on_endpoint_modified(handle, endpoint, operate) if handle.namespace_selector and not handle:namespace_selector(endpoint.metadata.namespace) then return @@ -177,6 +182,11 @@ local function on_endpoint_modified(handle, endpoint) if err then core.log.error("set endpoint into discovery DICT failed, ", err) handle.endpoint_dict:delete(endpoint_key .. "#version") + return + end + if operate == "list" then + handle.current_keys_hash[endpoint_key] = true + handle.current_keys_hash[endpoint_key .. "#version"] = true end end @@ -195,12 +205,23 @@ end local function pre_list(handle) - handle.endpoint_dict:flush_all() + handle.current_keys_hash = {} + handle.existing_keys = handle.endpoint_dict:get_keys(0) end local function post_list(handle) - handle.endpoint_dict:flush_expired() + if not handle.existing_keys or not handle.current_keys_hash then + return + end + for _, key in ipairs(handle.existing_keys) do + if not handle.current_keys_hash[key] then + core.log.info("kubernetes discovery module find dirty data in shared dict, key:", key) + handle.endpoint_dict:delete(key) + end + end + handle.existing_keys = nil + handle.current_keys_hash = nil end @@ -369,7 +390,7 @@ local function get_apiserver(conf) end local function create_endpoint_lrucache(endpoint_dict, endpoint_key, endpoint_port) - local endpoint_content = endpoint_dict:get_stale(endpoint_key) + local endpoint_content = endpoint_dict:get(endpoint_key) if not endpoint_content then core.log.error("get empty endpoint content from discovery DIC, this should not happen ", endpoint_key) @@ -497,7 +518,7 @@ local function single_mode_nodes(service_name) local endpoint_dict = ctx local endpoint_key = match[1] local endpoint_port = match[2] - local endpoint_version = endpoint_dict:get_stale(endpoint_key .. "#version") + local endpoint_version = endpoint_dict:get(endpoint_key .. "#version") if not endpoint_version then core.log.info("get empty endpoint version from discovery DICT ", endpoint_key) return nil @@ -612,7 +633,7 @@ local function multiple_mode_nodes(service_name) local endpoint_key = match[2] local endpoint_port = match[3] - local endpoint_version = endpoint_dict:get_stale(endpoint_key .. "#version") + local endpoint_version = endpoint_dict:get(endpoint_key .. "#version") if not endpoint_version then core.log.info("get empty endpoint version from discovery DICT ", endpoint_key) return nil diff --git a/t/kubernetes/discovery/kubernetes3.t b/t/kubernetes/discovery/kubernetes3.t index 60b224820f38..e6a3ed2d0c29 100644 --- a/t/kubernetes/discovery/kubernetes3.t +++ b/t/kubernetes/discovery/kubernetes3.t @@ -242,6 +242,13 @@ _EOC_ } } + location /t { + content_by_lua_block { + ngx.sleep(2) + ngx.exit(200) + } + } + _EOC_ $block->set_value("config", $config); @@ -493,3 +500,133 @@ GET /dump GET /dump --- response_body_like .*"name":"default/kubernetes".* + + + +=== TEST 7: test pre_list and post_list work for single-k8s with endpoint_slices +--- log_level: info +--- yaml_config eval: $::single_yaml_config +--- extra_init_by_lua + local ngx = ngx + local core = require("apisix.core") + + local dict = ngx.shared["kubernetes"] + local ok,err = dict:set("dirty_key", true) + if not ok then + core.log.error("set dirty_key to dict fail, err: ", err) + end +--- request +GET /t +--- no_error_log +[error] +--- grep_error_log eval +qr/kubernetes discovery module find dirty data in shared dict/ +--- grep_error_log_out +kubernetes discovery module find dirty data in shared dict + + + +=== TEST 8: test pre_list and post_list work for multi-k8s with endpoint_slices +--- log_level: info +--- yaml_config eval: $::yaml_config +--- extra_init_by_lua + local ngx = ngx + local core = require("apisix.core") + + local dict = ngx.shared["kubernetes-first"] + local ok,err = dict:set("dirty_key", true) + if not ok then + core.log.error("set dirty_key to dict fail, err: ", err) + end +--- request +GET /t +--- no_error_log +[error] +--- grep_error_log eval +qr/kubernetes discovery module find dirty data in shared dict/ +--- grep_error_log_out +kubernetes discovery module find dirty data in shared dict + + + +=== TEST 9: test pre_list and post_list work for single-k8s with endpoints +--- log_level: info +--- yaml_config +apisix: + node_listen: 1984 +deployment: + role: data_plane + role_data_plane: + config_provider: yaml +discovery: + kubernetes: + service: + host: "127.0.0.1" + port: "6443" + client: + token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token" + watch_endpoint_slices: false +--- extra_init_by_lua + local ngx = ngx + local core = require("apisix.core") + + local dict = ngx.shared["kubernetes"] + local ok,err = dict:set("dirty_key", true) + if not ok then + core.log.error("set dirty_key to dict fail, err: ", err) + end +--- request +GET /t +--- no_error_log +[error] +--- grep_error_log eval +qr/kubernetes discovery module find dirty data in shared dict/ +--- grep_error_log_out +kubernetes discovery module find dirty data in shared dict + + + +=== TEST 10: test pre_list and post_list work for multi-k8s with endpoints +--- log_level: info +--- yaml_config +apisix: + node_listen: 1984 +deployment: + role: data_plane + role_data_plane: + config_provider: yaml +discovery: + kubernetes: + - id: first + service: + host: "127.0.0.1" + port: "6443" + client: + token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token" + watch_endpoint_slices: false + - id: second + service: + schema: "http" + host: "127.0.0.1" + port: "6445" + client: + token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token" + watch_endpoint_slices: false +--- extra_init_by_lua + local ngx = ngx + local core = require("apisix.core") + + local dict = ngx.shared["kubernetes-first"] + local ok,err = dict:set("dirty_key", true) + if not ok then + core.log.error("set dirty_key to dict fail, err: ", err) + end +--- request +GET /t +--- no_error_log +[error] +--- grep_error_log eval +qr/kubernetes discovery module find dirty data in shared dict/ +--- grep_error_log_out +kubernetes discovery module find dirty data in shared dict +