Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,10 @@
import static com.linkedin.venice.ConfigKeys.SERVER_THROTTLER_FACTORS_FOR_NON_CURRENT_VERSION_NON_AA_WC_LEADER;
import static com.linkedin.venice.ConfigKeys.SERVER_THROTTLER_FACTORS_FOR_SEP_RT_LEADER;
import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH;
import static com.linkedin.venice.ConfigKeys.SERVER_USE_CHECKPOINTED_PUBSUB_POSITIONS;
import static com.linkedin.venice.ConfigKeys.SERVER_USE_HEARTBEAT_LAG_FOR_READY_TO_SERVE_CHECK_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_USE_METRICS_BASED_POSITION_IN_LAG_COMPUTATION;
import static com.linkedin.venice.ConfigKeys.SERVER_USE_UPSTREAM_PUBSUB_POSITIONS;
import static com.linkedin.venice.ConfigKeys.SERVER_ZSTD_DICT_COMPRESSION_LEVEL;
import static com.linkedin.venice.ConfigKeys.SEVER_CALCULATE_QUOTA_USAGE_BASED_ON_PARTITIONS_ASSIGNMENT_ENABLED;
import static com.linkedin.venice.ConfigKeys.SORTED_INPUT_DRAINER_SIZE;
Expand Down Expand Up @@ -675,6 +677,8 @@ public class VeniceServerConfig extends VeniceClusterConfig {

private final boolean validateSpecificSchemaEnabled;
private final boolean useMetricsBasedPositionInLagComputation;
private final boolean useUpstreamPubSubPositionWithFallback;
private final boolean useCheckpointedPubSubPositionWithFallback;
private final LogContext logContext;
private final IngestionTaskReusableObjects.Strategy ingestionTaskReusableObjectsStrategy;
private final boolean keyUrnCompressionEnabled;
Expand Down Expand Up @@ -1174,6 +1178,10 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getInt(SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_MAX_REPLICA_COUNT, 3);
this.useMetricsBasedPositionInLagComputation =
serverProperties.getBoolean(SERVER_USE_METRICS_BASED_POSITION_IN_LAG_COMPUTATION, false);
this.useUpstreamPubSubPositionWithFallback =
serverProperties.getBoolean(SERVER_USE_UPSTREAM_PUBSUB_POSITIONS, true);
this.useCheckpointedPubSubPositionWithFallback =
serverProperties.getBoolean(SERVER_USE_CHECKPOINTED_PUBSUB_POSITIONS, true);
this.serverIngestionInfoLogLineLimit = serverProperties.getInt(SERVER_INGESTION_INFO_LOG_LINE_LIMIT, 20);
this.parallelResourceShutdownEnabled =
serverProperties.getBoolean(SERVER_PARALLEL_RESOURCE_SHUTDOWN_ENABLED, false);
Expand Down Expand Up @@ -2124,6 +2132,14 @@ public boolean isUseMetricsBasedPositionInLagComputationEnabled() {
return this.useMetricsBasedPositionInLagComputation;
}

public boolean isUseUpstreamPubSubPositionWithFallbackEnabled() {
return this.useUpstreamPubSubPositionWithFallback;
}

public boolean isUseCheckpointedPubSubPositionWithFallbackEnabled() {
return this.useCheckpointedPubSubPositionWithFallback;
}

public int getServerIngestionInfoLogLineLimit() {
return this.serverIngestionInfoLogLineLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.linkedin.venice.pubsub.PubSubPositionDeserializer;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.PubSubUtil;
import com.linkedin.venice.pubsub.api.DefaultPubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
Expand Down Expand Up @@ -1529,17 +1530,27 @@ protected boolean handleVersionSwapControlMessage(
// client should never go backwards.
List<Long> localOffset = (List<Long>) currentVersionHighWatermarks
.getOrDefault(pubSubTopicPartition.getPartitionNumber(), Collections.EMPTY_MAP)
.getOrDefault(upstreamPartition, Collections.EMPTY_LIST);
// safety checks
if (localOffset == null) {
localOffset = new ArrayList<>();
.getOrDefault(upstreamPartition, new ArrayList<>(4));

// Prefer position-based high watermarks if available, otherwise use legacy offset-based
List<ByteBuffer> positions = versionSwap.getLocalHighWatermarkPubSubPositions();
List<Long> highWatermarkOffsets;

if (positions != null && !positions.isEmpty()) {
List<Long> legacyOffsets = versionSwap.getLocalHighWatermarks();
highWatermarkOffsets = new ArrayList<>(positions.size());
for (int i = 0; i < positions.size(); i++) {
long fallbackOffset = (legacyOffsets != null && i < legacyOffsets.size()) ? legacyOffsets.get(i) : -1L;
PubSubPosition position = PubSubUtil
.deserializePositionWithOffsetFallback(positions.get(i), fallbackOffset, pubSubPositionDeserializer);
highWatermarkOffsets.add(position.getNumericOffset());
}
} else {
highWatermarkOffsets = (versionSwap.getLocalHighWatermarks() != null)
? versionSwap.getLocalHighWatermarks()
: Collections.emptyList();
}
List<Long> highWatermarkOffsets = versionSwap.localHighWatermarkPubSubPositions == null
? new ArrayList<>()
: versionSwap.getLocalHighWatermarkPubSubPositions()
.stream()
.map(bb -> pubSubPositionDeserializer.toPosition(bb).getNumericOffset())
.collect(Collectors.toList());

if (RmdUtils.hasOffsetAdvanced(localOffset, highWatermarkOffsets)) {
currentVersionHighWatermarks
.putIfAbsent(pubSubTopicPartition.getPartitionNumber(), new ConcurrentHashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ public KafkaStoreIngestionService(
.setStoreChangeNotifier(asyncStoreChangeNotifier)
.setPubSubMessageDeserializer(pubSubDeserializer)
.setPubSubClientsFactory(pubSubClientsFactory)
.setUseCheckpointedPubSubPositionWithFallback(serverConfig.isUseCheckpointedPubSubPositionWithFallbackEnabled())
.build();

VeniceNotifier notifier = new LogNotifier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,13 +1114,19 @@ protected void syncConsumedUpstreamRTOffsetMapIfNeeded(
Map<String, PubSubPosition> upstreamStartPositionByPubSubUrl) {
// Update in-memory consumedUpstreamRTOffsetMap in case no RT record is consumed after the subscription
final PubSubTopic leaderTopic = pcs.getOffsetRecord().getLeaderTopic(pubSubTopicRepository);
final PubSubTopicPartition leaderTopicPartition = pcs.getSourceTopicPartition(leaderTopic);
if (leaderTopic != null && leaderTopic.isRealTime()) {
upstreamStartPositionByPubSubUrl.forEach((kafkaURL, upstreamStartPosition) -> {
PubSubPosition latestConsumedRtPosition =
getLatestConsumedUpstreamPositionForHybridOffsetLagMeasurement(pcs, kafkaURL);
// update latest consumed RT position if incoming upstream start position is greater
// than the latest consumed RT position
if (upstreamStartPosition.getNumericOffset() > latestConsumedRtPosition.getNumericOffset()) {
if ((PubSubSymbolicPosition.EARLIEST.equals(latestConsumedRtPosition)
&& !PubSubSymbolicPosition.EARLIEST.equals(upstreamStartPosition))
|| getTopicManager(kafkaURL).diffPosition(
Utils.createPubSubTopicPartitionFromLeaderTopicPartition(kafkaURL, leaderTopicPartition),
upstreamStartPosition,
latestConsumedRtPosition) > 0) {
updateLatestConsumedRtPositions(pcs, kafkaURL, upstreamStartPosition);
}
});
Expand Down Expand Up @@ -1600,7 +1606,8 @@ protected static void checkAndHandleUpstreamOffsetRewind(
final PubSubPosition newUpstreamPosition,
final PubSubPosition previousUpstreamPosition,
LeaderFollowerStoreIngestionTask ingestionTask) {
if (newUpstreamPosition.getNumericOffset() >= previousUpstreamPosition.getNumericOffset()) {
if (PubSubSymbolicPosition.EARLIEST.equals(previousUpstreamPosition)
|| topicManager.diffPosition(pubSubTopicPartition, newUpstreamPosition, previousUpstreamPosition) >= 0) {
return; // Rewind did not happen
}
if (!ingestionTask.isHybridMode()) {
Expand Down Expand Up @@ -2013,7 +2020,8 @@ protected boolean shouldProcessRecord(DefaultPubSubMessage record) {
}

PubSubPosition lastProcessedVtPos = partitionConsumptionState.getLatestProcessedVtPosition();
if (lastProcessedVtPos.getNumericOffset() >= record.getPosition().getNumericOffset()) {
if (!PubSubSymbolicPosition.EARLIEST.equals(lastProcessedVtPos) && topicManagerRepository.getLocalTopicManager()
.diffPosition(record.getTopicPartition(), lastProcessedVtPos, record.getPosition()) >= 0) {
String message = partitionConsumptionState.getLeaderFollowerState() + " replica: "
+ partitionConsumptionState.getReplicaId() + " had already processed the record";
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(message)) {
Expand Down Expand Up @@ -3781,7 +3789,7 @@ void loadGlobalRtDiv(int partition, String brokerUrl) {
// If the GlobalRtDivState is not present, it could be acceptable if this could be the first leader to be elected
// Object not existing could be problematic if this isn't the first leader (detected via nonzero leaderPosition)
PubSubPosition leaderPosition = pcs.getLeaderPosition(brokerUrl, false);
if (leaderPosition.getNumericOffset() > 0) {
if (!PubSubSymbolicPosition.EARLIEST.equals(leaderPosition)) {
LOGGER.warn(
"Unable to retrieve Global RT DIV from storage engine for topic-partition: {} brokerUrl: {} leaderPosition: {}",
topicPartition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pubsub.PubSubContext;
import com.linkedin.venice.pubsub.PubSubPositionDeserializer;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.PubSubUtil;
Expand Down Expand Up @@ -4888,9 +4887,16 @@ protected PubSubPosition extractUpstreamPosition(DefaultPubSubMessage consumerRe
}
LeaderMetadata leaderMetadataFooter = consumerRecord.getValue().leaderMetadataFooter;

// always return upstreamOffset instead of upstreamPubSubPosition
// till we fix all the issues in offset to pubsubPosition migration
return PubSubUtil.fromKafkaOffset(leaderMetadataFooter.upstreamOffset);
// Check config to determine whether to use upstreamPubSubPosition with fallback or just upstreamOffset
if (storeVersionConfig.isUseUpstreamPubSubPositionWithFallbackEnabled()) {
return PubSubUtil.deserializePositionWithOffsetFallback(
leaderMetadataFooter.upstreamPubSubPosition,
leaderMetadataFooter.upstreamOffset,
pubSubContext.getPubSubPositionDeserializer());
} else {
// Directly use upstreamOffset without attempting position deserialization
return PubSubUtil.fromKafkaOffset(leaderMetadataFooter.upstreamOffset);
}
}

// extract the upstream cluster id from the given consumer record's leader metadata.
Expand Down Expand Up @@ -5196,49 +5202,6 @@ PubSubContext getPubSubContext() {
return pubSubContext;
}

PubSubPosition deserializePositionWithOffsetFallback(
PubSubPositionDeserializer pubSubPositionDeserializer,
PubSubTopicPartition topicPartition,
ByteBuffer wireFormatBytes,
long offset) {
// Fast path: nothing to deserialize
if (wireFormatBytes == null || !wireFormatBytes.hasRemaining()) {
return PubSubUtil.fromKafkaOffset(offset);
}

try {
final PubSubPosition position = pubSubPositionDeserializer.toPosition(wireFormatBytes);
// Guard against regressions: honor the caller-provided minimum offset.
if (offset > 0 && position.getNumericOffset() < offset) {
String context = String.format(" for: %s/%s", topicPartition, versionTopic);
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(context)) {
LOGGER.warn(
"Deserialized position: {} is behind the provided offset: {}. Using offset-based position for: {}/{}",
position,
offset,
topicPartition,
versionTopic);
}
return PubSubUtil.fromKafkaOffset(offset);
}

return position;
} catch (Exception e) {
String context = String.format("%s/%s - %s", topicPartition, versionTopic, e.getMessage());
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(context)) {
LOGGER.warn(
"Failed to deserialize PubSubPosition for: {}/{}. Using offset-based position (offset={}, bufferRem={}, bufferCap={}).",
topicPartition,
versionTopic,
offset,
wireFormatBytes.remaining(),
wireFormatBytes.capacity(),
e);
}
return PubSubUtil.fromKafkaOffset(offset);
}
}

/**
* Validates that END_OF_PUSH has been received before processing TOPIC_SWITCH.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.davinci.validation;

import static com.linkedin.davinci.validation.DataIntegrityValidator.DISABLED;
import static com.linkedin.venice.pubsub.PubSubUtil.getPubSubPositionString;
import static com.linkedin.venice.pubsub.PubSubUtil.deserializePositionWithOffsetFallback;

import com.linkedin.venice.annotation.Threadsafe;
import com.linkedin.venice.annotation.VisibleForTesting;
Expand Down Expand Up @@ -901,9 +901,10 @@ private String generateMessage(
if (consumerRecord.getValue().leaderMetadataFooter != null) {
sb.append("; LeaderMetadata { upstream position: ")
.append(
getPubSubPositionString(
pubSubPositionDeserializer,
consumerRecord.getValue().leaderMetadataFooter.upstreamPubSubPosition))
deserializePositionWithOffsetFallback(
consumerRecord.getValue().leaderMetadataFooter.upstreamPubSubPosition,
consumerRecord.getValue().leaderMetadataFooter.upstreamOffset,
pubSubPositionDeserializer))
.append("; upstream pub sub cluster ID: ")
.append(consumerRecord.getValue().leaderMetadataFooter.upstreamKafkaClusterId)
.append("; producer host name: ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.PubSubContext;
import com.linkedin.venice.pubsub.PubSubPositionDeserializer;
import com.linkedin.venice.pubsub.PubSubTopicImpl;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
Expand Down Expand Up @@ -867,47 +866,6 @@ public void testIsLocalVersionTopicPartitionFullyConsumed(
}
}

@Test(timeOut = 60_000)
public void testDeserializePositionWithOffsetFallback() throws InterruptedException {
setUp();

// Use DEFAULT_DESERIALIZER instead of mocking
PubSubPositionDeserializer deserializer = PubSubPositionDeserializer.DEFAULT_DESERIALIZER;
PubSubTopicPartition mockTopicPartition = mock(PubSubTopicPartition.class);
when(mockTopicPartition.toString()).thenReturn("test-topic_v1-0");

// Use doCallRealMethod to test the actual implementation
doCallRealMethod().when(leaderFollowerStoreIngestionTask)
.deserializePositionWithOffsetFallback(any(), any(), any(), anyLong());
// Case 1: Null ByteBuffer - should return offset-based position
PubSubPosition actualPosition = leaderFollowerStoreIngestionTask
.deserializePositionWithOffsetFallback(deserializer, mockTopicPartition, null, 100L);
assertEquals(actualPosition.getNumericOffset(), 100L, "Null ByteBuffer should return offset-based position");

// Case 2: Empty ByteBuffer - should return offset-based position
actualPosition = leaderFollowerStoreIngestionTask
.deserializePositionWithOffsetFallback(deserializer, mockTopicPartition, ByteBuffer.allocate(0), 200L);
assertEquals(actualPosition.getNumericOffset(), 200L, "Empty ByteBuffer should return offset-based position");

// Case 3: Invalid ByteBuffer - should return offset-based position
ByteBuffer invalidBuffer = ByteBuffer.wrap(new byte[] { 0x01, 0x02, 0x03 }); // Random invalid bytes
actualPosition = leaderFollowerStoreIngestionTask
.deserializePositionWithOffsetFallback(deserializer, mockTopicPartition, invalidBuffer, 300L);
assertEquals(actualPosition.getNumericOffset(), 300L, "Invalid ByteBuffer should return offset-based position");

// Case 4: Valid ByteBuffer - should return deserialized position
ByteBuffer validBuffer = ApacheKafkaOffsetPosition.of(400L).toWireFormatBuffer();
actualPosition = leaderFollowerStoreIngestionTask
.deserializePositionWithOffsetFallback(deserializer, mockTopicPartition, validBuffer, 0L);
assertEquals(actualPosition.getNumericOffset(), 400L, "Valid ByteBuffer should return deserialized position");

// Case 5: ByteBuffer has EARLIEST symbolic position and offset is 0
ByteBuffer earliestBuffer = PubSubSymbolicPosition.EARLIEST.toWireFormatBuffer();
actualPosition = leaderFollowerStoreIngestionTask
.deserializePositionWithOffsetFallback(deserializer, mockTopicPartition, earliestBuffer, 0L);
assertEquals(actualPosition, PubSubSymbolicPosition.EARLIEST, "Should return EARLIEST symbolic position");
}

@Test(timeOut = 60_000)
public void testExtractUpstreamClusterId() throws InterruptedException {
setUp();
Expand Down
Loading
Loading