Skip to content

Commit 8b47aa8

Browse files
authored
KAFKA-17411: Fix issues with new offset management (apache#21795)
We had several issues with offset management that were causing the system tests to fail: 1. An NPE in `RocksDBStore`, caused by a `null` offset being passed to `commit`. 2. Inconsistent behaviour between `RocksDBStore` and `LegacyCheckpointingStateStore` for both `null` offsets and an empty `Map` in `commit`. - This was largely due to these cases not being properly defined in the `commit` method contract; which has now been addressed. 3. No way to wipe offsets from `RocksDBStore` when the store was corrupted. We now interpret an empty `Map` in `commit` as an instruction to wipe all committed offsets. - `ProcessorStateManager` now commits empty offsets to `corrupted` stores to force them to wipe their offsets. 3. `GlobalStateManagerImpl` didn't wipe state under EOS when a store was detected as corrupted. This has now been added, consistent with the behaviour of `ProcessorStateManager` 4. Many `StateStore` implementations that delegate to an internal `RocksDBStore` did not implement either `managesOffsets` or `committedOffset`, despite implementing `commit`. This caused these stores to be incorrectly wrapped in a `LegacyCheckpointingStateStore`, which conflicted with the offsets being tracked in RocksDB. 5. Now that `.checkpoint` files are not used for `RocksDBStore`, some tests no longer make sense. 6. In `GlobalKTableEOSIntegrationTest`, `shouldSkipOverTxMarkersOnRestore` and `shouldSkipOverAbortedMessagesOnRestore` had to be removed, as they depended on the ability to externally preload checkpoint offsets, which is no longer possible now they're stored in RocksDB. 7. `LegacyCheckpointingStateStore` no longer uses the `OFFSET_UNKNOWN` sentinel value, except during migration of old `.checkpoint` files. Reviewers: Bill Bejeck <bbejeck@apache.org>
1 parent f6ca0f6 commit 8b47aa8

23 files changed

Lines changed: 293 additions & 176 deletions

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -923,18 +923,12 @@ public void onRestoreEnd(final TopicPartition topicPartition,
923923
kafkaStreams.close();
924924
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60));
925925

926-
final File checkpointFile = Paths.get(
926+
final File taskDir = Paths.get(
927927
streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG),
928928
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
929-
task00.toString(),
930-
".checkpoint_" + stateStoreName
929+
task00.toString()
931930
).toFile();
932-
assertTrue(checkpointFile.exists());
933-
final Map<TopicPartition, Long> checkpoints = new OffsetCheckpoint(checkpointFile).read();
934-
assertEquals(
935-
Long.valueOf(restoredOffsetsForPartition0.get()),
936-
new ArrayList<>(checkpoints.values()).get(0)
937-
);
931+
assertTrue(taskDir.exists());
938932
}
939933

940934

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java

Lines changed: 0 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import org.apache.kafka.clients.consumer.ConsumerConfig;
2020
import org.apache.kafka.clients.producer.ProducerConfig;
21-
import org.apache.kafka.common.TopicPartition;
2221
import org.apache.kafka.common.serialization.LongSerializer;
2322
import org.apache.kafka.common.serialization.Serdes;
2423
import org.apache.kafka.common.serialization.StringSerializer;
@@ -42,7 +41,6 @@
4241
import org.apache.kafka.streams.state.KeyValueStore;
4342
import org.apache.kafka.streams.state.QueryableStoreTypes;
4443
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
45-
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
4644
import org.apache.kafka.test.TestUtils;
4745

4846
import org.junit.jupiter.api.AfterAll;
@@ -54,19 +52,15 @@
5452
import org.junit.jupiter.api.TestInfo;
5553
import org.junit.jupiter.api.Timeout;
5654

57-
import java.io.File;
5855
import java.io.IOException;
5956
import java.time.Duration;
6057
import java.util.Arrays;
61-
import java.util.Collections;
6258
import java.util.HashMap;
6359
import java.util.Map;
6460
import java.util.Properties;
65-
import java.util.concurrent.atomic.AtomicReference;
6661

6762
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
6863
import static org.junit.jupiter.api.Assertions.assertNotNull;
69-
import static org.junit.jupiter.api.Assertions.assertTrue;
7064

7165
@Tag("integration")
7266
@Timeout(600)
@@ -315,101 +309,6 @@ public void shouldRestoreTransactionalMessages() throws Exception {
315309
);
316310
}
317311

