Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions docs/development/extensions-core/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,116 @@ Ensure that when you are running task pods in another namespace, your task pods

Should you require the needed permissions for interacting across Kubernetes namespaces, you can specify a kubeconfig file, and provided the necessary permissions. You can then use the `KUBECONFIG` environment variable to allow your Overlord deployment to find your kubeconfig file. Refer to the [Kubernetes documentation](https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/) for more information.


### Running tasks on multiple Kubernetes clusters

You can configure the Kubernetes task runner to launch tasks across multiple Kubernetes clusters. This is useful when
you want one Druid Overlord to schedule tasks into several Kubernetes clusters while keeping one Druid metadata store
and one Druid ingestion control plane.

To enable multi-cluster scheduling, set `druid.indexer.runner.type` to `multik8s` and configure at least one
`druid.indexer.runner.clusters` entry. The multi-cluster runner uses the same task pod adapters, dynamic config,
capacity, labels, annotations, shared informer mode, and pod template selection settings as the single-cluster
Kubernetes task runner.

```properties
druid.indexer.runner.type=multik8s
druid.indexer.task.encapsulatedTask=true

druid.indexer.runner.capacity=20
druid.indexer.runner.clusterSelector.type=leastTask

druid.indexer.runner.clusters[0].name=cluster-a
druid.indexer.runner.clusters[0].taskNamespace=druid-tasks-a
druid.indexer.runner.clusters[0].kubeconfigPath=/etc/druid/kubeconfigs/cluster-a.yaml

druid.indexer.runner.clusters[1].name=cluster-b
druid.indexer.runner.clusters[1].taskNamespace=druid-tasks-b
druid.indexer.runner.clusters[1].kubeconfigPath=/etc/druid/kubeconfigs/cluster-b.yaml
```

The runner creates one Kubernetes client and one underlying Kubernetes task runner per configured cluster. The
configured `capacity` is a global limit for all clusters combined, not a per-cluster limit.

If you use the custom template pod adapter and need an explicit job owner label, set `overlordIdentifier` to the same
stable value for each cluster entry that belongs to the same Druid deployment:

```properties
druid.indexer.runner.clusters[0].overlordIdentifier=druid-overlord-prod
druid.indexer.runner.clusters[1].overlordIdentifier=druid-overlord-prod
```

#### Cluster configuration

Each `druid.indexer.runner.clusters[N]` entry supports the following properties:

| Property | Possible Values | Description | Default | Required |
| --- | --- | --- | --- | --- |
| `name` | `String` | Human-readable name for the cluster. When set, Druid adds this value to task context tags as `k8s_cluster`. Use unique names so pod template selection and operational logs are unambiguous. | `null` | No |
| `taskNamespace` | `String` | Kubernetes namespace where task pods and jobs run for this cluster. | - | Yes |
| `kubeconfigPath` | `String` | Path to the kubeconfig file that Druid uses to connect to this cluster. If omitted, Druid uses the default Kubernetes client configuration available to the Overlord process. | `null` | No |
| `overlordIdentifier` | Valid Kubernetes label value | Logical owner identifier for jobs created through the custom template pod adapter. Druid writes it to the `druid.overlord.namespace` Kubernetes job label and uses it when listing jobs, so an underlying runner only sees jobs with the same identifier. Set this when multiple Druid clusters or multiple configured runners may share the same Kubernetes task namespace. | `null` | No |
| `disabled` | `boolean` | Prevents Druid from scheduling new tasks on this cluster. Druid still starts the underlying runner so it can monitor tasks that already exist in the cluster. At least one configured cluster must be enabled. | `false` | No |

`overlordIdentifier` is a user-defined value. It is not the Kubernetes cluster name and does not need to match the
Overlord service name. Treat it as a stable owner label for task jobs. Because Druid writes the value directly to a
Kubernetes label, it must satisfy Kubernetes label value rules: at most 63 characters, beginning and ending with an
alphanumeric character, with only alphanumeric characters, `-`, `_`, and `.` in between. You can usually omit it when
each configured cluster uses a separate Kubernetes cluster or a separate `taskNamespace`. Configure it when jobs from
different Druid deployments, or different multi-cluster runner entries, could otherwise appear in the same namespace
with the same `druid.k8s.peons` label. For one Druid deployment spanning multiple Kubernetes clusters, use the same
`overlordIdentifier` value for each cluster entry.

#### Cluster selection

The multi-cluster runner selects among enabled clusters using `druid.indexer.runner.clusterSelector.type`.

| Value | Description |
| --- | --- |
| `roundrobin` | Selects enabled clusters in order. This is the default. |
| `random` | Selects a random enabled cluster. |
| `leastTask` | Selects from the enabled clusters with the fewest known tasks. If more than one cluster has the same count, Druid selects one of them randomly. |

#### Pod template selection by cluster

