Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2333,6 +2333,14 @@ private ConfigKeys() {
*/
public static final String CONCURRENT_INIT_ROUTINES_ENABLED = "concurrent.init.routines.enabled";

/**
* Block the state transition until all initialization routines complete on the first execution. When enabled,
* the controller will wait for all init routines to finish before returning from the STANDBY -> LEADER state
* transition, but only the first time the controller becomes leader for a given cluster. Subsequent leadership
* transitions will execute routines asynchronously.
*/
public static final String BLOCK_UNTIL_INIT_ROUTINES_COMPLETE = "block.until.init.routines.complete";

/**
* A config to control graceful shutdown.
* True: servers will flush all remain data in producers buffers and drainer queues, and persist all data including offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_ACL_ENABLED;
import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_MANAGER_ENABLED;
import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_SSL_ENABLED;
import static com.linkedin.venice.ConfigKeys.BLOCK_UNTIL_INIT_ROUTINES_COMPLETE;
import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_CLIENT_PORT;
import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_SERVER_PORT;
Expand Down Expand Up @@ -120,6 +121,7 @@ public void setUp() {
clusterConfig.put(PUSH_STATUS_STORE_ENABLED, true);
clusterConfig.put(DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS, 3);
clusterConfig.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false);
clusterConfig.put(BLOCK_UNTIL_INIT_ROUTINES_COMPLETE, true);

VeniceClusterCreateOptions options = new VeniceClusterCreateOptions.Builder().numberOfControllers(1)
.numberOfServers(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES;
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.BLOCK_UNTIL_INIT_ROUTINES_COMPLETE;
import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH;
import static com.linkedin.venice.ConfigKeys.NATIVE_REPLICATION_SOURCE_FABRIC;
import static com.linkedin.venice.ConfigKeys.PARENT_KAFKA_CLUSTER_FABRIC_LIST;
Expand Down Expand Up @@ -126,6 +127,7 @@ public void setUp() {
Properties controllerProps = new Properties();
controllerProps.put(NATIVE_REPLICATION_SOURCE_FABRIC, "dc-0");
controllerProps.put(PARENT_KAFKA_CLUSTER_FABRIC_LIST, DEFAULT_PARENT_DATA_CENTER_REGION_NAME);
controllerProps.put(BLOCK_UNTIL_INIT_ROUTINES_COMPLETE, true);
VeniceMultiRegionClusterCreateOptions.Builder optionsBuilder =
new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(NUMBER_OF_CHILD_DATACENTERS)
.numberOfClusters(NUMBER_OF_CLUSTERS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_BLOB_FILES_ENABLED;
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.BLOCK_UNTIL_INIT_ROUTINES_COMPLETE;
import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.LOG_COMPACTION_ENABLED;
Expand Down Expand Up @@ -1513,6 +1514,7 @@ public void testHybridWithPartitionWiseConsumer() throws Exception {
for (int i = 0; i < keyCount; i++) {
IntegrationTestPushUtils.sendStreamingRecord(producer, storeName, i, i);
}
producer.flush("");
producer.stop();
TestUtils.waitForNonDeterministicAssertion(20, TimeUnit.SECONDS, true, true, () -> {
for (int i = 0; i < keyCount; i++) {
Expand All @@ -1530,6 +1532,7 @@ public void testHybridWithPartitionWiseConsumer() throws Exception {
for (int i = keyCount; i < keyCount * 2; i++) {
IntegrationTestPushUtils.sendStreamingRecord(producer, storeName, i, i);
}
producer.flush("");
producer.stop();
TestUtils.waitForNonDeterministicAssertion(20, TimeUnit.SECONDS, true, true, () -> {
for (int i = 0; i < keyCount * 2; i++) {
Expand Down Expand Up @@ -1700,6 +1703,7 @@ private static VeniceClusterWrapper setUpCluster(boolean enablePartitionWiseShar
extraProperties.setProperty(DEFAULT_MAX_NUMBER_OF_PARTITIONS, "5");

// log compaction controller configs
extraProperties.put(BLOCK_UNTIL_INIT_ROUTINES_COMPLETE, true);
extraProperties.setProperty(REPUSH_ORCHESTRATOR_CLASS_NAME, TestHybrid.TestRepushOrchestratorImpl.class.getName());
extraProperties.setProperty(LOG_COMPACTION_ENABLED, "true");
extraProperties.setProperty(LOG_COMPACTION_SCHEDULING_ENABLED, "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ static ServiceProvider<VeniceClusterWrapper> generateService(VeniceClusterCreate
// VeniceClusterWrapper to tests.
if (!veniceClusterWrapper.getVeniceControllers().isEmpty()) {
final VeniceClusterWrapper finalClusterWrapper = veniceClusterWrapper;
TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MINUTES, true, () -> {
TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.MINUTES, true, () -> {
try {
for (AvroProtocolDefinition avroProtocolDefinition: CLUSTER_LEADER_INITIALIZATION_ROUTINES) {
Store store = finalClusterWrapper.getLeaderVeniceController()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static com.linkedin.venice.ConfigKeys.ADMIN_TOPIC_SOURCE_REGION;
import static com.linkedin.venice.ConfigKeys.AGGREGATE_REAL_TIME_SOURCE_REGION;
import static com.linkedin.venice.ConfigKeys.ALLOW_CLUSTER_WIPE;
import static com.linkedin.venice.ConfigKeys.BLOCK_UNTIL_INIT_ROUTINES_COMPLETE;
import static com.linkedin.venice.ConfigKeys.CHILD_CLUSTER_ALLOWLIST;
import static com.linkedin.venice.ConfigKeys.CHILD_CLUSTER_D2_PREFIX;
import static com.linkedin.venice.ConfigKeys.CHILD_CLUSTER_D2_SERVICE_NAME;
Expand Down Expand Up @@ -419,6 +420,8 @@ public class VeniceControllerClusterConfig {

private final boolean concurrentInitRoutinesEnabled;

private final boolean blockUntilInitRoutinesComplete;

private final boolean controllerClusterHelixCloudEnabled;
private final boolean storageClusterHelixCloudEnabled;
private final CloudConfig helixCloudConfig;
Expand Down Expand Up @@ -1066,6 +1069,7 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
this.emergencySourceRegion = props.getString(EMERGENCY_SOURCE_REGION, "");
this.allowClusterWipe = props.getBoolean(ALLOW_CLUSTER_WIPE, false);
this.concurrentInitRoutinesEnabled = props.getBoolean(CONCURRENT_INIT_ROUTINES_ENABLED, false);
this.blockUntilInitRoutinesComplete = props.getBoolean(BLOCK_UNTIL_INIT_ROUTINES_COMPLETE, false);
this.controllerClusterHelixCloudEnabled = props.getBoolean(CONTROLLER_CLUSTER_HELIX_CLOUD_ENABLED, false);
this.storageClusterHelixCloudEnabled = props.getBoolean(CONTROLLER_STORAGE_CLUSTER_HELIX_CLOUD_ENABLED, false);

Expand Down Expand Up @@ -1963,6 +1967,10 @@ public boolean isConcurrentInitRoutinesEnabled() {
return concurrentInitRoutinesEnabled;
}

public boolean isBlockUntilInitRoutinesComplete() {
return blockUntilInitRoutinesComplete;
}

public boolean isControllerClusterHelixCloudEnabled() {
return controllerClusterHelixCloudEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ public VeniceHelixAdmin(
ClusterLeaderInitializationManager clusterLeaderInitializationManager = new ClusterLeaderInitializationManager(
initRoutines,
commonConfig.isConcurrentInitRoutinesEnabled(),
commonConfig.isBlockUntilInitRoutinesComplete(),
commonConfig.getLogContext());

// Create the controller cluster if required.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,25 @@ public class ClusterLeaderInitializationManager implements ClusterLeaderInitiali
new VeniceConcurrentHashMap<>();
private final List<ClusterLeaderInitializationRoutine> initRoutines;
private final boolean concurrentInit;
private final boolean blockUntilComplete;
private final LogContext logContext;
private volatile boolean hasExecutedOnce = false;

public ClusterLeaderInitializationManager(
List<ClusterLeaderInitializationRoutine> initRoutines,
boolean concurrentInit,
LogContext logContext) {
this(initRoutines, concurrentInit, false, logContext);
}

public ClusterLeaderInitializationManager(
List<ClusterLeaderInitializationRoutine> initRoutines,
boolean concurrentInit,
boolean blockUntilComplete,
LogContext logContext) {
this.initRoutines = initRoutines;
this.concurrentInit = concurrentInit;
this.blockUntilComplete = blockUntilComplete;
this.logContext = logContext;
}

Expand All @@ -47,31 +58,90 @@ public void execute(String clusterToInit) {
Map<ClusterLeaderInitializationRoutine, Object> initializedRoutinesForCluster =
initializedClusters.computeIfAbsent(clusterToInit, k -> new VeniceConcurrentHashMap<>());

// Check if this is the first time execute() has been called on this manager instance
boolean isFirstExecution = !hasExecutedOnce;
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hasExecutedOnce flag is read without synchronization at line 62, then potentially updated at lines 106 and 110. This creates a race condition where multiple threads calling execute() concurrently could both read false for isFirstExecution, causing both to block. This violates the stated intent that only the first execution should block. Consider using AtomicBoolean.compareAndSet() to atomically check and set the flag, ensuring only one thread can observe isFirstExecution as true.

Copilot uses AI. Check for mistakes.

LOGGER.info(
"Starting initialization routines for cluster: {}. Mode: {}, BlockUntilComplete: {}, FirstExecution: {}, RoutineCount: {}",
clusterToInit,
concurrentInit ? "CONCURRENT" : "SEQUENTIAL",
blockUntilComplete,
isFirstExecution,
initRoutines.size());

long startTime = System.currentTimeMillis();
CompletableFuture<Void> allRoutinesFuture;
if (concurrentInit) {
initRoutines.forEach(
routine -> CompletableFuture
.runAsync(() -> initRoutine(clusterToInit, initializedRoutinesForCluster, routine)));
CompletableFuture<?>[] futures = initRoutines.stream()
.map(
routine -> CompletableFuture
.runAsync(() -> initRoutine(clusterToInit, initializedRoutinesForCluster, routine)))
.toArray(CompletableFuture[]::new);
allRoutinesFuture = CompletableFuture.allOf(futures);
} else {
CompletableFuture.runAsync(
allRoutinesFuture = CompletableFuture.runAsync(
() -> initRoutines.forEach(routine -> initRoutine(clusterToInit, initializedRoutinesForCluster, routine)));
}

// Block on the first execution if the flag is enabled
if (blockUntilComplete && isFirstExecution) {
try {
LOGGER.info(
"Blocking until all initialization routines complete for cluster: {} (first time execute called)",
clusterToInit);
allRoutinesFuture.join();
long elapsedMs = System.currentTimeMillis() - startTime;
LOGGER.info(
"All initialization routines completed for cluster: {} (first time execute called). Elapsed time: {} ms",
clusterToInit,
elapsedMs);
} catch (Exception e) {
long elapsedMs = System.currentTimeMillis() - startTime;
LOGGER.error(
"Error while waiting for initialization routines to complete for cluster: {}. Elapsed time: {} ms",
clusterToInit,
elapsedMs,
e);
} finally {
hasExecutedOnce = true;
}
} else {
if (!hasExecutedOnce) {
hasExecutedOnce = true;
}
Comment on lines +109 to +111
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The else branch updates hasExecutedOnce redundantly. If hasExecutedOnce is already true, setting it to true again is unnecessary. More importantly, this update at line 110 is not thread-safe and can race with the update in the finally block at line 106 if multiple threads execute this code concurrently. Consider removing lines 109-111 since the flag will be set appropriately by the finally block in the blocking path or should be set atomically before any async execution starts.

Suggested change
if (!hasExecutedOnce) {
hasExecutedOnce = true;
}

Copilot uses AI. Check for mistakes.
LOGGER.info("Initialization routines started asynchronously for cluster: {}. Blocking: false", clusterToInit);
}
}

private void initRoutine(
String clusterToInit,
Map<ClusterLeaderInitializationRoutine, Object> initializedRoutinesForCluster,
ClusterLeaderInitializationRoutine routine) {
initializedRoutinesForCluster.computeIfAbsent(routine, k -> {
long routineStartTime = System.currentTimeMillis();
try {
LogContext.setLogContext(logContext);
LOGGER.info(logMessage("Starting", routine, clusterToInit));
LOGGER.info(
"Starting execution of initialization routine '{}' for cluster '{}'",
routine.toString(),
clusterToInit);
routine.execute(clusterToInit);
LOGGER.info(logMessage("Finished", routine, clusterToInit));
long routineElapsedMs = System.currentTimeMillis() - routineStartTime;
LOGGER.info(
"Finished execution of initialization routine '{}' for cluster '{}'. Elapsed time: {} ms",
routine.toString(),
clusterToInit,
routineElapsedMs);
} catch (Exception e) {
long routineElapsedMs = System.currentTimeMillis() - routineStartTime;
LOGGER.error(
logMessage("Failed", routine, clusterToInit) + (concurrentInit
? " Other initialization routines are unaffected."
: " Will proceed to the next initialization routine."),
"Failed execution of initialization routine '{}' for cluster '{}'. Elapsed time: {} ms. {}",
routine.toString(),
clusterToInit,
routineElapsedMs,
(concurrentInit
? "Other initialization routines are unaffected."
: "Will proceed to the next initialization routine."),
e);
return null; // Will not populate the inner map...
} finally {
Expand All @@ -80,8 +150,4 @@ private void initRoutine(
return new Object(); // Success
});
}

private String logMessage(String action, ClusterLeaderInitializationRoutine routine, String clusterToInit) {
return action + " execution of '" + routine.toString() + "' for cluster '" + clusterToInit + "'.";
}
}
Loading
Loading