318-
@Test
319-
public void shouldSkipOverTxMarkersOnRestore() throws Exception {
320-
shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(false);
321-
}
322-
323-
@Test
324-
public void shouldSkipOverAbortedMessagesOnRestore() throws Exception {
325-
shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(true);
326-
}
327-
328-
private void shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(final boolean appendAbortedMessages) throws Exception {
329-
// records with key 1L, 2L, and 4L are written into partition-0
330-
// record with key 3L is written into partition-1
331-
produceInitialGlobalTableValues();
332-
final String stateDir = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
333-
final File globalStateDir = new File(
334-
stateDir
335-
+ File.separator
336-
+ streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
337-
+ File.separator
338-
+ "global");
339-
assertTrue(globalStateDir.mkdirs());
340-
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(globalStateDir, ".checkpoint_" + globalStore));
341-
342-
// set the checkpointed offset to the commit marker of partition-1
343-
// even if `poll()` won't return any data for partition-1, we should still finish the restore
344-
checkpoint.write(Collections.singletonMap(new TopicPartition(globalTableTopic, 1), 1L));
345-
346-
if (appendAbortedMessages) {
347-
final AtomicReference<Exception> error = new AtomicReference<>();
348-
startStreams(new StateRestoreListener() {
349-
@Override
350-
public void onRestoreStart(final TopicPartition topicPartition,
351-
final String storeName,
352-
final long startingOffset,
353-
final long endingOffset) {
354-
// we need to write aborted messages only after we init the `highWatermark`
355-
// to move the `endOffset` beyond the `highWatermark
356-
//
357-
// we cannot write committed messages because we want to test the case that
358-
// poll() returns no records
359-
//
360-
// cf. GlobalStateManagerImpl#restoreState()
361-
try {
362-
produceAbortedMessages();
363-
} catch (final Exception fatal) {
364-
error.set(fatal);
365-
}
366-
}
367-
368-
@Override
369-
public void onBatchRestored(final TopicPartition topicPartition,
370-
final String storeName,
371-
final long batchEndOffset,
372-
final long numRestored) { }
373-
374-
@Override
375-
public void onRestoreEnd(final TopicPartition topicPartition,
376-
final String storeName,
377-
final long totalRestored) { }
378-
});
379-
final Exception fatal = error.get();
380-
if (fatal != null) {
381-
throw fatal;
382-
}
383-
} else {
384-
startStreams();
385-
}
386-
387-
final Map<Long, String> expected = new HashMap<>();
388-
expected.put(1L, "A");
389-
expected.put(2L, "B");
390-
// skip record <3L, "C"> because we won't read it (cf checkpoint file above)
391-
expected.put(4L, "D");
392-
393-
final ReadOnlyKeyValueStore<Long, String> store = IntegrationTestUtils
394-
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
395-
assertNotNull(store);
396-
397-
final Map<Long, String> storeContent = new HashMap<>();
398-
TestUtils.waitForCondition(
399-
() -> {
400-
storeContent.clear();
401-
try (final KeyValueIterator<Long, String> it = store.all()) {
402-
it.forEachRemaining(kv -> storeContent.put(kv.key, kv.value));
403-
}
404-
return storeContent.equals(expected);
405-
},
406-
30_000L,
407-
() -> "waiting for initial values" +
408-
"\n expected: " + expected +
409-
"\n received: " + storeContent
410-
);
411-
}
412-
413312
@Test
414313
public void shouldNotRestoreAbortedMessages() throws Exception {
415314
produceAbortedMessages();

streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,19 @@ default void flush() {
103103
* <p>
104104
* Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the
105105
* records they represent, if possible.
106+
* <p>
107+
* <b>Empty map:</b> If {@code changelogOffsets} is empty, implementations that manage offsets <em>MUST</em>
108+
* remove all previously committed offsets. After an empty commit, {@link #committedOffset(TopicPartition)} should
109+
* return {@code null} for all partitions. This is used during corruption recovery to clear stale offsets so that
110+
* restoration can restart from the beginning.
111+
* <p>
112+
* <b>Null values:</b> If a value in {@code changelogOffsets} is {@code null}, implementations that manage offsets
113+
* <em>MUST</em> remove the committed offset for that partition. After such a commit,
114+
* {@link #committedOffset(TopicPartition)} should return {@code null} for the affected partition.
106115
*
107116
* @param changelogOffsets The changelog offset(s) corresponding to the most recently written records.
117+
* An empty map signals that all committed offsets should be cleared.
118+
* A {@code null} value for a partition signals that its committed offset should be removed.
108119
*/
109120
default void commit(final Map<TopicPartition, Long> changelogOffsets) {
110121
flush();

streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.common.utils.FixedOrderMap;
2929
import org.apache.kafka.common.utils.LogContext;
3030
import org.apache.kafka.common.utils.Time;
31+
import org.apache.kafka.common.utils.Utils;
3132
import org.apache.kafka.streams.StreamsConfig;
3233
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
3334
import org.apache.kafka.streams.errors.ErrorHandlerContext;
@@ -51,6 +52,7 @@
5152
import org.slf4j.Logger;
5253

5354
import java.io.File;
55+
import java.io.IOException;
5456
import java.time.Duration;
5557
import java.util.ArrayList;
5658
import java.util.Collections;
@@ -179,7 +181,20 @@ public Set<String> initialize() {
179181
final List<TopicPartition> storePartitions = topicPartitionsForStore(stateStore);
180182
final StateStore maybeWrappedStore = LegacyCheckpointingStateStore.maybeWrapStore(
181183
stateStore, eosEnabled, new HashSet<>(storePartitions), stateDirectory, null, logPrefix);
182-
maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore);
184+
try {
185+
maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore);
186+
} catch (final ProcessorStateException e) {
187+
if (eosEnabled) {
188+
log.warn("{}Detected unclean shutdown for global store {}. " +
189+
"Wiping global state directory.", logPrefix, stateStore.name(), e);
190+
try {
191+
Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile());
192+
} catch (final IOException ioe) {
193+
e.addSuppressed(ioe);
194+
}
195+
}
196+
throw e;
197+
}
183198

184199
for (final TopicPartition storePartition : storePartitions) {
185200
wrappedStores.put(storePartition, maybeWrappedStore);
@@ -566,7 +581,10 @@ public void commit() {
566581
// only add offsets for persistent stores
567582
if (store.persistent()) {
568583
for (final TopicPartition storePartition : storePartitions) {
569-
storeOffsets.put(storePartition, currentOffsets.get(storePartition));
584+
final Long offset = currentOffsets.get(storePartition);
585+
if (offset != null) {
586+
storeOffsets.put(storePartition, offset);
587+
}
570588
}
571589
}
572590
store.commit(storeOffsets);

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -487,45 +487,43 @@ public void commit() {
487487
if (!stores.isEmpty()) {
488488
log.debug("Committing all stores registered in the state manager: {}", stores);
489489
for (final StateStoreMetadata metadata : stores.values()) {
490-
if (!metadata.corrupted) {
491-
final StateStore store = metadata.stateStore;
492-
log.trace("Committing store {}", store.name());
493-
try {
494-
if (metadata.changelogPartition == null || metadata.offset == null || !store.persistent()) {
495-
store.commit(Map.of());
496-
} else {
497-
store.commit(Map.of(metadata.changelogPartition, metadata.offset));
498-
}
490+
final StateStore store = metadata.stateStore;
491+
log.trace("Committing store {}", store.name());
492+
try {
493+
if (metadata.changelogPartition == null || metadata.offset == null || metadata.corrupted || !store.persistent()) {
494+
store.commit(Map.of());
495+
} else {
496+
store.commit(Map.of(metadata.changelogPartition, metadata.offset));
497+
}
499498

500-
if (metadata.commitCallback != null) {
501-
try {
502-
metadata.commitCallback.onCommit();
503-
} catch (final IOException e) {
504-
throw new ProcessorStateException(
505-
format("%sException caught while trying to checkpoint store, " +
506-
"changelog partition %s", logPrefix, metadata.changelogPartition),
507-
e
508-
);
509-
}
510-
}
511-
} catch (final RuntimeException exception) {
512-
if (firstException == null) {
513-
// do NOT wrap the error if it is actually caused by Streams itself
514-
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
515-
if (exception instanceof FailedProcessingException)
516-
firstException = new ProcessorStateException(
517-
format("%sFailed to commit state store %s", logPrefix, store.name()),
518-
exception.getCause());
519-
else if (exception instanceof StreamsException)
520-
firstException = exception;
521-
else
522-
firstException = new ProcessorStateException(
523-
format("%sFailed to commit state store %s", logPrefix, store.name()), exception);
524-
log.error("Failed to commit state store {}: ", store.name(), firstException);
525-
} else {
526-
log.error("Failed to commit state store {}: ", store.name(), exception);
499+
if (!metadata.corrupted && metadata.commitCallback != null) {
500+
try {
501+
metadata.commitCallback.onCommit();
502+
} catch (final IOException e) {
503+
throw new ProcessorStateException(
504+
format("%sException caught while trying to checkpoint store, " +
505+
"changelog partition %s", logPrefix, metadata.changelogPartition),
506+
e
507+
);
527508
}
528509
}
510+
} catch (final RuntimeException exception) {
511+
if (firstException == null) {
512+
// do NOT wrap the error if it is actually caused by Streams itself
513+
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
514+
if (exception instanceof FailedProcessingException)
515+
firstException = new ProcessorStateException(
516+
format("%sFailed to commit state store %s", logPrefix, store.name()),
517+
exception.getCause());
518+
else if (exception instanceof StreamsException)
519+
firstException = exception;
520+
else
521+
firstException = new ProcessorStateException(
522+
format("%sFailed to commit state store %s", logPrefix, store.name()), exception);
523+
log.error("Failed to commit state store {}: ", store.name(), firstException);
524+
} else {
525+
log.error("Failed to commit state store {}: ", store.name(), exception);
526+
}
529527
}
530528
}
531529
}

streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.rocksdb.ColumnFamilyHandle;
2626
import org.rocksdb.RocksDBException;
27+
import org.rocksdb.RocksIterator;
2728

2829
import java.nio.ByteBuffer;
2930
import java.util.Arrays;
@@ -53,12 +54,20 @@ abstract class AbstractColumnFamilyAccessor implements RocksDBStore.ColumnFamily
5354

5455
@Override
5556
public final void commit(final RocksDBStore.DBAccessor accessor, final Map<TopicPartition, Long> changelogOffsets) throws RocksDBException {
56-
for (final Map.Entry<TopicPartition, Long> entry : changelogOffsets.entrySet()) {
57-
final TopicPartition tp = entry.getKey();
58-
final Long offset = entry.getValue();
59-
final byte[] key = stringSerializer.serialize(null, tp.toString());
60-
final byte[] value = longSerde.serializer().serialize(null, offset);
61-
accessor.put(offsetColumnFamilyHandle, key, value);
57+
if (changelogOffsets.isEmpty()) {
58+
wipeOffsets(accessor);
59+
} else {
60+
for (final Map.Entry<TopicPartition, Long> entry : changelogOffsets.entrySet()) {
61+
final TopicPartition tp = entry.getKey();
62+
final Long offset = entry.getValue();
63+
final byte[] key = stringSerializer.serialize(null, tp.toString());
64+
if (offset != null) {
65+
final byte[] value = longSerde.serializer().serialize(null, offset);
66+
accessor.put(offsetColumnFamilyHandle, key, value);
67+
} else {
68+
accessor.delete(offsetColumnFamilyHandle, key);
69+
}
70+
}
6271
}
6372
// We need to remove this flush call when implementing KAFKA-19712
6473
this.flush(accessor, offsetColumnFamilyHandle);
@@ -112,4 +121,17 @@ public final Long getCommittedOffset(final RocksDBStore.DBAccessor accessor, fin
112121
* @throws RocksDBException if an error occurs during the commit operation
113122
*/
114123
protected abstract void flush(final RocksDBStore.DBAccessor accessor, final ColumnFamilyHandle offsetColumnFamilyHandle) throws RocksDBException;
124+
125+
private void wipeOffsets(final RocksDBStore.DBAccessor accessor) throws RocksDBException {
126+
try (final RocksIterator iter = accessor.newIterator(offsetColumnFamilyHandle)) {
127+
iter.seekToFirst();
128+
while (iter.isValid()) {
129+
final byte[] key = iter.key();
130+
if (!Arrays.equals(key, statusKey)) {
131+
accessor.delete(offsetColumnFamilyHandle, key);
132+
}
133+
iter.next();
134+
}
135+
}
136+
}
115137
}

streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,17 @@ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
277277
segments.commit(changelogOffsets);
278278
}
279279

280+
@SuppressWarnings("deprecation")
281+
@Override
282+
public boolean managesOffsets() {
283+
return segments.managesOffsets();
284+
}
285+
286+
@Override
287+
public Long committedOffset(final TopicPartition partition) {
288+
return segments.committedOffset(partition);
289+
}
290+
280291
@Override
281292
public void close() {
282293
open = false;

streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,17 @@ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
318318
segments.commit(changelogOffsets);
319319
}
320320

321+
@SuppressWarnings("deprecation")
322+
@Override
323+
public boolean managesOffsets() {
324+
return segments.managesOffsets();
325+
}
326+
327+
@Override
328+
public Long committedOffset(final TopicPartition partition) {
329+
return segments.committedOffset(partition);
330+
}
331+
321332
@Override
322333
public void close() {
323334
open = false;

0 commit comments

Comments
 (0)