Skip to content

Conversation

@sushantmane
Copy link
Contributor

[controller] Add flag to block first leader transition until init routines complete

Cluster init routines run concurrently and asynchronously during leader
transition. If a routine fails or stalls, the controller can still become leader
and start serving traffic, leaving initialization issues unnoticed until later
failures occur.

The verification path in the test suite does not capture all init issues since it
only waits for store availability and not other steps executed as part of init
routines. In addition, the verification relies on a hard two minute timeout.

As more init routines have been added over time, this timeout is no longer
sufficient. Routines can legitimately take longer under load or during cold
starts, leading to consistent failures and test flakiness even though the
routines eventually succeed.

Introduce a block.until.init.routines.complete config to block the first leader
transition until all init routines complete. This provides a deterministic
signal that initialization has finished, rather than relying on a hard timeout.
Subsequent leader transitions remain async to preserve fast failover behavior.

Code changes

  • Added new code behind a config. If so list the config names and their default values in the PR description.
  • Introduced new log lines.
    • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues.
  • Proper synchronization mechanisms (e.g., synchronized, RWLock) are used where needed.
  • No blocking calls inside critical sections that could lead to deadlocks or performance degradation.
  • Verified thread-safe collections are used (e.g., ConcurrentHashMap, CopyOnWriteArrayList).
  • Validated proper exception handling in multi-threaded code to avoid silent thread termination.

How was this PR tested?

  • New unit tests added.
  • New integration tests added.
  • Modified or extended existing tests.
  • Verified backward compatibility (if applicable).

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes. Clearly explain the behavior change and its impact.

Copilot AI review requested due to automatic review settings December 12, 2025 06:35
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new configuration flag block.until.init.routines.complete to address test flakiness and initialization issues in the Venice controller. The PR enables blocking the first leader transition until all initialization routines complete, providing a deterministic signal that initialization has finished rather than relying on hard timeouts. Subsequent leader transitions remain asynchronous to preserve fast failover behavior.

Key changes:

  • Added a new boolean flag blockUntilComplete to ClusterLeaderInitializationManager that blocks the first execute() call until all initialization routines complete via CompletableFuture.join().
  • Improved logging throughout the initialization process to include elapsed times and clearer state information.
  • Increased test verification timeout from 2 to 3 minutes to accommodate longer initialization times under load.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java Adds the BLOCK_UNTIL_INIT_ROUTINES_COMPLETE configuration key with documentation.
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java Adds configuration property and getter for the new blocking flag.
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java Passes the blocking configuration to ClusterLeaderInitializationManager constructor.
services/venice-controller/src/main/java/com/linkedin/venice/controller/init/ClusterLeaderInitializationManager.java Implements blocking logic for first execution with hasExecutedOnce flag, improves logging with timing information, and properly collects all CompletableFuture instances for blocking.
services/venice-controller/src/test/java/com/linkedin/venice/controller/init/ClusterLeaderInitializationManagerTest.java Adds comprehensive unit tests covering blocking behavior, execution modes, failure handling, and edge cases.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java Enables the new blocking flag in test configuration.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java Increases verification timeout from 2 to 3 minutes to accommodate longer initialization times.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybrid.java Adds producer.flush() calls before stopping producers to ensure all records are sent.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

initializedClusters.computeIfAbsent(clusterToInit, k -> new VeniceConcurrentHashMap<>());

// Check if this is the first time execute() has been called on this manager instance
boolean isFirstExecution = !hasExecutedOnce;
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hasExecutedOnce flag is read without synchronization at line 62, then potentially updated at lines 106 and 110. This creates a race condition where multiple threads calling execute() concurrently could both read false for isFirstExecution, causing both to block. This violates the stated intent that only the first execution should block. Consider using AtomicBoolean.compareAndSet() to atomically check and set the flag, ensuring only one thread can observe isFirstExecution as true.

