Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.venice.exceptions.VeniceNoClusterException;
import com.linkedin.venice.hooks.StoreLifecycleHooks;
import com.linkedin.venice.hooks.StoreVersionLifecycleEventOutcome;
import com.linkedin.venice.meta.ConcurrentPushDetectionStrategy;
import com.linkedin.venice.meta.LifecycleHooksRecord;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
import com.linkedin.venice.meta.Store;
Expand Down Expand Up @@ -1225,7 +1226,10 @@ public void updateStore(String clusterName, String storeName, VersionStatus stat
// For jobs that stop polling early or for pushes that don't poll (empty push), we need to truncate the parent
// VT here to unblock the next push
String kafkaTopicName = Version.composeKafkaTopic(storeName, targetVersionNum);
if (!veniceParentHelixAdmin.isTopicTruncated(kafkaTopicName)) {
ConcurrentPushDetectionStrategy strategy =
veniceControllerMultiClusterConfig.getControllerConfig(clusterName).getConcurrentPushDetectionStrategy();
// skip truncating if the topic was not created based on ConcurrentPushDetectionStrategy
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment could be more precise. It currently says "skip truncating if the topic was not created" but the logic checks if isTopicWriteNeeded() returns true. Consider revising to: "skip truncating if the topic was not created (when strategy doesn't require topic writes)" to better explain the relationship between the strategy and topic creation.

Suggested change
// skip truncating if the topic was not created based on ConcurrentPushDetectionStrategy
// Skip truncating if the topic was not created, i.e., when the configured strategy does not require topic writes.

Copilot uses AI. Check for mistakes.
if (strategy.isTopicWriteNeeded() && !veniceParentHelixAdmin.isTopicTruncated(kafkaTopicName)) {
LOGGER.info("Truncating parent VT for {}", kafkaTopicName);
veniceParentHelixAdmin.truncateKafkaTopic(Version.composeKafkaTopic(storeName, targetVersionNum));
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Kafka topic name is being recomputed using Version.composeKafkaTopic(storeName, targetVersionNum) even though it was already computed on line 1228 and stored in the kafkaTopicName variable. Consider using the kafkaTopicName variable instead to avoid redundant computation and improve code maintainability.

Suggested change
veniceParentHelixAdmin.truncateKafkaTopic(Version.composeKafkaTopic(storeName, targetVersionNum));
veniceParentHelixAdmin.truncateKafkaTopic(kafkaTopicName);

Copilot uses AI. Check for mistakes.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hooks.StoreVersionLifecycleEventOutcome;
import com.linkedin.venice.meta.ConcurrentPushDetectionStrategy;
import com.linkedin.venice.meta.LifecycleHooksRecord;
import com.linkedin.venice.meta.LifecycleHooksRecordImpl;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
Expand Down Expand Up @@ -82,6 +83,8 @@ public void setUp() {
doReturn("").when(clusterConfig).getDeferredVersionSwapRegionRollforwardOrder();
doReturn(clusterConfig).when(veniceControllerMultiClusterConfig).getControllerConfig(clusterName);
doReturn(1).when(clusterConfig).getDeferredVersionSwapThreadPoolSize();
doReturn(ConcurrentPushDetectionStrategy.PARENT_VERSION_STATUS_ONLY).when(clusterConfig)
.getConcurrentPushDetectionStrategy();

childDatacenterToUrl.put(region1, "test");
childDatacenterToUrl.put(region2, "test");
Expand Down Expand Up @@ -264,6 +267,7 @@ public void testDeferredVersionSwap() throws Exception {
TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> {
verify(admin, atLeast(1)).rollForwardToFutureVersion(clusterName, storeName, region2 + "," + region3);
verify(store, atLeast(1)).updateVersionStatus(versionTwo, VersionStatus.ONLINE);
verify(admin, never()).truncateKafkaTopic(anyString());
});
}

Expand Down Expand Up @@ -527,6 +531,7 @@ public void testDeferredVersionSwapFailedRollForward() throws Exception {
TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> {
verify(admin, atLeast(1)).rollForwardToFutureVersion(clusterName, storeName, region2 + "," + region3);
verify(store, atLeast(1)).updateVersionStatus(versionTwo, VersionStatus.PARTIALLY_ONLINE);
verify(admin, never()).truncateKafkaTopic(anyString());
});
}

Expand Down Expand Up @@ -645,6 +650,7 @@ public void testDeferredVersionSwapWithOnlineChild() throws Exception {

TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> {
verify(store, atLeast(1)).updateVersionStatus(2, VersionStatus.ONLINE);
verify(admin, never()).truncateKafkaTopic(anyString());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hooks.StoreVersionLifecycleEventOutcome;
import com.linkedin.venice.meta.ConcurrentPushDetectionStrategy;
import com.linkedin.venice.meta.LifecycleHooksRecord;
import com.linkedin.venice.meta.LifecycleHooksRecordImpl;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
Expand Down Expand Up @@ -417,7 +418,9 @@ public void testSequentialRolloutFailurePath() throws Exception {

// Simulate failure on region2 rollout by making rollForwardToFutureVersion throw an exception when region2 appears
doThrow(new VeniceException()).when(admin).rollForwardToFutureVersion(clusterName, storeName, region2);

doReturn(clusterConfig).when(veniceControllerMultiClusterConfig).getControllerConfig(clusterName);
doReturn(ConcurrentPushDetectionStrategy.PARENT_VERSION_STATUS_ONLY).when(clusterConfig)
.getConcurrentPushDetectionStrategy();
// Create service
DeferredVersionSwapService deferredVersionSwapService =
new DeferredVersionSwapService(admin, veniceControllerMultiClusterConfig, stats, metricsRepository);
Expand All @@ -430,6 +433,7 @@ public void testSequentialRolloutFailurePath() throws Exception {
// Verify error recording was called due to the failure
verify(store, atLeastOnce()).updateVersionStatus(2, VersionStatus.PARTIALLY_ONLINE);
verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName, region3);
verify(admin, never()).truncateKafkaTopic(anyString(), anyInt());
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The verify call is checking for the wrong method signature. The actual code in DeferredVersionSwapService.java line 1234 calls truncateKafkaTopic(String topicName) with one parameter, but this test is verifying the two-parameter overload truncateKafkaTopic(anyString(), anyInt()). This means the test would not detect if the one-parameter version is called. Change this to verify(admin, never()).truncateKafkaTopic(anyString()); to match the actual method being called.

Suggested change
verify(admin, never()).truncateKafkaTopic(anyString(), anyInt());
verify(admin, never()).truncateKafkaTopic(anyString());

Copilot uses AI. Check for mistakes.
});
}

Expand Down Expand Up @@ -595,6 +599,10 @@ public void testSequentialRolloutFinalRegionCompletion() throws Exception {
String kafkaTopicName = Version.composeKafkaTopic(storeName, versionTwo);
doReturn(offlinePushStatusInfoWithCompletedPush).when(admin).getOffLinePushStatus(clusterName, kafkaTopicName);

doReturn(clusterConfig).when(veniceControllerMultiClusterConfig).getControllerConfig(clusterName);
doReturn(ConcurrentPushDetectionStrategy.TOPIC_BASED_ONLY).when(clusterConfig).getConcurrentPushDetectionStrategy();
doReturn(false).when(admin).isTopicTruncated(anyString());
Comment on lines +603 to +604
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test sets ConcurrentPushDetectionStrategy to TOPIC_BASED_ONLY (line 603) and mocks isTopicTruncated to return false (line 604), which according to the code logic in DeferredVersionSwapService.java line 1232 would trigger truncation (since isTopicWriteNeeded() returns true for TOPIC_BASED_ONLY and the topic is not truncated). However, line 616 expects that truncateKafkaTopic is never called. This test may not be properly validating the intended behavior. Consider either changing the strategy to PARENT_VERSION_STATUS_ONLY if the intent is to test that truncation doesn't happen, or adjusting the expectation to verify that truncation does happen with TOPIC_BASED_ONLY strategy.

Copilot uses AI. Check for mistakes.

// Create service
DeferredVersionSwapService deferredVersionSwapService =
new DeferredVersionSwapService(admin, veniceControllerMultiClusterConfig, stats, metricsRepository);
Expand All @@ -605,6 +613,7 @@ public void testSequentialRolloutFinalRegionCompletion() throws Exception {
TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> {
// Verify that updateStore was called to mark parent version as ONLINE
verify(store, atLeastOnce()).updateVersionStatus(versionTwo, VersionStatus.ONLINE);
verify(admin, never()).truncateKafkaTopic(anyString(), anyInt());
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The verify call is checking for the wrong method signature. The actual code in DeferredVersionSwapService.java line 1234 calls truncateKafkaTopic(String topicName) with one parameter, but this test is verifying the two-parameter overload truncateKafkaTopic(anyString(), anyInt()). This means the test would not detect if the one-parameter version is called. Change this to verify(admin, never()).truncateKafkaTopic(anyString()); to match the actual method being called.

Suggested change
verify(admin, never()).truncateKafkaTopic(anyString(), anyInt());
verify(admin, never()).truncateKafkaTopic(anyString());

Copilot uses AI. Check for mistakes.
});

// Verify error recording was not called
Expand Down
Loading