Skip to content

KAFKA-19713: Persist Position into offsets CF#21750

Draft
eduwercamacaro wants to merge 1 commit intoapache:trunkfrom
eduwercamacaro:19713-migrate-position-offsets-to-cf
Draft

KAFKA-19713: Persist Position into offsets CF#21750
eduwercamacaro wants to merge 1 commit intoapache:trunkfrom
eduwercamacaro:19713-migrate-position-offsets-to-cf

Conversation

@eduwercamacaro
Copy link
Contributor

As part of the implementation of KIP-1035, this PR adds the ability to store the position offsets into the Offsets ColumnFamily.

Also, it aims to migrate any pre-existing position file into the offsets CF.

@github-actions github-actions bot added triage PRs from the community streams small Small PRs labels Mar 13, 2026
@eduwercamacaro eduwercamacaro changed the title Store position into offsets CF KAFKA-19713: Persist Position into offsets CF Mar 13, 2026
Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @eduwercamacaro for the PR, I have a few comments

root,
(RecordBatchingStateRestoreCallback) this::restoreAllInternal,
() -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
() -> { } // Nothing to do?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write the position here, since on open/read we merge positions for all segments. Plus the cost is small since it's small entry. Same for the other callbacks

root,
(RecordBatchingStateRestoreCallback) this::restoreAllInternal,
() -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
() -> { } // Nothing to do?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

cfAccessor.commit(dbAccessor, position);
} catch (final RocksDBException e) {
// TODO: fatal error?
throw new RuntimeException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say since the position is for IQ only and doesn't affect data correctness I'd say log a WARN

* @throws StreamsException if an invalid state is found and ignoreInvalidState is false
*/
void open(final RocksDBStore.DBAccessor accessor, final boolean ignoreInvalidState) throws RocksDBException, StreamsException;
Position open(final RocksDBStore.DBAccessor accessor, final boolean ignoreInvalidState) throws RocksDBException, StreamsException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update JavaDoc that open returns a Position

root,
(RecordBatchingStateRestoreCallback) RocksDBVersionedStore.this::restoreBatch,
() -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
() -> { } // Nothing to do?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

@github-actions github-actions bot removed the triage PRs from the community label Mar 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants