diff --git a/cmd/testserver/run.go b/cmd/testserver/run.go index ce75e088..b2fcc15c 100644 --- a/cmd/testserver/run.go +++ b/cmd/testserver/run.go @@ -84,7 +84,7 @@ func run(ctx context.Context) error { } func createK8SClients(cfg loadtest.Config, logger *slog.Logger) (*kubernetes.Clientset, *dynamic.DynamicClient, *apiextensionsclientset.Clientset, helm.Client, error) { - rateLimiter := flowcontrol.NewTokenBucketRateLimiter(20, 50) + rateLimiter := flowcontrol.NewTokenBucketRateLimiter(100, 200) var restConfig *rest.Config var err error diff --git a/hack/loadtest/deploy.sh b/hack/loadtest/deploy.sh index d48da297..ad0d9f17 100755 --- a/hack/loadtest/deploy.sh +++ b/hack/loadtest/deploy.sh @@ -5,6 +5,7 @@ CC_IMAGE_TAG="${IMAGE_TAG:-latest}" LOAD_TEST_IMAGE_REPOSITORY="${LOAD_TEST_IMAGE_REPOSITORY:-$CC_IMAGE_REPOSITORY}" LOAD_TEST_IMAGE_TAG="${LOAD_TEST_IMAGE_TAG:-$CC_IMAGE_TAG}" DEPLOY_CLUSTER_CONTROLLER="${DEPLOY_CLUSTER_CONTROLLER:-true}" +KWOK_REPLICAS="${KWOK_REPLICAS:-15}" # Determine the directory where the script resides. SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" @@ -13,7 +14,7 @@ echo "Deploying kwok" helm repo add kwok https://kwok.sigs.k8s.io/charts/ helm repo update kwok -helm upgrade --namespace castai-agent --create-namespace --install kwok kwok/kwok +helm upgrade --namespace castai-agent --create-namespace --install kwok kwok/kwok --set replicas="$KWOK_REPLICAS" helm upgrade --namespace castai-agent --create-namespace --install kwok-stages kwok/stage-fast helm upgrade --namespace castai-agent --create-namespace --install kwok-metrics kwok/metrics-usage diff --git a/hack/loadtest/grafana/cluster-controller-dashboard.json b/hack/loadtest/grafana/cluster-controller-dashboard.json index 216d778c..42ff81db 100644 --- a/hack/loadtest/grafana/cluster-controller-dashboard.json +++ b/hack/loadtest/grafana/cluster-controller-dashboard.json @@ -69,8 +69,646 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum by (type, success) (rate(action_executed_total[$__rate_interval]))", + "format": "time_series", + "hide": false, + "instant": false, + "interval": "", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Actions executed", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 23, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "exemplar": false, + "expr": "action_executed_total", + "format": "time_series", + "hide": false, + "instant": false, + "interval": "", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Actions executed raw counter", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 8 + }, + "id": 22, + "panels": [], + "title": "apiserver requests", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 9 + }, + "id": 19, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "editorMode": "code", + "expr": "histogram_quantile(0.99, sum(rate(rest_client_rate_limiter_duration_seconds_bucket[1m])) by (le))", + "legendFormat": "p99", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.9, sum(rate(rest_client_rate_limiter_duration_seconds_bucket[1m])) by (le))", + "hide": false, + "instant": false, + "legendFormat": "p90", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.5, sum(rate(rest_client_rate_limiter_duration_seconds_bucket[1m])) by (le))", + "hide": false, + "instant": false, + "legendFormat": "p50", + "range": true, + "refId": "C" + } + ], + "title": "Client rate limit latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 9 + }, + "id": 18, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "editorMode": "code", + "expr": "histogram_quantile(0.99, sum(rate(rest_client_request_duration_seconds_bucket[1m])) by (le))", + "hide": false, + "legendFormat": "p99", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.9, sum(rate(rest_client_request_duration_seconds_bucket[1m])) by (le))", + "hide": false, + "instant": false, + "legendFormat": "p90", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.5, sum(rate(rest_client_request_duration_seconds_bucket[1m])) by (le))", + "hide": false, + "instant": false, + "legendFormat": "p50", + "range": true, + "refId": "B" + } + ], + "title": "apiserver latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Shows total QPS by the client across all methods and plots it against the default QPS. Due to token bucket algorithm, line can go temporarily above the threshold but staying there will cause client throttling. ", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "area" + } + }, + "fieldMinMax": false, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 25 + }, + { + "color": "#EAB839", + "value": 100 + } + ] + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 17 + }, + "id": 21, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "editorMode": "code", + "expr": "sum(rate(rest_client_requests_total[1m]))", + "legendFormat": "Total QPS", + "range": true, + "refId": "A" + } + ], + "title": "apiserver calls total", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 17 + }, + "id": 24, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "editorMode": "code", + "expr": "sum(rate(rest_client_requests_total{code=\"429\"}[1m])) by (method)", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "server-side throttle responses (429)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 } ] } @@ -81,9 +719,9 @@ "h": 8, "w": 12, "x": 0, - "y": 0 + "y": 25 }, - "id": 1, + "id": 20, "options": { "legend": { "calcs": [], @@ -97,26 +735,122 @@ "sort": "none" } }, - "pluginVersion": "11.5.2", + "pluginVersion": "11.6.0", "targets": [ + { + "editorMode": "code", + "expr": "histogram_quantile(0.99, sum(rate(rest_client_request_size_bytes_bucket[1m])) by (le))", + "legendFormat": "p99", + "range": true, + "refId": "A" + }, { "datasource": { "type": "prometheus", "uid": "PBFA97CFB590B2093" }, "editorMode": "code", - "exemplar": false, - "expr": "sum by (type, success) (rate(action_executed_total[$__rate_interval]))", - "format": "time_series", + "expr": "histogram_quantile(0.9, sum(rate(rest_client_request_size_bytes_bucket[1m])) by (le))", "hide": false, "instant": false, - "interval": "", + "legendFormat": "p90", + "range": true, + "refId": "B" + } + ], + "title": "apiserver request size", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 25 + }, + "id": 17, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "editorMode": "code", + "expr": "sum(rate(rest_client_requests_total[1m])) by (method, code)", "legendFormat": "__auto", "range": true, "refId": "A" } ], - "title": "Actions executed", + "title": "apiserver calls", "type": "timeseries" }, { @@ -125,7 +859,7 @@ "h": 1, "w": 24, "x": 0, - "y": 8 + "y": 33 }, "id": 16, "panels": [], @@ -180,8 +914,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -196,7 +929,7 @@ "h": 8, "w": 12, "x": 0, - "y": 9 + "y": 34 }, "id": 12, "options": { @@ -212,11 +945,11 @@ "sort": "none" } }, - "pluginVersion": "11.5.2", + "pluginVersion": "11.6.0", "targets": [ { "editorMode": "code", - "expr": "rate(process_cpu_seconds_total[5m]) * 100", + "expr": "rate(process_cpu_seconds_total[1m]) * 100", "legendFormat": "__auto", "range": true, "refId": "A" @@ -273,8 +1006,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -289,7 +1021,7 @@ "h": 8, "w": 12, "x": 12, - "y": 9 + "y": 34 }, "id": 2, "options": { @@ -305,7 +1037,7 @@ "sort": "none" } }, - "pluginVersion": "11.5.2", + "pluginVersion": "11.6.0", "targets": [ { "editorMode": "code", @@ -366,8 +1098,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -382,7 +1113,7 @@ "h": 8, "w": 12, "x": 0, - "y": 17 + "y": 42 }, "id": 11, "options": { @@ -398,7 +1129,7 @@ "sort": "none" } }, - "pluginVersion": "11.5.2", + "pluginVersion": "11.6.0", "targets": [ { "editorMode": "code", @@ -417,7 +1148,7 @@ "h": 1, "w": 24, "x": 0, - "y": 25 + "y": 50 }, "id": 8, "panels": [], @@ -487,7 +1218,7 @@ "h": 8, "w": 12, "x": 0, - "y": 26 + "y": 51 }, "id": 5, "options": { @@ -503,7 +1234,7 @@ "sort": "none" } }, - "pluginVersion": "11.5.2", + "pluginVersion": "11.6.0", "targets": [ { "editorMode": "code", @@ -513,7 +1244,7 @@ "refId": "A" } ], - "title": "Total MB allocated", + "title": "Heap MB allocated", "type": "timeseries" }, { @@ -579,7 +1310,7 @@ "h": 8, "w": 12, "x": 12, - "y": 26 + "y": 51 }, "id": 4, "options": { @@ -595,17 +1326,30 @@ "sort": "none" } }, - "pluginVersion": "11.5.2", + "pluginVersion": "11.6.0", "targets": [ { "editorMode": "code", "expr": "go_memstats_heap_alloc_bytes / (1024*1024)", - "legendFormat": "__auto", + "legendFormat": "{{instance}} - heap", "range": true, "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "go_memory_classes_total_bytes / (1024*1024)", + "hide": false, + "instant": false, + "legendFormat": "{{instance}} - total", + "range": true, + "refId": "B" } ], - "title": "Heap allocated MB", + "title": "Heap and Total allocated MB", "type": "timeseries" }, { @@ -671,7 +1415,7 @@ "h": 8, "w": 12, "x": 0, - "y": 34 + "y": 59 }, "id": 14, "options": { @@ -687,7 +1431,7 @@ "sort": "none" } }, - "pluginVersion": "11.5.2", + "pluginVersion": "11.6.0", "targets": [ { "editorMode": "code", @@ -763,7 +1507,7 @@ "h": 8, "w": 12, "x": 12, - "y": 34 + "y": 59 }, "id": 10, "options": { @@ -779,7 +1523,7 @@ "sort": "none" } }, - "pluginVersion": "11.5.2", + "pluginVersion": "11.6.0", "targets": [ { "editorMode": "code", @@ -798,7 +1542,7 @@ "h": 1, "w": 24, "x": 0, - "y": 42 + "y": 67 }, "id": 9, "panels": [], @@ -868,7 +1612,7 @@ "h": 8, "w": 12, "x": 0, - "y": 43 + "y": 68 }, "id": 6, "options": { @@ -884,11 +1628,11 @@ "sort": "none" } }, - "pluginVersion": "11.5.2", + "pluginVersion": "11.6.0", "targets": [ { "editorMode": "code", - "expr": "increase(go_gc_duration_seconds_count[5m])", + "expr": "increase(go_gc_duration_seconds_count[1m])", "legendFormat": "__auto", "range": true, "refId": "A" @@ -960,7 +1704,7 @@ "h": 8, "w": 12, "x": 12, - "y": 43 + "y": 68 }, "id": 3, "options": { @@ -976,11 +1720,11 @@ "sort": "none" } }, - "pluginVersion": "11.5.2", + "pluginVersion": "11.6.0", "targets": [ { "editorMode": "code", - "expr": "rate(go_gc_duration_seconds_sum[5m]) / rate(go_gc_duration_seconds_count[5m])", + "expr": "rate(go_gc_duration_seconds_sum[1m]) / rate(go_gc_duration_seconds_count[1m])", "legendFormat": "__auto", "range": true, "refId": "A" @@ -991,15 +1735,18 @@ } ], "preload": false, - "schemaVersion": 40, + "schemaVersion": 41, "tags": [], "templating": { "list": [] }, + "time": { + "from": "now-15m", + "to": "now" + }, "timepicker": {}, "timezone": "", - "title": "Cluster controller", - "uid": "aegw4usai4oowf", - "version": 1, - "weekStart": "" + "title": "Cluster controller 2", + "uid": "aegw4usai4oowf2", + "version": 1 } \ No newline at end of file diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 96ae2082..320f9077 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -8,7 +8,6 @@ import ( "k8s.io/component-base/metrics/legacyregistry" ) -// registry = metrics.NewKubeRegistry() var registry = prometheus.NewRegistry() func NewMetricsMux() *http.ServeMux { diff --git a/loadtest/castai.go b/loadtest/castai.go index 71d5e4b6..641df3d6 100644 --- a/loadtest/castai.go +++ b/loadtest/castai.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "slices" "sync" "time" @@ -23,6 +24,7 @@ type CastAITestServer struct { logMx sync.Mutex actionsLog map[string]chan string + actions map[string]*castai.ClusterAction } func NewTestServer(logger *slog.Logger, cfg TestServerConfig) *CastAITestServer { @@ -31,6 +33,7 @@ func NewTestServer(logger *slog.Logger, cfg TestServerConfig) *CastAITestServer actionsPushChannel: make(chan castai.ClusterAction, 10000), cfg: cfg, actionsLog: make(map[string]chan string), + actions: make(map[string]*castai.ClusterAction), } } @@ -44,9 +47,12 @@ func (c *CastAITestServer) ExecuteActions(ctx context.Context, actions []castai. if action.ID == "" { action.ID = uuid.NewString() } - c.addActionToStore(action.ID, ownerChannel) - c.actionsPushChannel <- action + if action.CreatedAt == (time.Time{}) { + action.CreatedAt = time.Now() + } + c.addActionToStore(action.ID, action, ownerChannel) } + c.log.Info(fmt.Sprintf("added %d actions to local DB", len(actions))) // Read from owner channel until len(actions) times, then close and return. finished := 0 @@ -55,8 +61,7 @@ func (c *CastAITestServer) ExecuteActions(ctx context.Context, actions []castai. case <-ctx.Done(): c.log.Info(fmt.Sprintf("Received signal to stop finished with cause (%q) and err (%v). Closing executor.", context.Cause(ctx), ctx.Err())) return - case finishedAction := <-ownerChannel: - c.removeActionFromStore(finishedAction) + case <-ownerChannel: finished++ if finished == len(actions) { close(ownerChannel) @@ -69,44 +74,30 @@ func (c *CastAITestServer) ExecuteActions(ctx context.Context, actions []castai. /* Start Cluster-hub mock implementation */ func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai.ClusterAction, error) { - c.log.Info(fmt.Sprintf("GetActions called, have %d items in buffer", len(c.actionsPushChannel))) - actionsToReturn := make([]*castai.ClusterAction, 0) - - // Wait for at least one action to arrive from whoever is pushing them. - // If none arrive, we simulate the "empty poll" case of cluster-hub and return empty list. - select { - case x := <-c.actionsPushChannel: - actionsToReturn = append(actionsToReturn, &x) - case <-time.After(c.cfg.TimeoutWaitingForActions): - c.log.Info(fmt.Sprintf("No actions to return in %v", c.cfg.TimeoutWaitingForActions)) - return nil, nil - case <-ctx.Done(): - return nil, fmt.Errorf("context done with cause (%w), err (%w)", context.Cause(ctx), ctx.Err()) - } - - // Attempt to drain up to max items from the channel. - for len(actionsToReturn) < c.cfg.MaxActionsPerCall { - select { - case x := <-c.actionsPushChannel: - actionsToReturn = append(actionsToReturn, &x) - case <-time.After(50 * time.Millisecond): - c.log.Info(fmt.Sprintf("Returning %d actions for processing", len(actionsToReturn))) - // If we haven't received enough items, just flush. - return actionsToReturn, nil - case <-ctx.Done(): - return nil, fmt.Errorf("context done with cause (%w), err (%w)", context.Cause(ctx), ctx.Err()) - } + c.log.Info("GetActions called") + c.logMx.Lock() + actions := lo.MapToSlice(c.actions, func(_ string, value *castai.ClusterAction) *castai.ClusterAction { + return value + }) + c.logMx.Unlock() + + slices.SortStableFunc(actions, func(a, b *castai.ClusterAction) int { + return a.CreatedAt.Compare(b.CreatedAt) + }) + totalActionsInDB := len(actions) + if totalActionsInDB > c.cfg.MaxActionsPerCall { + actions = actions[:c.cfg.MaxActionsPerCall] } - c.log.Info(fmt.Sprintf("Returning %d actions for processing", len(actionsToReturn))) - return actionsToReturn, nil + c.log.Info(fmt.Sprintf("Returning %d actions for processing out of %d", len(actions), totalActionsInDB)) + return actions, nil } func (c *CastAITestServer) AckAction(ctx context.Context, actionID string, req *castai.AckClusterActionRequest) error { errMsg := lo.FromPtr(req.Error) c.log.DebugContext(ctx, fmt.Sprintf("action %q acknowledged; has error: %v; error: %v", actionID, req.Error != nil, errMsg)) - receiver := c.getActionReceiver(actionID) + receiver := c.removeActionFromStore(actionID) if receiver == nil { return fmt.Errorf("action %q does not have a receiver", actionID) } @@ -123,28 +114,26 @@ func (c *CastAITestServer) SendLog(ctx context.Context, e *castai.LogEntry) erro /* End Cluster-hub mock implementation */ -func (c *CastAITestServer) addActionToStore(actionID string, receiver chan string) { +func (c *CastAITestServer) addActionToStore(actionID string, action castai.ClusterAction, receiver chan string) { c.logMx.Lock() defer c.logMx.Unlock() c.actionsLog[actionID] = receiver + c.actions[actionID] = &action } -func (c *CastAITestServer) removeActionFromStore(actionID string) { - c.logMx.Lock() - defer c.logMx.Unlock() - - delete(c.actionsLog, actionID) -} - -func (c *CastAITestServer) getActionReceiver(actionID string) chan string { +func (c *CastAITestServer) removeActionFromStore(actionID string) chan string { c.logMx.Lock() defer c.logMx.Unlock() receiver, ok := c.actionsLog[actionID] if !ok { - c.log.Error(fmt.Sprintf("Receiver for action %s is no longer there, possibly shutting down", actionID)) - return nil + c.log.Error(fmt.Sprintf("Receiver for action %s is no longer there, possibly shutting down or CC got restarted", actionID)) + receiver = nil } + + delete(c.actionsLog, actionID) + delete(c.actions, actionID) + return receiver } diff --git a/loadtest/config.go b/loadtest/config.go index 61cfbb5b..4d0e4da6 100644 --- a/loadtest/config.go +++ b/loadtest/config.go @@ -22,6 +22,7 @@ type TestServerConfig struct { MaxActionsPerCall int // TimeoutWaitingForActions controls how long to wait for at least 1 action to appear on server side. // This mimics CH behavior of not returning early if there are no pending actions and keeping the request "running". + // Note: Currently not implemented TimeoutWaitingForActions time.Duration } diff --git a/loadtest/scenarios/check_node_deleted_stuck.go b/loadtest/scenarios/check_node_deleted_stuck.go index 5b50e075..3b752d64 100644 --- a/loadtest/scenarios/check_node_deleted_stuck.go +++ b/loadtest/scenarios/check_node_deleted_stuck.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "math" "sync" "time" @@ -19,16 +20,16 @@ import ( ) // CheckNodeDeletedStuck simulates a case where the node is not deleted so the checker gets stuck. -func CheckNodeDeletedStuck(nodeCount int, log *slog.Logger) TestScenario { +func CheckNodeDeletedStuck(actionCount int, log *slog.Logger) TestScenario { return &checkNodeDeletedStuckScenario{ - nodeCount: nodeCount, - log: log, + actionCount: actionCount, + log: log, } } type checkNodeDeletedStuckScenario struct { - nodeCount int - log *slog.Logger + actionCount int + log *slog.Logger nodes []*corev1.Node } @@ -38,12 +39,14 @@ func (s *checkNodeDeletedStuckScenario) Name() string { } func (s *checkNodeDeletedStuckScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { - s.nodes = make([]*corev1.Node, 0, s.nodeCount) + s.nodes = make([]*corev1.Node, 0, s.actionCount) var lock sync.Mutex errGroup, ctx := errgroup.WithContext(ctx) - for i := range s.nodeCount { + nodeCount := int(math.Ceil(float64(s.actionCount) / nodeTestsCountOptimizeFactor)) + + for i := range nodeCount { errGroup.Go(func() error { nodeName := fmt.Sprintf("kwok-check-deleted-%d", i) s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) @@ -102,10 +105,11 @@ func (s *checkNodeDeletedStuckScenario) Cleanup(ctx context.Context, namespace s func (s *checkNodeDeletedStuckScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error { s.log.Info(fmt.Sprintf("Starting check node deleted action with %d nodes", len(s.nodes))) - actions := make([]castai.ClusterAction, 0, len(s.nodes)) // Note: there is no code that should delete the node so each action should fail with timeout // -> this puts more load than "expected" to simulate such edge case. - for _, node := range s.nodes { + actions := make([]castai.ClusterAction, 0, s.actionCount) + for i := range s.actionCount { + node := s.nodes[i%len(s.nodes)] actions = append(actions, castai.ClusterAction{ ID: uuid.NewString(), CreatedAt: time.Now().UTC(), diff --git a/loadtest/scenarios/check_node_status.go b/loadtest/scenarios/check_node_status.go index 189eb6ee..ea3bc7cd 100644 --- a/loadtest/scenarios/check_node_status.go +++ b/loadtest/scenarios/check_node_status.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "math" "sync" "time" @@ -18,16 +19,16 @@ import ( "github.com/castai/cluster-controller/internal/castai" ) -func CheckNodeStatus(nodeCount int, log *slog.Logger) TestScenario { +func CheckNodeStatus(actionCount int, log *slog.Logger) TestScenario { return &checkNodeStatusScenario{ - nodeCount: nodeCount, - log: log, + actionCount: actionCount, + log: log, } } type checkNodeStatusScenario struct { - nodeCount int - log *slog.Logger + actionCount int + log *slog.Logger nodes []*corev1.Node } @@ -37,12 +38,14 @@ func (s *checkNodeStatusScenario) Name() string { } func (s *checkNodeStatusScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { - s.nodes = make([]*corev1.Node, 0, s.nodeCount) + s.nodes = make([]*corev1.Node, 0, s.actionCount) var lock sync.Mutex errGroup, ctx := errgroup.WithContext(ctx) - for i := range s.nodeCount { + nodeCount := int(math.Ceil(float64(s.actionCount) / nodeTestsCountOptimizeFactor)) + + for i := range nodeCount { errGroup.Go(func() error { nodeName := fmt.Sprintf("kwok-check-status-%d", i) s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) @@ -101,8 +104,9 @@ func (s *checkNodeStatusScenario) Cleanup(ctx context.Context, namespace string, func (s *checkNodeStatusScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error { s.log.Info(fmt.Sprintf("Starting check node status action with %d nodes", len(s.nodes))) - actions := make([]castai.ClusterAction, 0, len(s.nodes)) - for _, node := range s.nodes { + actions := make([]castai.ClusterAction, 0, s.actionCount) + for i := range s.actionCount { + node := s.nodes[i%len(s.nodes)] actions = append(actions, castai.ClusterAction{ ID: uuid.NewString(), CreatedAt: time.Now().UTC(), diff --git a/loadtest/scenarios/delete_node.go b/loadtest/scenarios/delete_node.go index 676e243a..99bea2dc 100644 --- a/loadtest/scenarios/delete_node.go +++ b/loadtest/scenarios/delete_node.go @@ -75,7 +75,7 @@ func (s *deleteNodeScenario) Preparation(ctx context.Context, namespace string, } // Wait for deployment to become ready, otherwise we might start draining before the pod is up. - progressed := WaitUntil(ctx, 60*time.Second, func(ctx context.Context) bool { + progressed := WaitUntil(ctx, 600*time.Second, func(ctx context.Context) bool { d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) if err != nil { s.log.Warn("failed to get deployment after creating", "err", err) diff --git a/loadtest/scenarios/drain_node.go b/loadtest/scenarios/drain_node.go index c9853784..a3dfe914 100644 --- a/loadtest/scenarios/drain_node.go +++ b/loadtest/scenarios/drain_node.go @@ -76,7 +76,7 @@ func (s *drainNodeScenario) Preparation(ctx context.Context, namespace string, c } // Wait for deployment to become ready, otherwise we might start draining before the pod is up. - progressed := WaitUntil(ctx, 60*time.Second, func(ctx context.Context) bool { + progressed := WaitUntil(ctx, 600*time.Second, func(ctx context.Context) bool { d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) if err != nil { s.log.Warn("failed to get deployment after creating", "err", err) diff --git a/loadtest/scenarios/evict_pod.go b/loadtest/scenarios/evict_pod.go index 394657c5..948bff0f 100644 --- a/loadtest/scenarios/evict_pod.go +++ b/loadtest/scenarios/evict_pod.go @@ -35,7 +35,18 @@ func (e *evictPodScenario) Name() string { } func (e *evictPodScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { - // Create N pods; store in state + // create a kwok node for the pods + nodeName := fmt.Sprintf("kwok-evict-pods-%s", namespace) + node := NewKwokNode(KwokConfig{}, nodeName) + + _, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create fake node: %w", err) + } + if err != nil && apierrors.IsAlreadyExists(err) { + e.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) + } + for i := range e.totalPods { select { case <-ctx.Done(): @@ -45,6 +56,7 @@ func (e *evictPodScenario) Preparation(ctx context.Context, namespace string, cl pod := Pod(fmt.Sprintf("evict-pod-%d", i)) pod.ObjectMeta.Namespace = namespace + pod.Spec.NodeName = nodeName e.log.Info(fmt.Sprintf("Creating pod %s", pod.Name)) _, err := clientset.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) diff --git a/loadtest/scenarios/patch_node.go b/loadtest/scenarios/patch_node.go index 219ce78f..b355804c 100644 --- a/loadtest/scenarios/patch_node.go +++ b/loadtest/scenarios/patch_node.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "math" "sync" "time" @@ -19,16 +20,16 @@ import ( "github.com/castai/cluster-controller/internal/castai" ) -func PatchNode(nodeCount int, log *slog.Logger) TestScenario { +func PatchNode(actionCount int, log *slog.Logger) TestScenario { return &patchNodeScenario{ - nodeCount: nodeCount, - log: log, + actionCount: actionCount, + log: log, } } type patchNodeScenario struct { - nodeCount int - log *slog.Logger + actionCount int + log *slog.Logger nodesToPatch []*corev1.Node } @@ -38,12 +39,14 @@ func (s *patchNodeScenario) Name() string { } func (s *patchNodeScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { - s.nodesToPatch = make([]*corev1.Node, 0, s.nodeCount) + s.nodesToPatch = make([]*corev1.Node, 0, s.actionCount) var lock sync.Mutex errGroup, ctx := errgroup.WithContext(ctx) - for i := range s.nodeCount { + nodeCount := int(math.Ceil(float64(s.actionCount) / nodeTestsCountOptimizeFactor)) + + for i := range nodeCount { errGroup.Go(func() error { nodeName := fmt.Sprintf("kwok-patch-%d", i) s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) @@ -100,10 +103,11 @@ func (s *patchNodeScenario) Cleanup(ctx context.Context, namespace string, clien } func (s *patchNodeScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error { - s.log.Info(fmt.Sprintf("Starting patch node action creation with %d nodes", len(s.nodesToPatch))) + s.log.Info(fmt.Sprintf("Starting patch node action creation with %d nodes and %d actions", len(s.nodesToPatch), s.actionCount)) - actions := make([]castai.ClusterAction, 0, len(s.nodesToPatch)) - for _, node := range s.nodesToPatch { + actions := make([]castai.ClusterAction, 0, s.actionCount) + for i := range s.actionCount { + node := s.nodesToPatch[i%len(s.nodesToPatch)] actions = append(actions, castai.ClusterAction{ ID: uuid.NewString(), CreatedAt: time.Now().UTC(), diff --git a/loadtest/scenarios/stuck_drain.go b/loadtest/scenarios/stuck_drain.go index b15e501f..ea53f80e 100644 --- a/loadtest/scenarios/stuck_drain.go +++ b/loadtest/scenarios/stuck_drain.go @@ -5,10 +5,13 @@ import ( "errors" "fmt" "log/slog" + "math" + "sync" "time" "github.com/google/uuid" "github.com/samber/lo" + "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -17,16 +20,18 @@ import ( "github.com/castai/cluster-controller/internal/castai" ) -func StuckDrain(nodeCount, deploymentReplicas int, log *slog.Logger) TestScenario { +// StuckDrain tests a scenario where DrainNode gets stuck due to PDB and has to put continuous load on the system. +// Note: It reuses nodes to make setup for high action count easier. +func StuckDrain(actionCount, deploymentReplicas int, log *slog.Logger) TestScenario { return &stuckDrainScenario{ - nodeCount: nodeCount, + actionCount: actionCount, deploymentReplicas: deploymentReplicas, log: log, } } type stuckDrainScenario struct { - nodeCount int + actionCount int deploymentReplicas int log *slog.Logger @@ -38,54 +43,68 @@ func (s *stuckDrainScenario) Name() string { } func (s *stuckDrainScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { - s.nodesToDrain = make([]*corev1.Node, 0, s.nodeCount) - for i := range s.nodeCount { - nodeName := fmt.Sprintf("kwok-stuck-drain-%d", i) - s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) - node := NewKwokNode(KwokConfig{}, nodeName) - - _, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) - if err != nil && !apierrors.IsAlreadyExists(err) { - return fmt.Errorf("failed to create fake node: %w", err) - } - if err != nil && apierrors.IsAlreadyExists(err) { - s.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) - } - s.nodesToDrain = append(s.nodesToDrain, node) - - s.log.Info(fmt.Sprintf("Creating deployment on node %s", nodeName)) - deployment, pdb := DeploymentWithStuckPDB(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) - deployment.ObjectMeta.Namespace = namespace - //nolint:gosec // Not afraid of overflow here. - deployment.Spec.Replicas = lo.ToPtr(int32(s.deploymentReplicas)) - deployment.Spec.Template.Spec.NodeName = nodeName - pdb.ObjectMeta.Namespace = namespace - - _, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("failed to create fake deployment: %w", err) - } + s.nodesToDrain = make([]*corev1.Node, 0, s.actionCount) - _, err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Create(ctx, pdb, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("failed to create fake pod disruption budget: %w", err) - } + var lock sync.Mutex + errGroup, ctx := errgroup.WithContext(ctx) + + // We create 1/10 of the nodes only to optimize setup performance. + // Since the Drain will be stuck; nothing will change on the nodes, and we can just reuse the same node. + nodeCount := int(math.Ceil(float64(s.actionCount) / nodeTestsCountOptimizeFactor)) + + for i := range nodeCount { + errGroup.Go(func() error { + nodeName := fmt.Sprintf("kwok-stuck-drain-%d", i) + s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) + node := NewKwokNode(KwokConfig{}, nodeName) + + _, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create fake node: %w", err) + } + if err != nil && apierrors.IsAlreadyExists(err) { + s.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) + } + lock.Lock() + s.nodesToDrain = append(s.nodesToDrain, node) + lock.Unlock() + + s.log.Info(fmt.Sprintf("Creating deployment on node %s", nodeName)) + deployment, pdb := DeploymentWithStuckPDB(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) + deployment.ObjectMeta.Namespace = namespace + //nolint:gosec // Not afraid of overflow here. + deployment.Spec.Replicas = lo.ToPtr(int32(s.deploymentReplicas)) + deployment.Spec.Template.Spec.NodeName = nodeName + pdb.ObjectMeta.Namespace = namespace + + _, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create fake deployment: %w", err) + } - // Wait for deployment to become ready, otherwise we might start draining before the pod is up. - progressed := WaitUntil(ctx, 30*time.Second, func(ctx context.Context) bool { - d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) + _, err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Create(ctx, pdb, metav1.CreateOptions{}) if err != nil { - s.log.Warn("failed to get deployment after creating", "err", err) - return false + return fmt.Errorf("failed to create fake pod disruption budget: %w", err) + } + + // Wait for deployment to become ready, otherwise we might start draining before the pod is up. + progressed := WaitUntil(ctx, 300*time.Second, func(ctx context.Context) bool { + d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) + if err != nil { + s.log.Warn("failed to get deployment after creating", "err", err) + return false + } + return d.Status.ReadyReplicas == *d.Spec.Replicas + }) + if !progressed { + return fmt.Errorf("deployment %s did not progress to ready state in time", deployment.Name) } - return d.Status.ReadyReplicas == *d.Spec.Replicas + + return nil }) - if !progressed { - return fmt.Errorf("deployment %s did not progress to ready state in time", deployment.Name) - } } - return nil + return errGroup.Wait() } func (s *stuckDrainScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { @@ -137,15 +156,16 @@ func (s *stuckDrainScenario) Cleanup(ctx context.Context, namespace string, clie func (s *stuckDrainScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error { s.log.Info(fmt.Sprintf("Starting drain action creation with %d nodes", len(s.nodesToDrain))) - actions := make([]castai.ClusterAction, 0, len(s.nodesToDrain)) - for _, node := range s.nodesToDrain { + actions := make([]castai.ClusterAction, 0, s.actionCount) + for i := range s.actionCount { + node := s.nodesToDrain[i%len(s.nodesToDrain)] actions = append(actions, castai.ClusterAction{ ID: uuid.NewString(), CreatedAt: time.Now().UTC(), ActionDrainNode: &castai.ActionDrainNode{ NodeName: node.Name, NodeID: "", - DrainTimeoutSeconds: 60, + DrainTimeoutSeconds: 65, Force: false, }, }) diff --git a/loadtest/scenarios/util.go b/loadtest/scenarios/util.go index 548f8fce..b586eab2 100644 --- a/loadtest/scenarios/util.go +++ b/loadtest/scenarios/util.go @@ -5,6 +5,11 @@ import ( "time" ) +const ( + // nodeTestsCountOptimizeFactor controls the ratio of nodes to actions for load tests where node count can be < action count for optimization. + nodeTestsCountOptimizeFactor = 10 +) + func WaitUntil(ctx context.Context, duration time.Duration, condition func(ctx context.Context) bool) bool { start := time.Now() for {