diff --git a/docs/user-guide/nginx-configuration/configmap.md b/docs/user-guide/nginx-configuration/configmap.md index 782b9bc927..b4ac9d861a 100644 --- a/docs/user-guide/nginx-configuration/configmap.md +++ b/docs/user-guide/nginx-configuration/configmap.md @@ -773,6 +773,11 @@ The value can either be: - round_robin: to use the default round robin loadbalancer - ewma: to use the Peak EWMA method for routing ([implementation](https://github.com/kubernetes/ingress-nginx/blob/main/rootfs/etc/nginx/lua/balancer/ewma.lua)) +- least_connections: to route incoming connections to the upstream with the least connections open at the time. This is + recommended for use with evenly-resourced upstream servers when requests have a broad distribution in time to + process, for example if some requests require the upstream server to make a connection to a slow external service. If + all requests take a fairly similar time to process or the upstream servers serve at different speeds then ewma or + round_robin are likely more appropriate. The default is `round_robin`. diff --git a/internal/ingress/controller/template/configmap.go b/internal/ingress/controller/template/configmap.go index c73f3b6c05..1a75431a01 100644 --- a/internal/ingress/controller/template/configmap.go +++ b/internal/ingress/controller/template/configmap.go @@ -78,6 +78,7 @@ var ( "balancer_ewma": 10240, "balancer_ewma_last_touched_at": 10240, "balancer_ewma_locks": 1024, + "balancer_leastconn": 5120, "certificate_servers": 5120, "ocsp_response_cache": 5120, // keep this same as certificate_servers "global_throttle_cache": 10240, diff --git a/rootfs/etc/nginx/lua/balancer.lua b/rootfs/etc/nginx/lua/balancer.lua index 00104c89d7..4996c7affd 100644 --- a/rootfs/etc/nginx/lua/balancer.lua +++ b/rootfs/etc/nginx/lua/balancer.lua @@ -9,6 +9,7 @@ local chashsubset = require("balancer.chashsubset") local sticky_balanced = require("balancer.sticky_balanced") local sticky_persistent = require("balancer.sticky_persistent") local ewma = require("balancer.ewma") +local leastconn = require("balancer.leastconn") local string = string local ipairs = ipairs local table = table @@ -32,6 +33,7 @@ local IMPLEMENTATIONS = { sticky_balanced = sticky_balanced, sticky_persistent = sticky_persistent, ewma = ewma, + least_connections = leastconn, } local PROHIBITED_LOCALHOST_PORT = configuration.prohibited_localhost_port or '10246' diff --git a/rootfs/etc/nginx/lua/balancer/leastconn.lua b/rootfs/etc/nginx/lua/balancer/leastconn.lua new file mode 100644 index 0000000000..4afe2a10c3 --- /dev/null +++ b/rootfs/etc/nginx/lua/balancer/leastconn.lua @@ -0,0 +1,108 @@ +local util = require("util") +local split = require("util.split") +require("resty.core") + +local ngx = ngx +local ipairs = ipairs +local setmetatable = setmetatable +local string_format = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG + + +local _M = { name = "leastconn" } + +function _M.new(self, backend) + local o = { + peers = backend.endpoints + } + setmetatable(o, self) + self.__index = self + return o +end + +function _M.is_affinitized() + return false +end + + +local function get_upstream_name(upstream) + return upstream.address .. ":" .. upstream.port +end + + +function _M.balance(self) + local peers = self.peers + local endpoint = peers[1] + local endpoints = ngx.shared.balancer_leastconn + local feasible_endpoints = {} + + if #peers ~= 1 then + local lowestconns = 2147483647 + -- find the lowest connection count + for _, peer in pairs(peers) do + local peer_name = get_upstream_name(peer) + local peer_conn_count = endpoints:get(peer_name) + if peer_conn_count == nil then + -- Peer has never been recorded as having connections - add it to the connection + -- tracking table and the list of feasible peers + endpoints:set(peer_name,0,0) + lowestconns = 0 + feasible_endpoints[#feasible_endpoints+1] = peer + elseif peer_conn_count < lowestconns then + -- Peer has fewer connections than any other peer evaluated so far - add it as the + -- only feasible endpoint for now + feasible_endpoints = {peer} + lowestconns = peer_conn_count + elseif peer_conn_count == lowestconns then + -- Peer has equal fewest connections as other peers - add it to the list of + -- feasible peers + feasible_endpoints[#feasible_endpoints+1] = peer + end + end + ngx_log(DEBUG, "select from ", #feasible_endpoints, " feasible endpoints out of ", #peers) + endpoint = feasible_endpoints[math.random(1,#feasible_endpoints)] + end + + local selected_endpoint = get_upstream_name(endpoint) + ngx_log(DEBUG, "selected endpoint ", selected_endpoint) + + -- Update the endpoint connection count + endpoints:incr(selected_endpoint,1,1,0) + + return selected_endpoint +end + +function _M.after_balance(_) + local endpoints = ngx.shared.balancer_leastconn + local upstream = split.get_last_value(ngx.var.upstream_addr) + + if util.is_blank(upstream) then + return + end + endpoints:incr(upstream,-1,0,0) +end + +function _M.sync(self, backend) + local normalized_endpoints_added, normalized_endpoints_removed = + util.diff_endpoints(self.peers, backend.endpoints) + + if #normalized_endpoints_added == 0 and #normalized_endpoints_removed == 0 then + return + end + + ngx_log(DEBUG, string_format("[%s] peers have changed for backend %s", self.name, backend.name)) + + self.peers = backend.endpoints + + for _, endpoint_string in ipairs(normalized_endpoints_removed) do + ngx.shared.balancer_leastconn:delete(endpoint_string) + end + + for _, endpoint_string in ipairs(normalized_endpoints_added) do + ngx.shared.balancer_leastconn:set(endpoint_string,0,0) + end + +end + +return _M diff --git a/rootfs/etc/nginx/lua/test/balancer/leastconn_test.lua b/rootfs/etc/nginx/lua/test/balancer/leastconn_test.lua new file mode 100644 index 0000000000..82510a7f99 --- /dev/null +++ b/rootfs/etc/nginx/lua/test/balancer/leastconn_test.lua @@ -0,0 +1,160 @@ +local util = require("util") +local say = require("say") + +local original_ngx = ngx +local function reset_ngx() + _G.ngx = original_ngx +end + +local function included_in(state, arguments) + if not type(arguments[1]) == "table" or #arguments ~= 2 then + return false + end + + local table = arguments[1] + for _, value in pairs(table) do + if value == arguments[2] then + return true + end + end + return false +end +assert:register("assertion", "included_in", included_in, "assertion.has_property.positive", "assertion.has_property.negative") + +local function mock_ngx(mock) + local _ngx = mock + setmetatable(_ngx, { __index = ngx }) + _G.ngx = _ngx +end + +local function flush_connection_count() + ngx.shared.balancer_leastconn:flush_all() +end + +local function set_backend_count(endpoint_string, count) + ngx.shared.balancer_leastconn:set(endpoint_string, count) +end + +describe("Balancer leastconn", function() + local balancer_leastconn = require("balancer.leastconn") + local ngx_now = 1543238266 + local backend, instance + + before_each(function() + package.loaded["balancer.leastconn"] = nil + balancer_leastconn = require("balancer.leastconn") + + backend = { + name = "namespace-service-port", + ["load-balance"] = "least_connections", + endpoints = { + { address = "10.10.10.1", port = "8080" }, + { address = "10.10.10.2", port = "8080" }, + { address = "10.10.10.3", port = "8080" }, + } + } + set_backend_count("10.10.10.1:8080", 0) + set_backend_count("10.10.10.2:8080", 1) + set_backend_count("10.10.10.3:8080", 5) + + instance = balancer_leastconn:new(backend) + end) + + after_each(function() + reset_ngx() + flush_connection_count() + end) + + describe("after_balance()", function() + it("updates connection count", function() + ngx.var = { upstream_addr = "10.10.10.2:8080" } + + local count_before = ngx.shared.balancer_leastconn:get(ngx.var.upstream_addr) + instance:after_balance() + local count_after = ngx.shared.balancer_leastconn:get(ngx.var.upstream_addr) + + assert.are.equals(count_before - 1, count_after) + end) + end) + + describe("balance()", function() + it("increments connection count on selected peer", function() + local single_endpoint_backend = util.deepcopy(backend) + table.remove(single_endpoint_backend.endpoints, 3) + table.remove(single_endpoint_backend.endpoints, 2) + local single_endpoint_instance = balancer_leastconn:new(single_endpoint_backend) + + local upstream = single_endpoint_backend.endpoints[1] + local upstream_name = upstream.address .. ":" .. upstream.port + + set_backend_count(upstream_name, 0) + single_endpoint_instance:balance() + local count_after = ngx.shared.balancer_leastconn:get(upstream_name) + + assert.are.equals(1, count_after) + end) + + it("returns single endpoint when the given backend has only one endpoint", function() + local single_endpoint_backend = util.deepcopy(backend) + table.remove(single_endpoint_backend.endpoints, 3) + table.remove(single_endpoint_backend.endpoints, 2) + local single_endpoint_instance = balancer_leastconn:new(single_endpoint_backend) + + local peer = single_endpoint_instance:balance() + + assert.are.equals("10.10.10.1:8080", peer) + end) + + it("picks the endpoint with lowest connection count", function() + local two_endpoints_backend = util.deepcopy(backend) + table.remove(two_endpoints_backend.endpoints, 2) + local two_endpoints_instance = balancer_leastconn:new(two_endpoints_backend) + + local peer = two_endpoints_instance:balance() + + assert.equal("10.10.10.1:8080", peer) + end) + + it("picks one of the endpoints with tied lowest connection count", function() + set_backend_count("10.10.10.1:8080", 8) + set_backend_count("10.10.10.2:8080", 5) + set_backend_count("10.10.10.3:8080", 5) + + local peer = instance:balance() + assert.included_in({"10.10.10.2:8080", "10.10.10.3:8080"}, peer) + end) + + end) + + describe("sync()", function() + it("does not reset stats when endpoints do not change", function() + local new_backend = util.deepcopy(backend) + + instance:sync(new_backend) + + assert.are.same(new_backend.endpoints, instance.peers) + assert.are.same(new_backend.endpoints, backend.endpoints) + end) + + it("updates peers, deletes stats for old endpoints and sets connection count to zero for new ones", function() + local new_backend = util.deepcopy(backend) + + -- existing endpoint 10.10.10.2 got deleted + -- and replaced with 10.10.10.4 + new_backend.endpoints[2].address = "10.10.10.4" + -- and there's one new extra endpoint + table.insert(new_backend.endpoints, { address = "10.10.10.5", port = "8080" }) + + instance:sync(new_backend) + + assert.are.same(new_backend.endpoints, instance.peers) + + assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.1:8080"), 0) + assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.2:8080"), nil) + assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.3:8080"), 5) + assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.4:8080"), 0) + assert.are.equals(ngx.shared.balancer_leastconn:get("10.10.10.5:8080"), 0) + end) + end) + +end) \ No newline at end of file diff --git a/rootfs/etc/nginx/lua/test/balancer_test.lua b/rootfs/etc/nginx/lua/test/balancer_test.lua index 2d42ad3306..f4c1c92d3a 100644 --- a/rootfs/etc/nginx/lua/test/balancer_test.lua +++ b/rootfs/etc/nginx/lua/test/balancer_test.lua @@ -37,7 +37,8 @@ local function reset_expected_implementations() ["my-dummy-app-3"] = package.loaded["balancer.sticky_persistent"], ["my-dummy-app-4"] = package.loaded["balancer.ewma"], ["my-dummy-app-5"] = package.loaded["balancer.sticky_balanced"], - ["my-dummy-app-6"] = package.loaded["balancer.chashsubset"] + ["my-dummy-app-6"] = package.loaded["balancer.chashsubset"], + ["my-dummy-app-7"] = package.loaded["balancer.leastconn"] } end @@ -88,6 +89,10 @@ local function reset_backends() ["load-balance"] = "ewma", -- upstreamHashByConfig will take priority. upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by-subset"] = "true", } }, + { + name = "my-dummy-app-7", + ["load-balance"] = "least_connections", + }, } end diff --git a/test/data/cleanConf.expected.conf b/test/data/cleanConf.expected.conf index 1666c19f63..3f47083bea 100644 --- a/test/data/cleanConf.expected.conf +++ b/test/data/cleanConf.expected.conf @@ -23,6 +23,7 @@ http { lua_shared_dict balancer_ewma 10M; lua_shared_dict balancer_ewma_last_touched_at 10M; lua_shared_dict balancer_ewma_locks 1M; + lua_shared_dict balancer_leastconn 5M; lua_shared_dict certificate_data 20M; lua_shared_dict certificate_servers 5M; lua_shared_dict configuration_data 20M; diff --git a/test/data/cleanConf.src.conf b/test/data/cleanConf.src.conf index 0e572faa50..89cefe2754 100644 --- a/test/data/cleanConf.src.conf +++ b/test/data/cleanConf.src.conf @@ -40,6 +40,7 @@ http { lua_shared_dict balancer_ewma 10M; lua_shared_dict balancer_ewma_last_touched_at 10M; lua_shared_dict balancer_ewma_locks 1M; +lua_shared_dict balancer_leastconn 5M; lua_shared_dict certificate_data 20M; lua_shared_dict certificate_servers 5M; lua_shared_dict configuration_data 20M; diff --git a/test/e2e/framework/deployment.go b/test/e2e/framework/deployment.go index 565b8f4ac2..49c942da5a 100644 --- a/test/e2e/framework/deployment.go +++ b/test/e2e/framework/deployment.go @@ -37,6 +37,9 @@ const EchoService = "echo" // SlowEchoService name of the deployment for the echo app const SlowEchoService = "slow-echo" +// AlwaysSlowEchoService name of the deployment for the always slow echo app +const AlwaysSlowEchoService = "always-slow-echo" + // HTTPBunService name of the deployment for the httpbun app const HTTPBunService = "httpbun" @@ -44,11 +47,13 @@ const HTTPBunService = "httpbun" const NIPService = "external-nip" type deploymentOptions struct { - namespace string - name string - replicas int - svcAnnotations map[string]string - image string + namespace string + name string + replicas int + svcAnnotations map[string]string + image string + serviceName string + duplicateService bool } // WithDeploymentNamespace allows configuring the deployment's namespace @@ -94,30 +99,52 @@ func WithImage(i string) func(*deploymentOptions) { } } +// WithServiceName overrides the default service name connected to the deployment. To allow multiple deployments to +// match the same service an additional tag is used for the pods and ignored by the specified service. +func WithServiceName(s string) func(*deploymentOptions) { + return func(o *deploymentOptions) { + o.serviceName = s + } +} + // NewEchoDeployment creates a new single replica deployment of the echo server image in a particular namespace func (f *Framework) NewEchoDeployment(opts ...func(*deploymentOptions)) { options := &deploymentOptions{ - namespace: f.Namespace, - name: EchoService, - replicas: 1, - image: "registry.k8s.io/ingress-nginx/e2e-test-echo@sha256:6fc5aa2994c86575975bb20a5203651207029a0d28e3f491d8a127d08baadab4", + namespace: f.Namespace, + name: EchoService, + replicas: 1, + image: "registry.k8s.io/ingress-nginx/e2e-test-echo@sha256:6fc5aa2994c86575975bb20a5203651207029a0d28e3f491d8a127d08baadab4", + serviceName: "", } for _, o := range opts { o(options) } + // If a serviceName is defined then pods from any app will be labelled such that the specified service matches them + podLabels := make(map[string]string) + selectorLabels := make(map[string]string) + podLabels["app"] = options.name + if options.serviceName == "" { + options.serviceName = options.name + selectorLabels["app"] = options.name + } else { + podLabels["service"] = options.serviceName + selectorLabels["service"] = options.serviceName + } + deployment := newDeployment(options.name, options.namespace, options.image, 80, int32(options.replicas), nil, nil, nil, []corev1.VolumeMount{}, []corev1.Volume{}, true, + podLabels, ) f.EnsureDeployment(deployment) service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: options.name, + Name: options.serviceName, Namespace: options.namespace, Annotations: options.svcAnnotations, }, @@ -130,15 +157,24 @@ func (f *Framework) NewEchoDeployment(opts ...func(*deploymentOptions)) { Protocol: corev1.ProtocolTCP, }, }, - Selector: map[string]string{ - "app": options.name, - }, + Selector: selectorLabels, }, } - f.EnsureService(service) + // If a service name is provided then don't throw an error if it already exists + if options.serviceName == options.name { + f.EnsureService(service) + } else { + f.EnsureServiceExists(service) + } - err := WaitForEndpoints(f.KubeClientSet, DefaultTimeout, options.name, options.namespace, options.replicas) + var err error + // If service name is different to name then we could have more replicas than just those created by this ingress + if options.serviceName == options.name { + err = WaitForEndpoints(f.KubeClientSet, DefaultTimeout, options.serviceName, options.namespace, options.replicas) + } else { + err = WaitForMinimumEndpoints(f.KubeClientSet, DefaultTimeout, options.serviceName, options.namespace, options.replicas) + } assert.Nil(ginkgo.GinkgoT(), err, "waiting for endpoints to become ready") } @@ -188,6 +224,7 @@ func (f *Framework) NewHttpbunDeployment(opts ...func(*deploymentOptions)) strin []corev1.VolumeMount{}, []corev1.Volume{}, true, + nil, ) f.EnsureDeployment(deployment) @@ -223,6 +260,12 @@ func (f *Framework) NewHttpbunDeployment(opts ...func(*deploymentOptions)) strin // NewSlowEchoDeployment creates a new deployment of the slow echo server image in a particular namespace. func (f *Framework) NewSlowEchoDeployment() { + f.NewSlowEchoDeploymentWithOptions() + return +} + +// NewSlowEchoDeploymentWithOptions creates a new deployment of the slow echo server with functional options. +func (f *Framework) NewSlowEchoDeploymentWithOptions(opts ...func(*deploymentOptions)) { cfg := `# events { worker_connections 1024; @@ -256,7 +299,44 @@ http { ` - f.NGINXWithConfigDeployment(SlowEchoService, cfg) + f.NGINXWithConfigDeploymentWithOptions(SlowEchoService, cfg, true, opts...) +} + +// NewAlwaysSlowEchoDeployment creates a new deployment of the always slow echo server. This server will always sleep +// for the specified number of milliseconds before responding regardless of path. +func (f *Framework) NewAlwaysSlowEchoDeployment(sleepMillis int) { + f.NewAlwaysSlowEchoDeploymentWithOptions(sleepMillis) + return +} + +// NewAlwaysSlowEchoDeploymentWithOptions creates a new deployment of the always slow echo server with functional options. +// This server always sleeps for the specified number of milliseconds before responding. NOTE: values for sleepMillis +// >= 2000 will cause the deployment to fail health checks, causing false positive test failures. +func (f *Framework) NewAlwaysSlowEchoDeploymentWithOptions(sleepMillis int, opts ...func(*deploymentOptions)) { + delay := float32(sleepMillis) * 0.001 + cfg := fmt.Sprintf(`# +events { + worker_connections 1024; + multi_accept on; +} +http { + default_type 'text/plain'; + client_max_body_size 0; + server { + access_log on; + access_log /dev/stdout; + listen 80; + location / { + content_by_lua_block { + ngx.sleep(%.3f) + ngx.print("echo ok after %.3f seconds") + } + } + } +} +`, delay, delay) + + f.NGINXWithConfigDeploymentWithOptions(AlwaysSlowEchoService, cfg, true, opts...) } func (f *Framework) GetNginxBaseImage() string { @@ -272,24 +352,52 @@ func (f *Framework) GetNginxBaseImage() string { // NGINXDeployment creates a new simple NGINX Deployment using NGINX base image // and passing the desired configuration func (f *Framework) NGINXDeployment(name string, cfg string, waitendpoint bool) { + f.NGINXDeploymentWithOptions(name, cfg, waitendpoint) + return +} + +// NGINXDeploymentWithOptions creates a new simple NGINX Deployment using NGINX base image and the desired configuration, +// with overrides applied by any supplied functional options. +func (f *Framework) NGINXDeploymentWithOptions(name string, cfg string, waitendpoint bool, opts ...func(options *deploymentOptions)) { cfgMap := map[string]string{ "nginx.conf": cfg, } + options := &deploymentOptions{ + namespace: f.Namespace, + name: name, + replicas: 1, + serviceName: "", + } + for _, o := range opts { + o(options) + } + + // If a serviceName is defined then pods from any app will be labelled such that that service matches them + podLabels := make(map[string]string) + selectorLabels := make(map[string]string) + podLabels["app"] = options.name + if options.serviceName == "" { + options.serviceName = options.name + selectorLabels["app"] = options.name + } else { + podLabels["service"] = options.serviceName + selectorLabels["service"] = options.serviceName + } - _, err := f.KubeClientSet.CoreV1().ConfigMaps(f.Namespace).Create(context.TODO(), &corev1.ConfigMap{ + _, err := f.KubeClientSet.CoreV1().ConfigMaps(options.namespace).Create(context.TODO(), &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: f.Namespace, + Name: options.name, + Namespace: options.namespace, }, Data: cfgMap, }, metav1.CreateOptions{}) assert.Nil(ginkgo.GinkgoT(), err, "creating configmap") - deployment := newDeployment(name, f.Namespace, f.GetNginxBaseImage(), 80, 1, + deployment := newDeployment(options.name, f.Namespace, f.GetNginxBaseImage(), 80, int32(options.replicas), nil, nil, nil, []corev1.VolumeMount{ { - Name: name, + Name: options.name, MountPath: "/etc/nginx/nginx.conf", SubPath: "nginx.conf", ReadOnly: true, @@ -297,24 +405,24 @@ func (f *Framework) NGINXDeployment(name string, cfg string, waitendpoint bool) }, []corev1.Volume{ { - Name: name, + Name: options.name, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ - Name: name, + Name: options.name, }, }, }, }, - }, true, + }, true, podLabels, ) f.EnsureDeployment(deployment) service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: f.Namespace, + Name: options.serviceName, + Namespace: options.namespace, }, Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ @@ -325,23 +433,35 @@ func (f *Framework) NGINXDeployment(name string, cfg string, waitendpoint bool) Protocol: corev1.ProtocolTCP, }, }, - Selector: map[string]string{ - "app": name, - }, + Selector: selectorLabels, }, } - f.EnsureService(service) + // If a service name is provided then don't throw an error if it already exists + if options.serviceName == options.name { + f.EnsureService(service) + } else { + f.EnsureServiceExists(service) + } if waitendpoint { - err = WaitForEndpoints(f.KubeClientSet, DefaultTimeout, name, f.Namespace, 1) + if options.serviceName == options.name { + err = WaitForEndpoints(f.KubeClientSet, DefaultTimeout, name, options.namespace, 1) + } else { + err = WaitForMinimumEndpoints(f.KubeClientSet, DefaultTimeout, options.serviceName, options.namespace, 1) + } assert.Nil(ginkgo.GinkgoT(), err, "waiting for endpoints to become ready") } } // NGINXWithConfigDeployment creates an NGINX deployment using a configmap containing the nginx.conf configuration func (f *Framework) NGINXWithConfigDeployment(name string, cfg string) { - f.NGINXDeployment(name, cfg, true) + f.NGINXWithConfigDeploymentWithOptions(name, cfg, true) +} + +// NGINXWithConfigDeploymentWithOptions creates an NGINX deployment with nginx.conf override and functional options +func (f *Framework) NGINXWithConfigDeploymentWithOptions(name string, cfg string, waitendpoint bool, opts ...func(*deploymentOptions)) { + f.NGINXDeploymentWithOptions(name, cfg, waitendpoint, opts...) } // NewGRPCBinDeployment creates a new deployment of the @@ -438,7 +558,14 @@ func (f *Framework) NewGRPCBinDeployment() { } func newDeployment(name, namespace, image string, port int32, replicas int32, command []string, args []string, env []corev1.EnvVar, - volumeMounts []corev1.VolumeMount, volumes []corev1.Volume, setProbe bool) *appsv1.Deployment { + volumeMounts []corev1.VolumeMount, volumes []corev1.Volume, setProbe bool, podLabels map[string]string) *appsv1.Deployment { + + if podLabels == nil { + podLabels = map[string]string{ + "app": name, + } + } + probe := &corev1.Probe{ InitialDelaySeconds: 2, PeriodSeconds: 1, @@ -461,15 +588,11 @@ func newDeployment(name, namespace, image string, port int32, replicas int32, co Spec: appsv1.DeploymentSpec{ Replicas: NewInt32(replicas), Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": name, - }, + MatchLabels: podLabels, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "app": name, - }, + Labels: podLabels, }, Spec: corev1.PodSpec{ TerminationGracePeriodSeconds: NewInt64(0), @@ -516,7 +639,7 @@ func (f *Framework) NewDeployment(name, image string, port int32, replicas int32 // NewDeployment creates a new deployment in a particular namespace. func (f *Framework) NewDeploymentWithOpts(name, image string, port int32, replicas int32, command []string, args []string, env []corev1.EnvVar, volumeMounts []corev1.VolumeMount, volumes []corev1.Volume, setProbe bool) { - deployment := newDeployment(name, f.Namespace, image, port, replicas, command, args, env, volumeMounts, volumes, setProbe) + deployment := newDeployment(name, f.Namespace, image, port, replicas, command, args, env, volumeMounts, volumes, setProbe, nil) f.EnsureDeployment(deployment) diff --git a/test/e2e/framework/k8s.go b/test/e2e/framework/k8s.go index fc3e59b086..f29b837857 100644 --- a/test/e2e/framework/k8s.go +++ b/test/e2e/framework/k8s.go @@ -130,6 +130,19 @@ func (f *Framework) EnsureService(service *core.Service) *core.Service { return f.GetService(service.Namespace, service.Name) } +// EnsureServiceExists creates a Service object and returns it. If it already exists and matches the provided selectors +// then the service object is returned, if it does not match then an error is thrown. +func (f *Framework) EnsureServiceExists(service *core.Service) *core.Service { + err := createServiceWithRetries(f.KubeClientSet, service.Namespace, service) + if k8sErrors.IsAlreadyExists(err) { + err = nil + } + assert.Nil(ginkgo.GinkgoT(), err, "creating service") + result := f.GetService(service.Namespace, service.Name) + assert.Equal(ginkgo.GinkgoT(), result.Spec.Selector, service.Spec.Selector, "service already exists with different selectors") + return result +} + // EnsureDeployment creates a Deployment object and returns it, throws error if it already exists. func (f *Framework) EnsureDeployment(deployment *appsv1.Deployment) *appsv1.Deployment { err := createDeploymentWithRetries(f.KubeClientSet, deployment.Namespace, deployment) @@ -203,6 +216,24 @@ func WaitForEndpoints(kubeClientSet kubernetes.Interface, timeout time.Duration, }) } +// WaitForMinimumEndpoints waits for a given amount of time until the number of endpoints >= expectedEndpoints. +func WaitForMinimumEndpoints(kubeClientSet kubernetes.Interface, timeout time.Duration, name, ns string, minimumEndpoints int) error { + if minimumEndpoints == 0 { + return nil + } + return wait.PollImmediate(Poll, timeout, func() (bool, error) { + endpoint, err := kubeClientSet.CoreV1().Endpoints(ns).Get(context.TODO(), name, metav1.GetOptions{}) + if k8sErrors.IsNotFound(err) { + return false, nil + } + assert.Nil(ginkgo.GinkgoT(), err, "getting endpoints") + if countReadyEndpoints(endpoint) >= minimumEndpoints { + return true, nil + } + return false, nil + }) +} + func countReadyEndpoints(e *core.Endpoints) int { if e == nil || e.Subsets == nil { return 0 diff --git a/test/e2e/loadbalance/leastconn.go b/test/e2e/loadbalance/leastconn.go new file mode 100644 index 0000000000..fe5650e5da --- /dev/null +++ b/test/e2e/loadbalance/leastconn.go @@ -0,0 +1,141 @@ +/* +Copyright 2018 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package loadbalance + +import ( + "fmt" + "github.com/onsi/ginkgo/v2" + "github.com/stretchr/testify/assert" + "k8s.io/ingress-nginx/test/e2e/framework" + "net/http" + "regexp" + "sort" + "strings" + "sync" + "time" +) + +var _ = framework.DescribeSetting("[Load Balancer] least-connections", func() { + f := framework.NewDefaultFramework("leastconn") + ginkgo.BeforeEach(func() { + f.NewAlwaysSlowEchoDeploymentWithOptions(100, framework.WithDeploymentReplicas(1), framework.WithName("echo-fast"), framework.WithServiceName("leastconn-slow")) + f.NewAlwaysSlowEchoDeploymentWithOptions(1800, framework.WithDeploymentReplicas(1), framework.WithName("echo-slow"), framework.WithServiceName("leastconn-slow")) + + f.SetNginxConfigMapData(map[string]string{ + "worker-processes": "2", + "load-balance": "least_connections"}, + ) + }) + + ginkgo.It("does not fail requests", func() { + host := "load-balance.com" + + f.EnsureIngress(framework.NewSingleIngress(host, "/", host, f.Namespace, "leastconn-slow", 80, nil)) + f.WaitForNginxServer(host, + func(server string) bool { + return strings.Contains(server, "server_name load-balance.com") + }) + + algorithm, err := f.GetLbAlgorithm("leastconn-slow", 80) + assert.Nil(ginkgo.GinkgoT(), err) + assert.Equal(ginkgo.GinkgoT(), "least_connections", algorithm) + + re, _ := regexp.Compile(fmt.Sprintf(`%v.*`, framework.EchoService)) + replicaRequestCount := map[string]int{} + + for i := 0; i < 30; i++ { + body := f.HTTPTestClient(). + GET("/"). + WithHeader("Host", host). + Expect(). + Status(http.StatusOK).Body().Raw() + replica := re.FindString(body) + assert.NotEmpty(ginkgo.GinkgoT(), replica) + if _, ok := replicaRequestCount[replica]; !ok { + replicaRequestCount[replica] = 1 + } else { + replicaRequestCount[replica]++ + } + } + framework.Logf("Request distribution: %v", replicaRequestCount) + + actualCount := 0 + for _, v := range replicaRequestCount { + actualCount += v + } + assert.Equal(ginkgo.GinkgoT(), actualCount, 30) + }) + + ginkgo.It("sends fewer requests to a slower server", func() { + host := "load-balance.com" + + f.EnsureIngress(framework.NewSingleIngress(host, "/", host, f.Namespace, "leastconn-slow", 80, nil)) + f.WaitForNginxServer(host, + func(server string) bool { + return strings.Contains(server, "server_name load-balance.com") + }) + + algorithm, err := f.GetLbAlgorithm("leastconn-slow", 80) + assert.Nil(ginkgo.GinkgoT(), err) + assert.Equal(ginkgo.GinkgoT(), "least_connections", algorithm) + + re, _ := regexp.Compile(fmt.Sprintf(`%v.*`, framework.EchoService)) + replicaRequestCount := map[string]int{} + reqCount := 30 + + var wg sync.WaitGroup + wg.Add(reqCount) + results := make(chan string, reqCount) + for i := 0; i < reqCount; i++ { + time.Sleep(100 * time.Millisecond) + go func() { + defer wg.Done() + body := f.HTTPTestClient(). + GET("/"). + WithHeader("Host", host). + Expect(). + Status(http.StatusOK).Body().Raw() + replica := re.FindString(body) + assert.NotEmpty(ginkgo.GinkgoT(), replica) + results <- replica + }() + } + wg.Wait() + close(results) + + for r := range results { + if _, ok := replicaRequestCount[r]; !ok { + replicaRequestCount[r] = 1 + } else { + replicaRequestCount[r]++ + } + } + + framework.Logf("Request distribution: %v", replicaRequestCount) + + replicaCount := len(replicaRequestCount) + assert.Equal(ginkgo.GinkgoT(), replicaCount, 2, "expected responses from two replicas") + + values := make([]int, 2) + i := 0 + for _, v := range replicaRequestCount { + values[i] = v + i++ + } + sort.Ints(values) + // we expect to see at least twice as many requests to the echo server compared to the slow echo server + assert.GreaterOrEqual(ginkgo.GinkgoT(), values[1], 2*values[0], "expected at least twice as many responses from the faster server") + }) +}) diff --git a/test/test-lua.sh b/test/test-lua.sh index fc60023f8b..55c9cc6075 100755 --- a/test/test-lua.sh +++ b/test/test-lua.sh @@ -36,6 +36,7 @@ SHDICT_ARGS=( "--shdict" "high_throughput_tracker 1M" "--shdict" "balancer_ewma_last_touched_at 1M" "--shdict" "balancer_ewma_locks 512k" + "--shdict" "balancer_leastconn 5M" "--shdict" "global_throttle_cache 5M" "./rootfs/etc/nginx/lua/test/run.lua" )