Skip to content

Commit 18c4566

Browse files
committed
Merge remote-tracking branch 'upstream/master' into processing-junit5-pt7
2 parents 483d721 + b442161 commit 18c4566

50 files changed

Lines changed: 1497 additions & 198 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/configuration/index.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti
749749
|`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZooKeeper interaction code, however, is written in a way that doesn’t allow the Coordinator to know for a fact that it’s done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|`PT300S`|
750750
|`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical service.|`PT15M`|
751751
|`druid.coordinator.balancer.strategy`|The [balancing strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the Coordinator to distribute segments among the Historical servers in a tier. The `cost` strategy distributes segments by minimizing a cost function, `diskNormalized` weights these costs with the disk usage ratios of the servers and `random` distributes segments randomly.|`cost`|
752+
|`druid.coordinator.balancer.diskNormalized.moveCostSavingsThreshold`|Only used when `druid.coordinator.balancer.strategy` is `diskNormalized`. Minimum fractional cost reduction required before a segment is moved off a server that already holds it. A value of `0.05` requires the destination to be at least 5% cheaper than the source, which prevents oscillation between servers with similar disk utilization. Must be in `[0.0, 1.0)`; `0.0` disables the anti-oscillation discount.|`0.05`|
752753
|`druid.coordinator.loadqueuepeon.http.repeatDelay`|The start and repeat delay (in milliseconds) for the load queue peon, which manages the load/drop queue of segments for any server.|1 minute|
753754
|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop requests to batch in one HTTP request. Note that it must be smaller than or equal to the `druid.segmentCache.numLoadingThreads` config on Historical service. If this value is not configured, the coordinator uses the value of the `numLoadingThreads` for the respective server. | `druid.segmentCache.numLoadingThreads` |
754755
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator service should act like an Overlord as well. This configuration allows users to simplify a Druid cluster by not having to deploy any standalone Overlord services. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also.|false|
@@ -2023,7 +2024,7 @@ log4j config to route these logs to different sources based on the feed of the e
20232024
|`druid.emitter.logging.loggerClass`|The class used for logging.|`org.apache.druid.java.util.emitter.core.LoggingEmitter`|
20242025
|`druid.emitter.logging.logLevel`|Choices: debug, info, warn, error. The log level at which message are logged.|info|
20252026
|`druid.emitter.logging.shouldFilterMetrics`|When true, only metrics listed in the allow list are emitted; non-metric events (e.g. alerts) are always emitted. When false, all events are logged (backward-compatible).|false|
2026-
|`druid.emitter.logging.allowedMetricsPath`|Path to a JSON file whose keys are the allowed metric names. Only used when `shouldFilterMetrics` is true. If null or empty, the bundled classpath resource `defaultMetrics.json` is used. If a path is set but the file is missing, a warning is logged and the emitter falls back to the default classpath resource.|null|
2027+
|`druid.emitter.logging.allowedMetricsPath`|Path to a JSON file whose keys are the allowed metric names. Only used when `shouldFilterMetrics` is true. If null or empty, the bundled classpath resource `loggingEmitterAllowedMetrics.json` is used. If a path is set but the file is missing, a warning is logged and the emitter falls back to the default classpath resource.|null|
20272028

20282029
#### HTTP emitter module
20292030

docs/design/coordinator.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ But in a tier with several Historicals (or a low replication factor), segment re
8888
Thus, the Coordinator constantly monitors the set of segments present on each Historical in a tier and employs one of the following strategies to identify segments that may be moved from one Historical to another to retain balance.
8989

9090
- `cost` (default): For a given segment in a tier, this strategy picks the server with the minimum "cost" of placing that segment. The cost is a function of the data interval of the segment and the data intervals of all the segments already present on the candidate server. In essence, this strategy tries to avoid placing segments with adjacent or overlapping data intervals on the same server. This is based on the premise that adjacent-interval segments are more likely to be used together in a query and placing them on the same server may lead to skewed CPU usages of Historicals.
91-
- `diskNormalized`: A derivative of the `cost` strategy that weights the cost of placing a segment on a server with the disk usage ratio of the server. There are known issues with this strategy and is not recommended for a production cluster.
91+
- `diskNormalized`: A derivative of the `cost` strategy that multiplies the cost of placing a segment on a server by the server's disk usage ratio (`diskUsed / maxSize`). This penalizes fuller servers and drives disk utilization to equalize across the tier, which is useful when historicals within a tier hold segments of widely varying sizes. To prevent oscillation when servers have similar utilization, a segment that is already placed on a server receives a cost discount; a move only fires when the destination saves at least `druid.coordinator.balancer.diskNormalized.moveCostSavingsThreshold` (default `0.05`, i.e. 5%) of the source's cost.
9292
- `random`: Distributes segments randomly across servers. This is an experimental strategy and is not recommended for a production cluster.
9393

9494
All of the above strategies prioritize moving segments from the Historical with the least available disk space.

docs/ingestion/kafka-ingestion.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,8 @@ The following example shows a supervisor spec with idle configuration enabled:
235235
"enableTaskAutoScaler": true,
236236
"taskCountMax": 6,
237237
"taskCountMin": 2,
238-
"minTriggerScaleActionFrequencyMillis": 600000,
238+
"minScaleUpDelay": "PT10M",
239+
"minScaleDownDelay": "PT10M",
239240
"autoScalerStrategy": "lagBased",
240241
"lagCollectionIntervalMillis": 30000,
241242
"lagCollectionRangeMillis": 600000,

docs/ingestion/supervisor.md

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ The following table outlines the configuration properties for `autoScalerConfig`
7979
|`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka partitions or Kinesis shards, Druid sets the maximum number of reading tasks to the number of Kafka partitions or Kinesis shards and ignores `taskCountMax`.|Yes||
8080
|`taskCountMin`|The minimum number of ingestion tasks. When you enable the autoscaler, Druid computes the initial number of tasks to launch by checking the configs in the following order: `taskCountStart`, then `taskCount` (in `ioConfig`), then `taskCountMin`.|Yes||
8181
|`taskCountStart`|Optional config to specify the number of ingestion tasks to start with. When you enable the autoscaler, Druid computes the initial number of tasks to launch by checking the configs in the following order: `taskCountStart`, then `taskCount` (in `ioConfig`), then `taskCountMin`.|No|`taskCount` or `taskCountMin`|
82-
|`minTriggerScaleActionFrequencyMillis`|The minimum time interval between two scale actions.| No|600000|
82+
|`minScaleUpDelay`|Minimum cooldown duration between scale-up actions, specified as an ISO-8601 duration string. Falls back to `minTriggerScaleActionFrequencyMillis` if not set.|No||
83+
|`minScaleDownDelay`|Minimum cooldown duration between scale-down actions, specified as an ISO-8601 duration string. Falls back to `minTriggerScaleActionFrequencyMillis` if not set.|No||
84+
|`minTriggerScaleActionFrequencyMillis`|**Deprecated.** Use `minScaleUpDelay` and `minScaleDownDelay` instead. Minimum time interval in milliseconds between scale actions, used as the fallback when the Duration-based fields are not set.|No|600000|
8385
|`autoScalerStrategy`|The algorithm of autoscaler. Druid only supports the `lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more information.|No|`lagBased`|
8486
|`stopTaskCountRatio`|A variable version of `ioConfig.stopTaskCount` with a valid range of (0.0, 1.0]. Allows the maximum number of stoppable tasks in steady state to be proportional to the number of tasks currently running.|No||
8587

@@ -161,7 +163,8 @@ The following example shows a supervisor spec with `lagBased` autoscaler:
161163
"enableTaskAutoScaler": true,
162164
"taskCountMax": 6,
163165
"taskCountMin": 2,
164-
"minTriggerScaleActionFrequencyMillis": 600000,
166+
"minScaleUpDelay": "PT10M",
167+
"minScaleDownDelay": "PT10M",
165168
"autoScalerStrategy": "lagBased",
166169
"lagCollectionIntervalMillis": 30000,
167170
"lagCollectionRangeMillis": 600000,
@@ -210,10 +213,11 @@ The following table outlines the configuration properties related to the `costBa
210213
|`idleWeight`|The weight of extracted poll idle value in cost function. | No | 0.75 |
211214
|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when selecting task counts.|No| `false` |
212215
|`highLagThreshold`|Average partition lag threshold that triggers burst scale-up when set to a value greater than `0`. Set to a negative value to disable burst scale-up.|No|-1|
213-
|`minScaleDownDelay`|Minimum duration between successful scale actions, specified as an ISO-8601 duration string.|No|`PT30M`|
216+
|`minScaleUpDelay`|Minimum cooldown duration after a scale-up action before the next scale-up is allowed, specified as an ISO-8601 duration string.|No||
217+
|`minScaleDownDelay`|Minimum cooldown duration after a scale-down action before the next scale-down is allowed, specified as an ISO-8601 duration string.|No|`PT30M`|
214218
|`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is limited to periods during task rollovers only.|No|`false`|
215219

216-
The following example shows a supervisor spec with `lagBased` autoscaler:
220+
The following example shows a supervisor spec with `costBased` autoscaler:
217221

218222
<details>
219223
<summary>Click to view the example</summary>
@@ -227,9 +231,10 @@ The following example shows a supervisor spec with `lagBased` autoscaler:
227231
"autoScalerStrategy": "costBased",
228232
"taskCountMin": 1,
229233
"taskCountMax": 10,
230-
"minTriggerScaleActionFrequencyMillis": 600000,
234+
"minScaleUpDelay": "PT10M",
235+
"minScaleDownDelay": "PT30M",
231236
"lagWeight": 0.1,
232-
"idleWeight": 0.9,
237+
"idleWeight": 0.9
233238
}
234239
}
235240
}

extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
"kafka/consumer/recordsPerRequestAvg" : { "dimensions" : ["topic"], "type" : "gauge", "help": "Average records per fetch request as seen by the consumer of a Kafka indexing task (per topic)."},
9898
"kafka/consumer/outgoingBytes" : { "dimensions" : ["node_id"], "type" : "count", "help": "Bytes sent to Kafka brokers by the consumer of a Kafka indexing task (per node)."},
9999
"kafka/consumer/incomingBytes" : { "dimensions" : ["node_id"], "type" : "count", "help": "Bytes received from Kafka brokers by the consumer of a Kafka indexing task (per node)."},
100+
"kafka/consumer/pollIdleRatio" : { "dimensions" : [], "type" : "gauge", "help": "Average fraction of time the consumer of a Kafka indexing task spent idle (not in poll). 0 means never idle, 1 means always idle."},
100101

101102
"ingest/count" : { "dimensions" : ["dataSource", "taskType"], "type" : "count", "help": "Count of 1 every time an ingestion job runs (includes compaction jobs). Aggregate using dimensions." },
102103
"ingest/segments/count" : { "dimensions" : ["dataSource", "taskType"], "type" : "count", "help": "Count of final segments created by job (includes tombstones)." },

extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ public class KafkaConsumerMonitor extends AbstractMonitor
127127
"kafka/consumer/incomingBytes",
128128
Set.of(CLIENT_ID_TAG, NODE_ID_TAG),
129129
KafkaConsumerMetric.MetricType.COUNTER
130+
),
131+
new KafkaConsumerMetric(
132+
POLL_IDLE_RATIO_METRIC_NAME,
133+
"kafka/consumer/pollIdleRatio",
134+
Set.of(CLIENT_ID_TAG),
135+
KafkaConsumerMetric.MetricType.GAUGE
130136
)
131137
).collect(Collectors.toMap(KafkaConsumerMetric::getKafkaMetricName, Function.identity()));
132138

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ public void testPoll() throws InterruptedException, ExecutionException
484484
emitter.verifyEmitted("kafka/consumer/recordsPerRequestAvg", 1);
485485
emitter.verifyEmitted("kafka/consumer/incomingBytes", 2);
486486
emitter.verifyEmitted("kafka/consumer/outgoingBytes", 2);
487+
emitter.verifyEmitted("kafka/consumer/pollIdleRatio", 1);
487488

