From 31f68e309a3320d2cf2de1ee5b4c586ff9cc085a Mon Sep 17 00:00:00 2001 From: Nick Telford Date: Thu, 12 Mar 2026 16:14:28 +0000 Subject: [PATCH] KAFKA-19712: GlobalStateManagerImpl delegates offset tracking to stores Co-Authored-By: Claude Sonnet 4.6 --- .../GlobalKTableEOSIntegrationTest.java | 2 +- .../internals/GlobalStateManagerImpl.java | 123 ++++++++---------- .../internals/GlobalStateManagerImplTest.java | 123 ++++++++++-------- 3 files changed, 124 insertions(+), 124 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 640e438103f7d..6e8a345c7e963 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -337,7 +337,7 @@ private void shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(final boolean ap + File.separator + "global"); assertTrue(globalStateDir.mkdirs()); - final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(globalStateDir, ".checkpoint")); + final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(globalStateDir, ".checkpoint_" + globalStore)); // set the checkpointed offset to the commit marker of partition-1 // even if `poll()` won't return any data for partition-1, we should still finish the restore diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index f762338b94746..adc4e808f8dca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.errors.internals.FailedProcessingException; +import org.apache.kafka.streams.internals.StreamsConfigUtils; import org.apache.kafka.streams.processor.CommitCallback; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; @@ -44,13 +45,12 @@ import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.Task.TaskType; -import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore; import org.apache.kafka.streams.state.internals.RecordConverter; import org.slf4j.Logger; import java.io.File; -import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -65,7 +65,6 @@ import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.apache.kafka.streams.processor.internals.RecordDeserializer.handleDeserializationFailure; -import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; @@ -102,19 +101,20 @@ private static class StateStoreMetadata { private final Time time; private final Logger log; + private final String logPrefix; + private final StateDirectory stateDirectory; private final File baseDir; private final long taskTimeoutMs; private final ProcessorTopology topology; - private final OffsetCheckpoint checkpointFile; private final Duration pollMsPlusRequestTimeout; private final Consumer globalConsumer; private final StateRestoreListener stateRestoreListener; - private final Map checkpointFileCache; + private final Map currentOffsets; private final Map storeToChangelogTopic; private final Set globalStoreNames = new HashSet<>(); - private final Set globalNonPersistentStoresTopics = new HashSet<>(); private final FixedOrderMap> globalStores = new FixedOrderMap<>(); private final Map storeMetadata = new HashMap<>(); + private final boolean eosEnabled; private InternalProcessorContext globalProcessorContext; private DeserializationExceptionHandler deserializationExceptionHandler; private ProcessingExceptionHandler processingExceptionHandler; @@ -129,20 +129,18 @@ public GlobalStateManagerImpl(final LogContext logContext, final StreamsConfig config) { this.time = time; this.topology = topology; + this.stateDirectory = stateDirectory; baseDir = stateDirectory.globalStateDir(); storeToChangelogTopic = topology.storeToChangelogTopic(); - checkpointFile = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); - checkpointFileCache = new HashMap<>(); + currentOffsets = new HashMap<>(); // Find non persistent store's topics for (final StateStore store : topology.globalStateStores()) { globalStoreNames.add(store.name()); - if (!store.persistent()) { - globalNonPersistentStoresTopics.add(changelogFor(store.name())); - } } log = logContext.logger(GlobalStateManagerImpl.class); + logPrefix = logContext.logPrefix(); this.globalConsumer = globalConsumer; this.stateRestoreListener = stateRestoreListener; @@ -160,6 +158,7 @@ public GlobalStateManagerImpl(final LogContext logContext, @SuppressWarnings("deprecation") final boolean globalEnabled = config.getBoolean(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG); processingExceptionHandler = globalEnabled ? config.processingExceptionHandler() : null; + eosEnabled = StreamsConfigUtils.eosEnabled(config); } @Override @@ -169,41 +168,40 @@ public void setGlobalProcessorContext(final InternalProcessorContext globa @Override public Set initialize() { - try { - checkpointFileCache.putAll(checkpointFile.read()); - } catch (final IOException e) { - throw new StreamsException("Failed to read checkpoints for global state globalStores", e); - } - droppedRecordsSensor = droppedRecordsSensor( Thread.currentThread().getName(), globalProcessorContext.taskId().toString(), globalProcessorContext.metrics() ); - final Set changelogTopics = new HashSet<>(); + final Map wrappedStores = new HashMap<>(); for (final StateStore stateStore : topology.globalStateStores()) { - final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); - changelogTopics.add(sourceTopic); - stateStore.init(globalProcessorContext, stateStore); - } + final List storePartitions = topicPartitionsForStore(stateStore); + final StateStore maybeWrappedStore = LegacyCheckpointingStateStore.maybeWrapStore( + stateStore, eosEnabled, new HashSet<>(storePartitions), stateDirectory, null, logPrefix); + maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore); - // make sure each topic-partition from checkpointFileCache is associated with a global state store - checkpointFileCache.keySet().forEach(tp -> { - if (!changelogTopics.contains(tp.topic())) { - log.error( - "Encountered a topic-partition in the global checkpoint file not associated with any global" + - " state store, topic-partition: {}, checkpoint file: {}. If this topic-partition is no longer valid," + - " an application reset and state store directory cleanup will be required.", - tp.topic(), - checkpointFile - ); - throw new StreamsException("Encountered a topic-partition not associated with any global state store"); + for (final TopicPartition storePartition : storePartitions) { + wrappedStores.put(storePartition, maybeWrappedStore); } - }); + } + + // migrate offsets from legacy checkpoint file into the stores + LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix, stateDirectory, null, wrappedStores); - // restore or reprocess each registered store using the now-populated currentOffsets for (final StateStoreMetadata metadata : storeMetadata.values()) { + // load the committed offsets from the store + final StateStore store = metadata.stateStore; + if (store.persistent()) { + for (final TopicPartition partition : metadata.changelogPartitions) { + final Long offset = store.committedOffset(partition); + if (offset != null) { + currentOffsets.put(partition, offset); + } + } + } + + // restore or reprocess each registered store using the now-populated currentOffsets try { if (metadata.reprocessFactory.isPresent()) { reprocessState(metadata); @@ -219,7 +217,7 @@ public Set initialize() { } public StateStore globalStore(final String name) { - return globalStores.getOrDefault(name, Optional.empty()).orElse(null); + return LegacyCheckpointingStateStore.maybeUnwrapStore(globalStores.getOrDefault(name, Optional.empty()).orElse(null)); } @Override @@ -267,9 +265,9 @@ public void registerStore(final StateStore store, ); final Optional> reprocessFactory = topology - .storeNameToReprocessOnRestore().getOrDefault(store.name(), Optional.empty()); + .storeNameToReprocessOnRestore().getOrDefault(store.name(), Optional.empty()); storeMetadata.put(store.name(), new StateStoreMetadata( - store, topicPartitions, reprocessFactory, stateRestoreCallback, converterForStore(store), highWatermarks)); + store, topicPartitions, reprocessFactory, stateRestoreCallback, converterForStore(store), highWatermarks)); } private List topicPartitionsForStore(final StateStore store) { @@ -310,7 +308,7 @@ private void reprocessState(final StateStoreMetadata storeMetadata) { globalConsumer.assign(Collections.singletonList(topicPartition)); long offset; - final Long checkpoint = checkpointFileCache.get(topicPartition); + final Long checkpoint = currentOffsets.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); offset = checkpoint; @@ -441,7 +439,7 @@ private void reprocessState(final StateStoreMetadata storeMetadata) { stateRestoreListener.onBatchRestored(topicPartition, storeMetadata.stateStore.name(), offset, batchRestoreCount); } stateRestoreListener.onRestoreEnd(topicPartition, storeMetadata.stateStore.name(), restoreCount); - checkpointFileCache.put(topicPartition, offset); + currentOffsets.put(topicPartition, offset); } } @@ -452,7 +450,7 @@ private void restoreState(final StateStoreMetadata storeMetadata) { globalConsumer.assign(Collections.singletonList(topicPartition)); long offset; - final Long checkpoint = checkpointFileCache.get(topicPartition); + final Long checkpoint = currentOffsets.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); offset = checkpoint; @@ -497,7 +495,7 @@ private void restoreState(final StateStoreMetadata storeMetadata) { restoreCount += restoreRecords.size(); } stateRestoreListener.onRestoreEnd(topicPartition, storeMetadata.stateStore.name(), restoreCount); - checkpointFileCache.put(topicPartition, offset); + currentOffsets.put(topicPartition, offset); } } @@ -561,7 +559,17 @@ public void flush() { final StateStore store = entry.getValue().get(); try { log.trace("Committing global store={}", store.name()); - store.commit(Map.of()); + // construct per-store Map of offsets to commit + final List storePartitions = storeMetadata.get(store.name()).changelogPartitions; + final Map storeOffsets = new HashMap<>(storePartitions.size()); + + // only add offsets for persistent stores + if (store.persistent()) { + for (final TopicPartition storePartition : storePartitions) { + storeOffsets.put(storePartition, currentOffsets.get(storePartition)); + } + } + store.commit(storeOffsets); } catch (final RuntimeException e) { throw new ProcessorStateException( String.format("Failed to commit global state store %s", store.name()), @@ -574,6 +582,10 @@ public void flush() { } } + @Override + public void checkpoint() { + } + @Override public void close() { if (globalStores.isEmpty()) { @@ -606,28 +618,7 @@ public void close() { @Override public void updateChangelogOffsets(final Map offsets) { - checkpointFileCache.putAll(offsets); - } - - @Override - public void checkpoint() { - final Map filteredOffsets = new HashMap<>(); - - // Skip non persistent store - for (final Map.Entry topicPartitionOffset : checkpointFileCache.entrySet()) { - final String topic = topicPartitionOffset.getKey().topic(); - if (!globalNonPersistentStoresTopics.contains(topic)) { - filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue()); - } - } - - try { - checkpointFile.write(filteredOffsets); - } catch (final IOException e) { - log.warn("Failed to write offset checkpoint file to {} for global stores." + - " This may occur if OS cleaned the state.dir in case when it is located in the (default) ${java.io.tmpdir}/kafka-streams directory." + - " Changing the location of state.dir may resolve the problem", checkpointFile, e); - } + currentOffsets.putAll(offsets); } @Override @@ -637,7 +628,7 @@ public TaskType taskType() { @Override public Map changelogOffsets() { - return Collections.unmodifiableMap(checkpointFileCache); + return Collections.unmodifiableMap(currentOffsets); } public final String changelogFor(final String storeName) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 5b814b7f63663..6de572c5a326d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.state.TimestampedBytesStore; +import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockStateRestoreListener; @@ -175,70 +176,58 @@ public void before() { @Test public void shouldReadCheckpointOffsets() throws IOException { - final Map expected = writeCheckpoint(); + writeCheckpoint(); + initializeConsumer(0, 0, t1, t2, t3, t4, t5); + processorContext.setStateManger(stateManager); stateManager.initialize(); final Map offsets = stateManager.changelogOffsets(); - assertEquals(expected, offsets); + assertEquals(mkMap( + mkEntry(t1, 1L), + mkEntry(t2, 0L), + mkEntry(t3, 0L), + mkEntry(t4, 0L), + mkEntry(t5, 0L) + ), offsets); } @Test public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { - final Map offsets = Collections.singletonMap(t1, 25L); + final Map offsets = Collections.singletonMap(t1, 25_000L); + initializeConsumer(0, 0, t1, t2, t3, t4, t5); + processorContext.setStateManger(stateManager); + stateManager.setGlobalProcessorContext(processorContext); stateManager.initialize(); stateManager.updateChangelogOffsets(offsets); + final File storeCheckpointFile = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + "_" + storeName1); + // set readonly to the CHECKPOINT_FILE_NAME.tmp file because we will write data to the .tmp file first // and then swap to CHECKPOINT_FILE_NAME by replacing it - final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); + final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + "_" + storeName1 + ".tmp"); Files.createFile(file.toPath()); file.setWritable(false); - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) { - stateManager.checkpoint(); + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(LegacyCheckpointingStateStore.class)) { + stateManager.flush(); assertThat(appender.getMessages(), hasItem(containsString( - "Failed to write offset checkpoint file to " + checkpointFile.getPath() + " for global stores"))); + "Failed to write offset checkpoint file to [" + storeCheckpointFile.getPath() + "]. " + + "This may occur if OS cleaned the state.dir in case when it located in ${java.io.tmpdir} directory. " + + "This may also occur due to running multiple instances on the same machine using the same state dir. " + + "Changing the location of state.dir may resolve the problem."))); } } - @Test - public void shouldThrowStreamsExceptionForOldTopicPartitions() throws IOException { - final HashMap expectedOffsets = new HashMap<>(); - expectedOffsets.put(t1, 1L); - expectedOffsets.put(t2, 1L); - expectedOffsets.put(t3, 1L); - expectedOffsets.put(t4, 1L); - - // add an old topic (a topic not associated with any global state store) - final HashMap startOffsets = new HashMap<>(expectedOffsets); - final TopicPartition tOld = new TopicPartition("oldTopic", 1); - startOffsets.put(tOld, 1L); - - // start with a checkpoint file will all topic-partitions: expected and old (not - // associated with any global state store). - final OffsetCheckpoint checkpoint = new OffsetCheckpoint(checkpointFile); - checkpoint.write(startOffsets); - - // initialize will throw exception - final StreamsException e = assertThrows(StreamsException.class, () -> stateManager.initialize()); - assertThat(e.getMessage(), equalTo("Encountered a topic-partition not associated with any global state store")); - } - - @Test - public void shouldNotDeleteCheckpointFileAfterLoaded() throws IOException { - writeCheckpoint(); - stateManager.initialize(); - assertTrue(checkpointFile.exists()); - } - @Test public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws IOException { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); writeCorruptCheckpoint(); assertThrows(StreamsException.class, stateManager::initialize); } @Test public void shouldInitializeStateStores() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); stateManager.initialize(); assertTrue(store1.initialized); assertTrue(store2.initialized); @@ -246,12 +235,14 @@ public void shouldInitializeStateStores() { @Test public void shouldReturnInitializedStoreNames() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); final Set storeNames = stateManager.initialize(); assertEquals(Set.of(storeName1, storeName2, storeName3, storeName4, storeName5), storeNames); } @Test public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); stateManager.initialize(); try { @@ -264,6 +255,7 @@ public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() { @Test public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); stateManager.initialize(); initializeConsumer(2, 0, t1); stateManager.registerStore(store1, stateRestoreCallback, null); @@ -277,8 +269,8 @@ public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice( @Test public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() { - stateManager.initialize(); try { + stateManager.initialize(); stateManager.registerStore(store1, stateRestoreCallback, null); fail("Should have raised a StreamsException as there are no partition for the store"); } catch (final StreamsException e) { @@ -402,7 +394,8 @@ public void shouldRestoreRecordsFromCheckpointToHighWatermark() throws IOExcepti @Test - public void shouldFlushStateStores() { + public void shouldCommitStateStores() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); stateManager.initialize(); // register the stores initializeConsumer(1, 0, t1); @@ -417,6 +410,7 @@ public void shouldFlushStateStores() { @Test public void shouldThrowProcessorStateStoreExceptionIfStoreCommitFailed() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); stateManager.initialize(); // register the stores initializeConsumer(1, 0, t1); @@ -431,6 +425,7 @@ public void commit(final Map changelogOffsets) { @Test public void shouldCloseStateStores() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); stateManager.initialize(); // register the stores initializeConsumer(1, 0, t1); @@ -445,6 +440,7 @@ public void shouldCloseStateStores() { @Test public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); stateManager.initialize(); initializeConsumer(1, 0, t1); stateManager.registerStore(new NoOpReadOnlyStore<>(store1.name()) { @@ -459,6 +455,7 @@ public void close() { @Test public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); stateManager.initialize(); try { stateManager.registerStore(store1, null, null); @@ -470,6 +467,7 @@ public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() { @Test public void shouldNotCloseStoresIfCloseAlreadyCalled() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); stateManager.initialize(); initializeConsumer(1, 0, t1); stateManager.registerStore(new NoOpReadOnlyStore<>("t1-store") { @@ -488,8 +486,8 @@ public void close() { @Test public void shouldAttemptToCloseAllStoresEvenWhenSomeException() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); stateManager.initialize(); - initializeConsumer(1, 0, t1); final NoOpReadOnlyStore store = new NoOpReadOnlyStore<>("t1-store") { @Override public void close() { @@ -497,6 +495,7 @@ public void close() { throw new RuntimeException("KABOOM!"); } }; + initializeConsumer(1, 0, t1); stateManager.registerStore(store, stateRestoreCallback, null); initializeConsumer(1, 0, t2); @@ -513,19 +512,28 @@ public void close() { @Test public void shouldCheckpointOffsets() throws IOException { - final Map offsets = Collections.singletonMap(t1, 25L); + initializeConsumer(0, 0, t1, t2, t3, t4, t5); + stateManager.setGlobalProcessorContext(processorContext); + processorContext.setStateManger(stateManager); + final Map offsets = Collections.singletonMap(t1, 25_000L); stateManager.initialize(); stateManager.updateChangelogOffsets(offsets); - stateManager.checkpoint(); + stateManager.flush(); - final Map result = readOffsetsCheckpoint(); - assertThat(result, equalTo(offsets)); - assertThat(stateManager.changelogOffsets(), equalTo(offsets)); + assertThat(readOffsetsCheckpoint(storeName1), equalTo(offsets)); + assertThat(stateManager.changelogOffsets(), equalTo(mkMap( + mkEntry(t1, 25_000L), + mkEntry(t2, 0L), + mkEntry(t3, 0L), + mkEntry(t4, 0L), + mkEntry(t5, 0L) + ))); } @Test public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); stateManager.initialize(); initializeConsumer(10, 0, t1); stateManager.registerStore(store1, stateRestoreCallback, null); @@ -534,7 +542,7 @@ public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() { final Map initialCheckpoint = stateManager.changelogOffsets(); stateManager.updateChangelogOffsets(Collections.singletonMap(t1, 101L)); - stateManager.checkpoint(); + stateManager.flush(); final Map updatedCheckpoint = stateManager.changelogOffsets(); assertThat(updatedCheckpoint.get(t2), equalTo(initialCheckpoint.get(t2))); @@ -566,10 +574,12 @@ public void shouldSkipNullKeysWhenRestoring() { @Test public void shouldCheckpointRestoredOffsetsToFile() throws IOException { initializeConsumer(0, 0, t2, t3, t4, t5); - initializeConsumer(10, 0, t1); processorContext.setStateManger(stateManager); + stateManager.setGlobalProcessorContext(processorContext); + + initializeConsumer(10, 0, t1); stateManager.initialize(); - stateManager.checkpoint(); + stateManager.flush(); stateManager.close(); final Map checkpointMap = stateManager.changelogOffsets(); @@ -582,26 +592,23 @@ public void shouldCheckpointRestoredOffsetsToFile() throws IOException { mkEntry(t5, 0L) ))); - // checkpoint file only contains persistent store offsets - assertThat(readOffsetsCheckpoint(), equalTo(mkMap( - mkEntry(t1, 10L), - mkEntry(t2, 0L) - ))); + assertThat(readOffsetsCheckpoint(storeName1), equalTo(mkMap(mkEntry(t1, 10L)))); + assertThat(readOffsetsCheckpoint(storeName2), equalTo(mkMap(mkEntry(t2, 0L)))); } @Test public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException { - initializeConsumer(0, 0, t1, t3, t4, t5); + initializeConsumer(0, 0, t1, t2, t4, t5); initializeConsumer(10, 0, t3); stateManager.initialize(); stateManager.close(); - assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap())); + assertThat(readOffsetsCheckpoint(storeName3), equalTo(Collections.emptyMap())); } - private Map readOffsetsCheckpoint() throws IOException { + private Map readOffsetsCheckpoint(final String storeName) throws IOException { final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), - StateManagerUtil.CHECKPOINT_FILE_NAME)); + StateManagerUtil.CHECKPOINT_FILE_NAME + "_" + storeName)); return offsetCheckpoint.read(); } @@ -1168,6 +1175,7 @@ private void setUpReprocessing() { @Test public void shouldFailOnDeserializationErrorsWhenReprocessing() { + initializeConsumer(0, 0, t1, t2, t3, t4, t5); setUpReprocessing(); initializeConsumer(0, 0, t1, t2, t3, t4); initializeConsumer(2, 0, t5); @@ -1180,6 +1188,7 @@ public void shouldFailOnDeserializationErrorsWhenReprocessing() { public void shouldSkipOnDeserializationErrorsWhenReprocessing() { stateManager.setDeserializationExceptionHandler(new LogAndContinueExceptionHandler()); setUpReprocessing(); + initializeConsumer(0, 0, t1, t2, t3, t4, t5); initializeConsumer(2, 0, t5); stateManager.initialize();