Copilot uses AI. Check for mistakes.
Comment on lines +109 to +111
if (!hasExecutedOnce) {
hasExecutedOnce = true;
}
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The else branch updates hasExecutedOnce redundantly. If hasExecutedOnce is already true, setting it to true again is unnecessary. More importantly, this update at line 110 is not thread-safe and can race with the update in the finally block at line 106 if multiple threads execute this code concurrently. Consider removing lines 109-111 since the flag will be set appropriately by the finally block in the blocking path or should be set atomically before any async execution starts.

Suggested change
if (!hasExecutedOnce) {
hasExecutedOnce = true;
}

Copilot uses AI. Check for mistakes.
Comment on lines +24 to +312
public class ClusterLeaderInitializationManagerTest {
private static final String TEST_CLUSTER = "test-cluster";
private static final long TIMEOUT_MS = 5000;
private static final LogContext TEST_LOG_CONTEXT = new LogContext.Builder().build();

/**
* Tests blocking behavior for first and subsequent executions.
*/
@Test(timeOut = TIMEOUT_MS)
public void testBlockingBehavior() throws InterruptedException {
// Case 1: Async execution when blocking is disabled
CountDownLatch routineStarted = new CountDownLatch(1);
CountDownLatch routineCanFinish = new CountDownLatch(1);
ClusterLeaderInitializationRoutine slowRoutine = mock(ClusterLeaderInitializationRoutine.class);
doAnswer(invocation -> {
routineStarted.countDown();
routineCanFinish.await(TIMEOUT_MS, TimeUnit.MILLISECONDS);
return null;
}).when(slowRoutine).execute(anyString());

ClusterLeaderInitializationManager asyncManager = new ClusterLeaderInitializationManager(
Collections.singletonList(slowRoutine),
false, // concurrentInit
false, // blockUntilComplete
TEST_LOG_CONTEXT);

long startTime = System.currentTimeMillis();
asyncManager.execute(TEST_CLUSTER);
long executionTime = System.currentTimeMillis() - startTime;

assertTrue(executionTime < 500, "Execute should return immediately when blocking is disabled");
assertTrue(routineStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS), "Routine should start executing");
routineCanFinish.countDown();
Thread.sleep(100);
verify(slowRoutine, times(1)).execute(TEST_CLUSTER);

// Case 2: Blocking on first execution when flag is enabled
AtomicBoolean routineCompleted = new AtomicBoolean(false);
ClusterLeaderInitializationRoutine blockingRoutine = mock(ClusterLeaderInitializationRoutine.class);
doAnswer(invocation -> {
Thread.sleep(200);
routineCompleted.set(true);
return null;
}).when(blockingRoutine).execute(anyString());

ClusterLeaderInitializationManager blockingManager = new ClusterLeaderInitializationManager(
Collections.singletonList(blockingRoutine),
false, // concurrentInit
true, // blockUntilComplete
TEST_LOG_CONTEXT);

blockingManager.execute(TEST_CLUSTER);
assertTrue(routineCompleted.get(), "Routine should be completed after execute returns");
verify(blockingRoutine, times(1)).execute(TEST_CLUSTER);

// Case 3: Subsequent executions are async even when blockUntilComplete is true
AtomicInteger executionCount = new AtomicInteger(0);
ClusterLeaderInitializationRoutine subsequentRoutine = mock(ClusterLeaderInitializationRoutine.class);
doAnswer(invocation -> {
executionCount.incrementAndGet();
return null;
}).when(subsequentRoutine).execute(anyString());

ClusterLeaderInitializationManager subsequentManager = new ClusterLeaderInitializationManager(
Collections.singletonList(subsequentRoutine),
false, // concurrentInit
true, // blockUntilComplete
TEST_LOG_CONTEXT);

// First execute() - blocks
subsequentManager.execute(TEST_CLUSTER);
assertEquals(executionCount.get(), 1, "First execution should have run");

// Second execute() - async (doesn't block)
startTime = System.currentTimeMillis();
subsequentManager.execute(TEST_CLUSTER);
executionTime = System.currentTimeMillis() - startTime;

assertTrue(executionTime < 500, "Second execution should return immediately (async)");
Thread.sleep(100); // Wait for async execution
assertEquals(executionCount.get(), 1, "Routine should not execute again (already initialized)");
}

