Skip to content

Commit 9157589

Browse files
authored
HOTFIX: RocksDBMetricsRecorder#init should null check taskId (#18151)
Appears to be a typo in the code, since the error message indicates this check is for taskId being null, but instead we accidentally check the streams metrics twice Reviewers: Matthias Sax <mjsax@apache.org>, runo Cadonna <cadonna@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
1 parent f2f19b7 commit 9157589

2 files changed

Lines changed: 17 additions & 1 deletion

File tree

streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public TaskId taskId() {
135135
public void init(final StreamsMetricsImpl streamsMetrics,
136136
final TaskId taskId) {
137137
Objects.requireNonNull(streamsMetrics, "Streams metrics must not be null");
138-
Objects.requireNonNull(streamsMetrics, "task ID must not be null");
138+
Objects.requireNonNull(taskId, "task ID must not be null");
139139
if (this.taskId != null && !this.taskId.equals(taskId)) {
140140
throw new IllegalStateException("Metrics recorder is re-initialised with different task: previous task is " +
141141
this.taskId + " whereas current task is " + taskId + ". This is a bug in Kafka Streams. " +

streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,22 @@ public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentTask() {
173173
);
174174
}
175175

176+
@Test
177+
public void shouldThrowIfMetricRecorderIsInitialisedWithNullMetrics() {
178+
assertThrows(
179+
NullPointerException.class,
180+
() -> recorder.init(null, TASK_ID1)
181+
);
182+
}
183+
184+
@Test
185+
public void shouldThrowIfMetricRecorderIsInitialisedWithNullTaskId() {
186+
assertThrows(
187+
NullPointerException.class,
188+
() -> recorder.init(streamsMetrics, null)
189+
);
190+
}
191+
176192
@Test
177193
public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentStreamsMetrics() {
178194
assertThrows(

0 commit comments

Comments
 (0)