Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.internals.AbstractDualSchemaRocksDBSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.InMemorySessionStore;
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

import org.slf4j.Logger;

Expand Down Expand Up @@ -101,6 +106,8 @@ public static class StateStoreMetadata {
// corrupted state store should not be included in checkpointing
private boolean corrupted;

private final long retentionPeriod;


private StateStoreMetadata(final StateStore stateStore,
final CommitCallback commitCallback) {
Expand All @@ -111,6 +118,7 @@ private StateStoreMetadata(final StateStore stateStore,
this.changelogPartition = null;
this.corrupted = false;
this.offset = null;
this.retentionPeriod = -1L;
}

private StateStoreMetadata(final StateStore stateStore,
Expand All @@ -128,12 +136,36 @@ private StateStoreMetadata(final StateStore stateStore,
this.commitCallback = commitCallback;
this.recordConverter = recordConverter;
this.offset = null;
this.retentionPeriod = extractRetentionPeriod(stateStore);
}

private void setOffset(final Long offset) {
this.offset = offset;
}

private static long extractRetentionPeriod(final StateStore stateStore) {
// Peel off the wrapping layers one by one
StateStore current = stateStore;
while (current instanceof WrappedStateStore) {
current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
}
// Now 'current' is the innermost store. Check what type it is.
if (current instanceof AbstractRocksDBSegmentedBytesStore) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thinking instead of the mulitple instanceof checks we could introduce an interface WithRetentionPeriod (the name is up for debate) containing the single method retentionPeriod and have the store types here implement it. Since all of the instances here are internal API this is possble without requiring a KIP. So the multiple checks would go to one

if (current instanceof WithRetentionPeriod) {
        return ((WithRetentionPeriod) current).retentionPeriod();
}

This would also provide the added benefit of automatically picking up any store needing/using a retention period.

\cc @mjsax @aliehsaeedii @lucasbru @frankvicky

return ((AbstractRocksDBSegmentedBytesStore<?>) current).retentionPeriod();
}
if (current instanceof AbstractDualSchemaRocksDBSegmentedBytesStore) {
return ((AbstractDualSchemaRocksDBSegmentedBytesStore<?>) current).retentionPeriod();
}
if (current instanceof InMemoryWindowStore) {
return ((InMemoryWindowStore) current).retentionPeriod();
}
if (current instanceof InMemorySessionStore) {
return ((InMemorySessionStore) current).retentionPeriod();
}
// Not a windowed/session store
return -1L;
}

// the offset is exposed to the changelog reader to determine if restoration is completed
Long offset() {
return this.offset;
Expand All @@ -142,6 +174,11 @@ Long offset() {
Long endOffset() {
return this.endOffset;
}

// the retentionPeriod is exposed to the changelog reader for window restoration
long retentionPeriod() {
return retentionPeriod;
}

public void setEndOffset(final Long endOffset) {
this.endOffset = endOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -970,6 +971,7 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
final Set<ChangelogMetadata> newPartitionsToRestore) {
// separate those who do not have the current offset loaded from checkpoint
final Set<TopicPartition> newPartitionsWithoutStartOffset = new HashSet<>();
final Map<TopicPartition, Long> newPartitionsWithTimestampSeek = new HashMap<>();

for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) {
final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
Expand All @@ -986,18 +988,21 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
log.debug("Start restoring changelog partition {} from current offset {} to end offset {}.",
partition, currentOffset, recordEndOffset(endOffset));
} else {
log.debug("Start restoring changelog partition {} from the beginning offset to end offset {} " +
"since we cannot find current offset.", partition, recordEndOffset(endOffset));

newPartitionsWithoutStartOffset.add(partition);
final long retentionPeriod = storeMetadata.retentionPeriod();
if (retentionPeriod > 0) {
final long seekTimestamp = time.milliseconds() - retentionPeriod;
newPartitionsWithTimestampSeek.put(partition, seekTimestamp);
log.debug("Start restoring windowed changelog partition {} from timestamp {} to end offset {}.",
partition, seekTimestamp, recordEndOffset(endOffset));
} else {
log.debug("Start restoring changelog partition {} from the beginning offset to end offset {} " +
"since we cannot find current offset.", partition, recordEndOffset(endOffset));
newPartitionsWithoutStartOffset.add(partition);
}
}
}

// optimization: batch all seek-to-beginning offsets in a single request
// seek is not a blocking call so there's nothing to capture
if (!newPartitionsWithoutStartOffset.isEmpty()) {
restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset);
}
seekToTimestampOrBeginning(newPartitionsWithTimestampSeek, newPartitionsWithoutStartOffset);

for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) {
final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
Expand Down Expand Up @@ -1039,6 +1044,29 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
}
}

