Skip to content

Commit e86b854

Browse files
committed
[ingestion] Fix deserializePositionWithOffsetFallback to preserve symbolic positions
The deserializePositionWithOffsetFallback method was incorrectly treating symbolic positions (EARLIEST, LATEST) as regular positions and comparing their numeric offsets against the provided minimum offset. This caused EARLIEST (numeric offset -1) to be replaced with an offset-based position when the minimum offset was >= 0. Added a guard to detect and preserve symbolic positions before performing numeric offset comparison, ensuring they are returned as-is regardless of the minimum offset parameter. This fix resolves the failing testDeserializePositionWithOffsetFallback test in LeaderFollowerStoreIngestionTaskTest. Test: ./gradlew :clients:da-vinci-client:test --tests "com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTaskTest"
1 parent adbeca6 commit e86b854

File tree

1 file changed

+6
-0
lines changed

1 file changed

+6
-0
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5179,6 +5179,12 @@ PubSubPosition deserializePositionWithOffsetFallback(
51795179

51805180
try {
51815181
final PubSubPosition position = pubSubPositionDeserializer.toPosition(wireFormatBytes);
5182+
5183+
// Symbolic positions (EARLIEST, LATEST) should always be returned as-is
5184+
if (position == PubSubSymbolicPosition.EARLIEST || position == PubSubSymbolicPosition.LATEST) {
5185+
return position;
5186+
}
5187+
51825188
// Guard against regressions: honor the caller-provided minimum offset.
51835189
if (position.getNumericOffset() < offset) {
51845190
String context = String.format(" for: %s/%s", topicPartition, versionTopic);

0 commit comments

Comments
 (0)