Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.query.Position;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -39,6 +41,7 @@ abstract class AbstractColumnFamilyAccessor implements RocksDBStore.ColumnFamily
private final StringSerializer stringSerializer = new StringSerializer();
private final Serdes.LongSerde longSerde = new Serdes.LongSerde();
private final byte[] statusKey = stringSerializer.serialize(null, "status");
private final byte[] positionKey = stringSerializer.serialize(null, "position");
private final byte[] openState = longSerde.serializer().serialize(null, 1L);
private final byte[] closedState = longSerde.serializer().serialize(null, 0L);
private final AtomicBoolean storeOpen;
Expand All @@ -62,12 +65,23 @@ public final void commit(final RocksDBStore.DBAccessor accessor, final Map<Topic
}

@Override
public final void open(final RocksDBStore.DBAccessor accessor, final boolean ignoreInvalidState) throws RocksDBException {
public final void commit(final RocksDBStore.DBAccessor accessor, final Position storePosition) throws RocksDBException {
accessor.put(offsetColumnFamilyHandle, positionKey, PositionSerde.serialize(storePosition).array());
}

@Override
public final Position open(final RocksDBStore.DBAccessor accessor, final boolean ignoreInvalidState) throws RocksDBException {
final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle, statusKey);
if (ignoreInvalidState || (valueBytes == null || Arrays.equals(valueBytes, closedState))) {
// If the status key is not present, we initialize it to "OPEN"
accessor.put(offsetColumnFamilyHandle, statusKey, openState);
storeOpen.set(true);
final byte[] positionBytes = accessor.get(offsetColumnFamilyHandle, positionKey);
if (positionBytes != null) {
return PositionSerde.deserialize(ByteBuffer.wrap(positionBytes));
} else {
return Position.emptyPosition();
}
} else {
throw new ProcessorStateException("Invalid state during store open. Expected state to be either empty or closed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -61,7 +60,6 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
protected boolean consistencyEnabled = false;
protected Position position;
protected OffsetCheckpoint positionCheckpoint;
private volatile boolean open;

AbstractDualSchemaRocksDBSegmentedBytesStore(final String name,
Expand Down Expand Up @@ -254,18 +252,15 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
metrics
);

final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
segments.setPosition(this.position);

segments.openExisting(internalProcessorContext, observedStreamTime);
this.position = segments.position;
StoreQueryUtils.maybeMigrateExistingPositionFile(stateStoreContext.stateDir(), name(), this.position);

// register and possibly restore the state from the logs
stateStoreContext.register(
root,
(RecordBatchingStateRestoreCallback) this::restoreAllInternal,
() -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
segments::writePosition
);

open = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand All @@ -62,7 +61,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private boolean consistencyEnabled = false;
private Position position;
protected OffsetCheckpoint positionCheckpoint;
private volatile boolean open;

AbstractRocksDBSegmentedBytesStore(final String name,
Expand Down Expand Up @@ -296,17 +294,15 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
metrics
);

final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
segments.setPosition(position);
segments.openExisting(internalProcessorContext, observedStreamTime);
this.position = segments.position;
StoreQueryUtils.maybeMigrateExistingPositionFile(stateStoreContext.stateDir(), name(), this.position);

// register and possibly restore the state from the logs
stateStoreContext.register(
root,
(RecordBatchingStateRestoreCallback) this::restoreAllInternal,
() -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
segments::writePosition
);

open = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {
private final long retentionPeriod;
private final long segmentInterval;
private final SimpleDateFormat formatter;
Position position;
protected final Position position = Position.emptyPosition();

AbstractSegments(final String name, final long retentionPeriod, final long segmentInterval) {
this.name = name;
Expand All @@ -57,14 +57,14 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {
this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
}

protected final void writePosition() {
segments.forEach((id, segment) -> segment.writePosition());
}

protected abstract S createSegment(long segmentId, String segmentName);

protected abstract void openSegmentDB(final S segment, final StateStoreContext context);

public void setPosition(final Position position) {
this.position = position;
}

@Override
public long segmentId(final long timestamp) {
return timestamp / segmentInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
throw new UnsupportedOperationException("nothing to commit for logical segment");
}

@Override
public void writePosition() {
physicalStore.writePosition();
}

@Override
public synchronized void close() {
// close open iterators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;

import java.util.HashMap;
Expand Down Expand Up @@ -56,11 +55,6 @@ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSeg
this.physicalStore = new RocksDBStore(name, parentDir, metricsRecorder, false);
}

@Override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove setPosition? I think we need to add this back as LogicalKeyValueSegments.physicalStore is a standalone RocksDBStore whose position is set independently during openDB().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I'm removing this is because we are already merging the position from the physicalStore to the segments position. So basically, physicalStore reads the position offsets during openDB() and AbstractSegments merge that position into its own position. The position defined at AbstractSements is shared among all the LogicalKeyValueSegments. See line 97 on this class.

    public void openExisting(final StateStoreContext context, final long streamTime) {
        metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
        physicalStore.openDB(context.appConfigs(), context.stateDir());
        position.merge(physicalStore.getPosition());
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack thanks for explaining

public void setPosition(final Position position) {
this.physicalStore.position = position;
}

@Override
protected LogicalKeyValueSegment createSegment(final long segmentId, final String segmentName) {
if (segmentId < 0) {
Expand Down Expand Up @@ -100,6 +94,7 @@ LogicalKeyValueSegment getReservedSegment(final long segmentId) {
public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
physicalStore.openDB(context.appConfigs(), context.stateDir());
position.merge(physicalStore.getPosition());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS

protected StateStoreContext context;
protected Position position;
private OffsetCheckpoint positionCheckpoint;

public RocksDBStore(final String name,
final String metricsScope) {
Expand Down Expand Up @@ -169,18 +168,15 @@ public void init(final StateStoreContext stateStoreContext,
// open the DB dir
metricsRecorder.init(metricsImpl(stateStoreContext), stateStoreContext.taskId());
openDB(stateStoreContext.appConfigs(), stateStoreContext.stateDir());

final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
StoreQueryUtils.maybeMigrateExistingPositionFile(stateStoreContext.stateDir(), name(), position);

// value getter should always read directly from rocksDB
// since it is only for values that are already flushed
this.context = stateStoreContext;
stateStoreContext.register(
root,
(RecordBatchingStateRestoreCallback) this::restoreBatch,
() -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
this::writePosition
);
consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
stateStoreContext.appConfigs(),
Expand Down Expand Up @@ -252,7 +248,13 @@ void openDB(final Map<String, Object> configs, final File stateDir) {
openRocksDB(dbOptions, columnFamilyOptions);
dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
try {
cfAccessor.open(dbAccessor, !eosEnabled);
final Position existingPositionOrEmpty = cfAccessor.open(dbAccessor, !eosEnabled);
if (position == null) {
position = existingPositionOrEmpty;
} else {
// For segmented stores, the overall position is composed of multiple underlying stores, so merge this store's position into it.
position.merge(existingPositionOrEmpty);
}
} catch (final StreamsException fatal) {
final String fatalMessage = "State store " + name + " didn't find a valid state, since under EOS it has the risk of getting uncommitted data in stores";
throw new ProcessorStateException(fatalMessage, fatal);
Expand Down Expand Up @@ -396,6 +398,15 @@ private List<ColumnFamilyHandle> mergeColumnFamilyHandleLists(final List<ColumnF
return columnFamilies;
}

public final void writePosition() {
validateStoreOpen();
try {
cfAccessor.commit(dbAccessor, position);
} catch (final RocksDBException e) {
log.warn("Error while committing position for store {}", name, e);
}
}

@Override
public String name() {
return name;
Expand Down Expand Up @@ -906,6 +917,8 @@ ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,

void commit(final DBAccessor accessor, final Map<TopicPartition, Long> changelogOffsets) throws RocksDBException;

void commit(final DBAccessor accessor, final Position storePosition) throws RocksDBException;

void addToBatch(final byte[] key,
final byte[] value,
final WriteBatchInterface batch) throws RocksDBException;
Expand All @@ -914,9 +927,10 @@ void addToBatch(final byte[] key,

/**
* Initializes the ColumnFamily.
* @return the position of the store based on the data in the ColumnFamily. If no offset position is found, an empty position is returned.
* @throws StreamsException if an invalid state is found and ignoreInvalidState is false
*/
void open(final RocksDBStore.DBAccessor accessor, final boolean ignoreInvalidState) throws RocksDBException, StreamsException;
Position open(final RocksDBStore.DBAccessor accessor, final boolean ignoreInvalidState) throws RocksDBException, StreamsException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update JavaDoc that open returns a Position


Long getCommittedOffset(final RocksDBStore.DBAccessor accessor, final TopicPartition partition) throws RocksDBException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -108,7 +107,6 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private boolean consistencyEnabled = false;
private Position position;
private OffsetCheckpoint positionCheckpoint;
private volatile boolean open;

RocksDBVersionedStore(final String name, final String metricsScope, final long historyRetention, final long segmentInterval) {
Expand Down Expand Up @@ -366,17 +364,15 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo

metricsRecorder.init(ProcessorContextUtils.metricsImpl(stateStoreContext), stateStoreContext.taskId());

final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
segmentStores.setPosition(position);
segmentStores.openExisting(internalProcessorContext, observedStreamTime);
this.position = segmentStores.position;
StoreQueryUtils.maybeMigrateExistingPositionFile(stateStoreContext.stateDir(), name(), this.position);

// register and possibly restore the state from the logs
stateStoreContext.register(
root,
(RecordBatchingStateRestoreCallback) RocksDBVersionedStore.this::restoreBatch,
() -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
segmentStores::writePosition
);

open = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public interface Segment extends KeyValueStore<Bytes, byte[]>, BatchWritingStore

void deleteRange(Bytes keyFrom, Bytes keyTo);

void writePosition();

@Override
default int compareTo(final Segment segment) {
return Long.compare(id(), segment.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
Expand Down Expand Up @@ -164,6 +165,14 @@ public static void updatePosition(
}
}

public static void maybeMigrateExistingPositionFile(final File stateDir, final String storeName, final Position position) {
final File positionCheckpointFile = new File(stateDir, storeName + ".position");
if (positionCheckpointFile.exists()) {
final Position existingPosition = readPositionFromCheckpoint(new OffsetCheckpoint(positionCheckpointFile));
position.merge(existingPosition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we delete the position file here after merging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we should delete the position file here, but we will need to recreate it when executing the downgrade process. I didn't do it on this PR because I thought it was part of KAFKA-19710

}
}

public static boolean isPermitted(
final Position position,
final PositionBound positionBound,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1629,6 +1629,20 @@ public void shouldMeasureExpiredRecords() {
bytesStore.close();
}

@Test
public void shouldLoadPositionFromFile() {
final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L)))));
final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new File(stateDir, storeName + ".position"));
StoreQueryUtils.checkpointPosition(positionCheckpoint, position);

final AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment> bytesStore = getBytesStore();

// store.init migrates the position from the legacy checkpoint file into the store.
bytesStore.init(context, bytesStore);
assertEquals(position, bytesStore.getPosition());
bytesStore.close();
}

private Set<String> segmentDirs() {
final File windowDir = new File(stateDir, storeName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,21 @@ public void shouldNotThrowWhenRestoringOnMissingHeaders(final SegmentedBytesStor
assertThat(bytesStore.getPosition(), is(Position.emptyPosition()));
}

@ParameterizedTest
@MethodSource("getKeySchemas")
public void shouldLoadPositionFromFile(final SegmentedBytesStore.KeySchema schema) {
before(schema);
final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L)))));
final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new File(context.stateDir(), storeName + ".position"));
StoreQueryUtils.checkpointPosition(positionCheckpoint, position);

final AbstractRocksDBSegmentedBytesStore<S> bytesStore = getBytesStore();

// store.init migrates the position from the legacy checkpoint file into the store.
bytesStore.init(context, bytesStore);
assertEquals(position, bytesStore.getPosition());
}

private List<ConsumerRecord<byte[], byte[]>> getChangelogRecords() {
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
final Headers headers = new RecordHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.test.InternalMockProcessorContext;
Expand Down Expand Up @@ -72,7 +71,6 @@ public void setUp() {
SEGMENT_INTERVAL,
new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
);
segments.setPosition(Position.emptyPosition());
segments.openExisting(context, 0L);
}

Expand Down
Loading
Loading