diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index 676ae78753002..21c84a434b3ff 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -657,13 +657,15 @@ private Map performLoadBalancingRevocations( "connector", configured.connectors().size(), workers, - WorkerLoad::connectors + WorkerLoad::connectors, + Function.identity() ); Map> taskRevocations = loadBalancingRevocations( "task", configured.tasks().size(), workers, - WorkerLoad::tasks + WorkerLoad::tasks, + ConnectorTaskId::connector ); connectorRevocations.forEach((worker, revoked) -> @@ -680,7 +682,8 @@ private Map> loadBalancingRevocations( String allocatedResourceName, int totalToAllocate, Collection workers, - Function> workerAllocation + Function> workerAllocation, + Function allocationGrouper ) { int totalWorkers = workers.size(); // The minimum instances of this resource that should be assigned to each worker @@ -736,7 +739,7 @@ private Map> loadBalancingRevocations( Set revokedFromWorker = new LinkedHashSet<>(); result.put(worker.worker(), revokedFromWorker); - Iterator currentWorkerAllocation = workerAllocation.apply(worker).iterator(); + Iterator currentWorkerAllocation = new BalancedIterator(workerAllocation.apply(worker), allocationGrouper); // Revoke resources from the worker until it isn't allocated any more than it should be for (int numRevoked = 0; currentAllocationSizeForWorker - numRevoked > maxAllocationForWorker; numRevoked++) { if (!currentWorkerAllocation.hasNext()) { @@ -793,6 +796,46 @@ protected void assignConnectors(List workerAssignment, Collection implements Iterator { + + private final Map> grouped; + private final List keys; + + private int k; + + public BalancedIterator(Collection collection, Function allocationGrouper) { + this.k = 0; + this.grouped = collection.stream().collect(Collectors.groupingBy( + allocationGrouper, + Collectors.collectingAndThen( + Collectors.toList(), + List::iterator + ) + )); + this.keys = collection.stream() + .map(allocationGrouper) + .distinct() + .collect(Collectors.toList()); + } + + @Override + public boolean hasNext() { + return grouped.values().stream().anyMatch(Iterator::hasNext); + } + + @Override + public E next() { + while (!this.keys.isEmpty() && k < k + this.keys.size()) { + Iterator iterator = grouped.get(this.keys.get(k % this.keys.size())); + k++; + if (iterator.hasNext()) { + return iterator.next(); + } + } + return null; + } + } + /** * Perform a round-robin assignment of tasks to workers with existing worker load. This * assignment tries to balance the load between workers, by assigning tasks to workers that @@ -805,7 +848,7 @@ protected void assignTasks(List workerAssignment, Collection load = tasks.iterator(); + Iterator load = new BalancedIterator<>(tasks, ConnectorTaskId::connector); while (load.hasNext()) { int firstLoad = first.tasksSize(); int upTo = IntStream.range(0, workerAssignment.size()) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index b59c1863a9179..ea6ca0ae77558 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -131,6 +131,46 @@ public void testTaskAssignmentWhenWorkerJoins() { assertEmptyAssignment(); } + @Test + public void testConnectorWellBalancedOnScaleOut() { + // Customize assignor for this test case + time = new MockTime(); + initAssignor(); + connectors.clear(); + + // Add first connector + addNewConnector("connector1", 12); + performStandardRebalance(); + assertDelay(0); + + // add second connector + addNewConnector("connector2", 12); + performStandardRebalance(); + assertDelay(0); + + // add second worker + addNewEmptyWorkers("worker2"); + performStandardRebalance(); + performStandardRebalance(); + assertDelay(0); + + // add third worker + addNewEmptyWorkers("worker3"); + performStandardRebalance(); + performStandardRebalance(); + assertDelay(0); + + // assert the connectors are well balanced + // over the workers + assertEquals(3, memberAssignments.size()); + memberAssignments.forEach((k, v) -> { + Map> countsByConnector = v.tasks().stream().collect(Collectors.groupingBy(ConnectorTaskId::connector)); + assertEquals(countsByConnector.size(), 2); + countsByConnector.forEach((k2, v2) -> assertEquals(v2.size(), 4)); + }); + assertBalancedAndCompleteAllocation(); + } + @Test public void testAssignmentsWhenWorkersJoinAfterRevocations() { // Customize assignor for this test case