Skip to content

Commit 244e848

Browse files
KAFKA-20211: Add group coordinator background metrics (apache#21616)
Add the background-queue-time and background-processing-time histogram metrics. Add the background-thread-idle-ratio-avg metric. Unlike the event processor, we cannot measure idle time when using an ExecutorService. Instead, we measure busy time and invert it when reporting the metric. All three background metrics are only reported by the group coordinator and not the share coordinator. Reviewers: David Jacot <djacot@confluent.io>
1 parent 7ed35ab commit 244e848

8 files changed

Lines changed: 395 additions & 23 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.common.runtime;
18+
19+
import org.apache.kafka.common.utils.ThreadUtils;
20+
import org.apache.kafka.common.utils.Time;
21+
22+
import java.util.Objects;
23+
import java.util.concurrent.LinkedBlockingQueue;
24+
import java.util.concurrent.ThreadPoolExecutor;
25+
import java.util.concurrent.TimeUnit;
26+
27+
/**
28+
* A {@link ThreadPoolExecutor} that reports metrics.
29+
*/
30+
public class CoordinatorBackgroundThreadPoolExecutor extends ThreadPoolExecutor {
31+
32+
/**
33+
* The time.
34+
*/
35+
private final Time time;
36+
37+
/**
38+
* The coordinator runtime metrics.
39+
*/
40+
private final CoordinatorRuntimeMetrics metrics;
41+
42+
public CoordinatorBackgroundThreadPoolExecutor(
43+
String threadPrefix,
44+
int numThreads,
45+
Time time,
46+
CoordinatorRuntimeMetrics metrics
47+
) {
48+
super(
49+
numThreads,
50+
numThreads,
51+
0L,
52+
TimeUnit.MILLISECONDS,
53+
new LinkedBlockingQueue<>(),
54+
ThreadUtils.createThreadFactory(Objects.requireNonNull(threadPrefix) + "%d", false)
55+
);
56+
this.time = Objects.requireNonNull(time);
57+
this.metrics = Objects.requireNonNull(metrics);
58+
}
59+
60+
@Override
61+
public void execute(Runnable command) {
62+
var queuedTimeMs = time.milliseconds();
63+
64+
super.execute(() -> {
65+
var dequeuedTimeMs = time.milliseconds();
66+
metrics.recordBackgroundQueueTime(dequeuedTimeMs - queuedTimeMs);
67+
68+
try {
69+
command.run();
70+
} finally {
71+
long processingTimeMs = time.milliseconds() - dequeuedTimeMs;
72+
metrics.recordBackgroundProcessingTime(processingTimeMs);
73+
metrics.recordBackgroundThreadBusyTime((double) processingTimeMs / getCorePoolSize());
74+
}
75+
});
76+
}
77+
}

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,26 @@ public interface CoordinatorRuntimeMetrics extends AutoCloseable {
8080
*/
8181
void recordThreadIdleTime(double idleTimeMs);
8282

83+
/**
84+
* Update the background queue time.
85+
*
86+
* @param durationMs The queue time.
87+
*/
88+
void recordBackgroundQueueTime(long durationMs);
89+
90+
/**
91+
* Update the background processing time.
92+
*
93+
* @param durationMs The background processing time.
94+
*/
95+
void recordBackgroundProcessingTime(long durationMs);
96+
97+
/**
98+
* Record the background thread busy time.
99+
* @param busyTimeMs The busy time in milliseconds.
100+
*/
101+
void recordBackgroundThreadBusyTime(double busyTimeMs);
102+
83103
/**
84104
* Register the event queue size gauge.
85105
*

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.common.MetricName;
2020
import org.apache.kafka.common.metrics.Gauge;
21+
import org.apache.kafka.common.metrics.MetricConfig;
2122
import org.apache.kafka.common.metrics.Metrics;
2223
import org.apache.kafka.common.metrics.Sensor;
2324
import org.apache.kafka.common.metrics.stats.Avg;
@@ -79,6 +80,16 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
7980
*/
8081
public static final String BATCH_BUFFER_CACHE_DISCARD_COUNT_METRIC_NAME = "batch-buffer-cache-discard-count";
8182

83+
/**
84+
* The background queue time metric name.
85+
*/
86+
public static final String BACKGROUND_QUEUE_TIME_METRIC_NAME = "background-queue-time-ms";
87+
88+
/**
89+
* The background processing time metric name.
90+
*/
91+
public static final String BACKGROUND_PROCESSING_TIME_METRIC_NAME = "background-processing-time-ms";
92+
8293
/**
8394
* Metric to count the number of partitions in Loading state.
8495
*/
@@ -153,7 +164,22 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
153164
*/
154165
private final Sensor flushSensor;
155166

156-
public CoordinatorRuntimeMetricsImpl(Metrics metrics, String metricsGroup) {
167+
/**
168+
* The background thread busy sensor. Null when background metrics are not enabled.
169+
*/
170+
private final Sensor backgroundThreadBusySensor;
171+
172+
/**
173+
* The background queue time sensor. Null when background metrics are not enabled.
174+
*/
175+
private final Sensor backgroundQueueTimeSensor;
176+
177+
/**
178+
* The background processing time sensor. Null when background metrics are not enabled.
179+
*/
180+
private final Sensor backgroundProcessingTimeSensor;
181+
182+
public CoordinatorRuntimeMetricsImpl(Metrics metrics, String metricsGroup, boolean enableBackgroundMetrics) {
157183
this.metrics = Objects.requireNonNull(metrics);
158184
this.metricsGroup = Objects.requireNonNull(metricsGroup);
159185

@@ -265,6 +291,44 @@ public CoordinatorRuntimeMetricsImpl(Metrics metrics, String metricsGroup) {
265291
this.metricsGroup,
266292
"The flushes per second."),
267293
new Rate(TimeUnit.SECONDS, new WindowedCount()));
294+
295+
if (enableBackgroundMetrics) {
296+
this.backgroundThreadBusySensor = metrics.sensor(this.metricsGroup + "-BackgroundThreadBusyRatio");
297+
this.backgroundThreadBusySensor.add(
298+
metrics.metricName(
299+
"background-thread-idle-ratio-avg",
300+
this.metricsGroup,
301+
"The fraction of time the background threads are idle. This is an average across " +
302+
"all coordinator background threads."),
303+
new Rate(TimeUnit.MILLISECONDS) {
304+
@Override
305+
public double measure(MetricConfig config, long now) {
306+
return 1.0 - super.measure(config, now);
307+
}
308+
});
309+
310+
KafkaMetricHistogram backgroundQueueTimeHistogram = KafkaMetricHistogram.newLatencyHistogram(
311+
suffix -> kafkaMetricName(
312+
BACKGROUND_QUEUE_TIME_METRIC_NAME + "-" + suffix,
313+
"The " + suffix + " background queue time in milliseconds"
314+
)
315+
);
316+
this.backgroundQueueTimeSensor = metrics.sensor(this.metricsGroup + "-BackgroundQueueTime");
317+
this.backgroundQueueTimeSensor.add(backgroundQueueTimeHistogram);
318+
319+
KafkaMetricHistogram backgroundProcessingTimeHistogram = KafkaMetricHistogram.newLatencyHistogram(
320+
suffix -> kafkaMetricName(
321+
BACKGROUND_PROCESSING_TIME_METRIC_NAME + "-" + suffix,
322+
"The " + suffix + " background processing time in milliseconds"
323+
)
324+
);
325+
this.backgroundProcessingTimeSensor = metrics.sensor(this.metricsGroup + "-BackgroundProcessingTime");
326+
this.backgroundProcessingTimeSensor.add(backgroundProcessingTimeHistogram);
327+
} else {
328+
this.backgroundThreadBusySensor = null;
329+
this.backgroundQueueTimeSensor = null;
330+
this.backgroundProcessingTimeSensor = null;
331+
}
268332
}
269333

270334
/**
@@ -298,6 +362,15 @@ public void close() {
298362
metrics.removeSensor(eventPurgatoryTimeSensor.name());
299363
metrics.removeSensor(lingerTimeSensor.name());
300364
metrics.removeSensor(flushSensor.name());
365+
if (backgroundThreadBusySensor != null) {
366+
metrics.removeSensor(backgroundThreadBusySensor.name());
367+
}
368+
if (backgroundQueueTimeSensor != null) {
369+
metrics.removeSensor(backgroundQueueTimeSensor.name());
370+
}
371+
if (backgroundProcessingTimeSensor != null) {
372+
metrics.removeSensor(backgroundProcessingTimeSensor.name());
373+
}
301374
}
302375

303376
/**
@@ -372,6 +445,27 @@ public void recordThreadIdleTime(double idleTimeMs) {
372445
threadIdleSensor.record(idleTimeMs);
373446
}
374447

448+
@Override
449+
public void recordBackgroundThreadBusyTime(double busyTimeMs) {
450+
if (backgroundThreadBusySensor != null) {
451+
backgroundThreadBusySensor.record(busyTimeMs);
452+
}
453+
}
454+
455+
@Override
456+
public void recordBackgroundQueueTime(long durationMs) {
457+
if (backgroundQueueTimeSensor != null) {
458+
backgroundQueueTimeSensor.record(durationMs);
459+
}
460+
}
461+
462+
@Override
463+
public void recordBackgroundProcessingTime(long durationMs) {
464+
if (backgroundProcessingTimeSensor != null) {
465+
backgroundProcessingTimeSensor.record(durationMs);
466+
}
467+
}
468+
375469
@Override
376470
public void registerEventQueueSizeGauge(Supplier<Integer> sizeSupplier) {
377471
metrics.addMetric(eventQueueSize, (Gauge<Long>) (config, now) -> (long) sizeSupplier.get());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.common.runtime;
18+
19+
import org.apache.kafka.common.utils.MockTime;
20+
import org.apache.kafka.common.utils.Time;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.Future;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.TimeoutException;
29+
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.times;
32+
import static org.mockito.Mockito.verify;
33+
34+
public class CoordinatorBackgroundThreadPoolExecutorTest {
35+
36+
@Test
37+
public void testMetrics() throws ExecutionException, InterruptedException, TimeoutException {
38+
CoordinatorRuntimeMetrics metrics = mock(CoordinatorRuntimeMetrics.class);
39+
Time mockTime = new MockTime();
40+
CoordinatorBackgroundThreadPoolExecutor threadPoolExecutor = new CoordinatorBackgroundThreadPoolExecutor(
41+
"threaad-pool-",
42+
2,
43+
mockTime,
44+
metrics
45+
);
46+
47+
try {
48+
CountDownLatch task1Started = new CountDownLatch(1);
49+
CountDownLatch task2Started = new CountDownLatch(1);
50+
CountDownLatch task3Started = new CountDownLatch(1);
51+
CountDownLatch task1Unblocked = new CountDownLatch(1);
52+
CountDownLatch task2Unblocked = new CountDownLatch(1);
53+
CountDownLatch task3Unblocked = new CountDownLatch(1);
54+
55+
Future<?> task1 = threadPoolExecutor.submit(() -> {
56+
task1Started.countDown();
57+
try {
58+
task1Unblocked.await();
59+
} catch (InterruptedException e) { }
60+
});
61+
Future<?> task2 = threadPoolExecutor.submit(() -> {
62+
task2Started.countDown();
63+
try {
64+
task2Unblocked.await();
65+
} catch (InterruptedException e) { }
66+
});
67+
Future<?> task3 = threadPoolExecutor.submit(() -> {
68+
task3Started.countDown();
69+
try {
70+
task3Unblocked.await();
71+
} catch (InterruptedException e) { }
72+
});
73+
74+
// Task 1 and task 2 start.
75+
task1Started.await();
76+
task2Started.await();
77+
78+
// Task 1 takes 100 ms.
79+
mockTime.sleep(100);
80+
task1Unblocked.countDown();
81+
task1.get(5, TimeUnit.SECONDS);
82+
83+
// Task 3 starts.
84+
task3Started.await();
85+
86+
// Task 2 takes 500 ms.
87+
mockTime.sleep(400);
88+
task2Unblocked.countDown();
89+
task2.get(5, TimeUnit.SECONDS);
90+
91+
// Task 3 takes 500 ms.
92+
mockTime.sleep(100);
93+
task3Unblocked.countDown();
94+
task3.get(5, TimeUnit.SECONDS);
95+
96+
verify(metrics, times(2)).recordBackgroundQueueTime(0);
97+
verify(metrics, times(1)).recordBackgroundQueueTime(100);
98+
verify(metrics, times(1)).recordBackgroundProcessingTime(100);
99+
verify(metrics, times(2)).recordBackgroundProcessingTime(500);
100+
verify(metrics, times(1)).recordBackgroundThreadBusyTime(50.0);
101+
verify(metrics, times(2)).recordBackgroundThreadBusyTime(250.0);
102+
} finally {
103+
threadPoolExecutor.shutdown();
104+
threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS);
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)