Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.query.Position;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -39,6 +41,7 @@ abstract class AbstractColumnFamilyAccessor implements RocksDBStore.ColumnFamily
private final StringSerializer stringSerializer = new StringSerializer();
private final Serdes.LongSerde longSerde = new Serdes.LongSerde();
private final byte[] statusKey = stringSerializer.serialize(null, "status");
private final byte[] positionKey = stringSerializer.serialize(null, "position");
private final byte[] openState = longSerde.serializer().serialize(null, 1L);
private final byte[] closedState = longSerde.serializer().serialize(null, 0L);
private final AtomicBoolean storeOpen;
Expand All @@ -62,12 +65,23 @@ public final void commit(final RocksDBStore.DBAccessor accessor, final Map<Topic
}

@Override
public final void open(final RocksDBStore.DBAccessor accessor, final boolean ignoreInvalidState) throws RocksDBException {
public final void commit(final RocksDBStore.DBAccessor accessor, final Position storePosition) throws RocksDBException {
accessor.put(offsetColumnFamilyHandle, positionKey, PositionSerde.serialize(storePosition).array());
}

@Override
public final Position open(final RocksDBStore.DBAccessor accessor, final boolean ignoreInvalidState) throws RocksDBException {
final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle, statusKey);
if (ignoreInvalidState || (valueBytes == null || Arrays.equals(valueBytes, closedState))) {
// If the status key is not present, we initialize it to "OPEN"
accessor.put(offsetColumnFamilyHandle, statusKey, openState);
storeOpen.set(true);
final byte[] positionBytes = accessor.get(offsetColumnFamilyHandle, positionKey);
if (positionBytes != null) {
return PositionSerde.deserialize(ByteBuffer.wrap(positionBytes));
} else {
return Position.emptyPosition();
}
} else {
throw new ProcessorStateException("Invalid state during store open. Expected state to be either empty or closed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -254,18 +253,14 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
metrics
);

final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
segments.setPosition(this.position);

segments.openExisting(internalProcessorContext, observedStreamTime);
this.position = segments.position;

// register and possibly restore the state from the logs
stateStoreContext.register(
root,
(RecordBatchingStateRestoreCallback) this::restoreAllInternal,
() -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
() -> { } // Nothing to do?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write the position here, since on open/read we merge positions for all segments. Plus the cost is small since it's small entry. Same for the other callbacks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bbejeck I addressed this.

At first, I was confused about this because the underlying store should be supposed to persist the position after commit, but that's not true for segmented stores.

);

open = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -296,17 +295,14 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
metrics
);

final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
segments.setPosition(position);
segments.openExisting(internalProcessorContext, observedStreamTime);
this.position = segments.position;

// register and possibly restore the state from the logs
stateStoreContext.register(
root,
(RecordBatchingStateRestoreCallback) this::restoreAllInternal,
() -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
() -> { } // Nothing to do?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

);

open = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {
private final long retentionPeriod;
private final long segmentInterval;
private final SimpleDateFormat formatter;
Position position;
protected final Position position = Position.emptyPosition();

AbstractSegments(final String name, final long retentionPeriod, final long segmentInterval) {
this.name = name;
Expand All @@ -61,10 +61,6 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {

protected abstract void openSegmentDB(final S segment, final StateStoreContext context);

public void setPosition(final Position position) {
this.position = position;
}

@Override
public long segmentId(final long timestamp) {
return timestamp / segmentInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected KeyValueSegment createSegment(final long segmentId, final String segme
@Override
protected void openSegmentDB(final KeyValueSegment segment, final StateStoreContext context) {
segment.openDB(context.appConfigs(), context.stateDir());
position.merge(segment.getPosition());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;

import java.util.HashMap;
Expand Down Expand Up @@ -56,11 +55,6 @@ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSeg
this.physicalStore = new RocksDBStore(name, parentDir, metricsRecorder, false);
}

@Override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove setPosition? I think we need to add this back as LogicalKeyValueSegments.physicalStore is a standalone RocksDBStore whose position is set independently during openDB().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I'm removing this is because we are already merging the position from the physicalStore to the segments position. So basically, physicalStore reads the position offsets during openDB() and AbstractSegments merge that position into its own position. The position defined at AbstractSements is shared among all the LogicalKeyValueSegments. See line 97 on this class.

    public void openExisting(final StateStoreContext context, final long streamTime) {
        metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
        physicalStore.openDB(context.appConfigs(), context.stateDir());
        position.merge(physicalStore.getPosition());
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack thanks for explaining

public void setPosition(final Position position) {
this.physicalStore.position = position;
}

@Override
protected LogicalKeyValueSegment createSegment(final long segmentId, final String segmentName) {
if (segmentId < 0) {
Expand Down Expand Up @@ -100,6 +94,7 @@ LogicalKeyValueSegment getReservedSegment(final long segmentId) {
public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
physicalStore.openDB(context.appConfigs(), context.stateDir());
position.merge(physicalStore.getPosition());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS

protected StateStoreContext context;
protected Position position;
private OffsetCheckpoint positionCheckpoint;

public RocksDBStore(final String name,
final String metricsScope) {
Expand Down Expand Up @@ -170,17 +169,13 @@ public void init(final StateStoreContext stateStoreContext,
metricsRecorder.init(metricsImpl(stateStoreContext), stateStoreContext.taskId());
openDB(stateStoreContext.appConfigs(), stateStoreContext.stateDir());

final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);

// value getter should always read directly from rocksDB
// since it is only for values that are already flushed
this.context = stateStoreContext;
stateStoreContext.register(
root,
(RecordBatchingStateRestoreCallback) this::restoreBatch,
() -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
this::writePosition
);
consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
stateStoreContext.appConfigs(),
Expand Down Expand Up @@ -252,7 +247,7 @@ void openDB(final Map<String, Object> configs, final File stateDir) {
openRocksDB(dbOptions, columnFamilyOptions);
dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
try {
cfAccessor.open(dbAccessor, !eosEnabled);
position = cfAccessor.open(dbAccessor, !eosEnabled);
} catch (final StreamsException fatal) {
final String fatalMessage = "State store " + name + " didn't find a valid state, since under EOS it has the risk of getting uncommitted data in stores";
throw new ProcessorStateException(fatalMessage, fatal);
Expand Down Expand Up @@ -396,6 +391,16 @@ private List<ColumnFamilyHandle> mergeColumnFamilyHandleLists(final List<ColumnF
return columnFamilies;
}

private void writePosition() {
validateStoreOpen();
try {
cfAccessor.commit(dbAccessor, position);
} catch (final RocksDBException e) {
// TODO: fatal error?
throw new RuntimeException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say since the position is for IQ only and doesn't affect data correctness I'd say log a WARN

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me.

}
}

@Override
public String name() {
return name;
Expand Down Expand Up @@ -906,6 +911,8 @@ ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,

void commit(final DBAccessor accessor, final Map<TopicPartition, Long> changelogOffsets) throws RocksDBException;

void commit(final DBAccessor accessor, final Position storePosition) throws RocksDBException;

void addToBatch(final byte[] key,
final byte[] value,
final WriteBatchInterface batch) throws RocksDBException;
Expand All @@ -916,7 +923,7 @@ void addToBatch(final byte[] key,
* Initializes the ColumnFamily.
* @throws StreamsException if an invalid state is found and ignoreInvalidState is false
*/
void open(final RocksDBStore.DBAccessor accessor, final boolean ignoreInvalidState) throws RocksDBException, StreamsException;
Position open(final RocksDBStore.DBAccessor accessor, final boolean ignoreInvalidState) throws RocksDBException, StreamsException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update JavaDoc that open returns a Position


Long getCommittedOffset(final RocksDBStore.DBAccessor accessor, final TopicPartition partition) throws RocksDBException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -366,17 +365,14 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo

metricsRecorder.init(ProcessorContextUtils.metricsImpl(stateStoreContext), stateStoreContext.taskId());

final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
segmentStores.setPosition(position);
segmentStores.openExisting(internalProcessorContext, observedStreamTime);
this.position = segmentStores.position;

// register and possibly restore the state from the logs
stateStoreContext.register(
root,
(RecordBatchingStateRestoreCallback) RocksDBVersionedStore.this::restoreBatch,
() -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
() -> { } // Nothing to do?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

);

open = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected SessionSegmentWithHeaders createSegment(final long segmentId, final St
@Override
protected void openSegmentDB(final SessionSegmentWithHeaders segment, final StateStoreContext context) {
segment.openDB(context.appConfigs(), context.stateDir());
position.merge(segment.getPosition());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected TimestampedSegment createSegment(final long segmentId, final String se
@Override
protected void openSegmentDB(final TimestampedSegment segment, final StateStoreContext context) {
segment.openDB(context.appConfigs(), context.stateDir());
position.merge(segment.getPosition());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected TimestampedSegmentWithHeaders createSegment(final long segmentId, fina
@Override
protected void openSegmentDB(final TimestampedSegmentWithHeaders segment, final StateStoreContext context) {
segment.openDB(context.appConfigs(), context.stateDir());
position.merge(segment.getPosition());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.test.InternalMockProcessorContext;
Expand Down Expand Up @@ -72,7 +71,6 @@ public void setUp() {
SEGMENT_INTERVAL,
new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
);
segments.setPosition(Position.emptyPosition());
segments.openExisting(context, 0L);
}

Expand Down
Loading