488489
recordSupplier.close();
489490
Assert.assertFalse(monitor.monitor(emitter));

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,8 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException
315315
autoScalerConfig.put("scaleInStep", 1);
316316
autoScalerConfig.put("scaleOutStep", 2);
317317
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
318+
autoScalerConfig.put("minScaleUpDelay", "PT20M");
319+
autoScalerConfig.put("minScaleDownDelay", "PT20M");
318320

319321
final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
320322
consumerProperties.put("bootstrap.servers", "localhost:8082");

indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ public Response specGetAll(
214214
Set<String> authorizedSupervisorIds = filterAuthorizedSupervisorIds(
215215
req,
216216
manager,
217-
manager.getSupervisorIds()
217+
manager.getSupervisorIds(),
218+
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR
218219
);
219220
final boolean includeFull = full != null;
220221
final boolean includeState = state != null && state;
@@ -509,7 +510,8 @@ public Response terminateAll(@Context final HttpServletRequest req)
509510
Set<String> authorizedSupervisorIds = filterAuthorizedSupervisorIds(
510511
req,
511512
manager,
512-
manager.getSupervisorIds()
513+
manager.getSupervisorIds(),
514+
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR
513515
);
514516

515517
for (final String supervisorId : authorizedSupervisorIds) {
@@ -652,15 +654,16 @@ private Response asLeaderWithSupervisorManager(Function<SupervisorManager, Respo
652654
private Set<String> filterAuthorizedSupervisorIds(
653655
final HttpServletRequest req,
654656
SupervisorManager manager,
655-
Collection<String> supervisorIds
657+
Collection<String> supervisorIds,
658+
Function<String, ResourceAction> authorizationFn
656659
)
657660
{
658661
Function<String, Iterable<ResourceAction>> raGenerator = supervisorId -> {
659662
Optional<SupervisorSpec> supervisorSpecOptional = manager.getSupervisorSpec(supervisorId);
660663
if (supervisorSpecOptional.isPresent()) {
661664
return Iterables.transform(
662665
supervisorSpecOptional.get().getDataSources(),
663-
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR
666+
authorizationFn
664667
);
665668
} else {
666669
return null;
@@ -710,7 +713,8 @@ private Response suspendOrResumeAll(final HttpServletRequest req, final boolean
710713
Set<String> authorizedSupervisorIds = filterAuthorizedSupervisorIds(
711714
req,
712715
manager,
713-
manager.getSupervisorIds()
716+
manager.getSupervisorIds(),
717+
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR
714718
);
715719

716720
for (final String supervisorId : authorizedSupervisorIds) {

0 commit comments

Comments
 (0)