Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 130 additions & 3 deletions docs/development/extensions-core/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ If you are running multiple Druid clusters and would like to have a dedicated na

Warning: When `druid.indexer.runner.overlordNamespace` and `druid.indexer.runner.k8sTaskPodNamePrefix` is configured, users should ensure that all running tasks are stopped when changing these values. Failure to do so will cause the Overlord to lose track of running tasks, and re-launch them. This may lead to duplicate data and possibly metadata inconsistency issues.

Druid will tag Kubernetes jobs with a `druid.overlord.namespace` label. This label helps Druid filter out Kubernetes jobs belonging to other namespaces. Should you need to deploy a Druid cluster on a namespace `N1` that is already running tasks from another namespace `N2`, take note to set `druid.indexer.runner.overlordNamespace` to `druid.indexer.runner.namespace` (which is `N1`). Failure to do so will result in the cluster in `N1` detecting task pods created from both `N1` and `N2`.
When using the custom template pod adapter, Druid will tag Kubernetes jobs with a `druid.overlord.namespace` label. This label helps Druid filter out Kubernetes jobs belonging to other namespaces. Should you need to deploy a Druid cluster on a namespace `N1` that is already running tasks from another namespace `N2`, take note to set `druid.indexer.runner.overlordNamespace` to `druid.indexer.runner.namespace` (which is `N1`). Failure to do so will result in the cluster in `N1` detecting task pods created from both `N1` and `N2`.

##### Differentiating Task Pods Created From Multiple Namespaces

Expand All @@ -807,11 +807,132 @@ Ensure that when you are running task pods in another namespace, your task pods

Should you require the needed permissions for interacting across Kubernetes namespaces, you can specify a kubeconfig file, and provided the necessary permissions. You can then use the `KUBECONFIG` environment variable to allow your Overlord deployment to find your kubeconfig file. Refer to the [Kubernetes documentation](https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/) for more information.


### Running tasks on multiple Kubernetes clusters

You can configure the Kubernetes task runner to launch tasks across multiple Kubernetes clusters. This is useful when
you want one Druid Overlord to schedule tasks into several Kubernetes clusters while keeping one Druid metadata store
and one Druid ingestion control plane.

To enable multi-cluster scheduling, set `druid.indexer.runner.type` to `multik8s` and configure at least one
`druid.indexer.runner.clusters` entry. The multi-cluster runner uses the same task pod adapters, dynamic config,
capacity, labels, annotations, shared informer mode, and pod template selection settings as the single-cluster
Kubernetes task runner.

```properties
druid.indexer.runner.type=multik8s
druid.indexer.task.encapsulatedTask=true

druid.indexer.runner.capacity=20
druid.indexer.runner.namespace=druid-overlord
druid.indexer.runner.clusterSelector.type=leastTask

druid.indexer.runner.clusters[0].name=cluster-a
druid.indexer.runner.clusters[0].taskNamespace=druid-tasks-a
druid.indexer.runner.clusters[0].kubeconfigPath=/etc/druid/kubeconfigs/cluster-a.yaml

druid.indexer.runner.clusters[1].name=cluster-b
druid.indexer.runner.clusters[1].taskNamespace=druid-tasks-b
druid.indexer.runner.clusters[1].kubeconfigPath=/etc/druid/kubeconfigs/cluster-b.yaml
```

The runner creates one Kubernetes client and one underlying Kubernetes task runner per configured cluster. The
configured `capacity` is a global limit for all clusters combined, not a per-cluster limit.

When using the default `overlordSingleContainer` adapter or the `overlordMultiContainer` adapter, Druid reads the local
Overlord pod spec from the Kubernetes cluster where the Overlord is running, then creates the task job in the selected
target cluster's `taskNamespace`. Set `druid.indexer.runner.overlordNamespace`, or `druid.indexer.runner.namespace`, to
the namespace that contains the Overlord pod. If neither is set, Druid can only fall back to `clusters[0].taskNamespace`
when there is a single enabled cluster entry that does not specify `kubeconfigPath`. If your target clusters do not have
the same service accounts, secrets, ConfigMaps, volumes, images, or sidecars referenced by the local Overlord pod, use
the `customTemplateAdapter` instead and provide pod templates that are valid for each target cluster.

If you use the custom template pod adapter and need an explicit job owner label, set `overlordIdentifier` to the same
stable value for each cluster entry that belongs to the same Druid deployment:

```properties
druid.indexer.runner.clusters[0].overlordIdentifier=druid-overlord-prod
druid.indexer.runner.clusters[1].overlordIdentifier=druid-overlord-prod
```

#### Cluster configuration

Each `druid.indexer.runner.clusters[N]` entry supports the following properties:

| Property | Possible Values | Description | Default | Required |
| --- | --- | --- | --- | --- |
| `name` | `String` | Human-readable name for the cluster. When set, Druid adds this value to task context tags as `k8s_cluster`. Use unique names so pod template selection and operational logs are unambiguous. | `null` | No |
| `taskNamespace` | `String` | Kubernetes namespace where task pods and jobs run for this cluster. | - | Yes |
| `kubeconfigPath` | `String` | Path to the kubeconfig file that Druid uses to connect to this cluster. If omitted, Druid uses the default Kubernetes client configuration available to the Overlord process. | `null` | No |
| `overlordIdentifier` | Valid Kubernetes label value | Logical owner identifier for jobs created through the custom template pod adapter. Druid writes it to the `druid.overlord.namespace` Kubernetes job label and uses it when listing jobs, so an underlying runner only sees jobs with the same identifier. Set this when multiple Druid clusters or multiple configured runners may share the same Kubernetes task namespace. | `null` | No |
| `disabled` | `boolean` | Prevents Druid from scheduling new tasks on this cluster. Druid still starts the underlying runner so it can monitor tasks that already exist in the cluster. At least one configured cluster must be enabled. | `false` | No |

`overlordIdentifier` is a user-defined value. It is not the Kubernetes cluster name and does not need to match the
Overlord service name. Treat it as a stable owner label for task jobs. Because Druid writes the value directly to a
Kubernetes label, it must satisfy Kubernetes label value rules: at most 63 characters, beginning and ending with an
alphanumeric character, with only alphanumeric characters, `-`, `_`, and `.` in between. You can usually omit it when
each configured cluster uses a separate Kubernetes cluster or a separate `taskNamespace`. Configure it when jobs from
different Druid deployments, or different multi-cluster runner entries, could otherwise appear in the same namespace
with the same `druid.k8s.peons` label. For one Druid deployment spanning multiple Kubernetes clusters, use the same
`overlordIdentifier` value for each cluster entry.

#### Cluster selection

The multi-cluster runner selects among enabled clusters using `druid.indexer.runner.clusterSelector.type`.

| Value | Description |
| --- | --- |
| `roundrobin` | Selects enabled clusters in order. This is the default. |
| `random` | Selects a random enabled cluster. |
| `leastTask` | Selects from the enabled clusters with the fewest known tasks. If more than one cluster has the same count, Druid selects one of them randomly. |

#### Pod template selection by cluster