private void seekToTimestampOrBeginning(final Map<TopicPartition, Long> partitionsWithTimestampSeek,
final Set<TopicPartition> partitionsWithoutStartOffset) {
// optimization: seek windowed stores by timestamp to skip expired data
if (!partitionsWithTimestampSeek.isEmpty()) {
final Map<TopicPartition, OffsetAndTimestamp> offsetsByTimestamp =
restoreConsumer.offsetsForTimes(partitionsWithTimestampSeek);
for (final Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsByTimestamp.entrySet()) {
if (entry.getValue() != null) {
restoreConsumer.seek(entry.getKey(), entry.getValue().offset());
} else {
// no offset found for the timestamp, fall back to seeking to the beginning
partitionsWithoutStartOffset.add(entry.getKey());
}
}
}

// optimization: batch all seek-to-beginning offsets in a single request
// seek is not a blocking call so there's nothing to capture
if (!partitionsWithoutStartOffset.isEmpty()) {
restoreConsumer.seekToBeginning(partitionsWithoutStartOffset);
}
}

@Override
public void unregister(final Collection<TopicPartition> revokedChangelogs) {
unregister(revokedChangelogs, StandbyUpdateListener.SuspendReason.MIGRATED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
this.retentionPeriod = retentionPeriod;
}

public long retentionPeriod() {
return retentionPeriod;
}

@Override
public KeyValueIterator<Bytes, byte[]> all() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
this.segments = segments;
}

public long retentionPeriod() {
return retentionPeriod;
}

@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
final long from,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
this.metricScope = metricScope;
this.position = Position.emptyPosition();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add a space here

public long retentionPeriod() {
return retentionPeriod;
}

@Override
public String name() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public InMemoryWindowStore(final String name,
this.position = Position.emptyPosition();
}

public long retentionPeriod() {
return retentionPeriod;
}

@Override
public String name() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
Expand Down Expand Up @@ -1430,6 +1431,51 @@ private void addRecords(final long messages, final TopicPartition topicPartition
}
}

@Test
public void shouldSeekByTimestampForWindowedStoreWithoutCheckpoint() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add tests for negative test cases, broker doesn't have info and returns null for offsetsForTimes or for the non-windowed store case

final long retentionMs = Duration.ofHours(2).toMillis();
final long offsetForTimestamp = 42L;

// Use a MockConsumer subclass that supports offsetsForTimes
final MockConsumer<byte[], byte[]> timestampConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
@Override
public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
final Map<TopicPartition, OffsetAndTimestamp> result = new java.util.HashMap<>();
for (final Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
result.put(entry.getKey(), new OffsetAndTimestamp(offsetForTimestamp, entry.getValue()));
}
return result;
}
};

// Set up mocks - storeMetadata returns null offset (no checkpoint) and positive retentionPeriod
final StateStoreMetadata windowStoreMetadata = mock(StateStoreMetadata.class);
final ProcessorStateManager windowStateManager = mock(ProcessorStateManager.class);
final StateStore windowStore = mock(StateStore.class);
when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
when(windowStoreMetadata.store()).thenReturn(windowStore);
when(windowStoreMetadata.offset()).thenReturn(null);
when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
when(windowStore.name()).thenReturn(storeName);
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
when(windowStateManager.taskType()).thenReturn(ACTIVE);

final TaskId taskId = new TaskId(0, 0);
when(windowStateManager.taskId()).thenReturn(taskId);

timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));

final StoreChangelogReader reader =
new StoreChangelogReader(time, config, logContext, adminClient, timestampConsumer, callback, standbyListener);

reader.register(tp, windowStateManager);
reader.restore(Collections.singletonMap(taskId, mock(Task.class)));

// The consumer should be seeked to the offset returned by offsetsForTimes, not to the beginning
assertEquals(offsetForTimestamp, timestampConsumer.position(tp));
}

private void assignPartition(final long messages,
final TopicPartition topicPartition) {
consumer.updatePartitions(
Expand Down
Loading