Skip to content

Commit 2311a4e

Browse files
committed
Address review comments
1 parent 0029a40 commit 2311a4e

File tree

4 files changed

+67
-96
lines changed

4 files changed

+67
-96
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 3 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@
9797
import com.linkedin.venice.meta.Version;
9898
import com.linkedin.venice.offsets.OffsetRecord;
9999
import com.linkedin.venice.pubsub.PubSubContext;
100-
import com.linkedin.venice.pubsub.PubSubPositionDeserializer;
101100
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
102101
import com.linkedin.venice.pubsub.PubSubTopicRepository;
103102
import com.linkedin.venice.pubsub.PubSubUtil;
@@ -4890,11 +4889,10 @@ protected PubSubPosition extractUpstreamPosition(DefaultPubSubMessage consumerRe
48904889

48914890
// Check config to determine whether to use upstreamPubSubPosition with fallback or just upstreamOffset
48924891
if (storeVersionConfig.isUseUpstreamPubSubPositionWithFallbackEnabled()) {
4893-
return deserializePositionWithOffsetFallback(
4894-
pubSubContext.getPubSubPositionDeserializer(),
4895-
consumerRecord.getTopicPartition(),
4892+
return PubSubUtil.deserializePositionWithOffsetFallback(
48964893
leaderMetadataFooter.upstreamPubSubPosition,
4897-
leaderMetadataFooter.upstreamOffset);
4894+
leaderMetadataFooter.upstreamOffset,
4895+
pubSubContext.getPubSubPositionDeserializer());
48984896
} else {
48994897
// Directly use upstreamOffset without attempting position deserialization
49004898
return PubSubUtil.fromKafkaOffset(leaderMetadataFooter.upstreamOffset);
@@ -5204,55 +5202,6 @@ PubSubContext getPubSubContext() {
52045202
return pubSubContext;
52055203
}
52065204

5207-
PubSubPosition deserializePositionWithOffsetFallback(
5208-
PubSubPositionDeserializer pubSubPositionDeserializer,
5209-
PubSubTopicPartition topicPartition,
5210-
ByteBuffer wireFormatBytes,
5211-
long offset) {
5212-
// Fast path: nothing to deserialize
5213-
if (wireFormatBytes == null || !wireFormatBytes.hasRemaining()) {
5214-
return PubSubUtil.fromKafkaOffset(offset);
5215-
}
5216-
5217-
try {
5218-
final PubSubPosition position = pubSubPositionDeserializer.toPosition(wireFormatBytes);
5219-
5220-
// Symbolic positions (EARLIEST, LATEST) should always be returned as-is
5221-
if (position == PubSubSymbolicPosition.EARLIEST || position == PubSubSymbolicPosition.LATEST) {
5222-
return position;
5223-
}
5224-
5225-
// Guard against regressions: honor the caller-provided minimum offset.
5226-
if (position.getNumericOffset() < offset) {
5227-
String context = String.format(" for: %s/%s", topicPartition, versionTopic);
5228-
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(context)) {
5229-
LOGGER.warn(
5230-
"Deserialized position: {} is behind the provided offset: {}. Using offset-based position for: {}/{}",
5231-
position,
5232-
offset,
5233-
topicPartition,
5234-
versionTopic);
5235-
}
5236-
return PubSubUtil.fromKafkaOffset(offset);
5237-
}
5238-
5239-
return position;
5240-
} catch (Exception e) {
5241-
String context = String.format("%s/%s - %s", topicPartition, versionTopic, e.getMessage());
5242-
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(context)) {
5243-
LOGGER.warn(
5244-
"Failed to deserialize PubSubPosition for: {}/{}. Using offset-based position (offset={}, bufferRem={}, bufferCap={}).",
5245-
topicPartition,
5246-
versionTopic,
5247-
offset,
5248-
wireFormatBytes.remaining(),
5249-
wireFormatBytes.capacity(),
5250-
e);
5251-
}
5252-
return PubSubUtil.fromKafkaOffset(offset);
5253-
}
5254-
}
5255-
52565205
/**
52575206
* Validates that END_OF_PUSH has been received before processing TOPIC_SWITCH.
52585207
*

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import com.linkedin.venice.offsets.OffsetRecord;
6666
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
6767
import com.linkedin.venice.pubsub.PubSubContext;
68-
import com.linkedin.venice.pubsub.PubSubPositionDeserializer;
6968
import com.linkedin.venice.pubsub.PubSubTopicImpl;
7069
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
7170
import com.linkedin.venice.pubsub.PubSubTopicRepository;
@@ -867,47 +866,6 @@ public void testIsLocalVersionTopicPartitionFullyConsumed(
867866
}
868867
}
869868

870-
@Test(timeOut = 60_000)
871-
public void testDeserializePositionWithOffsetFallback() throws InterruptedException {
872-
setUp();
873-
874-
// Use DEFAULT_DESERIALIZER instead of mocking
875-
PubSubPositionDeserializer deserializer = PubSubPositionDeserializer.DEFAULT_DESERIALIZER;
876-
PubSubTopicPartition mockTopicPartition = mock(PubSubTopicPartition.class);
877-
when(mockTopicPartition.toString()).thenReturn("test-topic_v1-0");
878-
879-
// Use doCallRealMethod to test the actual implementation
880-
doCallRealMethod().when(leaderFollowerStoreIngestionTask)
881-
.deserializePositionWithOffsetFallback(any(), any(), any(), anyLong());
882-
// Case 1: Null ByteBuffer - should return offset-based position
883-
PubSubPosition actualPosition = leaderFollowerStoreIngestionTask
884-
.deserializePositionWithOffsetFallback(deserializer, mockTopicPartition, null, 100L);
885-
assertEquals(actualPosition.getNumericOffset(), 100L, "Null ByteBuffer should return offset-based position");
886-
887-
// Case 2: Empty ByteBuffer - should return offset-based position
888-
actualPosition = leaderFollowerStoreIngestionTask
889-
.deserializePositionWithOffsetFallback(deserializer, mockTopicPartition, ByteBuffer.allocate(0), 200L);
890-
assertEquals(actualPosition.getNumericOffset(), 200L, "Empty ByteBuffer should return offset-based position");
891-
892-
// Case 3: Invalid ByteBuffer - should return offset-based position
893-
ByteBuffer invalidBuffer = ByteBuffer.wrap(new byte[] { 0x01, 0x02, 0x03 }); // Random invalid bytes
894-
actualPosition = leaderFollowerStoreIngestionTask
895-
.deserializePositionWithOffsetFallback(deserializer, mockTopicPartition, invalidBuffer, 300L);
896-
assertEquals(actualPosition.getNumericOffset(), 300L, "Invalid ByteBuffer should return offset-based position");
897-
898-
// Case 4: Valid ByteBuffer - should return deserialized position
899-
ByteBuffer validBuffer = ApacheKafkaOffsetPosition.of(400L).toWireFormatBuffer();
900-
actualPosition = leaderFollowerStoreIngestionTask
901-
.deserializePositionWithOffsetFallback(deserializer, mockTopicPartition, validBuffer, 0L);
902-
assertEquals(actualPosition.getNumericOffset(), 400L, "Valid ByteBuffer should return deserialized position");
903-
904-
// Case 5: ByteBuffer has EARLIEST symbolic position and offset is 0
905-
ByteBuffer earliestBuffer = PubSubSymbolicPosition.EARLIEST.toWireFormatBuffer();
906-
actualPosition = leaderFollowerStoreIngestionTask
907-
.deserializePositionWithOffsetFallback(deserializer, mockTopicPartition, earliestBuffer, 0L);
908-
assertEquals(actualPosition, PubSubSymbolicPosition.EARLIEST, "Should return EARLIEST symbolic position");
909-
}
910-
911869
@Test(timeOut = 60_000)
912870
public void testExtractUpstreamClusterId() throws InterruptedException {
913871
setUp();

internal/venice-common/src/main/java/com/linkedin/venice/pubsub/PubSubUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,11 @@ public static PubSubPosition deserializePositionWithOffsetFallback(
374374

375375
try {
376376
PubSubPosition position = pubSubPositionDeserializer.toPosition(wireFormatBytes);
377+
// Symbolic positions (EARLIEST, LATEST) should always be returned as-is
378+
if (position == PubSubSymbolicPosition.EARLIEST || position == PubSubSymbolicPosition.LATEST) {
379+
return position;
380+
}
381+
377382
// Guard against regressions: honor the caller-provided minimum offset.
378383
// This applies to both symbolic and concrete positions.
379384
if (position.getNumericOffset() >= offset) {

internal/venice-common/src/test/java/com/linkedin/venice/pubsub/PubSubUtilTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,4 +563,63 @@ public void testParsePositionWireFormatInvalidInputs() {
563563
() -> PubSubUtil.parsePositionWireFormat("1:invalid_base64!", deserializer));
564564
expectThrows(IllegalArgumentException.class, () -> PubSubUtil.parsePositionWireFormat("1:", deserializer));
565565
}
566+
567+
@Test
568+
public void testDeserializePositionWithOffsetFallback() {
569+
PubSubPositionDeserializer deserializer = PubSubPositionDeserializer.DEFAULT_DESERIALIZER;
570+
571+
// Case 1: Null ByteBuffer - should return offset-based position
572+
PubSubPosition actualPosition = PubSubUtil.deserializePositionWithOffsetFallback(null, 100L, deserializer);
573+
assertEquals(actualPosition.getNumericOffset(), 100L, "Null ByteBuffer should return offset-based position");
574+
575+
// Case 2: Empty ByteBuffer - should return offset-based position
576+
actualPosition = PubSubUtil.deserializePositionWithOffsetFallback(ByteBuffer.allocate(0), 200L, deserializer);
577+
assertEquals(actualPosition.getNumericOffset(), 200L, "Empty ByteBuffer should return offset-based position");
578+
579+
// Case 3: Invalid ByteBuffer - should return offset-based position
580+
ByteBuffer invalidBuffer = ByteBuffer.wrap(new byte[] { 0x01, 0x02, 0x03 }); // Random invalid bytes
581+
actualPosition = PubSubUtil.deserializePositionWithOffsetFallback(invalidBuffer, 300L, deserializer);
582+
assertEquals(actualPosition.getNumericOffset(), 300L, "Invalid ByteBuffer should return offset-based position");
583+
584+
// Case 4: Valid ByteBuffer with position ahead of offset - should return deserialized position
585+
ByteBuffer validBuffer = ApacheKafkaOffsetPosition.of(400L).toWireFormatBuffer();
586+
actualPosition = PubSubUtil.deserializePositionWithOffsetFallback(validBuffer, 0L, deserializer);
587+
assertEquals(actualPosition.getNumericOffset(), 400L, "Valid ByteBuffer should return deserialized position");
588+
589+
// Case 5: Valid ByteBuffer with position equal to offset - should return deserialized position
590+
ByteBuffer equalBuffer = ApacheKafkaOffsetPosition.of(500L).toWireFormatBuffer();
591+
actualPosition = PubSubUtil.deserializePositionWithOffsetFallback(equalBuffer, 500L, deserializer);
592+
assertEquals(
593+
actualPosition.getNumericOffset(),
594+
500L,
595+
"Position equal to offset should return deserialized position");
596+
597+
// Case 6: Valid ByteBuffer with position behind offset - should return offset-based position
598+
ByteBuffer behindBuffer = ApacheKafkaOffsetPosition.of(100L).toWireFormatBuffer();
599+
actualPosition = PubSubUtil.deserializePositionWithOffsetFallback(behindBuffer, 200L, deserializer);
600+
assertEquals(actualPosition.getNumericOffset(), 200L, "Position behind offset should return offset-based position");
601+
602+
// Case 7: ByteBuffer has EARLIEST symbolic position - should return EARLIEST as-is
603+
ByteBuffer earliestBuffer = PubSubSymbolicPosition.EARLIEST.toWireFormatBuffer();
604+
actualPosition = PubSubUtil.deserializePositionWithOffsetFallback(earliestBuffer, 100L, deserializer);
605+
assertEquals(actualPosition, PubSubSymbolicPosition.EARLIEST, "Should return EARLIEST symbolic position as-is");
606+
607+
// Case 8: ByteBuffer has LATEST symbolic position - should return LATEST as-is
608+
ByteBuffer latestBuffer = PubSubSymbolicPosition.LATEST.toWireFormatBuffer();
609+
actualPosition = PubSubUtil.deserializePositionWithOffsetFallback(latestBuffer, 100L, deserializer);
610+
assertEquals(actualPosition, PubSubSymbolicPosition.LATEST, "Should return LATEST symbolic position as-is");
611+
612+
// Case 9: Large offset values
613+
ByteBuffer largeBuffer = ApacheKafkaOffsetPosition.of(Long.MAX_VALUE - 1000L).toWireFormatBuffer();
614+
actualPosition = PubSubUtil.deserializePositionWithOffsetFallback(largeBuffer, 0L, deserializer);
615+
assertEquals(
616+
actualPosition.getNumericOffset(),
617+
Long.MAX_VALUE - 1000L,
618+
"Should handle large offset values correctly");
619+
620+
// Case 10: Zero offset
621+
ByteBuffer zeroBuffer = ApacheKafkaOffsetPosition.of(0L).toWireFormatBuffer();
622+
actualPosition = PubSubUtil.deserializePositionWithOffsetFallback(zeroBuffer, 0L, deserializer);
623+
assertEquals(actualPosition.getNumericOffset(), 0L, "Should handle zero offset correctly");
624+
}
566625
}

0 commit comments

Comments
 (0)