When a selected cluster has a configured `name`, Druid adds the cluster name to the task context tags as
`k8s_cluster`. You can use this tag with the `selectorBased` [pod template selection](#pod-template-selection) strategy
to choose different pod templates per Kubernetes cluster.

```json
{
"type": "default",
"podTemplateSelectStrategy": {
"type": "selectorBased",
"selectors": [
{
"selectionKey": "cluster-a-template",
"context.tags": { "k8s_cluster": ["cluster-a"] }
},
{
"selectionKey": "cluster-b-template",
"context.tags": { "k8s_cluster": ["cluster-b"] }
}
]
}
}
```

Configure the corresponding pod templates with the same `selectionKey` values:

```properties
druid.indexer.runner.k8s.podTemplate.base=/path/to/basePodSpec.yaml
druid.indexer.runner.k8s.podTemplate.cluster-a-template=/path/to/cluster-a-podSpec.yaml
druid.indexer.runner.k8s.podTemplate.cluster-b-template=/path/to/cluster-b-podSpec.yaml
```

#### Permissions

The Overlord process must be able to read every configured kubeconfig file. Each kubeconfig must grant access to create,
list, watch, get, and delete jobs and pods in that cluster's `taskNamespace`. If you use `overlordSingleContainer` or
`overlordMultiContainer`, the local Kubernetes credentials available to the Overlord process must also be able to get
the Overlord pod from `druid.indexer.runner.overlordNamespace` or `druid.indexer.runner.namespace`. If you enable
`druid.indexer.runner.useK8sSharedInformers`, each configured cluster also starts shared informers against its
`taskNamespace`.

### Properties
| Property | Possible Values | Description | Default | Required |
| --- | --- | --- | --- | --- |
| `druid.indexer.runner.namespace` | `String` | If Overlord and task pods are running in different namespaces, specify the Overlord namespace. | - | Yes |
| `druid.indexer.runner.overlordNamespace` | `String` | Only applicable when using Custom Template Pod Adapter. If Overlord and task pods are running in different namespaces, specify the Overlord namespace. <br /> Warning: You need to stop all running tasks in Druid to change this property. Failure to do so will lead to duplicate data and metadata inconsistencies. | `""` | No |
| `druid.indexer.runner.namespace` | `String` | For the single-cluster `k8s` runner, the namespace where task pods run. For `multik8s`, `clusters[N].taskNamespace` controls where task jobs run, and this property is only used as a fallback namespace for reading the local Overlord pod spec with `overlordSingleContainer` or `overlordMultiContainer`. | - | Yes for `k8s`; No for `multik8s` |
| `druid.indexer.runner.overlordNamespace` | `String` | If Overlord and task pods are running in different namespaces, specify the Overlord namespace. For `multik8s` with `overlordSingleContainer` or `overlordMultiContainer`, Druid uses this namespace to read the local Overlord pod spec. When using Custom Template Pod Adapter, Druid uses this namespace for job ownership labels and filtering. <br /> Warning: You need to stop all running tasks in Druid to change this property. Failure to do so will lead to duplicate data and metadata inconsistencies. | `""` | No |
| `druid.indexer.runner.k8sTaskPodNamePrefix` | `String` | Use this if you want to change your task name to contain a more human-readable prefix. Maximum 30 characters. Special characters `: - . _` will be ignored. <br /> Warning: You need to stop all running tasks in Druid to change this property. Failure to do so will lead to duplicate data and metadata inconsistencies. | `""` | No |
| `druid.indexer.runner.debugJobs` | `boolean` | Boolean flag used to disable clean up of K8s jobs after tasks complete. | False | No |
| `druid.indexer.runner.sidecarSupport` | `boolean` | Deprecated, specify adapter type as runtime property `druid.indexer.runner.k8s.adapter.type: overlordMultiContainer` instead. If your overlord pod has sidecars, this will attempt to start the task with the same sidecars as the overlord pod. | False | No |
Expand All @@ -833,6 +954,12 @@ Should you require the needed permissions for interacting across Kubernetes name
| `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the ingestion task makes a best effort to persist the pod logs from `k8s` to persistent task log storage. The timeout ensures that `k8s` connection issues do not cause the pod to hang indefinitely thereby blocking Overlord operations. If the timeout occurs before the logs are saved, those logs will not be available in Druid. | `PT300S` | NO |
| `druid.indexer.runner.useK8sSharedInformers` | `boolean` | Whether to use shared informers to watch for pod/job changes. This is more efficient on the Kubernetes API server, but may use more memory in the Overlord. | `false` | No |
| `druid.indexer.runner.k8sSharedInformerResyncPeriod` | `Duration` | When using shared informers, controls how frequently the informers resync with the Kubernetes API server. This prevents change events from being missed, keeping the informer cache clean and accurate. | `PT300S` | No |
| `druid.indexer.runner.clusterSelector.type` | `String` (`roundrobin`, `random`, `leastTask`) | Only applicable when `druid.indexer.runner.type` is `multik8s`. Strategy used to select which enabled Kubernetes cluster receives the next task. | `roundrobin` | No |
| `druid.indexer.runner.clusters[N].name` | `String` | Only applicable when `druid.indexer.runner.type` is `multik8s`. Human-readable name for cluster `N`. When set, Druid adds this value to task context tags as `k8s_cluster`. | `null` | No |
| `druid.indexer.runner.clusters[N].taskNamespace` | `String` | Only applicable when `druid.indexer.runner.type` is `multik8s`. Kubernetes namespace where task pods and jobs run for cluster `N`. | - | Yes |
| `druid.indexer.runner.clusters[N].kubeconfigPath` | `String` | Only applicable when `druid.indexer.runner.type` is `multik8s`. Path to the kubeconfig file for cluster `N`. If omitted, Druid uses the default Kubernetes client configuration available to the Overlord process. | `null` | No |
| `druid.indexer.runner.clusters[N].overlordIdentifier` | Valid Kubernetes label value | Only applicable when `druid.indexer.runner.type` is `multik8s`. Logical owner identifier for jobs created through the custom template pod adapter. Druid writes it to the `druid.overlord.namespace` Kubernetes job label and uses it when listing jobs. Use the same value for cluster entries that belong to the same Druid deployment. Set this when multiple Druid clusters or multiple configured runners may share the same Kubernetes task namespace. | `null` | No |
| `druid.indexer.runner.clusters[N].disabled` | `boolean` | Only applicable when `druid.indexer.runner.type` is `multik8s`. Prevents Druid from scheduling new tasks on cluster `N` while continuing to monitor existing tasks there. | `false` | No |

### Metrics added

Expand Down
4 changes: 4 additions & 0 deletions extensions-core/kubernetes-overlord-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
<artifactId>commons-lang3</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-properties</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord;


import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;

import javax.annotation.Nullable;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class AutoscalableThreadPoolExecutor extends ThreadPoolExecutor
{
private static final EmittingLogger log = new EmittingLogger(AutoscalableThreadPoolExecutor.class);

@Nullable
private final ConfigManager configManager;
private final String listenerKey;
private final Consumer<KubernetesTaskRunnerDynamicConfig> configListener;

public AutoscalableThreadPoolExecutor(int initialCapacity, @Nullable ConfigManager configManager)
{
super(
initialCapacity,
initialCapacity,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
Execs.makeThreadFactory("k8s-task-runner-%d", null)
);

this.configManager = configManager;
this.listenerKey = StringUtils.format("AutoscalableThreadPoolExecutor-%d", System.identityHashCode(this));
this.configListener = this::onConfigurationChange;

// Monitor the configuration change
if (configManager != null && !configManager.addListener(
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
listenerKey,
configListener)) {
log.error("Failed to add configuration listener for AutoscalableThreadPoolExecutor");
}
}

@Override
public void shutdown()
{
removeConfigListener();
super.shutdown();
}

@Override
public List<Runnable> shutdownNow()
{
removeConfigListener();
return super.shutdownNow();
}

private void removeConfigListener()
{
if (configManager == null) {
return;
}

if (!configManager.removeListener(
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
listenerKey,
configListener
)) {
log.warn("Failed to remove configuration listener[%s] for AutoscalableThreadPoolExecutor", listenerKey);
}
}

private void onConfigurationChange(KubernetesTaskRunnerDynamicConfig config)
{
if (config.getCapacity() == null) {
return;
}

final int curCapacity = this.getCorePoolSize();
final int newCapacity = config.getCapacity();
if (newCapacity == curCapacity) {
return;
}

log.info("Adjusting k8s task runner capacity from [%d] to [%d]", curCapacity, newCapacity);
// maximum pool size must always be greater than or equal to the core pool size
if (newCapacity < curCapacity) {
// decrease capacity
setCorePoolSize(newCapacity);
setMaximumPoolSize(newCapacity);
} else {
// increase capacity
setMaximumPoolSize(newCapacity);
setCorePoolSize(newCapacity);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,12 @@ public void configure(Binder binder)
biddy.addBinding(KubernetesAndWorkerTaskRunnerFactory.TYPE_NAME)
.to(KubernetesAndWorkerTaskRunnerFactory.class)
.in(LazySingleton.class);
biddy.addBinding(MultipleKubernetesTaskRunnerFactory.TYPE_NAME)
.to(MultipleKubernetesTaskRunnerFactory.class)
.in(LazySingleton.class);
binder.bind(KubernetesTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(KubernetesAndWorkerTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(MultipleKubernetesTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RunnerStrategy.class)
.toProvider(RunnerStrategyProvider.class)
.in(LazySingleton.class);
Expand Down
Loading
Loading