/**
* Tests concurrent and sequential execution modes.
*/
@Test(timeOut = TIMEOUT_MS)
public void testExecutionModes() throws InterruptedException {
// Case 1: Concurrent execution - routines run in parallel
int routineCount = 3;
CountDownLatch allRoutinesStarted = new CountDownLatch(routineCount);
List<ClusterLeaderInitializationRoutine> concurrentRoutines = new ArrayList<>();

for (int i = 0; i < routineCount; i++) {
ClusterLeaderInitializationRoutine routine = mock(ClusterLeaderInitializationRoutine.class);
doAnswer(invocation -> {
allRoutinesStarted.countDown();
Thread.sleep(100);
return null;
}).when(routine).execute(anyString());
concurrentRoutines.add(routine);
}

ClusterLeaderInitializationManager concurrentManager = new ClusterLeaderInitializationManager(
concurrentRoutines,
true, // concurrentInit
true, // blockUntilComplete
TEST_LOG_CONTEXT);

long startTime = System.currentTimeMillis();
concurrentManager.execute(TEST_CLUSTER);
long executionTime = System.currentTimeMillis() - startTime;

assertTrue(allRoutinesStarted.await(100, TimeUnit.MILLISECONDS), "All routines should start concurrently");
assertTrue(executionTime < 250, "Concurrent execution should be faster than sequential");
for (ClusterLeaderInitializationRoutine routine: concurrentRoutines) {
verify(routine, times(1)).execute(TEST_CLUSTER);
}

// Case 2: Sequential execution - routines run in order
List<Integer> executionSequence = Collections.synchronizedList(new ArrayList<>());
List<ClusterLeaderInitializationRoutine> sequentialRoutines = new ArrayList<>();

for (int i = 0; i < routineCount; i++) {
final int routineId = i;
ClusterLeaderInitializationRoutine routine = mock(ClusterLeaderInitializationRoutine.class);
doAnswer(invocation -> {
executionSequence.add(routineId);
Thread.sleep(50);
return null;
}).when(routine).execute(anyString());
sequentialRoutines.add(routine);
}

ClusterLeaderInitializationManager sequentialManager = new ClusterLeaderInitializationManager(
sequentialRoutines,
false, // concurrentInit (sequential)
true, // blockUntilComplete
TEST_LOG_CONTEXT);

sequentialManager.execute(TEST_CLUSTER);

assertEquals(executionSequence.size(), routineCount, "All routines should execute");
assertEquals(executionSequence, Arrays.asList(0, 1, 2), "Routines should execute in order");
}

