getClusters()
+ {
+ return clusters;
+ }
+
+ public MultipleKubernetesTaskRunner.KubernetesClusterSelector getClusterSelector()
+ {
+ return clusterSelector;
+ }
+
+ @VisibleForTesting
+ static MultipleKubernetesTaskRunnerConfig fromProperties(Properties properties)
+ {
+ return fromProperties(new ObjectMapper().configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false)
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false),
+ properties);
+ }
+
+ /**
+ * Using JavaPropsMapper to parse properties so that we can support properties config formats.
+ * property config format is much simpler for array/map config.
+ *
+ * Special handling for labels and annotations properties:
+ *
+ * Option 1 - JSON format (recommended for complex keys with dots):
+ * - If the value starts with '{', it's treated as JSON and parsed directly
+ * - This allows complex map keys containing dots and special characters
+ * - Example: druid.indexer.runner.labels={"lxcfs-admission-webhook.k8s.io/no-mutating":""}
+ * - Can be set via environment variable: druid_indexer_runner_labels='{"key":"value"}'
+ *
+ * Option 2 - Dot notation (for simple keys without dots):
+ * - Use standard dot notation for simple keys
+ * - Example: druid.indexer.runner.labels.simpleKey=simpleValue
+ *
+ * Arrays continue to work with JavaPropsMapper's bracket notation:
+ * - Example: druid.indexer.runner.clusters[0].taskNamespace=namespace1
+ */
+ public static MultipleKubernetesTaskRunnerConfig fromProperties(ObjectMapper jsonMapper, Properties properties)
+ {
+ String prefix = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".";
+
+ JsonNode labelNode = null;
+ JsonNode annotationNode = null;
+
+ Properties props = new Properties();
+ for (String prop : properties.stringPropertyNames()) {
+ if (prop.startsWith(prefix)) {
+ String key = prop.substring(prefix.length());
+ String value = properties.getProperty(prop);
+
+ // Special handling for labels: if value starts with '{', treat as JSON
+ if ("labels".equals(key) && value.trim().startsWith("{")) {
+ try {
+ labelNode = jsonMapper.readTree(value);
+ // Don't add to props - we'll handle this in the tree
+ continue;
+ }
+ catch (IOException ignored) {
+ // If JSON parsing fails, fall back to treating it as a regular property
+ }
+ }
+
+ // Special handling for annotations: if value starts with '{', treat as JSON
+ if ("annotations".equals(key) && value.trim().startsWith("{")) {
+ try {
+ annotationNode = jsonMapper.readTree(value);
+ // Don't add to props - we'll handle this in the tree
+ continue;
+ }
+ catch (IOException ignored) {
+ // If JSON parsing fails, fall back to treating it as a regular property
+ }
+ }
+
+ props.setProperty(key, value);
+ }
+ }
+
+ JavaPropsMapper propsMapper = (JavaPropsMapper) new JavaPropsMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
+
+ try {
+ // Step 1: Parse properties to JsonNode (tree model)
+ JsonNode tree = propsMapper.readPropertiesAs(props, JsonNode.class);
+
+ // Step 2: If we have JSON labels or annotations, merge them into the tree
+ if (labelNode != null) {
+ mergeJsonNode((ObjectNode) tree, "labels", (ObjectNode) labelNode);
+ }
+
+ if (annotationNode != null) {
+ mergeJsonNode((ObjectNode) tree, "annotations", (ObjectNode) annotationNode);
+ }
+
+ // Step 3: Deserialize the tree to final config object
+ // Reuse propsMapper since it already has all necessary modules (e.g., Joda Time) registered
+ return jsonMapper.treeToValue(tree, MultipleKubernetesTaskRunnerConfig.class);
+ }
+ catch (IOException e) {
+ throw new RE(e);
+ }
+ }
+
+ /**
+ * Merge JSON object node into the tree at the specified field.
+ * If the field already exists in the tree, merge the JSON values into it (JSON takes precedence).
+ * If the field doesn't exist, just set it to the JSON node.
+ */
+ private static void mergeJsonNode(ObjectNode tree, String fieldName, ObjectNode jsonNode)
+ {
+ if (tree.has(fieldName) && tree.get(fieldName).isObject()) {
+ // Merge with existing field from dot notation
+ ObjectNode existing = (ObjectNode) tree.get(fieldName);
+
+ // JSON values take precedence
+ Iterator fieldNames = jsonNode.fieldNames();
+ while (fieldNames.hasNext()) {
+ String name = fieldNames.next();
+ existing.set(name, jsonNode.get(name));
+ }
+ } else {
+ // No existing field or not an object, just use JSON
+ tree.set(fieldName, jsonNode);
+ }
+ }
+}
diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerDelegate.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerDelegate.java
new file mode 100644
index 000000000000..3f9a0d43e34f
--- /dev/null
+++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerDelegate.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient;
+import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+
+public class MultipleKubernetesTaskRunnerDelegate implements Closeable
+{
+ private static final Logger log = new Logger(MultipleKubernetesTaskRunnerDelegate.class);
+
+ private final KubernetesTaskRunner runner;
+ private final String k8sCluster;
+ private final boolean disabled;
+ private final DruidKubernetesClient kubernetesClient;
+ private final DruidKubernetesCachingClient cachingClient;
+
+ @VisibleForTesting
+ MultipleKubernetesTaskRunnerDelegate(KubernetesTaskRunner runner)
+ {
+ this(runner, null, false, null, null);
+ }
+
+ public MultipleKubernetesTaskRunnerDelegate(KubernetesTaskRunner runner,
+ String k8sCluster,
+ boolean disabled,
+ @Nullable DruidKubernetesClient kubernetesClient)
+ {
+ this(runner, k8sCluster, disabled, kubernetesClient, null);
+ }
+
+ public MultipleKubernetesTaskRunnerDelegate(KubernetesTaskRunner runner,
+ String k8sCluster,
+ boolean disabled,
+ @Nullable DruidKubernetesClient kubernetesClient,
+ @Nullable DruidKubernetesCachingClient cachingClient)
+ {
+ this.k8sCluster = k8sCluster;
+ this.runner = runner;
+ this.disabled = disabled;
+ this.kubernetesClient = kubernetesClient;
+ this.cachingClient = cachingClient;
+ }
+
+ public KubernetesTaskRunner getRunner()
+ {
+ return runner;
+ }
+
+ public boolean isDisabled()
+ {
+ return disabled;
+ }
+
+ public String getK8sCluster()
+ {
+ return k8sCluster;
+ }
+
+ @VisibleForTesting
+ DruidKubernetesClient getKubernetesClient()
+ {
+ return kubernetesClient;
+ }
+
+ @Override
+ public void close()
+ {
+ runner.stop();
+
+ if (cachingClient != null) {
+ try {
+ log.info("Stopping Kubernetes caching client for cluster[%s]", k8sCluster);
+ cachingClient.stop();
+ }
+ catch (Exception e) {
+ log.warn(e, "Error while stopping Kubernetes caching client for cluster[%s]", k8sCluster);
+ }
+ }
+
+ // Close the associated Kubernetes client if present
+ if (kubernetesClient != null) {
+ try {
+ log.info("Stopping Kubernetes client for cluster[%s]", k8sCluster);
+ kubernetesClient.getClient().close();
+ }
+ catch (Exception e) {
+ log.warn(e, "Error while closing Kubernetes client for cluster[%s]", k8sCluster);
+ }
+ }
+ }
+}
diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerFactory.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerFactory.java
new file mode 100644
index 000000000000..46a9008a9868
--- /dev/null
+++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerFactory.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.k8s.overlord;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
+import com.google.inject.Inject;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import org.apache.druid.common.config.ConfigManager;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.TaskRunnerFactory;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.k8s.overlord.common.CachingKubernetesPeonClient;
+import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient;
+import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
+import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import org.apache.druid.k8s.overlord.common.httpclient.DruidKubernetesHttpClientFactory;
+import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
+import org.apache.druid.k8s.overlord.taskadapter.DynamicConfigPodTemplateSelector;
+import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
+import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
+import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
+import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class MultipleKubernetesTaskRunnerFactory implements TaskRunnerFactory
+{
+ public static final String TYPE_NAME = "multik8s";
+ private final ObjectMapper smileMapper;
+ private final HttpClient httpClient;
+ private final TaskLogs taskLogs;
+ private final ServiceEmitter emitter;
+ private final Supplier dynamicConfigSupplier;
+ private final ConfigManager configManager;
+ private final MultipleKubernetesTaskRunnerConfig runnerConfig;
+ private final TaskConfig taskConfig;
+ private final StartupLoggingConfig startupLoggingConfig;
+ private final DruidNode druidNode;
+ private final Properties properties;
+ private final DruidKubernetesHttpClientFactory httpClientFactory;
+ private TaskRunner runner;
+
+ @Inject
+ public MultipleKubernetesTaskRunnerFactory(
+ @Json ObjectMapper objectMapper,
+ @Smile ObjectMapper smileMapper,
+ @EscalatedGlobal final HttpClient httpClient,
+ TaskLogs taskLogs,
+ Properties properties,
+ ServiceEmitter emitter,
+ Supplier dynamicConfigSupplier,
+ @Nullable ConfigManager configManager,
+ TaskConfig taskConfig,
+ StartupLoggingConfig startupLoggingConfig,
+ @Self DruidNode druidNode,
+ DruidKubernetesHttpClientFactory httpClientFactory
+ )
+ {
+ this.runnerConfig = MultipleKubernetesTaskRunnerConfig.fromProperties(objectMapper, properties);
+
+ this.smileMapper = smileMapper;
+ this.httpClient = httpClient;
+ this.taskLogs = taskLogs;
+ this.emitter = emitter;
+ this.dynamicConfigSupplier = dynamicConfigSupplier;
+ this.configManager = configManager;
+ this.taskConfig = taskConfig;
+ this.startupLoggingConfig = startupLoggingConfig;
+ this.druidNode = druidNode;
+ this.properties = properties;
+ this.httpClientFactory = httpClientFactory;
+ }
+
+ @Override
+ public TaskRunner build()
+ {
+ final List enabledClusters = this.runnerConfig.getClusters()
+ .stream()
+ .filter(cluster -> !cluster.isDisabled())
+ .collect(Collectors.toList());
+
+ if (enabledClusters.isEmpty()) {
+ throw new IllegalArgumentException("At least one task runner must be enabled");
+ }
+
+ final String adapter = getAdapterType();
+ final boolean requiresOverlordPodSource = requiresOverlordPodSource(adapter);
+ final String overlordPodSourceNamespace = requiresOverlordPodSource
+ ? getOverlordPodSourceNamespace(enabledClusters)
+ : null;
+ DruidKubernetesClient overlordPodSourceClient = null;
+ AutoscalableThreadPoolExecutor sharedExecutor = null;
+ final List taskRunners = new ArrayList<>();
+
+ try {
+ overlordPodSourceClient = requiresOverlordPodSource
+ ? createOverlordPodSourceClient(overlordPodSourceNamespace)
+ : null;
+ final int totalCapacity = new KubernetesTaskRunnerEffectiveConfig(this.runnerConfig, this.dynamicConfigSupplier)
+ .getCapacity();
+ sharedExecutor = new AutoscalableThreadPoolExecutor(totalCapacity, this.configManager);
+
+ for (MultipleKubernetesTaskRunnerConfig.KubernetesCluster kubernetesCluster : this.runnerConfig.getClusters()) {
+ DruidKubernetesClient client = null;
+ DruidKubernetesCachingClient cachingClient = null;
+ boolean delegateCreated = false;
+
+ try {
+ final KubernetesTaskRunnerStaticConfig clusterConfig = getPerClusterConfiguration(kubernetesCluster);
+ final KubernetesTaskRunnerEffectiveConfig effectiveConfig = new KubernetesTaskRunnerEffectiveConfig(
+ clusterConfig,
+ this.dynamicConfigSupplier
+ );
+
+ client = createClientForCluster(kubernetesCluster, clusterConfig);
+ final TaskAdapter clusterTaskAdapter = buildTaskAdapter(
+ adapter,
+ client,
+ overlordPodSourceClient,
+ overlordPodSourceNamespace,
+ effectiveConfig
+ );
+ final boolean useOverlordNamespace = PodTemplateTaskAdapter.TYPE.equals(clusterTaskAdapter.getAdapterType());
+ cachingClient = effectiveConfig.isUseK8sSharedInformers()
+ ? createCachingClient(client, effectiveConfig)
+ : null;
+
+ final KubernetesPeonClient peonClient = createPeonClient(
+ client,
+ cachingClient,
+ effectiveConfig,
+ useOverlordNamespace
+ );
+
+ final KubernetesTaskRunner clusterRunner = new KubernetesTaskRunner(
+ clusterTaskAdapter,
+ effectiveConfig,
+ peonClient,
+ httpClient,
+ new KubernetesPeonLifecycleFactory(
+ peonClient,
+ taskLogs,
+ smileMapper,
+ effectiveConfig.getLogSaveTimeout().toStandardDuration().getMillis()
+ ),
+ emitter,
+ sharedExecutor,
+ configManager
+ );
+
+ taskRunners.add(
+ new MultipleKubernetesTaskRunnerDelegate(
+ clusterRunner,
+ kubernetesCluster.getName(),
+ kubernetesCluster.isDisabled(),
+ client,
+ cachingClient
+ )
+ );
+ delegateCreated = true;
+ }
+ catch (RuntimeException e) {
+ if (!delegateCreated) {
+ closeCachingClient(e, cachingClient);
+ closeKubernetesClient(e, client);
+ }
+ throw e;
+ }
+ }
+
+ this.runner = new MultipleKubernetesTaskRunner(
+ new KubernetesTaskRunnerEffectiveConfig(
+ this.runnerConfig,
+ this.dynamicConfigSupplier
+ ),
+ runnerConfig.getClusterSelector(),
+ taskRunners,
+ sharedExecutor,
+ overlordPodSourceClient
+ );
+ return this.runner;
+ }
+ catch (RuntimeException e) {
+ for (MultipleKubernetesTaskRunnerDelegate taskRunner : taskRunners) {
+ try {
+ taskRunner.close();
+ }
+ catch (Exception closeException) {
+ e.addSuppressed(closeException);
+ }
+ }
+ if (sharedExecutor != null) {
+ sharedExecutor.shutdownNow();
+ }
+ closeKubernetesClient(e, overlordPodSourceClient);
+ throw e;
+ }
+ }
+
+ private void closeCachingClient(RuntimeException e, @Nullable DruidKubernetesCachingClient cachingClient)
+ {
+ if (cachingClient == null) {
+ return;
+ }
+
+ try {
+ cachingClient.stop();
+ }
+ catch (Exception closeException) {
+ e.addSuppressed(closeException);
+ }
+ }
+
+ private void closeKubernetesClient(RuntimeException e, @Nullable DruidKubernetesClient client)
+ {
+ if (client == null) {
+ return;
+ }
+
+ try {
+ client.getClient().close();
+ }
+ catch (Exception closeException) {
+ e.addSuppressed(closeException);
+ }
+ }
+
+ @Override
+ public TaskRunner get()
+ {
+ return runner;
+ }
+
+ private DruidKubernetesClient createClientForCluster(
+ MultipleKubernetesTaskRunnerConfig.KubernetesCluster cluster,
+ KubernetesTaskRunnerStaticConfig clusterConfig
+ )
+ {
+ Config config;
+ if (cluster.getKubeconfigPath() != null && !cluster.getKubeconfigPath().trim().isEmpty()) {
+ config = Config.fromKubeconfig(new File(cluster.getKubeconfigPath()));
+ } else {
+ config = new ConfigBuilder().build();
+ }
+
+ if (clusterConfig.isDisableClientProxy()) {
+ config.setHttpsProxy(null);
+ config.setHttpProxy(null);
+ }
+
+ config.setNamespace(clusterConfig.getNamespace());
+
+ return new DruidKubernetesClient(httpClientFactory, config);
+ }
+
+ protected DruidKubernetesClient createOverlordPodSourceClient(String overlordPodSourceNamespace)
+ {
+ final Config config = new ConfigBuilder().build();
+ if (runnerConfig.isDisableClientProxy()) {
+ config.setHttpsProxy(null);
+ config.setHttpProxy(null);
+ }
+ config.setNamespace(overlordPodSourceNamespace);
+ return new DruidKubernetesClient(httpClientFactory, config);
+ }
+
+ protected DruidKubernetesCachingClient createCachingClient(
+ DruidKubernetesClient client,
+ KubernetesTaskRunnerEffectiveConfig effectiveConfig
+ )
+ {
+ return new DruidKubernetesCachingClient(
+ client,
+ effectiveConfig.getNamespace(),
+ effectiveConfig.getK8sSharedInformerResyncPeriod().toStandardDuration().getMillis()
+ );
+ }
+
+ private KubernetesPeonClient createPeonClient(
+ DruidKubernetesClient client,
+ @Nullable DruidKubernetesCachingClient cachingClient,
+ KubernetesTaskRunnerEffectiveConfig effectiveConfig,
+ boolean useOverlordNamespace
+ )
+ {
+ if (cachingClient != null) {
+ return new CachingKubernetesPeonClient(
+ cachingClient,
+ effectiveConfig.getNamespace(),
+ useOverlordNamespace ? effectiveConfig.getOverlordNamespace() : "",
+ effectiveConfig.isDebugJobs(),
+ emitter
+ );
+ }
+
+ return new KubernetesPeonClient(
+ client,
+ effectiveConfig.getNamespace(),
+ useOverlordNamespace ? effectiveConfig.getOverlordNamespace() : "",
+ effectiveConfig.isDebugJobs(),
+ emitter
+ );
+ }
+
+ private TaskAdapter buildTaskAdapter(
+ @Nullable String adapter,
+ DruidKubernetesClient client,
+ @Nullable DruidKubernetesClient overlordPodSourceClient,
+ @Nullable String overlordPodSourceNamespace,
+ KubernetesTaskRunnerEffectiveConfig effectiveConfig
+ )
+ {
+ if (adapter != null
+ && !MultiContainerTaskAdapter.TYPE.equals(adapter)
+ && effectiveConfig.isSidecarSupport()) {
+ throw new IAE(
+ "Invalid pod adapter [%s], only pod adapter [%s] can be specified when sidecarSupport is enabled",
+ adapter,
+ MultiContainerTaskAdapter.TYPE
+ );
+ }
+
+ if (MultiContainerTaskAdapter.TYPE.equals(adapter) || effectiveConfig.isSidecarSupport()) {
+ return new MultiContainerTaskAdapter(
+ overlordPodSourceClient,
+ overlordPodSourceNamespace,
+ client,
+ effectiveConfig,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ smileMapper,
+ taskLogs
+ );
+ } else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
+ return new PodTemplateTaskAdapter(
+ effectiveConfig,
+ taskConfig,
+ druidNode,
+ smileMapper,
+ taskLogs,
+ new DynamicConfigPodTemplateSelector(properties, effectiveConfig)
+ );
+ } else {
+ return new SingleContainerTaskAdapter(
+ overlordPodSourceClient,
+ overlordPodSourceNamespace,
+ client,
+ effectiveConfig,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ smileMapper,
+ taskLogs
+ );
+ }
+ }
+
+ private String getAdapterType()
+ {
+ return properties.getProperty(String.format(
+ Locale.ROOT,
+ "%s.%s.adapter.type",
+ IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX,
+ KubernetesTaskRunnerFactory.TYPE_NAME
+ ));
+ }
+
+ private boolean requiresOverlordPodSource(@Nullable String adapter)
+ {
+ return !PodTemplateTaskAdapter.TYPE.equals(adapter);
+ }
+
+ private String getOverlordPodSourceNamespace(
+ List enabledClusters
+ )
+ {
+ final String overlordNamespace = runnerConfig.getOverlordNamespace();
+ if (overlordNamespace != null && !overlordNamespace.trim().isEmpty()) {
+ return overlordNamespace;
+ }
+
+ final String namespace = runnerConfig.getNamespace();
+ if (namespace != null && !namespace.trim().isEmpty()) {
+ return namespace;
+ }
+
+ if (enabledClusters.size() == 1) {
+ final MultipleKubernetesTaskRunnerConfig.KubernetesCluster cluster = enabledClusters.get(0);
+ if (cluster.getKubeconfigPath() == null || cluster.getKubeconfigPath().trim().isEmpty()) {
+ return cluster.getTaskNamespace();
+ }
+ }
+
+ throw new IAE(
+ "Pod adapter [%s] requires the local Overlord pod namespace for multik8s. Set either "
+ + "[druid.indexer.runner.overlordNamespace] or [druid.indexer.runner.namespace], or set "
+ + "[druid.indexer.runner.k8s.adapter.type=%s].",
+ getAdapterType() == null ? SingleContainerTaskAdapter.TYPE : getAdapterType(),
+ PodTemplateTaskAdapter.TYPE
+ );
+ }
+
+ private KubernetesTaskRunnerStaticConfig getPerClusterConfiguration(
+ MultipleKubernetesTaskRunnerConfig.KubernetesCluster cluster
+ )
+ {
+ return new KubernetesTaskRunnerStaticConfig(
+ cluster.getTaskNamespace(),
+ cluster.getOverlordIdentifier(),
+ this.runnerConfig.getK8sTaskPodNamePrefix(),
+ this.runnerConfig.isDebugJobs(),
+ this.runnerConfig.isSidecarSupport(),
+ this.runnerConfig.getPrimaryContainerName(),
+ this.runnerConfig.getKubexitImage(),
+ this.runnerConfig.getGraceTerminationPeriodSeconds(),
+ this.runnerConfig.isDisableClientProxy(),
+ this.runnerConfig.getTaskTimeout(),
+ this.runnerConfig.getTaskCleanupDelay(),
+ this.runnerConfig.getTaskCleanupInterval(),
+ this.runnerConfig.getTaskLaunchTimeout(),
+ this.runnerConfig.getLogSaveTimeout(),
+ this.runnerConfig.getPeonMonitors(),
+ this.runnerConfig.getJavaOptsArray(),
+ this.runnerConfig.getCpuCoreInMicro(),
+ this.runnerConfig.getLabels(),
+ this.runnerConfig.getAnnotations(),
+ this.runnerConfig.getCapacity(),
+ this.runnerConfig.getTaskJoinTimeout(),
+ this.runnerConfig.isUseK8sSharedInformers(),
+ this.runnerConfig.getK8sSharedInformerResyncPeriod(),
+ this.runnerConfig.isAllowTaskPodTemplateSelection()
+ );
+ }
+}
diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
index 9d6b6d6ee7e8..9bc1c172f110 100644
--- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
+++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
@@ -45,6 +45,7 @@ public DefaultKubernetesTaskRunnerDynamicConfig(
@Override
@JsonProperty
+ @Nullable
public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
{
return podTemplateSelectStrategy;
@@ -52,6 +53,7 @@ public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
@Override
@JsonProperty
+ @Nullable
public Integer getCapacity()
{
return capacity;
diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
index 6f7d36dbe74b..cef4588e08ac 100644
--- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
+++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
@@ -90,6 +90,8 @@ public abstract class K8sTaskAdapter implements TaskAdapter
private static final EmittingLogger log = new EmittingLogger(K8sTaskAdapter.class);
protected final KubernetesClientApi client;
+ protected final KubernetesClientApi podSourceClient;
+ protected final String podSourceNamespace;
protected final KubernetesTaskRunnerConfig taskRunnerConfig;
protected final TaskConfig taskConfig;
protected final StartupLoggingConfig startupLoggingConfig;
@@ -106,8 +108,25 @@ public K8sTaskAdapter(
ObjectMapper mapper,
TaskLogs taskLogs
)
+ {
+ this(client, taskRunnerConfig.getNamespace(), client, taskRunnerConfig, taskConfig, startupLoggingConfig, node, mapper, taskLogs);
+ }
+
+ public K8sTaskAdapter(
+ KubernetesClientApi podSourceClient,
+ String podSourceNamespace,
+ KubernetesClientApi client,
+ KubernetesTaskRunnerConfig taskRunnerConfig,
+ TaskConfig taskConfig,
+ StartupLoggingConfig startupLoggingConfig,
+ DruidNode node,
+ ObjectMapper mapper,
+ TaskLogs taskLogs
+ )
{
this.client = client;
+ this.podSourceClient = podSourceClient;
+ this.podSourceNamespace = podSourceNamespace;
this.taskRunnerConfig = taskRunnerConfig;
this.taskConfig = taskConfig;
this.startupLoggingConfig = startupLoggingConfig;
@@ -119,8 +138,20 @@ public K8sTaskAdapter(
@Override
public Job fromTask(Task task) throws IOException
{
- String myPodName = System.getenv("HOSTNAME");
- Pod pod = client.executeRequest(client -> client.pods().inNamespace(taskRunnerConfig.getNamespace()).withName(myPodName).get());
+ String myPodName = getCurrentPodName();
+ Pod pod = podSourceClient.executeRequest(
+ client -> client.pods().inNamespace(podSourceNamespace).withName(myPodName).get()
+ );
+ if (pod == null) {
+ throw InternalServerError.exception(
+ "Could not load Overlord pod [%s] from namespace [%s] while creating Kubernetes job for task [%s]."
+ + " Check that the Overlord Kubernetes client can get its own pod, or use pod adapter [%s].",
+ myPodName,
+ podSourceNamespace,
+ task.getId(),
+ PodTemplateTaskAdapter.TYPE
+ );
+ }
PeonCommandContext context = new PeonCommandContext(
generateCommand(task),
javaOpts(task),
@@ -133,6 +164,24 @@ public Job fromTask(Task task) throws IOException
return createJobFromPodSpec(podSpec, task, context);
}
+ @VisibleForTesting
+ protected String getCurrentPodName()
+ {
+ return System.getenv("HOSTNAME");
+ }
+
+ @VisibleForTesting
+ public KubernetesClientApi getPodSourceClient()
+ {
+ return podSourceClient;
+ }
+
+ @VisibleForTesting
+ public String getPodSourceNamespace()
+ {
+ return podSourceNamespace;
+ }
+
@Override
public Task toTask(Job from) throws IOException
{
@@ -499,4 +548,3 @@ public boolean shouldUseDeepStorageForTaskPayload(Task task) throws IOException
return compressedTaskPayload.length() > DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
}
}
-
diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
index 401e7a3fab2c..e56b153dcc8e 100644
--- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
+++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
@@ -67,6 +67,31 @@ public MultiContainerTaskAdapter(
super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper, taskLogs);
}
+ public MultiContainerTaskAdapter(
+ KubernetesClientApi podSourceClient,
+ String podSourceNamespace,
+ KubernetesClientApi client,
+ KubernetesTaskRunnerConfig taskRunnerConfig,
+ TaskConfig taskConfig,
+ StartupLoggingConfig startupLoggingConfig,
+ DruidNode druidNode,
+ ObjectMapper mapper,
+ TaskLogs taskLogs
+ )
+ {
+ super(
+ podSourceClient,
+ podSourceNamespace,
+ client,
+ taskRunnerConfig,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ mapper,
+ taskLogs
+ );
+ }
+
@Override
public String getAdapterType()
{
diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
index 8d23d22d864f..4f6dffa6c7b7 100644
--- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
+++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
@@ -55,6 +55,31 @@ public SingleContainerTaskAdapter(
super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper, taskLogs);
}
+ public SingleContainerTaskAdapter(
+ KubernetesClientApi podSourceClient,
+ String podSourceNamespace,
+ KubernetesClientApi client,
+ KubernetesTaskRunnerConfig taskRunnerConfig,
+ TaskConfig taskConfig,
+ StartupLoggingConfig startupLoggingConfig,
+ DruidNode druidNode,
+ ObjectMapper mapper,
+ TaskLogs taskLogs
+ )
+ {
+ super(
+ podSourceClient,
+ podSourceNamespace,
+ client,
+ taskRunnerConfig,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ mapper,
+ taskLogs
+ );
+ }
+
@Override
public String getAdapterType()
{
diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/AutoscalableThreadPoolExecutorTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/AutoscalableThreadPoolExecutorTest.java
new file mode 100644
index 000000000000..e43724c174e7
--- /dev/null
+++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/AutoscalableThreadPoolExecutorTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import org.apache.druid.common.config.ConfigManager;
+import org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
+import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class AutoscalableThreadPoolExecutorTest
+{
+ @Test
+ public void testConstructorWithNullConfigManager()
+ {
+ final AutoscalableThreadPoolExecutor executor = new AutoscalableThreadPoolExecutor(2, null);
+
+ Assertions.assertEquals(2, executor.getCorePoolSize());
+ executor.shutdownNow();
+ }
+
+ @Test
+ public void testDynamicConfigWithNullCapacityDoesNotChangePoolSize()
+ {
+ final ConfigManager configManager = EasyMock.createMock(ConfigManager.class);
+ final Capture> listenerCapture = EasyMock.newCapture();
+
+ EasyMock.expect(configManager.addListener(
+ EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY),
+ EasyMock.anyString(),
+ EasyMock.capture(listenerCapture)
+ )).andReturn(true);
+ EasyMock.expect(configManager.removeListener(
+ EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY),
+ EasyMock.anyString(),
+ EasyMock.anyObject()
+ )).andReturn(true).anyTimes();
+ EasyMock.replay(configManager);
+
+ final AutoscalableThreadPoolExecutor executor = new AutoscalableThreadPoolExecutor(2, configManager);
+ listenerCapture.getValue().accept(new DefaultKubernetesTaskRunnerDynamicConfig(null, null));
+
+ Assertions.assertEquals(2, executor.getCorePoolSize());
+ Assertions.assertEquals(2, executor.getMaximumPoolSize());
+
+ executor.shutdownNow();
+ EasyMock.verify(configManager);
+ }
+
+ @Test
+ public void testDynamicConfigWithCapacityChangesPoolSize() throws InterruptedException
+ {
+ final ConfigManager configManager = EasyMock.createMock(ConfigManager.class);
+ final Capture> listenerCapture = EasyMock.newCapture();
+
+ EasyMock.expect(configManager.addListener(
+ EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY),
+ EasyMock.anyString(),
+ EasyMock.capture(listenerCapture)
+ )).andReturn(true);
+ EasyMock.expect(configManager.removeListener(
+ EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY),
+ EasyMock.anyString(),
+ EasyMock.anyObject()
+ )).andReturn(true).anyTimes();
+ EasyMock.replay(configManager);
+
+ final AutoscalableThreadPoolExecutor executor = new AutoscalableThreadPoolExecutor(2, configManager);
+ listenerCapture.getValue().accept(new DefaultKubernetesTaskRunnerDynamicConfig(null, 4));
+
+ Assertions.assertEquals(4, executor.getCorePoolSize());
+ Assertions.assertEquals(4, executor.getMaximumPoolSize());
+
+ executor.shutdown();
+ Assertions.assertTrue(executor.isShutdown());
+ Assertions.assertTrue(executor.getQueue().isEmpty());
+ Assertions.assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
+ EasyMock.verify(configManager);
+ }
+}
diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
index 0f1bc803672c..e37313ebb0fb 100644
--- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
+++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
@@ -36,6 +36,7 @@
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
+import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -118,6 +119,19 @@ public void testDefaultHttpRemoteTaskRunnerFactoryBindSuccessfully()
Assertions.assertNotNull(taskRunnerFactory.build());
}
+ @Test
+ public void testMultipleKubernetesTaskRunnerFactoryBindSuccessfully()
+ {
+ final Properties props = initializePropertes(false);
+ props.setProperty("druid.indexer.runner.type", MultipleKubernetesTaskRunnerFactory.TYPE_NAME);
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "NAMESPACE");
+ injector = makeInjectorWithProperties(props, false, true);
+
+ final TaskRunnerFactory> taskRunnerFactory = injector.getInstance(TaskRunnerFactory.class);
+
+ Assertions.assertInstanceOf(MultipleKubernetesTaskRunnerFactory.class, taskRunnerFactory);
+ }
+
@Test
public void testRemoteTaskRunnerFactoryBindSuccessfully()
{
diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index cbca7e757298..426feb33aed1 100644
--- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -65,6 +65,7 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -166,6 +167,33 @@ protected KubernetesWorkItem joinAsync(Task task)
Assertions.assertEquals(1, runner.tasks.size());
}
+ @Test
+ public void test_stop_doesNotShutDownSharedExecutor()
+ {
+ final ThreadPoolExecutor sharedExecutor = new ThreadPoolExecutor(
+ 1,
+ 1,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>()
+ );
+ final KubernetesTaskRunner sharedExecutorRunner = new KubernetesTaskRunner(
+ taskAdapter,
+ config,
+ peonClient,
+ httpClient,
+ new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+ emitter,
+ sharedExecutor,
+ configManager
+ );
+
+ sharedExecutorRunner.stop();
+
+ Assertions.assertFalse(sharedExecutor.isShutdown());
+ sharedExecutor.shutdownNow();
+ }
+
@Test
public void test_start_withExistingJobs_oneJobFails() throws IOException
{
diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerConfigTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerConfigTest.java
new file mode 100644
index 000000000000..3dcfeb610d89
--- /dev/null
+++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerConfigTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import org.apache.druid.java.util.common.RE;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+public class MultipleKubernetesTaskRunnerConfigTest
+{
+ @Test
+ public void test_fromProperties_withSingleCluster()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ props.setProperty("druid.indexer.runner.clusters[0].kubeconfigPath", "/path/to/kubeconfig1");
+ props.setProperty("druid.indexer.runner.clusters[0].overlordIdentifier", "overlord1");
+
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertNotNull(config.getClusters());
+ Assertions.assertEquals(1, config.getClusters().size());
+
+ MultipleKubernetesTaskRunnerConfig.KubernetesCluster cluster = config.getClusters().get(0);
+ Assertions.assertEquals("namespace1", cluster.getTaskNamespace());
+ Assertions.assertEquals("/path/to/kubeconfig1", cluster.getKubeconfigPath());
+ Assertions.assertEquals("overlord1", cluster.getOverlordIdentifier());
+
+ // Should default to RoundRobinSelectionStrategy
+ Assertions.assertInstanceOf(MultipleKubernetesTaskRunner.RoundRobinSelector.class, config.getClusterSelector());
+ }
+
+ @Test
+ public void test_fromProperties_withMultipleClusters()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ props.setProperty("druid.indexer.runner.clusters[0].kubeconfigPath", "/path/to/kubeconfig1");
+ props.setProperty("druid.indexer.runner.clusters[0].overlordIdentifier", "overlord1");
+
+ props.setProperty("druid.indexer.runner.clusters[1].taskNamespace", "namespace2");
+ props.setProperty("druid.indexer.runner.clusters[1].kubeconfigPath", "/path/to/kubeconfig2");
+ props.setProperty("druid.indexer.runner.clusters[1].overlordIdentifier", "overlord2");
+
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertNotNull(config.getClusters());
+ Assertions.assertEquals(2, config.getClusters().size());
+
+ MultipleKubernetesTaskRunnerConfig.KubernetesCluster cluster1 = config.getClusters().get(0);
+ Assertions.assertEquals("namespace1", cluster1.getTaskNamespace());
+ Assertions.assertEquals("/path/to/kubeconfig1", cluster1.getKubeconfigPath());
+ Assertions.assertEquals("overlord1", cluster1.getOverlordIdentifier());
+
+ MultipleKubernetesTaskRunnerConfig.KubernetesCluster cluster2 = config.getClusters().get(1);
+ Assertions.assertEquals("namespace2", cluster2.getTaskNamespace());
+ Assertions.assertEquals("/path/to/kubeconfig2", cluster2.getKubeconfigPath());
+ Assertions.assertEquals("overlord2", cluster2.getOverlordIdentifier());
+ }
+
+ @Test
+ public void test_fromProperties_withNullableFields()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ // kubeconfigPath and overlordIdentifier are optional, so we don't set them
+
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertNotNull(config.getClusters());
+ Assertions.assertEquals(1, config.getClusters().size());
+
+ MultipleKubernetesTaskRunnerConfig.KubernetesCluster cluster = config.getClusters().get(0);
+ Assertions.assertEquals("namespace1", cluster.getTaskNamespace());
+ Assertions.assertNull(cluster.getKubeconfigPath());
+ Assertions.assertNull(cluster.getOverlordIdentifier());
+ }
+
+ @Test
+ public void test_fromProperties_withClusterSelector_roundRobin()
+ {
+ final Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ props.setProperty("druid.indexer.runner.clusterSelector.type", "roundrobin");
+
+ final MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertInstanceOf(MultipleKubernetesTaskRunner.RoundRobinSelector.class, config.getClusterSelector());
+ }
+
+ @Test
+ public void test_fromProperties_withSelectionStrategy_random()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ props.setProperty("druid.indexer.runner.clusterSelector.type", "random");
+
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertInstanceOf(MultipleKubernetesTaskRunner.RandomSelector.class, config.getClusterSelector());
+ }
+
+ @Test
+ public void test_fromProperties_withLabels_simpleKeys()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ // Test with simple label keys using dot notation
+ props.setProperty("druid.indexer.runner.labels.simpleKey", "simpleValue");
+ props.setProperty("druid.indexer.runner.labels.anotherKey", "anotherValue");
+
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertNotNull(config.getLabels());
+ Assertions.assertEquals(2, config.getLabels().size());
+ Assertions.assertEquals("simpleValue", config.getLabels().get("simpleKey"));
+ Assertions.assertEquals("anotherValue", config.getLabels().get("anotherKey"));
+ }
+
+ @Test
+ public void test_fromProperties_withLabels_jsonFormat()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ // Test with JSON format for labels - supports complex keys with dots and special characters
+ props.setProperty("druid.indexer.runner.labels", "{\"lxcfs-admission-webhook.k8s.io/no-mutating\":\"\",\"another.key.with.dots\":\"value\"}");
+
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertNotNull(config.getLabels());
+ Assertions.assertEquals(2, config.getLabels().size());
+ Assertions.assertEquals("", config.getLabels().get("lxcfs-admission-webhook.k8s.io/no-mutating"));
+ Assertions.assertEquals("value", config.getLabels().get("another.key.with.dots"));
+ }
+
+ @Test
+ public void test_fromProperties_withAnnotations_jsonFormat()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ // Test with JSON format for annotations
+ props.setProperty("druid.indexer.runner.annotations", "{\"prometheus.io/scrape\":\"true\",\"prometheus.io/port\":\"8080\"}");
+
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertNotNull(config.getAnnotations());
+ Assertions.assertEquals(2, config.getAnnotations().size());
+ Assertions.assertEquals("true", config.getAnnotations().get("prometheus.io/scrape"));
+ Assertions.assertEquals("8080", config.getAnnotations().get("prometheus.io/port"));
+ }
+
+ @Test
+ public void test_fromProperties_withLabels_mixedJsonAndDotNotation()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ // Mix JSON format for complex keys
+ props.setProperty("druid.indexer.runner.labels", "{\"lxcfs-admission-webhook.k8s.io/no-mutating\":\"\"}");
+ // And dot notation for simple keys
+ props.setProperty("druid.indexer.runner.labels.simpleKey", "simpleValue");
+
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertNotNull(config.getLabels());
+ Assertions.assertEquals(2, config.getLabels().size());
+ Assertions.assertEquals("", config.getLabels().get("lxcfs-admission-webhook.k8s.io/no-mutating"));
+ Assertions.assertEquals("simpleValue", config.getLabels().get("simpleKey"));
+ }
+
+ @Test
+ public void test_fromProperties_withSelectionStrategy_leastTask()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ props.setProperty("druid.indexer.runner.clusterSelector.type", "leastTask");
+
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertInstanceOf(MultipleKubernetesTaskRunner.LeastTaskSelector.class, config.getClusterSelector());
+ }
+
+ @Test
+ public void test_fromProperties_withAllowTaskPodTemplateSelection()
+ {
+ final Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ props.setProperty("druid.indexer.runner.allowTaskPodTemplateSelection", "true");
+
+ final MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertTrue(config.isAllowTaskPodTemplateSelection());
+ }
+
+ @Test
+ public void test_fromProperties_withoutSelectionStrategy_defaultsToRoundRobin()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ // No selectionStrategy specified
+
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertInstanceOf(MultipleKubernetesTaskRunner.RoundRobinSelector.class, config.getClusterSelector());
+ }
+
+ @Test
+ public void test_fromProperties_ignoresPropertiesWithoutPrefix()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+ props.setProperty("druid.indexer.runner.k8s.someOtherProperty", "shouldBeIgnored");
+ props.setProperty("some.other.property", "shouldAlsoBeIgnored");
+
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertNotNull(config.getClusters());
+ Assertions.assertEquals(1, config.getClusters().size());
+ Assertions.assertEquals("namespace1", config.getClusters().get(0).getTaskNamespace());
+ }
+
+ @Test
+ public void test_fromProperties_withEmptyProperties_throwsException()
+ {
+ final Properties props = new Properties();
+ // No properties with the prefix - should throw exception due to validation
+
+ Assertions.assertThrows(
+ RE.class,
+ () -> MultipleKubernetesTaskRunnerConfig.fromProperties(props)
+ );
+ }
+
+ @Test
+ public void test_fromProperties_withDotNotation()
+ {
+ Properties props = new Properties();
+ // Test with dot notation instead of bracket notation
+ props.setProperty("druid.indexer.runner.clusters.0.taskNamespace", "namespace1");
+ props.setProperty("druid.indexer.runner.clusters.0.kubeconfigPath", "/path/to/kubeconfig1");
+ props.setProperty("druid.indexer.runner.clusters.0.overlordIdentifier", "overlord1");
+
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertNotNull(config.getClusters());
+ Assertions.assertEquals(1, config.getClusters().size());
+
+ MultipleKubernetesTaskRunnerConfig.KubernetesCluster cluster = config.getClusters().get(0);
+ Assertions.assertEquals("namespace1", cluster.getTaskNamespace());
+ Assertions.assertEquals("/path/to/kubeconfig1", cluster.getKubeconfigPath());
+ Assertions.assertEquals("overlord1", cluster.getOverlordIdentifier());
+
+ // Should default to RoundRobinSelectionStrategy
+ Assertions.assertInstanceOf(MultipleKubernetesTaskRunner.RoundRobinSelector.class, config.getClusterSelector());
+ }
+
+ @Test
+ public void test_fromProperties_withUnknownProperties_ignored()
+ {
+ Properties props = new Properties();
+ // Set an unknown property - should be ignored due to FAIL_ON_UNKNOWN_PROPERTIES = false
+ props.setProperty("druid.indexer.runner.unknownProperty", "someValue");
+ props.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace1");
+
+ // Should not throw an exception due to FAIL_ON_UNKNOWN_PROPERTIES = false
+ MultipleKubernetesTaskRunnerConfig config = MultipleKubernetesTaskRunnerConfig.fromProperties(props);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertNotNull(config.getClusters());
+ Assertions.assertEquals(1, config.getClusters().size());
+ Assertions.assertEquals("namespace1", config.getClusters().get(0).getTaskNamespace());
+ }
+}
diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerFactoryTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerFactoryTest.java
new file mode 100644
index 000000000000..7d0c3f20a6ba
--- /dev/null
+++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerFactoryTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.druid.common.config.ConfigManager;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.k8s.overlord.common.CachingKubernetesPeonClient;
+import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient;
+import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
+import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import org.apache.druid.k8s.overlord.common.httpclient.vertx.DruidKubernetesVertxHttpClientConfig;
+import org.apache.druid.k8s.overlord.common.httpclient.vertx.DruidKubernetesVertxHttpClientFactory;
+import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
+import org.apache.druid.k8s.overlord.taskadapter.K8sTaskAdapter;
+import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
+import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.easymock.EasyMock;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+public class MultipleKubernetesTaskRunnerFactoryTest
+{
+ @Test
+ public void test_build_withSharedInformers_usesCachingPeonClient()
+ {
+ final TestMultipleKubernetesTaskRunnerFactory factory = createFactory(true);
+
+ final TaskRunner taskRunner = factory.build();
+
+ final KubernetesPeonClient peonClient = getOnlyPeonClient(taskRunner);
+ Assertions.assertInstanceOf(CachingKubernetesPeonClient.class, peonClient);
+ Assertions.assertEquals(1, factory.getCachingClients().size());
+
+ ((MultipleKubernetesTaskRunner) taskRunner).stop();
+ }
+
+ @Test
+ public void test_build_withoutSharedInformers_usesPlainPeonClient()
+ {
+ final TestMultipleKubernetesTaskRunnerFactory factory = createFactory(false);
+
+ final TaskRunner taskRunner = factory.build();
+
+ final KubernetesPeonClient peonClient = getOnlyPeonClient(taskRunner);
+ Assertions.assertEquals(KubernetesPeonClient.class, peonClient.getClass());
+ Assertions.assertTrue(factory.getCachingClients().isEmpty());
+
+ ((MultipleKubernetesTaskRunner) taskRunner).stop();
+ }
+
+ @Test
+ public void test_build_withDefaultAdapter_usesOverlordPodSourceClient()
+ {
+ final TestMultipleKubernetesTaskRunnerFactory factory = createFactory(false);
+
+ final TaskRunner taskRunner = factory.build();
+ final MultipleKubernetesTaskRunner multipleRunner = (MultipleKubernetesTaskRunner) taskRunner;
+ final MultipleKubernetesTaskRunnerDelegate delegate = multipleRunner.getTaskRunners().get(0);
+ final K8sTaskAdapter adapter = (K8sTaskAdapter) delegate.getRunner().adapter;
+
+ Assertions.assertSame(factory.getOverlordPodSourceClient(), adapter.getPodSourceClient());
+ Assertions.assertEquals("local-overlord-namespace", adapter.getPodSourceNamespace());
+ Assertions.assertNotSame(factory.getOverlordPodSourceClient(), delegate.getKubernetesClient());
+
+ multipleRunner.stop();
+ }
+
+ @Test
+ public void test_build_withSidecarSupport_usesOverlordPodSourceClient()
+ {
+ final TestMultipleKubernetesTaskRunnerFactory factory = createFactory(
+ false,
+ properties -> properties.setProperty("druid.indexer.runner.sidecarSupport", "true")
+ );
+
+ final TaskRunner taskRunner = factory.build();
+ final MultipleKubernetesTaskRunner multipleRunner = (MultipleKubernetesTaskRunner) taskRunner;
+ final K8sTaskAdapter adapter = (K8sTaskAdapter) multipleRunner.getTaskRunners().get(0).getRunner().adapter;
+
+ Assertions.assertInstanceOf(MultiContainerTaskAdapter.class, adapter);
+ Assertions.assertSame(factory.getOverlordPodSourceClient(), adapter.getPodSourceClient());
+ Assertions.assertEquals("local-overlord-namespace", adapter.getPodSourceNamespace());
+
+ multipleRunner.stop();
+ }
+
+ @Test
+ public void test_build_withCustomTemplateAdapter_doesNotCreateOverlordPodSourceClient()
+ {
+ final URL url = this.getClass().getClassLoader().getResource("basePodTemplate.yaml");
+ final TestMultipleKubernetesTaskRunnerFactory factory = createFactory(false, properties -> {
+ properties.setProperty("druid.indexer.runner.k8s.adapter.type", PodTemplateTaskAdapter.TYPE);
+ properties.setProperty("druid.indexer.runner.k8s.podTemplate.base", url.getPath());
+ properties.remove("druid.indexer.runner.overlordNamespace");
+ properties.remove("druid.indexer.runner.namespace");
+ });
+
+ final TaskRunner taskRunner = factory.build();
+ final MultipleKubernetesTaskRunner multipleRunner = (MultipleKubernetesTaskRunner) taskRunner;
+
+ Assertions.assertInstanceOf(PodTemplateTaskAdapter.class, multipleRunner.getTaskRunners().get(0).getRunner().adapter);
+ Assertions.assertNull(factory.getOverlordPodSourceClient());
+
+ multipleRunner.stop();
+ }
+
+ @Test
+ public void test_build_withDefaultAdapterAndSingleDefaultCluster_fallsBackToTaskNamespace()
+ {
+ final TestMultipleKubernetesTaskRunnerFactory factory = createFactory(false, properties -> {
+ properties.remove("druid.indexer.runner.overlordNamespace");
+ properties.remove("druid.indexer.runner.namespace");
+ });
+
+ final TaskRunner taskRunner = factory.build();
+ final MultipleKubernetesTaskRunner multipleRunner = (MultipleKubernetesTaskRunner) taskRunner;
+ final K8sTaskAdapter adapter = (K8sTaskAdapter) multipleRunner.getTaskRunners().get(0).getRunner().adapter;
+
+ Assertions.assertEquals("namespace-a", adapter.getPodSourceNamespace());
+
+ multipleRunner.stop();
+ }
+
+ @Test
+ public void test_build_withDefaultAdapterAndAmbiguousOverlordPodSourceNamespace_throwsException()
+ {
+ final TestMultipleKubernetesTaskRunnerFactory factory = createFactory(false, properties -> {
+ properties.remove("druid.indexer.runner.overlordNamespace");
+ properties.remove("druid.indexer.runner.namespace");
+ properties.setProperty("druid.indexer.runner.clusters[1].name", "cluster-b");
+ properties.setProperty("druid.indexer.runner.clusters[1].taskNamespace", "namespace-b");
+ });
+
+ final IAE exception = Assertions.assertThrows(IAE.class, factory::build);
+ Assertions.assertTrue(exception.getMessage().contains("druid.indexer.runner.overlordNamespace"));
+ Assertions.assertTrue(exception.getMessage().contains("druid.indexer.runner.namespace"));
+ Assertions.assertTrue(exception.getMessage().contains("customTemplateAdapter"));
+ }
+
+ private KubernetesPeonClient getOnlyPeonClient(TaskRunner taskRunner)
+ {
+ final MultipleKubernetesTaskRunner multipleRunner = (MultipleKubernetesTaskRunner) taskRunner;
+ Assertions.assertEquals(1, multipleRunner.getTaskRunners().size());
+ return multipleRunner.getTaskRunners().get(0).getRunner().getPeonClient();
+ }
+
+ private TestMultipleKubernetesTaskRunnerFactory createFactory(boolean useSharedInformers)
+ {
+ return createFactory(useSharedInformers, properties -> {});
+ }
+
+ private TestMultipleKubernetesTaskRunnerFactory createFactory(
+ boolean useSharedInformers,
+ Consumer propertiesCustomizer
+ )
+ {
+ final ObjectMapper objectMapper = new TestUtils().getTestObjectMapper();
+ final Properties properties = new Properties();
+ properties.setProperty("druid.indexer.runner.overlordNamespace", "local-overlord-namespace");
+ properties.setProperty("druid.indexer.runner.clusters[0].name", "cluster-a");
+ properties.setProperty("druid.indexer.runner.clusters[0].taskNamespace", "namespace-a");
+ properties.setProperty("druid.indexer.runner.clusters[0].overlordIdentifier", "overlord-a");
+ properties.setProperty("druid.indexer.runner.useK8sSharedInformers", Boolean.toString(useSharedInformers));
+ propertiesCustomizer.accept(properties);
+
+ final ConfigManager configManager = EasyMock.createNiceMock(ConfigManager.class);
+ final HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+ EasyMock.replay(configManager, httpClient);
+
+ return new TestMultipleKubernetesTaskRunnerFactory(
+ objectMapper,
+ objectMapper,
+ httpClient,
+ new NoopTaskLogs(),
+ properties,
+ new NoopServiceEmitter(),
+ () -> null,
+ configManager,
+ new TaskConfig(null, null, false, null, null, null, false, null, false, null, false),
+ new StartupLoggingConfig(),
+ new DruidNode("test-overlord", "localhost", false, 8080, null, true, false),
+ new DruidKubernetesVertxHttpClientFactory(new DruidKubernetesVertxHttpClientConfig(), objectMapper)
+ );
+ }
+
+ private static class TestMultipleKubernetesTaskRunnerFactory extends MultipleKubernetesTaskRunnerFactory
+ {
+ private final List cachingClients = new ArrayList<>();
+ private DruidKubernetesClient overlordPodSourceClient;
+
+ TestMultipleKubernetesTaskRunnerFactory(
+ ObjectMapper objectMapper,
+ ObjectMapper smileMapper,
+ HttpClient httpClient,
+ NoopTaskLogs taskLogs,
+ Properties properties,
+ NoopServiceEmitter emitter,
+ com.google.common.base.Supplier dynamicConfigSupplier,
+ ConfigManager configManager,
+ TaskConfig taskConfig,
+ StartupLoggingConfig startupLoggingConfig,
+ DruidNode druidNode,
+ DruidKubernetesVertxHttpClientFactory httpClientFactory
+ )
+ {
+ super(
+ objectMapper,
+ smileMapper,
+ httpClient,
+ taskLogs,
+ properties,
+ emitter,
+ dynamicConfigSupplier,
+ configManager,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ httpClientFactory
+ );
+ }
+
+ @Override
+ protected DruidKubernetesCachingClient createCachingClient(
+ DruidKubernetesClient client,
+ KubernetesTaskRunnerEffectiveConfig effectiveConfig
+ )
+ {
+ final DruidKubernetesCachingClient cachingClient = EasyMock.createMock(DruidKubernetesCachingClient.class);
+ EasyMock.expect(cachingClient.getBaseClient()).andReturn(client).anyTimes();
+ cachingClient.stop();
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(cachingClient);
+ cachingClients.add(cachingClient);
+ return cachingClient;
+ }
+
+ @Override
+ protected DruidKubernetesClient createOverlordPodSourceClient(String overlordPodSourceNamespace)
+ {
+ final KubernetesClient kubernetesClient = EasyMock.createMock(KubernetesClient.class);
+ kubernetesClient.close();
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(kubernetesClient);
+
+ overlordPodSourceClient = EasyMock.createMock(DruidKubernetesClient.class);
+ EasyMock.expect(overlordPodSourceClient.getClient()).andReturn(kubernetesClient).anyTimes();
+ EasyMock.replay(overlordPodSourceClient);
+ return overlordPodSourceClient;
+ }
+
+ List getCachingClients()
+ {
+ return cachingClients;
+ }
+
+ DruidKubernetesClient getOverlordPodSourceClient()
+ {
+ return overlordPodSourceClient;
+ }
+ }
+}
diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerTest.java
new file mode 100644
index 000000000000..2033b23fb608
--- /dev/null
+++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerTest.java
@@ -0,0 +1,1340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.common.config.ConfigManager;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.query.DruidMetrics;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockExtension;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(EasyMockExtension.class)
+public class MultipleKubernetesTaskRunnerTest extends EasyMockSupport
+{
+ private static final String TASK_ID = "task-id";
+ private static final String REASON = "reason";
+
+ /**
+ * Helper method to create an immediately completed ListenableFuture.
+ * Replaces Futures.immediateFuture() which is marked as beta.
+ */
+ private static ListenableFuture immediateFuture(T value)
+ {
+ SettableFuture future = SettableFuture.create();
+ future.set(value);
+ return future;
+ }
+
+ /**
+ * Simple test implementation of TaskRunnerWorkItem for use in tests.
+ * This avoids type incompatibility issues with wildcard generics.
+ */
+ private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
+ {
+ public TestTaskRunnerWorkItem(String id)
+ {
+ super(id, null);
+ }
+
+ @Override
+ public TaskLocation getLocation()
+ {
+ return TaskLocation.unknown();
+ }
+
+ @Override
+ public String getTaskType()
+ {
+ return null;
+ }
+
+ @Override
+ public String getDataSource()
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Test implementation of KubernetesTaskRunner that allows setting return values for testing.
+ */
+ private static class TestKubernetesTaskRunner extends KubernetesTaskRunner
+ {
+ private Collection knownTasks = new ArrayList<>();
+ private Collection runningTasks = new ArrayList<>();
+ private Collection pendingTasks = new ArrayList<>();
+ private final Map runnerTaskStates = new HashMap<>();
+ private final Map taskLocations = new HashMap<>();
+ private final Map> taskLogs = new HashMap<>();
+ private final Map> taskReports = new HashMap<>();
+ private final Map> runResults = new HashMap<>();
+
+ public TestKubernetesTaskRunner(KubernetesTaskRunnerEffectiveConfig config, ConfigManager configManager)
+ {
+ super(null, config, null, null, null, null, configManager);
+ }
+
+ public void setKnownTasks(Collection tasks)
+ {
+ this.knownTasks = tasks;
+ }
+
+ public void setRunningTasks(Collection tasks)
+ {
+ this.runningTasks = tasks;
+ }
+
+ public void setPendingTasks(Collection tasks)
+ {
+ this.pendingTasks = tasks;
+ }
+
+ public void setRunnerTaskState(String taskId, RunnerTaskState state)
+ {
+ if (state == null) {
+ runnerTaskStates.remove(taskId);
+ } else {
+ runnerTaskStates.put(taskId, state);
+ }
+ }
+
+ public void setTaskLocation(String taskId, TaskLocation location)
+ {
+ taskLocations.put(taskId, location);
+ }
+
+ public void setTaskLog(String taskId, Optional log)
+ {
+ taskLogs.put(taskId, log);
+ }
+
+ public void setTaskReports(String taskId, Optional reports)
+ {
+ taskReports.put(taskId, reports);
+ }
+
+ public void setRunResult(Task task, ListenableFuture result)
+ {
+ runResults.put(task.getId(), result);
+ }
+
+ @Override
+ public Collection extends TaskRunnerWorkItem> getKnownTasks()
+ {
+ return knownTasks;
+ }
+
+ @Override
+ public Collection getRunningTasks()
+ {
+ return runningTasks;
+ }
+
+ @Override
+ public Collection getPendingTasks()
+ {
+ return pendingTasks;
+ }
+
+ @Override
+ public RunnerTaskState getRunnerTaskState(String taskId)
+ {
+ return runnerTaskStates.get(taskId);
+ }
+
+ @Override
+ public TaskLocation getTaskLocation(String taskId)
+ {
+ TaskLocation location = taskLocations.get(taskId);
+ return location != null ? location : TaskLocation.unknown();
+ }
+
+ @Override
+ public Optional streamTaskLog(String taskid, long offset)
+ {
+ Optional log = taskLogs.get(taskid);
+ return log != null ? log : Optional.absent();
+ }
+
+ @Override
+ public Optional streamTaskReports(String taskid)
+ {
+ Optional reports = taskReports.get(taskid);
+ return reports != null ? reports : Optional.absent();
+ }
+
+ @Override
+ public ListenableFuture run(Task task)
+ {
+ ListenableFuture result = runResults.get(task.getId());
+ if (result != null) {
+ return result;
+ }
+ return immediateFuture(TaskStatus.success(task.getId()));
+ }
+
+ @Override
+ public void start()
+ {
+ // No-op for testing
+ }
+
+ @Override
+ public void stop()
+ {
+ // No-op for testing
+ }
+
+ @Override
+ public void shutdown(String taskid, String reason)
+ {
+ // No-op for testing - just remove from tasks if present
+ tasks.remove(taskid);
+ }
+ }
+
+ @Mock
+ private KubernetesTaskRunner kubernetesTaskRunner1;
+
+ @Mock
+ private KubernetesTaskRunner kubernetesTaskRunner2;
+
+ private KubernetesTaskRunnerConfig config;
+ private KubernetesTaskRunnerEffectiveConfig effectiveConfig1;
+ private KubernetesTaskRunnerEffectiveConfig effectiveConfig2;
+ private MultipleKubernetesTaskRunner runner;
+ private ConfigManager testConfigManager;
+
+ @BeforeEach
+ public void setup()
+ {
+ testConfigManager = EasyMock.createNiceMock(ConfigManager.class);
+ EasyMock.replay(testConfigManager);
+
+ // Create configs
+ config = KubernetesTaskRunnerConfig.builder()
+ .withCapacity(10)
+ .build();
+
+ effectiveConfig1 = new KubernetesTaskRunnerEffectiveConfig(
+ KubernetesTaskRunnerConfig.builder().withNamespace("namespace1").withCapacity(10).build(),
+ () -> null
+ );
+
+ effectiveConfig2 = new KubernetesTaskRunnerEffectiveConfig(
+ KubernetesTaskRunnerConfig.builder().withNamespace("namespace2").withCapacity(10).build(),
+ () -> null
+ );
+
+ }
+
+ @Test
+ public void test_run_withRoundRobinSelectionStrategy() throws ExecutionException, InterruptedException
+ {
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner2)
+ )
+ );
+
+ Task task1 = new NoopTask("task-1", null, null, 0, 0, null);
+
+ // First call: both runners report no existing task, round-robin selects runner1
+ // Both runners should be checked for existing tasks (return null = no existing task)
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task1.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner2.getRunnerTaskState(task1.getId())).andReturn(null);
+ // Round-robin selects first runner (kubernetesTaskRunner1)
+ EasyMock.expect(kubernetesTaskRunner1.run(task1)).andReturn(immediateFuture(TaskStatus.success("task-1")));
+
+ replayAll();
+
+ ListenableFuture result = runner.run(task1);
+ Assertions.assertEquals("task-1", result.get().getId());
+
+ verifyAll();
+ resetAll();
+
+ Task task2 = new NoopTask("task-2", null, null, 0, 0, null);
+
+ // Second call: both runners report no existing task, round-robin selects runner2
+ // Both runners should be checked for existing tasks (return null = no existing task)
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task2.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner2.getRunnerTaskState(task2.getId())).andReturn(null);
+ // Round-robin selects second runner (kubernetesTaskRunner2)
+ EasyMock.expect(kubernetesTaskRunner2.run(task2)).andReturn(immediateFuture(TaskStatus.success("task-2")));
+
+ replayAll();
+
+ result = runner.run(task2);
+ Assertions.assertEquals("task-2", result.get().getId());
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_run_withExistingTask_returnsToSameRunner() throws ExecutionException, InterruptedException
+ {
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner2)
+ )
+ );
+
+ Task task = new NoopTask("existing-task", null, null, 0, 0, null);
+
+ // Task already exists in runner1 - should check runner1 first and find it
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task.getId())).andReturn(RunnerTaskState.RUNNING);
+ EasyMock.expect(kubernetesTaskRunner1.run(task)).andReturn(immediateFuture(TaskStatus.success(task.getId())));
+
+ replayAll();
+
+ ListenableFuture result = runner.run(task);
+ Assertions.assertEquals(task.getId(), result.get().getId());
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_run_withRoundRobinSelectionStrategy_disabledRunnerNotSelected() throws ExecutionException, InterruptedException
+ {
+ // Create runner with round-robin strategy, runner2 is disabled
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner2, null, true, null) // runner2 is disabled
+ )
+ );
+
+ Task task1 = new NoopTask("task-1", null, null, 0, 0, null);
+
+ // First call: both runners checked for existing task, but only enabled runner1 should be selected
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task1.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner2.getRunnerTaskState(task1.getId())).andReturn(null);
+ // Round-robin should select runner1 (only enabled runner)
+ EasyMock.expect(kubernetesTaskRunner1.run(task1)).andReturn(immediateFuture(TaskStatus.success("task-1")));
+
+ replayAll();
+
+ ListenableFuture result = runner.run(task1);
+ Assertions.assertEquals("task-1", result.get().getId());
+
+ verifyAll();
+ resetAll();
+
+ Task task2 = new NoopTask("task-2", null, null, 0, 0, null);
+
+ // Second call: should still select runner1 (only enabled runner, round-robin wraps around)
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task2.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner2.getRunnerTaskState(task2.getId())).andReturn(null);
+ // Round-robin should still select runner1 (only enabled runner)
+ EasyMock.expect(kubernetesTaskRunner1.run(task2)).andReturn(immediateFuture(TaskStatus.success("task-2")));
+
+ replayAll();
+
+ result = runner.run(task2);
+ Assertions.assertEquals("task-2", result.get().getId());
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_run_withAllRunnersDisabled_throwsException()
+ {
+ // Constructor rejects when every delegate is disabled (no runner can run tasks).
+ Assertions.assertThrows(
+ IllegalStateException.class,
+ () -> new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1, null, true, null), // runner1 is disabled
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner2, null, true, null) // runner2 is disabled
+ )
+ )
+ );
+ }
+
+ @Test
+ public void test_run_withRandomSelectionStrategy() throws ExecutionException, InterruptedException
+ {
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RandomSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner2)
+ )
+ );
+
+ Task task = new NoopTask("random-task", null, null, 0, 0, null);
+
+ // Random selection - check both runners for existing task, then pick one
+ // Since it's random, we need to set up expectations for whichever runner gets selected
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner2.getRunnerTaskState(task.getId())).andReturn(null);
+ // Random strategy will pick one of them - set up both possibilities for execution
+ EasyMock.expect(kubernetesTaskRunner1.run(task)).andReturn(immediateFuture(TaskStatus.success(task.getId()))).anyTimes();
+ EasyMock.expect(kubernetesTaskRunner2.run(task)).andReturn(immediateFuture(TaskStatus.success(task.getId()))).anyTimes();
+
+ replayAll();
+
+ ListenableFuture result = runner.run(task);
+ Assertions.assertEquals(task.getId(), result.get().getId());
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_run_withLeastTaskSelectionStrategy() throws ExecutionException, InterruptedException
+ {
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.LeastTaskSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner2)
+ )
+ );
+
+ TestTaskRunnerWorkItem workItem1 = new TestTaskRunnerWorkItem("work-item-1");
+ TestTaskRunnerWorkItem workItem2 = new TestTaskRunnerWorkItem("work-item-2");
+
+ // Runner1 has 2 tasks, runner2 has 1 task - should pick runner2
+ Task task = new NoopTask("least-task", null, null, 0, 0, null);
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner2.getRunnerTaskState(task.getId())).andReturn(null);
+
+ // Least task strategy checks known tasks to find the runner with the fewest tasks
+ // getKnownTasks() is called multiple times, so allow anyTimes
+ EasyMock.expect((Object) kubernetesTaskRunner1.getKnownTasks()).andReturn(ImmutableList.of(workItem1, workItem2)).anyTimes();
+ EasyMock.expect((Object) kubernetesTaskRunner2.getKnownTasks()).andReturn(ImmutableList.of(workItem1)).anyTimes();
+
+ // Should pick runner2 (has fewer tasks)
+ EasyMock.expect(kubernetesTaskRunner2.run(task)).andReturn(immediateFuture(TaskStatus.success(task.getId())));
+
+ replayAll();
+
+ ListenableFuture result = runner.run(task);
+ Assertions.assertEquals(task.getId(), result.get().getId());
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_shutdown()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ // shutdown() is called on all runners - no need to set expectations
+ runner.shutdown(TASK_ID, REASON);
+ }
+
+ @Test
+ public void test_getRunningTasks()
+ {
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner2)
+ )
+ );
+
+ TestTaskRunnerWorkItem workItem1 = new TestTaskRunnerWorkItem("work-item-1");
+ TestTaskRunnerWorkItem workItem2 = new TestTaskRunnerWorkItem("work-item-2");
+
+ EasyMock.expect(kubernetesTaskRunner1.getRunningTasks()).andReturn(Collections.singletonList(workItem1));
+ EasyMock.expect(kubernetesTaskRunner2.getRunningTasks()).andReturn(Collections.singletonList(workItem2));
+
+ replayAll();
+
+ Collection extends TaskRunnerWorkItem> runningTasks = runner.getRunningTasks();
+
+ assertEquals(2, runningTasks.size());
+ verifyAll();
+ }
+
+ @Test
+ public void test_getPendingTasks()
+ {
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner2)
+ )
+ );
+
+ TestTaskRunnerWorkItem workItem1 = new TestTaskRunnerWorkItem("work-item-1");
+ TestTaskRunnerWorkItem workItem2 = new TestTaskRunnerWorkItem("work-item-2");
+
+ EasyMock.expect(kubernetesTaskRunner1.getPendingTasks()).andReturn(Collections.singletonList(workItem1));
+ EasyMock.expect(kubernetesTaskRunner2.getPendingTasks()).andReturn(Collections.singletonList(workItem2));
+
+ replayAll();
+
+ Collection extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
+
+ assertEquals(2, pendingTasks.size());
+ verifyAll();
+ }
+
+ @Test
+ public void test_getKnownTasks()
+ {
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner2)
+ )
+ );
+
+ TestTaskRunnerWorkItem workItem1 = new TestTaskRunnerWorkItem("work-item-1");
+ TestTaskRunnerWorkItem workItem2 = new TestTaskRunnerWorkItem("work-item-2");
+
+ EasyMock.expect((Object) kubernetesTaskRunner1.getKnownTasks()).andReturn(ImmutableList.of(workItem1));
+ EasyMock.expect((Object) kubernetesTaskRunner2.getKnownTasks()).andReturn(ImmutableList.of(workItem2));
+
+ replayAll();
+
+ Collection extends TaskRunnerWorkItem> knownTasks = runner.getKnownTasks();
+
+ assertEquals(2, knownTasks.size());
+ verifyAll();
+ }
+
+ @Test
+ public void test_getUsedCapacity()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ TestTaskRunnerWorkItem workItem1 = new TestTaskRunnerWorkItem("work-item-1");
+ TestTaskRunnerWorkItem workItem2 = new TestTaskRunnerWorkItem("work-item-2");
+ TestTaskRunnerWorkItem workItem3 = new TestTaskRunnerWorkItem("work-item-3");
+
+ testRunner1.setKnownTasks(Lists.newArrayList(workItem1, workItem2));
+ testRunner2.setKnownTasks(Collections.singletonList(workItem3));
+
+ int usedCapacity = runner.getUsedCapacity();
+
+ assertEquals(3, usedCapacity);
+ assertEquals(10, runner.getTotalCapacity());
+ }
+
+ @Test
+ public void test_getTotalCapacity()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ int totalCapacity = runner.getTotalCapacity();
+
+ assertEquals(10, totalCapacity);
+ }
+
+ @Test
+ public void test_getTotalTaskSlotCount()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ Map slotCount = runner.getTotalTaskSlotCount();
+
+ assertEquals(1, slotCount.size());
+ assertEquals(Long.valueOf(10), slotCount.get(KubernetesTaskRunner.WORKER_CATEGORY));
+ }
+
+ @Test
+ public void test_getIdleTaskSlotCount()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ TestTaskRunnerWorkItem workItem1 = new TestTaskRunnerWorkItem("work-item-1");
+ TestTaskRunnerWorkItem workItem2 = new TestTaskRunnerWorkItem("work-item-2");
+
+ testRunner1.setKnownTasks(Collections.singletonList(workItem1));
+ testRunner2.setKnownTasks(Collections.singletonList(workItem2));
+
+ Map idleSlotCount = runner.getIdleTaskSlotCount();
+
+ assertEquals(1, idleSlotCount.size());
+ assertEquals(Long.valueOf(8), idleSlotCount.get(KubernetesTaskRunner.WORKER_CATEGORY)); // 10 - 2 = 8
+ }
+
+ @Test
+ public void test_getUsedTaskSlotCount()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+
+ TestTaskRunnerWorkItem workItem1 = new TestTaskRunnerWorkItem("work-item-1");
+ TestTaskRunnerWorkItem workItem2 = new TestTaskRunnerWorkItem("work-item-2");
+
+ testRunner1.setKnownTasks(Collections.singletonList(workItem1));
+ testRunner2.setKnownTasks(Collections.singletonList(workItem2));
+
+ Map usedSlotCount = runner.getUsedTaskSlotCount();
+
+ assertEquals(1, usedSlotCount.size());
+ assertEquals(Long.valueOf(2), usedSlotCount.get(KubernetesTaskRunner.WORKER_CATEGORY));
+ }
+
+ @Test
+ public void test_getLazyTaskSlotCount()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ Map lazySlotCount = runner.getLazyTaskSlotCount();
+
+ assertTrue(lazySlotCount.isEmpty());
+ }
+
+ @Test
+ public void test_getBlacklistedTaskSlotCount()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ Map blacklistedSlotCount = runner.getBlacklistedTaskSlotCount();
+
+ assertTrue(blacklistedSlotCount.isEmpty());
+ }
+
+ @Test
+ public void test_getTaskLocation_withExistingTask()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ String taskId = "location-task";
+ TaskLocation location = TaskLocation.create("host", 8080, 8081, false);
+
+ testRunner1.setTaskLocation(taskId, TaskLocation.unknown());
+ testRunner2.setTaskLocation(taskId, location);
+
+ TaskLocation result = runner.getTaskLocation(taskId);
+
+ assertEquals(location, result);
+ }
+
+ @Test
+ public void test_getTaskLocation_withoutExistingTask()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ String taskId = "unknown-location-task";
+ testRunner1.setTaskLocation(taskId, TaskLocation.unknown());
+ testRunner2.setTaskLocation(taskId, TaskLocation.unknown());
+
+ TaskLocation result = runner.getTaskLocation(taskId);
+
+ assertEquals(TaskLocation.unknown(), result);
+ }
+
+ @Test
+ public void test_getRunnerTaskState_withExistingTask()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ String taskId = "running-task";
+ testRunner1.setRunnerTaskState(taskId, null);
+ testRunner2.setRunnerTaskState(taskId, RunnerTaskState.RUNNING);
+
+ RunnerTaskState result = runner.getRunnerTaskState(taskId);
+
+ assertEquals(RunnerTaskState.RUNNING, result);
+ }
+
+ @Test
+ public void test_getRunnerTaskState_withoutExistingTask()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ String taskId = "unknown-state-task";
+ testRunner1.setRunnerTaskState(taskId, null);
+ testRunner2.setRunnerTaskState(taskId, null);
+
+ RunnerTaskState result = runner.getRunnerTaskState(taskId);
+
+ assertNull(result);
+ }
+
+ @Test
+ public void test_streamTaskLog_withExistingTask() throws Exception
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ String taskId = "log-task";
+ InputStream inputStream = IOUtils.toInputStream("log content", StandardCharsets.UTF_8);
+
+ testRunner1.setRunnerTaskState(taskId, null);
+ testRunner2.setRunnerTaskState(taskId, RunnerTaskState.RUNNING);
+ testRunner2.setTaskLog(taskId, Optional.of(inputStream));
+
+ Optional result = runner.streamTaskLog(taskId, 0L);
+
+ assertTrue(result.isPresent());
+ assertEquals("log content", IOUtils.toString(result.get(), StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void test_streamTaskLog_withoutExistingTask()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ String taskId = "no-log-task";
+ testRunner1.setRunnerTaskState(taskId, null);
+ testRunner2.setRunnerTaskState(taskId, null);
+
+ Optional result = runner.streamTaskLog(taskId, 0L);
+
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void test_streamTaskReports_withExistingTask() throws Exception
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ final TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ final TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ final String taskId = "report-task";
+ final InputStream inputStream = IOUtils.toInputStream("report content", StandardCharsets.UTF_8);
+
+ testRunner1.setRunnerTaskState(taskId, null);
+ testRunner2.setRunnerTaskState(taskId, RunnerTaskState.RUNNING);
+ testRunner2.setTaskReports(taskId, Optional.of(inputStream));
+
+ final Optional result = runner.streamTaskReports(taskId);
+
+ assertTrue(result.isPresent());
+ assertEquals("report content", IOUtils.toString(result.get(), StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void test_streamTaskReports_withoutExistingTask() throws Exception
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ final TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ final TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ final String taskId = "no-report-task";
+ testRunner1.setRunnerTaskState(taskId, null);
+ testRunner2.setRunnerTaskState(taskId, null);
+
+ final Optional result = runner.streamTaskReports(taskId);
+
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void test_restore()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ List>> result = runner.restore();
+
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void test_getScalingStats()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ Optional> result = runner.getScalingStats();
+
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void test_start()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ // start() is called on all runners - no need to set expectations
+ runner.start();
+ }
+
+ @Test
+ public void test_stop()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ // stop() is called on all runners - no need to set expectations
+ runner.stop();
+ }
+
+ @Test
+ public void test_registerListener()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ // registerListener() is called on all runners - no need to set expectations
+ runner.registerListener(
+ new TaskRunnerListener()
+ {
+ @Override
+ public String getListenerId()
+ {
+ return "";
+ }
+
+ @Override
+ public void locationChanged(String taskId, TaskLocation newLocation)
+ {
+
+ }
+
+ @Override
+ public void statusChanged(String taskId, TaskStatus status)
+ {
+
+ }
+ },
+ Executors.newSingleThreadExecutor()
+ );
+ }
+
+ @Test
+ public void test_unregisterListener()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ )
+ );
+
+ String listenerId = "listener-id";
+ // unregisterListener() is called on all runners - no need to set expectations
+ runner.unregisterListener(listenerId);
+ }
+
+ @Test
+ public void test_roundRobinSelectionStrategy_wrapsAround()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+ List testTaskRunners = ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ );
+
+ MultipleKubernetesTaskRunner.RoundRobinSelector strategy =
+ new MultipleKubernetesTaskRunner.RoundRobinSelector();
+
+ // First call should return runner1 (index 0)
+ MultipleKubernetesTaskRunnerDelegate result1 = strategy.next(testTaskRunners);
+ assertEquals(testRunner1, result1.getRunner());
+
+ // Second call should return runner2 (index 1)
+ MultipleKubernetesTaskRunnerDelegate result2 = strategy.next(testTaskRunners);
+ assertEquals(testRunner2, result2.getRunner());
+
+ // Third call should wrap around to runner1 (index 0)
+ MultipleKubernetesTaskRunnerDelegate result3 = strategy.next(testTaskRunners);
+ assertEquals(testRunner1, result3.getRunner());
+ }
+
+ @Test
+ public void test_leastTaskSelectionStrategy_withEqualTasks()
+ {
+ // Create test runners with clear setup - similar to Mockito style
+ TestKubernetesTaskRunner testRunner1 = new TestKubernetesTaskRunner(effectiveConfig1, testConfigManager);
+ TestKubernetesTaskRunner testRunner2 = new TestKubernetesTaskRunner(effectiveConfig2, testConfigManager);
+ List testTaskRunners = ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(testRunner1),
+ new MultipleKubernetesTaskRunnerDelegate(testRunner2)
+ );
+
+ MultipleKubernetesTaskRunner.LeastTaskSelector strategy =
+ new MultipleKubernetesTaskRunner.LeastTaskSelector();
+
+ TestTaskRunnerWorkItem workItem = new TestTaskRunnerWorkItem("work-item");
+
+ testRunner1.setKnownTasks(Collections.singletonList(workItem));
+ testRunner2.setKnownTasks(Collections.singletonList(workItem));
+
+ // Both have same number of tasks, should pick one randomly
+ MultipleKubernetesTaskRunnerDelegate result = strategy.next(testTaskRunners);
+
+ // Should be one of the two runners
+ assertTrue(result.getRunner() == testRunner1 || result.getRunner() == testRunner2);
+ }
+
+ @Test
+ public void test_run_injectsK8sClusterContextWhenClusterNameIsSet()
+ {
+ String clusterName = "test-cluster-a";
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1, clusterName, false, null)
+ )
+ );
+
+ Task task = new NoopTask("task-with-cluster", null, null, 0, 0, null);
+ task.addToContext(DruidMetrics.TAGS, ImmutableMap.of("existing_tag", "tag_value"));
+
+ // Task doesn't exist in any runner
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner1.run(task)).andReturn(immediateFuture(TaskStatus.success(task.getId())));
+
+ replayAll();
+
+ runner.run(task);
+
+ verifyAll();
+
+ // Verify that k8s_cluster was added to the task context
+ Map tags = task.getContextValue(DruidMetrics.TAGS);
+ Assertions.assertEquals(clusterName, tags.get("k8s_cluster"));
+
+ // Verify that existing tags are preserved
+ Assertions.assertEquals("tag_value", tags.get("existing_tag"));
+ }
+
+ @Test
+ public void test_run_doesNotInjectK8sClusterContextWhenClusterNameIsNull()
+ {
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1)
+ )
+ );
+
+ Task task = new NoopTask("task-without-cluster", null, null, 0, 0, null);
+
+ // Task doesn't exist in any runner
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner1.run(task)).andReturn(immediateFuture(TaskStatus.success(task.getId())));
+
+ replayAll();
+
+ runner.run(task);
+
+ verifyAll();
+
+ // Verify that k8s_cluster was NOT added to the task context
+ Object contextValue = task.getContextValue("k8s_cluster");
+ Assertions.assertNull(contextValue, "k8s_cluster should not be injected when cluster name is null");
+ }
+
+ @Test
+ public void test_run_injectsDifferentK8sClusterNamesForDifferentRunners()
+ {
+ String clusterNameA = "cluster-a";
+ String clusterNameB = "cluster-b";
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1, clusterNameA, false, null),
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner2, clusterNameB, false, null)
+ )
+ );
+
+ Task task1 = new NoopTask("task-1", null, null, 0, 0, null);
+
+ // First call: round-robin selects runner1 with cluster-a
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task1.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner2.getRunnerTaskState(task1.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner1.run(task1)).andReturn(immediateFuture(TaskStatus.success(task1.getId())));
+
+ replayAll();
+
+ runner.run(task1);
+
+ verifyAll();
+
+ // Verify that k8s_cluster=cluster-a was injected into tags
+ Map tags1 = task1.getContextValue(DruidMetrics.TAGS);
+ Assertions.assertEquals(clusterNameA, tags1.get("k8s_cluster"));
+
+ resetAll();
+
+ Task task2 = new NoopTask("task-2", null, null, 0, 0, null);
+
+ // Second call: round-robin selects runner2 with cluster-b
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task2.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner2.getRunnerTaskState(task2.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner2.run(task2)).andReturn(immediateFuture(TaskStatus.success(task2.getId())));
+
+ replayAll();
+
+ runner.run(task2);
+
+ verifyAll();
+
+ // Verify that k8s_cluster=cluster-b was injected into tags
+ Map tags2 = task2.getContextValue(DruidMetrics.TAGS);
+ Assertions.assertEquals(clusterNameB, tags2.get("k8s_cluster"));
+ }
+
+ @Test
+ public void test_run_existingTaskDoesNotOverwriteK8sClusterContext()
+ {
+ String clusterName = "cluster-a";
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1, clusterName, false, null)
+ )
+ );
+
+ Task task = new NoopTask("existing-task", null, null, 0, 0, null);
+ // Pre-set a different cluster value in the task context
+ task.addToContext("k8s_cluster", "original-cluster");
+
+ // Task already exists in runner1 - should be found and delegated directly
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task.getId())).andReturn(RunnerTaskState.RUNNING);
+ EasyMock.expect(kubernetesTaskRunner1.run(task)).andReturn(immediateFuture(TaskStatus.success(task.getId())));
+
+ replayAll();
+
+ runner.run(task);
+
+ verifyAll();
+
+ // Verify that the original k8s_cluster value is preserved (not overwritten)
+ // When a task already exists in a runner, we don't inject the cluster name again
+ Assertions.assertEquals("original-cluster", task.getContextValue("k8s_cluster"));
+ }
+
+ @Test
+ public void test_run_k8sClusterContextCanBeUsedForPodTemplateSelection()
+ {
+ // This test verifies the integration between k8s_cluster context injection
+ // and pod template selection (which happens inside KubernetesTaskRunner.run())
+ String clusterName = "production-cluster";
+
+ runner = new MultipleKubernetesTaskRunner(
+ config,
+ new MultipleKubernetesTaskRunner.RoundRobinSelector(),
+ ImmutableList.of(
+ new MultipleKubernetesTaskRunnerDelegate(kubernetesTaskRunner1, clusterName, false, null)
+ )
+ );
+
+ Task task = new NoopTask("task-for-template-selection", null, null, 0, 0, null);
+
+ // Task doesn't exist in any runner
+ EasyMock.expect(kubernetesTaskRunner1.getRunnerTaskState(task.getId())).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunner1.run(task)).andReturn(immediateFuture(TaskStatus.success(task.getId())));
+
+ replayAll();
+
+ runner.run(task);
+
+ verifyAll();
+
+ // Verify that k8s_cluster was injected into tags
+ Map tags = task.getContextValue(DruidMetrics.TAGS);
+ Assertions.assertEquals(clusterName, tags.get("k8s_cluster"));
+
+ // Note: The actual pod template selection happens inside KubernetesTaskRunner.run()
+ // Users can configure SelectorBasedPodTemplateSelectStrategy to check for k8s_cluster
+ // in the task's context tags (DruidMetrics.TAGS) to select different pod templates
+ }
+}
diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
index 3f5ce3c6e3c5..7275e1278cbf 100644
--- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
+++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
@@ -31,6 +31,7 @@
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
@@ -56,6 +57,7 @@
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
+import org.apache.druid.k8s.overlord.common.KubernetesClientApi;
import org.apache.druid.k8s.overlord.common.KubernetesExecutor;
import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
@@ -166,6 +168,90 @@ public PodSpec getSpec()
Assertions.assertFalse(jobFromSpec.getMetadata().getLabels().containsKey("annotation_key"));
}
+ @Test
+ void testFromTaskUsesOverlordPodSourceClient() throws IOException
+ {
+ final Pod pod = new PodBuilder()
+ .withNewMetadata()
+ .withName("overlord-pod")
+ .withNamespace("overlord-namespace")
+ .endMetadata()
+ .withSpec(K8sTestUtils.getDummyPodSpec())
+ .build();
+ client.pods().inNamespace("overlord-namespace").resource(pod).create();
+ final KubernetesClientApi podSourceClient = new TestKubernetesClient(client, "overlord-namespace");
+ final KubernetesClientApi targetClient = new TestKubernetesClient(client, "remote-task-namespace")
+ {
+ @Override
+ public T executeRequest(KubernetesExecutor executor)
+ {
+ throw new AssertionError("fromTask should not read the Overlord pod from the target cluster client");
+ }
+ };
+ final KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder()
+ .withNamespace("remote-task-namespace")
+ .build();
+ final K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ podSourceClient,
+ "overlord-namespace",
+ targetClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ taskLogs
+ )
+ {
+ @Override
+ protected String getCurrentPodName()
+ {
+ return "overlord-pod";
+ }
+ };
+
+ final Job job = adapter.fromTask(K8sTestUtils.getTask());
+
+ Assertions.assertNotNull(job);
+ Assertions.assertNotNull(job.getSpec().getTemplate().getSpec());
+ }
+
+ @Test
+ void testFromTaskWithMissingOverlordPodFailsWithActionableMessage()
+ {
+ final KubernetesClientApi podSourceClient = new TestKubernetesClient(client, "overlord-namespace")
+ {
+ @SuppressWarnings("unchecked")
+ @Override
+ public T executeRequest(KubernetesExecutor executor)
+ {
+ return null;
+ }
+ };
+ final KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder()
+ .withNamespace("remote-task-namespace")
+ .build();
+ final K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ podSourceClient,
+ "overlord-namespace",
+ new TestKubernetesClient(client, "remote-task-namespace"),
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ taskLogs
+ );
+
+ final DruidException exception = Assertions.assertThrows(
+ DruidException.class,
+ () -> adapter.fromTask(K8sTestUtils.getTask())
+ );
+ Assertions.assertTrue(exception.getMessage().contains("Could not load Overlord pod"));
+ Assertions.assertTrue(exception.getMessage().contains("overlord-namespace"));
+ Assertions.assertTrue(exception.getMessage().contains("customTemplateAdapter"));
+ }
+
@Test
public void serializingAndDeserializingATask() throws IOException
{
diff --git a/licenses.yaml b/licenses.yaml
index 3b76c7490b4d..943f69cd07d0 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -998,6 +998,37 @@ libraries:
- io.fabric8: kubernetes-httpclient-okhttp
---
+name: Jackson
+license_category: binary
+module: extensions-core/kubernetes-overlord-extensions
+license_name: Apache License version 2.0
+version: 2.20.2
+libraries:
+ - com.fasterxml.jackson.dataformat: jackson-dataformat-properties
+notice: |
+ # Jackson JSON processor
+
+ Jackson is a high-performance, Free/Open Source JSON processing library.
+ It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+ been in development since 2007.
+ It is currently developed by a community of developers, as well as supported
+ commercially by FasterXML.com.
+
+ ## Licensing
+
+ Jackson core and extension components may licensed under different licenses.
+ To find the details that apply to this artifact see the accompanying LICENSE file.
+ For more information, including possible other licensing options, contact
+ FasterXML.com (http://fasterxml.com).
+
+ ## Credits
+
+ A list of contributors may be found from CREDITS file, which is included
+ in some artifacts (usually source distributions); but is always available
+ from the source code management (SCM) system project uses.
+
+---
+
name: vertx
license_category: binary
module: extensions-core/kubernetes-overlord-extensions