Skip to content

Commit 153a458

Browse files
authored
KAFKA-19711: Add commit-rate metric to metered state stores Add commit metric (#21853)
Add `commit-rate`, `commit-latency-avg`, and `commit-latency-max` metrics to replace the `flush-`* metrics which are now deprecated. The flush-* metrics will be removed in the next major release. Reviewers: Eduwer Camacaro <eduwerc@gmail.com>, Chriso Lolov <clolov@apache.org>, Alieh Saeedii <asaeedi@confluent.io>
1 parent b9882a0 commit 153a458

11 files changed

Lines changed: 130 additions & 36 deletions

File tree

docs/operations/monitoring.md

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5316,25 +5316,51 @@ kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[s
53165316
<tr>
53175317
<td>
53185318

5319-
flush-latency-avg
5320-
</td>
5319+
flush-latency-avg (deprecated)
5320+
</td>
53215321
<td>
53225322

5323-
The average flush execution time in ns.
5324-
</td>
5323+
The average flush execution time in ns. Deprecated: use commit-latency-avg instead.
5324+
</td>
53255325
<td>
53265326

53275327
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
5328-
</td> </tr>
5329-
<tr>
5328+
</td> </tr>
5329+
<tr>
53305330
<td>
53315331

5332-
flush-latency-max
5333-
</td>
5332+
flush-latency-max (deprecated)
5333+
</td>
53345334
<td>
53355335

5336-
The maximum flush execution time in ns.
5337-
</td>
5336+
The maximum flush execution time in ns. Deprecated: use commit-latency-max instead.
5337+
</td>
5338+
<td>
5339+
5340+
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
5341+
</td> </tr>
5342+
<tr>
5343+
<td>
5344+
5345+
commit-latency-avg
5346+
</td>
5347+
<td>
5348+
5349+
The average commit execution time in ns.
5350+
</td>
5351+
<td>
5352+
5353+
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
5354+
</td> </tr>
5355+
<tr>
5356+
<td>
5357+
5358+
commit-latency-max
5359+
</td>
5360+
<td>
5361+
5362+
The maximum commit execution time in ns.
5363+
</td>
53385364
<td>
53395365

53405366
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
@@ -5472,17 +5498,30 @@ kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[s
54725498
<tr>
54735499
<td>
54745500

5475-
flush-rate
5476-
</td>
5501+
flush-rate (deprecated)
5502+
</td>
54775503
<td>
54785504

5479-
The average flush rate for this store.
5480-
</td>
5505+
The average flush rate for this store. Deprecated: use commit-rate instead.
5506+
</td>
54815507
<td>
54825508

54835509
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
5484-
</td> </tr>
5485-
<tr>
5510+
</td> </tr>
5511+
<tr>
5512+
<td>
5513+
5514+
commit-rate
5515+
</td>
5516+
<td>
5517+
5518+
The average commit rate for this store.
5519+
</td>
5520+
<td>
5521+
5522+
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
5523+
</td> </tr>
5524+
<tr>
54865525
<td>
54875526

54885527
restore-rate

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public class MeteredKeyValueStore<K, V>
9191
protected Sensor allSensor;
9292
protected Sensor rangeSensor;
9393
protected Sensor prefixScanSensor;
94-
private Sensor flushSensor;
94+
private Sensor commitSensor;
9595
private Sensor e2eLatencySensor;
9696
protected Sensor iteratorDurationSensor;
9797
protected InternalProcessorContext<?, ?> internalContext;
@@ -143,6 +143,7 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
143143
super.init(stateStoreContext, root);
144144
}
145145

146+
@SuppressWarnings("deprecation")
146147
private void registerMetrics() {
147148
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
148149
putIfAbsentSensor = StateStoreMetrics.putIfAbsentSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
@@ -151,7 +152,10 @@ private void registerMetrics() {
151152
allSensor = StateStoreMetrics.allSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
152153
rangeSensor = StateStoreMetrics.rangeSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
153154
prefixScanSensor = StateStoreMetrics.prefixScanSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
154-
flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
155+
// flush metrics ar deprecated per KIP-1035 and will be removed in the next major release.
156+
// Here we just register the sensor without recording
157+
StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
158+
commitSensor = StateStoreMetrics.commitSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
155159
deleteSensor = StateStoreMetrics.deleteSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
156160
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
157161
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
@@ -416,7 +420,7 @@ public KeyValueIterator<K, V> reverseAll() {
416420

417421
@Override
418422
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
419-
maybeMeasureLatency(() -> super.commit(changelogOffsets), time, flushSensor);
423+
maybeMeasureLatency(() -> super.commit(changelogOffsets), time, commitSensor);
420424
}
421425

422426
@Override

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public class MeteredSessionStore<K, V>
7373
protected StreamsMetricsImpl streamsMetrics;
7474
protected Sensor putSensor;
7575
protected Sensor fetchSensor;
76-
protected Sensor flushSensor;
76+
protected Sensor commitSensor;
7777
protected Sensor removeSensor;
7878
protected Sensor e2eLatencySensor;
7979
protected Sensor iteratorDurationSensor;
@@ -119,10 +119,14 @@ public void init(final StateStoreContext stateStoreContext,
119119
super.init(stateStoreContext, root);
120120
}
121121

122+
@SuppressWarnings("deprecation")
122123
private void registerMetrics() {
123124
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
124125
fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
125-
flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
126+
// flushSensor is deprecated per KIP-1035 and will be removed in the next major release.
127+
// Here we just register the sensor without recording
128+
StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
129+
commitSensor = StateStoreMetrics.commitSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
126130
removeSensor = StateStoreMetrics.removeSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
127131
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
128132
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
@@ -430,7 +434,7 @@ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K keyFrom,
430434

431435
@Override
432436
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
433-
maybeMeasureLatency(() -> super.commit(changelogOffsets), time, flushSensor);
437+
maybeMeasureLatency(() -> super.commit(changelogOffsets), time, commitSensor);
434438
}
435439

436440
@Override

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public class MeteredWindowStore<K, V>
7676
protected StreamsMetricsImpl streamsMetrics;
7777
protected Sensor putSensor;
7878
protected Sensor fetchSensor;
79-
private Sensor flushSensor;
79+
private Sensor commitSensor;
8080
private Sensor e2eLatencySensor;
8181
protected Sensor iteratorDurationSensor;
8282
protected InternalProcessorContext<?, ?> internalContext;
@@ -141,10 +141,14 @@ protected Serde<V> prepareValueSerde(final Serde<V> valueSerde, final SerdeGette
141141
return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
142142
}
143143

144+
@SuppressWarnings("deprecation")
144145
private void registerMetrics() {
145146
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
146147
fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
147-
flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
148+
// flushSensor is deprecated per KIP-1035 and will be removed in the next major release.
149+
// Here we just register the sensor without recording
150+
StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
151+
commitSensor = StateStoreMetrics.commitSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
148152
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
149153
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
150154
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
@@ -384,7 +388,7 @@ public KeyValueIterator<Windowed<K>, V> backwardAll() {
384388

385389
@Override
386390
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
387-
maybeMeasureLatency(() -> super.commit(changelogOffsets), time, flushSensor);
391+
maybeMeasureLatency(() -> super.commit(changelogOffsets), time, commitSensor);
388392
}
389393

390394
@Override

streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,13 @@ private StateStoreMetrics() {}
111111
private static final String FLUSH_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + FLUSH_DESCRIPTION;
112112
private static final String FLUSH_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + FLUSH_DESCRIPTION;
113113

114+
private static final String COMMIT = "commit";
115+
private static final String COMMIT_DESCRIPTION = "calls to commit";
116+
private static final String COMMIT_RATE_DESCRIPTION =
117+
RATE_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
118+
private static final String COMMIT_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION;
119+
private static final String COMMIT_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION;
120+
114121
private static final String DELETE = "delete";
115122
private static final String DELETE_DESCRIPTION = "calls to delete";
116123
private static final String DELETE_RATE_DESCRIPTION =
@@ -309,6 +316,10 @@ public static Sensor prefixScanSensor(final String taskId,
309316
return sensor;
310317
}
311318

319+
/**
320+
* @deprecated since 4.3. Use {@link #commitSensor(String, String, String, StreamsMetricsImpl)} instead.
321+
*/
322+
@Deprecated
312323
public static Sensor flushSensor(final String taskId,
313324
final String storeType,
314325
final String storeName,
@@ -326,6 +337,23 @@ public static Sensor flushSensor(final String taskId,
326337
);
327338
}
328339

340+
public static Sensor commitSensor(final String taskId,
341+
final String storeType,
342+
final String storeName,
343+
final StreamsMetricsImpl streamsMetrics) {
344+
return throughputAndLatencySensor(
345+
taskId,
346+
storeType,
347+
storeName,
348+
COMMIT,
349+
COMMIT_RATE_DESCRIPTION,
350+
COMMIT_AVG_LATENCY_DESCRIPTION,
351+
COMMIT_MAX_LATENCY_DESCRIPTION,
352+
RecordingLevel.DEBUG,
353+
streamsMetrics
354+
);
355+
}
356+
329357
public static Sensor deleteSensor(final String taskId,
330358
final String storeType,
331359
final String storeName,

streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,8 @@ public void shouldFlushInnerWhenCommitTimeRecords() {
326326

327327
metered.commit(Map.of());
328328

329-
final KafkaMetric metric = metric("flush-rate");
330-
assertTrue((Double) metric.metricValue() > 0);
329+
final KafkaMetric commitMetric = metric("commit-rate");
330+
assertTrue((Double) commitMetric.metricValue() > 0);
331331
}
332332

333333
private interface CachedKeyValueStore extends KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> { }

streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,8 @@ public void shouldCommitInnerWhenCommitTimeRecords() {
351351

352352
metered.commit(Map.of());
353353

354-
final KafkaMetric metric = metric("flush-rate");
355-
assertTrue((Double) metric.metricValue() > 0);
354+
final KafkaMetric commitMetric = metric("commit-rate");
355+
assertTrue((Double) commitMetric.metricValue() > 0);
356356
}
357357

358358
private interface CachedKeyValueStore extends KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> { }

streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,8 @@ public void shouldCommitInnerWhenCommitTimeRecords() {
292292

293293
metered.commit(Map.of());
294294

295-
final KafkaMetric metric = metric("flush-rate");
296-
assertTrue((Double) metric.metricValue() > 0);
295+
final KafkaMetric commitMetric = metric("commit-rate");
296+
assertTrue((Double) commitMetric.metricValue() > 0);
297297
}
298298

299299
private interface CachedKeyValueStore extends KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> { }

streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ public void shouldDelegateAndRecordMetricsOnCommit() {
233233
store.commit(Map.of());
234234

235235
verify(inner).commit(Map.of());
236-
assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0));
236+
assertThat((Double) getMetric("commit-rate").metricValue(), greaterThan(0.0));
237237
}
238238

239239
@Test

streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,10 +356,8 @@ public void shouldRecordCommitLatency() {
356356
store.init(context, store);
357357
store.commit(Map.of());
358358

359-
// it suffices to verify one flush metric since all flush metrics are recorded by the same sensor
360-
// and the sensor is tested elsewhere
361-
final KafkaMetric metric = metric("flush-rate");
362-
assertTrue((Double) metric.metricValue() > 0);
359+
final KafkaMetric commitMetric = metric("commit-rate");
360+
assertTrue((Double) commitMetric.metricValue() > 0);
363361
}
364362

365363
@Test

0 commit comments

Comments
 (0)