/**
* Tests failure handling in both concurrent and sequential modes, plus retry logic.
*/
@Test(timeOut = TIMEOUT_MS)
public void testFailureHandling() throws InterruptedException {
// Case 1: Concurrent mode - failures don't prevent other routines
ClusterLeaderInitializationRoutine failingRoutine1 = mock(ClusterLeaderInitializationRoutine.class);
doThrow(new RuntimeException("Test failure")).when(failingRoutine1).execute(anyString());

AtomicBoolean successRoutineExecuted1 = new AtomicBoolean(false);
ClusterLeaderInitializationRoutine successRoutine1 = mock(ClusterLeaderInitializationRoutine.class);
doAnswer(invocation -> {
Thread.sleep(50);
successRoutineExecuted1.set(true);
return null;
}).when(successRoutine1).execute(anyString());

ClusterLeaderInitializationManager concurrentManager = new ClusterLeaderInitializationManager(
Arrays.asList(failingRoutine1, successRoutine1),
true, // concurrentInit
true, // blockUntilComplete
TEST_LOG_CONTEXT);

concurrentManager.execute(TEST_CLUSTER);
assertTrue(successRoutineExecuted1.get(), "Successful routine should execute despite other routine failing");
verify(failingRoutine1, times(1)).execute(TEST_CLUSTER);
verify(successRoutine1, times(1)).execute(TEST_CLUSTER);

// Case 2: Sequential mode - failures don't prevent subsequent routines
ClusterLeaderInitializationRoutine failingRoutine2 = mock(ClusterLeaderInitializationRoutine.class);
doThrow(new RuntimeException("Test failure")).when(failingRoutine2).execute(anyString());

AtomicBoolean successRoutineExecuted2 = new AtomicBoolean(false);
ClusterLeaderInitializationRoutine successRoutine2 = mock(ClusterLeaderInitializationRoutine.class);
doAnswer(invocation -> {
successRoutineExecuted2.set(true);
return null;
}).when(successRoutine2).execute(anyString());

ClusterLeaderInitializationManager sequentialManager = new ClusterLeaderInitializationManager(
Arrays.asList(failingRoutine2, successRoutine2),
false, // concurrentInit (sequential)
true, // blockUntilComplete
TEST_LOG_CONTEXT);

sequentialManager.execute(TEST_CLUSTER);
assertTrue(successRoutineExecuted2.get(), "Subsequent routine should execute despite previous failure");
verify(failingRoutine2, times(1)).execute(TEST_CLUSTER);
verify(successRoutine2, times(1)).execute(TEST_CLUSTER);

// Case 3: Failed routines can be retried on subsequent execute() calls (async after first)
AtomicInteger attemptCount = new AtomicInteger(0);
ClusterLeaderInitializationRoutine flakyRoutine = mock(ClusterLeaderInitializationRoutine.class);
doAnswer(invocation -> {
int attempt = attemptCount.incrementAndGet();
if (attempt == 1) {
throw new RuntimeException("First attempt fails");
}
return null;
}).when(flakyRoutine).execute(anyString());

ClusterLeaderInitializationManager retryManager = new ClusterLeaderInitializationManager(
Collections.singletonList(flakyRoutine),
false, // concurrentInit
true, // blockUntilComplete
TEST_LOG_CONTEXT);

// First execute() - blocks and routine fails
retryManager.execute(TEST_CLUSTER);
assertEquals(attemptCount.get(), 1, "First attempt should have been made");

// Second execute() - async, routine retries and succeeds
retryManager.execute(TEST_CLUSTER);
Thread.sleep(100); // Wait for async execution
assertEquals(attemptCount.get(), 2, "Second attempt should have been made");

// Third execute() - async, routine should not execute again (already succeeded)
retryManager.execute(TEST_CLUSTER);
Thread.sleep(100);
assertEquals(attemptCount.get(), 2, "No third attempt should be made (already succeeded)");
}