When a selected cluster has a configured `name`, Druid adds the cluster name to the task context tags as
`k8s_cluster`. You can use this tag with the `selectorBased` [pod template selection](#pod-template-selection) strategy
to choose different pod templates per Kubernetes cluster.

```json
{
"type": "default",
"podTemplateSelectStrategy": {
"type": "selectorBased",
"selectors": [
{
"selectionKey": "cluster-a-template",
"context.tags": { "k8s_cluster": ["cluster-a"] }
},
{
"selectionKey": "cluster-b-template",
"context.tags": { "k8s_cluster": ["cluster-b"] }
}
]
}
}
```

Configure the corresponding pod templates with the same `selectionKey` values:

```properties
druid.indexer.runner.k8s.podTemplate.base=/path/to/basePodSpec.yaml
druid.indexer.runner.k8s.podTemplate.cluster-a-template=/path/to/cluster-a-podSpec.yaml
druid.indexer.runner.k8s.podTemplate.cluster-b-template=/path/to/cluster-b-podSpec.yaml
```

#### Permissions

The Overlord process must be able to read every configured kubeconfig file. Each kubeconfig must grant access to create,
list, watch, get, and delete jobs and pods in that cluster's `taskNamespace`. If you enable
`druid.indexer.runner.useK8sSharedInformers`, each configured cluster also starts shared informers against its
`taskNamespace`.

### Properties
| Property | Possible Values | Description | Default | Required |
| --- | --- | --- | --- | --- |
Expand All @@ -833,6 +943,12 @@ Should you require the needed permissions for interacting across Kubernetes name
| `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the ingestion task makes a best effort to persist the pod logs from `k8s` to persistent task log storage. The timeout ensures that `k8s` connection issues do not cause the pod to hang indefinitely thereby blocking Overlord operations. If the timeout occurs before the logs are saved, those logs will not be available in Druid. | `PT300S` | NO |
| `druid.indexer.runner.useK8sSharedInformers` | `boolean` | Whether to use shared informers to watch for pod/job changes. This is more efficient on the Kubernetes API server, but may use more memory in the Overlord. | `false` | No |
| `druid.indexer.runner.k8sSharedInformerResyncPeriod` | `Duration` | When using shared informers, controls how frequently the informers resync with the Kubernetes API server. This prevents change events from being missed, keeping the informer cache clean and accurate. | `PT300S` | No |
| `druid.indexer.runner.clusterSelector.type` | `String` (`roundrobin`, `random`, `leastTask`) | Only applicable when `druid.indexer.runner.type` is `multik8s`. Strategy used to select which enabled Kubernetes cluster receives the next task. | `roundrobin` | No |
| `druid.indexer.runner.clusters[N].name` | `String` | Only applicable when `druid.indexer.runner.type` is `multik8s`. Human-readable name for cluster `N`. When set, Druid adds this value to task context tags as `k8s_cluster`. | `null` | No |
| `druid.indexer.runner.clusters[N].taskNamespace` | `String` | Only applicable when `druid.indexer.runner.type` is `multik8s`. Kubernetes namespace where task pods and jobs run for cluster `N`. | - | Yes |
| `druid.indexer.runner.clusters[N].kubeconfigPath` | `String` | Only applicable when `druid.indexer.runner.type` is `multik8s`. Path to the kubeconfig file for cluster `N`. If omitted, Druid uses the default Kubernetes client configuration available to the Overlord process. | `null` | No |
| `druid.indexer.runner.clusters[N].overlordIdentifier` | Valid Kubernetes label value | Only applicable when `druid.indexer.runner.type` is `multik8s`. Logical owner identifier for jobs created through the custom template pod adapter. Druid writes it to the `druid.overlord.namespace` Kubernetes job label and uses it when listing jobs. Use the same value for cluster entries that belong to the same Druid deployment. Set this when multiple Druid clusters or multiple configured runners may share the same Kubernetes task namespace. | `null` | No |
| `druid.indexer.runner.clusters[N].disabled` | `boolean` | Only applicable when `druid.indexer.runner.type` is `multik8s`. Prevents Druid from scheduling new tasks on cluster `N` while continuing to monitor existing tasks there. | `false` | No |

### Metrics added

Expand Down
4 changes: 4 additions & 0 deletions extensions-core/kubernetes-overlord-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
<artifactId>commons-lang3</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-properties</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord;


import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;

import javax.annotation.Nullable;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class AutoscalableThreadPoolExecutor extends ThreadPoolExecutor
{
private static final EmittingLogger log = new EmittingLogger(AutoscalableThreadPoolExecutor.class);

@Nullable
private final ConfigManager configManager;
private final String listenerKey;
private final Consumer<KubernetesTaskRunnerDynamicConfig> configListener;

public AutoscalableThreadPoolExecutor(int initialCapacity, @Nullable ConfigManager configManager)
{
super(
initialCapacity,
initialCapacity,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
Execs.makeThreadFactory("k8s-task-runner-%d", null)
);

this.configManager = configManager;
this.listenerKey = StringUtils.format("AutoscalableThreadPoolExecutor-%d", System.identityHashCode(this));
this.configListener = this::onConfigurationChange;

// Monitor the configuration change
if (configManager != null && !configManager.addListener(
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
listenerKey,
configListener)) {
log.error("Failed to add configuration listener for AutoscalableThreadPoolExecutor");
}
}

@Override
public void shutdown()
{
removeConfigListener();
super.shutdown();
}

@Override
public List<Runnable> shutdownNow()
{
removeConfigListener();
return super.shutdownNow();
}

private void removeConfigListener()
{
if (configManager == null) {
return;
}

if (!configManager.removeListener(
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
listenerKey,
configListener
)) {
log.warn("Failed to remove configuration listener[%s] for AutoscalableThreadPoolExecutor", listenerKey);
}
}

private void onConfigurationChange(KubernetesTaskRunnerDynamicConfig config)
{
if (config.getCapacity() == null) {
return;
}

final int curCapacity = this.getCorePoolSize();
final int newCapacity = config.getCapacity();
if (newCapacity == curCapacity) {
return;
}

log.info("Adjusting k8s task runner capacity from [%d] to [%d]", curCapacity, newCapacity);
// maximum pool size must always be greater than or equal to the core pool size
if (newCapacity < curCapacity) {
// decrease capacity
setCorePoolSize(newCapacity);
setMaximumPoolSize(newCapacity);
} else {
// increase capacity
setMaximumPoolSize(newCapacity);
setCorePoolSize(newCapacity);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,12 @@ public void configure(Binder binder)
biddy.addBinding(KubernetesAndWorkerTaskRunnerFactory.TYPE_NAME)
.to(KubernetesAndWorkerTaskRunnerFactory.class)
.in(LazySingleton.class);
biddy.addBinding(MultipleKubernetesTaskRunnerFactory.TYPE_NAME)
.to(MultipleKubernetesTaskRunnerFactory.class)
.in(LazySingleton.class);
binder.bind(KubernetesTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(KubernetesAndWorkerTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(MultipleKubernetesTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RunnerStrategy.class)
.toProvider(RunnerStrategyProvider.class)
.in(LazySingleton.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
private final HttpClient httpClient;
private final PeonLifecycleFactory peonLifecycleFactory;
private final ServiceEmitter emitter;
private final boolean ownsExecutor;
// currently worker categories aren't supported, so it's hardcoded.
protected static final String WORKER_CATEGORY = "_k8s_worker_category";

Expand All @@ -136,6 +137,20 @@
ServiceEmitter emitter,
ConfigManager configManager
)
{
this(adapter, config, client, httpClient, peonLifecycleFactory, emitter, null, configManager);
}

public KubernetesTaskRunner(
TaskAdapter adapter,
KubernetesTaskRunnerConfig config,
KubernetesPeonClient client,
HttpClient httpClient,
PeonLifecycleFactory peonLifecycleFactory,
ServiceEmitter emitter,
@Nullable ThreadPoolExecutor sharedExecutor,
@Nullable ConfigManager configManager
)
{
this.adapter = adapter;
this.config = config;
Expand All @@ -146,9 +161,28 @@
this.emitter = emitter;

this.currentCapacity = new AtomicInteger(config.getCapacity());
this.tpe = new ThreadPoolExecutor(currentCapacity.get(), currentCapacity.get(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Execs.makeThreadFactory("k8s-task-runner-%d", null));
if (sharedExecutor == null) {
this.tpe = new ThreadPoolExecutor(
currentCapacity.get(),
currentCapacity.get(),
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
Execs.makeThreadFactory("k8s-task-runner-%d", null)
);
this.ownsExecutor = true;
} else {
this.tpe = sharedExecutor;
this.ownsExecutor = false;
}
this.exec = MoreExecutors.listeningDecorator(this.tpe);
configManager.addListener(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, StringUtils.format(OBSERVER_KEY, Thread.currentThread().getId()), this::syncCapacityWithDynamicConfig);
if (ownsExecutor && configManager != null) {
configManager.addListener(
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
StringUtils.format(OBSERVER_KEY, Thread.currentThread().getId()),

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
Thread.getId
should be avoided because it has been deprecated.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updated Thread.threadId is the replacement, introduced in Java 19. I didn't get to see any PR that drops jdk17 support yet, (correct me if im wrong), so we should still use this deprecated method.

this::syncCapacityWithDynamicConfig
);
}
}

@Override
Expand Down Expand Up @@ -194,7 +228,7 @@

private void syncCapacityWithDynamicConfig(KubernetesTaskRunnerDynamicConfig config)
{
int newCapacity = config.getCapacity();
final int newCapacity = config.getCapacity();
if (newCapacity == currentCapacity.get()) {
return;
}
Expand All @@ -210,6 +244,12 @@
currentCapacity.set(newCapacity);
}

@VisibleForTesting
KubernetesPeonClient getPeonClient()
{
return client;
}

private TaskStatus runTask(Task task)
{
return doTask(task, true);
Expand Down Expand Up @@ -443,7 +483,9 @@
{
log.info("Stopping KubernetesTaskRunner");
// Stop managing the running k8s jobs
exec.shutdownNow();
if (ownsExecutor) {
exec.shutdownNow();
}
cleanupExecutor.shutdownNow();
log.debug("Stopped KubernetesTaskRunner");
}
Expand Down
Loading
Loading