diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 6a7eb8c25cf4d..003550427b049 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -31,10 +31,15 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.Task.TaskType; +import org.apache.kafka.streams.state.internals.AbstractDualSchemaRocksDBSegmentedBytesStore; +import org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore; import org.apache.kafka.streams.state.internals.CachedStateStore; +import org.apache.kafka.streams.state.internals.InMemorySessionStore; +import org.apache.kafka.streams.state.internals.InMemoryWindowStore; import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore; import org.apache.kafka.streams.state.internals.RecordConverter; import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer; +import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.slf4j.Logger; @@ -101,6 +106,8 @@ public static class StateStoreMetadata { // corrupted state store should not be included in checkpointing private boolean corrupted; + private final long retentionPeriod; + private StateStoreMetadata(final StateStore stateStore, final CommitCallback commitCallback) { @@ -111,6 +118,7 @@ private StateStoreMetadata(final StateStore stateStore, this.changelogPartition = null; this.corrupted = false; this.offset = null; + this.retentionPeriod = -1L; } private StateStoreMetadata(final StateStore stateStore, @@ -128,12 +136,36 @@ private StateStoreMetadata(final StateStore stateStore, this.commitCallback = commitCallback; this.recordConverter = recordConverter; this.offset = null; + this.retentionPeriod = extractRetentionPeriod(stateStore); } private void setOffset(final Long offset) { this.offset = offset; } + private static long extractRetentionPeriod(final StateStore stateStore) { + // Peel off the wrapping layers one by one + StateStore current = stateStore; + while (current instanceof WrappedStateStore) { + current = ((WrappedStateStore) current).wrapped(); + } + // Now 'current' is the innermost store. Check what type it is. + if (current instanceof AbstractRocksDBSegmentedBytesStore) { + return ((AbstractRocksDBSegmentedBytesStore) current).retentionPeriod(); + } + if (current instanceof AbstractDualSchemaRocksDBSegmentedBytesStore) { + return ((AbstractDualSchemaRocksDBSegmentedBytesStore) current).retentionPeriod(); + } + if (current instanceof InMemoryWindowStore) { + return ((InMemoryWindowStore) current).retentionPeriod(); + } + if (current instanceof InMemorySessionStore) { + return ((InMemorySessionStore) current).retentionPeriod(); + } + // Not a windowed/session store + return -1L; + } + // the offset is exposed to the changelog reader to determine if restoration is completed Long offset() { return this.offset; @@ -142,6 +174,11 @@ Long offset() { Long endOffset() { return this.endOffset; } + + // the retentionPeriod is exposed to the changelog reader for window restoration + long retentionPeriod() { + return retentionPeriod; + } public void setEndOffset(final Long endOffset) { this.endOffset = endOffset; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index eab7da800d882..2821152f4771e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.InvalidOffsetException; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; @@ -970,6 +971,7 @@ private void prepareChangelogs(final Map tasks, final Set newPartitionsToRestore) { // separate those who do not have the current offset loaded from checkpoint final Set newPartitionsWithoutStartOffset = new HashSet<>(); + final Map newPartitionsWithTimestampSeek = new HashMap<>(); for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) { final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata; @@ -986,18 +988,21 @@ private void prepareChangelogs(final Map tasks, log.debug("Start restoring changelog partition {} from current offset {} to end offset {}.", partition, currentOffset, recordEndOffset(endOffset)); } else { - log.debug("Start restoring changelog partition {} from the beginning offset to end offset {} " + - "since we cannot find current offset.", partition, recordEndOffset(endOffset)); - - newPartitionsWithoutStartOffset.add(partition); + final long retentionPeriod = storeMetadata.retentionPeriod(); + if (retentionPeriod > 0) { + final long seekTimestamp = time.milliseconds() - retentionPeriod; + newPartitionsWithTimestampSeek.put(partition, seekTimestamp); + log.debug("Start restoring windowed changelog partition {} from timestamp {} to end offset {}.", + partition, seekTimestamp, recordEndOffset(endOffset)); + } else { + log.debug("Start restoring changelog partition {} from the beginning offset to end offset {} " + + "since we cannot find current offset.", partition, recordEndOffset(endOffset)); + newPartitionsWithoutStartOffset.add(partition); + } } } - // optimization: batch all seek-to-beginning offsets in a single request - // seek is not a blocking call so there's nothing to capture - if (!newPartitionsWithoutStartOffset.isEmpty()) { - restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset); - } + seekToTimestampOrBeginning(newPartitionsWithTimestampSeek, newPartitionsWithoutStartOffset); for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) { final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata; @@ -1039,6 +1044,29 @@ private void prepareChangelogs(final Map tasks, } } + private void seekToTimestampOrBeginning(final Map partitionsWithTimestampSeek, + final Set partitionsWithoutStartOffset) { + // optimization: seek windowed stores by timestamp to skip expired data + if (!partitionsWithTimestampSeek.isEmpty()) { + final Map offsetsByTimestamp = + restoreConsumer.offsetsForTimes(partitionsWithTimestampSeek); + for (final Map.Entry entry : offsetsByTimestamp.entrySet()) { + if (entry.getValue() != null) { + restoreConsumer.seek(entry.getKey(), entry.getValue().offset()); + } else { + // no offset found for the timestamp, fall back to seeking to the beginning + partitionsWithoutStartOffset.add(entry.getKey()); + } + } + } + + // optimization: batch all seek-to-beginning offsets in a single request + // seek is not a blocking call so there's nothing to capture + if (!partitionsWithoutStartOffset.isEmpty()) { + restoreConsumer.seekToBeginning(partitionsWithoutStartOffset); + } + } + @Override public void unregister(final Collection revokedChangelogs) { unregister(revokedChangelogs, StandbyUpdateListener.SuspendReason.MIGRATED); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index 85b85855a36be..3502dbf7dfb18 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -76,6 +76,10 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore all() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index 2baae916e3433..bc2587773657f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -75,6 +75,10 @@ public class AbstractRocksDBSegmentedBytesStore implements Se this.segments = segments; } + public long retentionPeriod() { + return retentionPeriod; + } + @Override public KeyValueIterator fetch(final Bytes key, final long from, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index 8bd7185139ae6..4a68bc51f4c7c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -88,6 +88,9 @@ public class InMemorySessionStore implements SessionStore { this.metricScope = metricScope; this.position = Position.emptyPosition(); } + public long retentionPeriod() { + return retentionPeriod; + } @Override public String name() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 8d2228db5c3d5..40a4db2797f20 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -95,6 +95,10 @@ public InMemoryWindowStore(final String name, this.position = Position.emptyPosition(); } + public long retentionPeriod() { + return retentionPeriod; + } + @Override public String name() { return name; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 72954175f5b4a..1dc2a4c0b20b7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; @@ -1430,6 +1431,51 @@ private void addRecords(final long messages, final TopicPartition topicPartition } } + @Test + public void shouldSeekByTimestampForWindowedStoreWithoutCheckpoint() { + final long retentionMs = Duration.ofHours(2).toMillis(); + final long offsetForTimestamp = 42L; + + // Use a MockConsumer subclass that supports offsetsForTimes + final MockConsumer timestampConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { + @Override + public synchronized Map offsetsForTimes(final Map timestampsToSearch) { + final Map result = new java.util.HashMap<>(); + for (final Map.Entry entry : timestampsToSearch.entrySet()) { + result.put(entry.getKey(), new OffsetAndTimestamp(offsetForTimestamp, entry.getValue())); + } + return result; + } + }; + + // Set up mocks - storeMetadata returns null offset (no checkpoint) and positive retentionPeriod + final StateStoreMetadata windowStoreMetadata = mock(StateStoreMetadata.class); + final ProcessorStateManager windowStateManager = mock(ProcessorStateManager.class); + final StateStore windowStore = mock(StateStore.class); + when(windowStoreMetadata.changelogPartition()).thenReturn(tp); + when(windowStoreMetadata.store()).thenReturn(windowStore); + when(windowStoreMetadata.offset()).thenReturn(null); + when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs); + when(windowStore.name()).thenReturn(storeName); + when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata); + when(windowStateManager.taskType()).thenReturn(ACTIVE); + + final TaskId taskId = new TaskId(0, 0); + when(windowStateManager.taskId()).thenReturn(taskId); + + timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L)); + adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L)); + + final StoreChangelogReader reader = + new StoreChangelogReader(time, config, logContext, adminClient, timestampConsumer, callback, standbyListener); + + reader.register(tp, windowStateManager); + reader.restore(Collections.singletonMap(taskId, mock(Task.class))); + + // The consumer should be seeked to the offset returned by offsetsForTimes, not to the beginning + assertEquals(offsetForTimestamp, timestampConsumer.position(tp)); + } + private void assignPartition(final long messages, final TopicPartition topicPartition) { consumer.updatePartitions(