/**
* Tests edge cases: multiple clusters and empty routine list.
*/
@Test(timeOut = TIMEOUT_MS)
public void testEdgeCases() throws InterruptedException {
// Case 1: First execute() blocks, subsequent calls are async
AtomicInteger cluster1Count = new AtomicInteger(0);
AtomicInteger cluster2Count = new AtomicInteger(0);

ClusterLeaderInitializationRoutine routine = mock(ClusterLeaderInitializationRoutine.class);
doAnswer(invocation -> {
String cluster = invocation.getArgument(0);
if ("cluster1".equals(cluster)) {
cluster1Count.incrementAndGet();
} else if ("cluster2".equals(cluster)) {
cluster2Count.incrementAndGet();
}
return null;
}).when(routine).execute(anyString());

ClusterLeaderInitializationManager multiClusterManager = new ClusterLeaderInitializationManager(
Collections.singletonList(routine),
false, // concurrentInit
true, // blockUntilComplete
TEST_LOG_CONTEXT);

// First execute() - blocks
multiClusterManager.execute("cluster1");
assertEquals(cluster1Count.get(), 1, "Cluster1 should be initialized");
assertEquals(cluster2Count.get(), 0, "Cluster2 should not be initialized yet");

// Second execute() - async (doesn't block)
multiClusterManager.execute("cluster2");
Thread.sleep(100); // Wait for async execution
assertEquals(cluster1Count.get(), 1, "Cluster1 count should not change");
assertEquals(cluster2Count.get(), 1, "Cluster2 should be initialized");

// Third execute() - async, cluster1 already initialized
multiClusterManager.execute("cluster1");
Thread.sleep(100);
assertEquals(cluster1Count.get(), 1, "Cluster1 should not re-initialize");

// Fourth execute() - async, cluster2 already initialized
multiClusterManager.execute("cluster2");
Thread.sleep(100);
assertEquals(cluster2Count.get(), 1, "Cluster2 should not re-initialize");

// Case 2: Empty routine list completes immediately
ClusterLeaderInitializationManager emptyManager = new ClusterLeaderInitializationManager(
Collections.emptyList(),
false, // concurrentInit
true, // blockUntilComplete
TEST_LOG_CONTEXT);

long startTime = System.currentTimeMillis();
emptyManager.execute(TEST_CLUSTER);
long executionTime = System.currentTimeMillis() - startTime;

assertTrue(executionTime < 100, "Empty routine list should complete immediately");
}
}
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test suite does not cover the scenario where multiple threads call execute() concurrently on the same ClusterLeaderInitializationManager instance. This is important to test because the hasExecutedOnce flag has thread-safety issues. Add a test that verifies concurrent execute() calls handle the first-execution blocking correctly without race conditions.

Copilot uses AI. Check for mistakes.
*/
@Test(timeOut = TIMEOUT_MS)
public void testEdgeCases() throws InterruptedException {
// Case 1: First execute() blocks, subsequent calls are async
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test comment "First execute() blocks, subsequent calls are async" at line 257 could be misleading. The blocking behavior is per-manager-instance, not per-cluster. After the first execute() call for any cluster, all subsequent execute() calls for any cluster (including different clusters) will be async. Consider clarifying the comment to state "First execute() on the manager blocks (for cluster1), subsequent execute() calls are async (including for cluster2)" to make the manager-instance-wide scope explicit.

Suggested change
// Case 1: First execute() blocks, subsequent calls are async
// Case 1: First execute() on the manager blocks (for cluster1), subsequent execute() calls are async (including for cluster2)

Copilot uses AI. Check for mistakes.
…ines complete on first execution

- Added BLOCK_UNTIL_INIT_ROUTINES_COMPLETE config key to control blocking behavior
- Modified ClusterLeaderInitializationManager to support blocking on first execution only
- Subsequent leadership transitions remain async for fast failover
- Added comprehensive logging with latency measurements:
  * Overall execution logs (mode, blocking status, first execution flag, routine count)
  * Per-routine logs with elapsed time in milliseconds
  * Error logs with timing information
- Added comprehensive unit tests with 4 test methods covering:
  * Blocking behavior (async, first execution blocking, subsequent async)
  * Execution modes (concurrent and sequential)
  * Failure handling (concurrent, sequential, retry logic)
  * Edge cases (multi-cluster, empty routines)
- Fixed test flakiness:
  * Increased timeout for cluster leader initialization routines (2m -> 3m)
  * Added producer flush calls in TestHybrid to ensure records are sent
- All tests pass successfully

The feature ensures critical first-time cluster initialization completes before
the controller starts serving traffic, while maintaining fast failover for
subsequent leadership transitions. Comprehensive logging enables monitoring
and debugging of initialization performance.
@sushantmane sushantmane force-pushed the ClusterLeaderInitializationRoutine-Block branch from baf31fa to 97ef141 Compare December 12, 2025 07:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant