From c18ba01295bc2c2f8d08b0033348fd37394905bf Mon Sep 17 00:00:00 2001 From: yazgoo Date: Fri, 28 Jun 2024 11:47:39 +0200 Subject: [PATCH 1/8] fix KAFKA-17049 --- .../IncrementalCooperativeAssignor.java | 18 ++++++++++----- .../IncrementalCooperativeAssignorTest.java | 22 +++++++++++++++++++ 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index 676ae78753002..e35b211e1679f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -736,10 +736,20 @@ private Map> loadBalancingRevocations( Set revokedFromWorker = new LinkedHashSet<>(); result.put(worker.worker(), revokedFromWorker); - Iterator currentWorkerAllocation = workerAllocation.apply(worker).iterator(); + Map> currentWorkerAllocationByPrefix = workerAllocation.apply(worker).stream().collect( + Collectors.groupingBy(item -> item.toString().replaceAll("-.*", ""))); + List keys = new ArrayList<>(currentWorkerAllocationByPrefix.keySet()); + Map> currentWokrerAllocationByPrefixIterator = currentWorkerAllocationByPrefix.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().iterator())); // Revoke resources from the worker until it isn't allocated any more than it should be - for (int numRevoked = 0; currentAllocationSizeForWorker - numRevoked > maxAllocationForWorker; numRevoked++) { - if (!currentWorkerAllocation.hasNext()) { + int numRevoked = 0; + for (int i = 0; currentAllocationSizeForWorker - numRevoked > maxAllocationForWorker; i++) { + Iterator currentWorkerAllocation = currentWokrerAllocationByPrefixIterator.get(keys.get(i % keys.size())); + if (currentWorkerAllocation.hasNext()) { + E revocation = currentWorkerAllocation.next(); + revokedFromWorker.add(revocation); + numRevoked++; + } else if (i == keys.size() - 1) { // Should never happen, but better to log a warning and move on than die and fail the whole rebalance if it does log.warn( "Unexpectedly ran out of {}s to revoke from worker {} while performing load-balancing revocations; " + @@ -751,8 +761,6 @@ private Map> loadBalancingRevocations( ); break; } - E revocation = currentWorkerAllocation.next(); - revokedFromWorker.add(revocation); } } return result; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index b59c1863a9179..32079a05c3dee 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -131,6 +131,28 @@ public void testTaskAssignmentWhenWorkerJoins() { assertEmptyAssignment(); } + @Test + public void checkIndividualConnectorBalance() { + connectors.clear(); + addNewConnector("connector1", 12); + performStandardRebalance(); + addNewConnector("connector2", 12); + performStandardRebalance(); + addNewEmptyWorkers("worker2"); + performStandardRebalance(); + performStandardRebalance(); + addNewEmptyWorkers("worker3"); + performStandardRebalance(); + performStandardRebalance(); + System.out.println("====================="); + memberAssignments.forEach((k, v) -> { + System.out.println((" " + k + " ->\tc=" + v.connectors() + "\tt=" + v.tasks()).replaceAll("connector", "c")); + Map> countsByConnector = v.tasks().stream().collect(Collectors.groupingBy(ConnectorTaskId::connector)); + assertEquals(countsByConnector.size(), 2); + countsByConnector.forEach((k2, v2) -> assertEquals(v2.size(), 4)); + }); + } + @Test public void testAssignmentsWhenWorkersJoinAfterRevocations() { // Customize assignor for this test case From f1341672de1f244870e702db74d5d61ea722b26e Mon Sep 17 00:00:00 2001 From: yazgoo Date: Fri, 28 Jun 2024 12:05:15 +0200 Subject: [PATCH 2/8] generalization --- .../IncrementalCooperativeAssignor.java | 17 ++++++++++++----- .../IncrementalCooperativeAssignorTest.java | 3 +-- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index e35b211e1679f..60486c5539a57 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -657,13 +657,15 @@ private Map performLoadBalancingRevocations( "connector", configured.connectors().size(), workers, - WorkerLoad::connectors + WorkerLoad::connectors, + x -> x.toString() ); Map> taskRevocations = loadBalancingRevocations( "task", configured.tasks().size(), workers, - WorkerLoad::tasks + WorkerLoad::tasks, + x -> x.connector() ); connectorRevocations.forEach((worker, revoked) -> @@ -680,7 +682,8 @@ private Map> loadBalancingRevocations( String allocatedResourceName, int totalToAllocate, Collection workers, - Function> workerAllocation + Function> workerAllocation, + Function allocationGrouper ) { int totalWorkers = workers.size(); // The minimum instances of this resource that should be assigned to each worker @@ -737,10 +740,11 @@ private Map> loadBalancingRevocations( result.put(worker.worker(), revokedFromWorker); Map> currentWorkerAllocationByPrefix = workerAllocation.apply(worker).stream().collect( - Collectors.groupingBy(item -> item.toString().replaceAll("-.*", ""))); + Collectors.groupingBy(item -> allocationGrouper.apply(item))); List keys = new ArrayList<>(currentWorkerAllocationByPrefix.keySet()); Map> currentWokrerAllocationByPrefixIterator = currentWorkerAllocationByPrefix.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().iterator())); + Map exhausted = currentWorkerAllocationByPrefix.keySet().stream().collect(Collectors.toMap(Function.identity(), k -> false)); // Revoke resources from the worker until it isn't allocated any more than it should be int numRevoked = 0; for (int i = 0; currentAllocationSizeForWorker - numRevoked > maxAllocationForWorker; i++) { @@ -749,7 +753,10 @@ private Map> loadBalancingRevocations( E revocation = currentWorkerAllocation.next(); revokedFromWorker.add(revocation); numRevoked++; - } else if (i == keys.size() - 1) { + } else { + exhausted.put(keys.get(i % keys.size()), true); + } + if (exhausted.values().stream().allMatch(v -> v)) { // Should never happen, but better to log a warning and move on than die and fail the whole rebalance if it does log.warn( "Unexpectedly ran out of {}s to revoke from worker {} while performing load-balancing revocations; " + diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index 32079a05c3dee..6fb4aa4e321b5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -144,9 +144,8 @@ public void checkIndividualConnectorBalance() { addNewEmptyWorkers("worker3"); performStandardRebalance(); performStandardRebalance(); - System.out.println("====================="); + assertEquals(3, memberAssignments.size()); memberAssignments.forEach((k, v) -> { - System.out.println((" " + k + " ->\tc=" + v.connectors() + "\tt=" + v.tasks()).replaceAll("connector", "c")); Map> countsByConnector = v.tasks().stream().collect(Collectors.groupingBy(ConnectorTaskId::connector)); assertEquals(countsByConnector.size(), 2); countsByConnector.forEach((k2, v2) -> assertEquals(v2.size(), 4)); From 4ed6a852193dfa0283a530a09368416c79892883 Mon Sep 17 00:00:00 2001 From: yazgoo Date: Mon, 1 Jul 2024 15:08:44 +0200 Subject: [PATCH 3/8] refacto + handle assignTask --- .../IncrementalCooperativeAssignor.java | 58 +++++++++++++------ .../IncrementalCooperativeAssignorTest.java | 1 + 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index 60486c5539a57..97d9d177ea382 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -739,24 +739,10 @@ private Map> loadBalancingRevocations( Set revokedFromWorker = new LinkedHashSet<>(); result.put(worker.worker(), revokedFromWorker); - Map> currentWorkerAllocationByPrefix = workerAllocation.apply(worker).stream().collect( - Collectors.groupingBy(item -> allocationGrouper.apply(item))); - List keys = new ArrayList<>(currentWorkerAllocationByPrefix.keySet()); - Map> currentWokrerAllocationByPrefixIterator = currentWorkerAllocationByPrefix.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().iterator())); - Map exhausted = currentWorkerAllocationByPrefix.keySet().stream().collect(Collectors.toMap(Function.identity(), k -> false)); + Iterator currentWorkerAllocation = new BalancedIterator(workerAllocation.apply(worker), allocationGrouper); // Revoke resources from the worker until it isn't allocated any more than it should be - int numRevoked = 0; - for (int i = 0; currentAllocationSizeForWorker - numRevoked > maxAllocationForWorker; i++) { - Iterator currentWorkerAllocation = currentWokrerAllocationByPrefixIterator.get(keys.get(i % keys.size())); - if (currentWorkerAllocation.hasNext()) { - E revocation = currentWorkerAllocation.next(); - revokedFromWorker.add(revocation); - numRevoked++; - } else { - exhausted.put(keys.get(i % keys.size()), true); - } - if (exhausted.values().stream().allMatch(v -> v)) { + for (int numRevoked = 0; currentAllocationSizeForWorker - numRevoked > maxAllocationForWorker; numRevoked++) { + if (!currentWorkerAllocation.hasNext()) { // Should never happen, but better to log a warning and move on than die and fail the whole rebalance if it does log.warn( "Unexpectedly ran out of {}s to revoke from worker {} while performing load-balancing revocations; " + @@ -768,6 +754,8 @@ private Map> loadBalancingRevocations( ); break; } + E revocation = currentWorkerAllocation.next(); + revokedFromWorker.add(revocation); } } return result; @@ -808,6 +796,38 @@ protected void assignConnectors(List workerAssignment, Collection implements Iterator { + + Map> grouped; + List keys; + int k; + + public BalancedIterator(Collection collection, Function allocationGrouper) { + this.k = 0; + this.grouped = collection.stream() + .collect(Collectors.groupingBy(item -> allocationGrouper.apply(item))) + .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().iterator())); + this.keys = new ArrayList<>(grouped.keySet()); + } + + @Override + public boolean hasNext() { + return grouped.values().stream().anyMatch(Iterator::hasNext); + } + + @Override + public E next() { + for (; k < k + this.keys.size(); ) { + Iterator iterator = grouped.get(this.keys.get(k % this.keys.size())); + k++; + if (iterator.hasNext()) { + return iterator.next(); + } + } + return null; + } + } + /** * Perform a round-robin assignment of tasks to workers with existing worker load. This * assignment tries to balance the load between workers, by assigning tasks to workers that @@ -817,10 +837,10 @@ protected void assignConnectors(List workerAssignment, Collection workerAssignment, Collection tasks) { - workerAssignment.sort(WorkerLoad.taskComparator()); + WorkerLoad first = workerAssignment.get(0); - Iterator load = tasks.iterator(); + Iterator load = new BalancedIterator(tasks, ConnectorTaskId::connector); while (load.hasNext()) { int firstLoad = first.tasksSize(); int upTo = IntStream.range(0, workerAssignment.size()) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index 6fb4aa4e321b5..996af881b9161 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -150,6 +150,7 @@ public void checkIndividualConnectorBalance() { assertEquals(countsByConnector.size(), 2); countsByConnector.forEach((k2, v2) -> assertEquals(v2.size(), 4)); }); + assertBalancedAndCompleteAllocation(); } @Test From a3a858830a81569a3ee81e323d5ea0ef43af7276 Mon Sep 17 00:00:00 2001 From: yazgoo Date: Mon, 1 Jul 2024 17:45:09 +0200 Subject: [PATCH 4/8] Update connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java Co-authored-by: Chris Egerton --- .../runtime/distributed/IncrementalCooperativeAssignor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index 97d9d177ea382..913da4b21fb2b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -658,7 +658,7 @@ private Map performLoadBalancingRevocations( configured.connectors().size(), workers, WorkerLoad::connectors, - x -> x.toString() + Function.identity() ); Map> taskRevocations = loadBalancingRevocations( "task", From 601250771e6e3b849b785a6fe8eedd790551dbbd Mon Sep 17 00:00:00 2001 From: yazgoo Date: Mon, 1 Jul 2024 17:45:19 +0200 Subject: [PATCH 5/8] Update connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java Co-authored-by: Chris Egerton --- .../runtime/distributed/IncrementalCooperativeAssignor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index 913da4b21fb2b..504db83f5b7d6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -665,7 +665,7 @@ private Map performLoadBalancingRevocations( configured.tasks().size(), workers, WorkerLoad::tasks, - x -> x.connector() + ConnectorTaskId::connector ); connectorRevocations.forEach((worker, revoked) -> From 976711d2282c3aabde36cef74dae4315bfb6f7d5 Mon Sep 17 00:00:00 2001 From: yazgoo Date: Mon, 1 Jul 2024 17:46:28 +0200 Subject: [PATCH 6/8] Apply suggestions from code review Co-authored-by: Chris Egerton --- .../IncrementalCooperativeAssignor.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index 504db83f5b7d6..7ced8f9578ad3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -798,15 +798,20 @@ protected void assignConnectors(List workerAssignment, Collection implements Iterator { - Map> grouped; - List keys; - int k; + private final Map> grouped; + private final List keys; + + private int k; public BalancedIterator(Collection collection, Function allocationGrouper) { this.k = 0; - this.grouped = collection.stream() - .collect(Collectors.groupingBy(item -> allocationGrouper.apply(item))) - .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().iterator())); + this.grouped = collection.stream().collect(Collectors.groupingBy( + allocationGrouper, + Collectors.collectingAndThen( + Collectors.toList(), + List::iterator + ) + )); this.keys = new ArrayList<>(grouped.keySet()); } @@ -837,10 +842,9 @@ public E next() { * @param tasks the tasks to be assigned */ protected void assignTasks(List workerAssignment, Collection tasks) { - WorkerLoad first = workerAssignment.get(0); - Iterator load = new BalancedIterator(tasks, ConnectorTaskId::connector); + Iterator load = new BalancedIterator<>(tasks, ConnectorTaskId::connector); while (load.hasNext()) { int firstLoad = first.tasksSize(); int upTo = IntStream.range(0, workerAssignment.size()) From dcf5e7ed1177fb1a6e935fed2aeae84152386c24 Mon Sep 17 00:00:00 2001 From: yazgoo Date: Tue, 2 Jul 2024 09:10:51 +0200 Subject: [PATCH 7/8] =?UTF-8?q?=F0=9F=91=8C=20review?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IncrementalCooperativeAssignor.java | 2 +- .../IncrementalCooperativeAssignorTest.java | 20 ++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index 7ced8f9578ad3..c74c78f6f2d69 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -822,7 +822,7 @@ public boolean hasNext() { @Override public E next() { - for (; k < k + this.keys.size(); ) { + while (!this.keys.isEmpty() && k < k + this.keys.size()) { Iterator iterator = grouped.get(this.keys.get(k % this.keys.size())); k++; if (iterator.hasNext()) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index 996af881b9161..ea6ca0ae77558 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -132,18 +132,36 @@ public void testTaskAssignmentWhenWorkerJoins() { } @Test - public void checkIndividualConnectorBalance() { + public void testConnectorWellBalancedOnScaleOut() { + // Customize assignor for this test case + time = new MockTime(); + initAssignor(); connectors.clear(); + + // Add first connector addNewConnector("connector1", 12); performStandardRebalance(); + assertDelay(0); + + // add second connector addNewConnector("connector2", 12); performStandardRebalance(); + assertDelay(0); + + // add second worker addNewEmptyWorkers("worker2"); performStandardRebalance(); performStandardRebalance(); + assertDelay(0); + + // add third worker addNewEmptyWorkers("worker3"); performStandardRebalance(); performStandardRebalance(); + assertDelay(0); + + // assert the connectors are well balanced + // over the workers assertEquals(3, memberAssignments.size()); memberAssignments.forEach((k, v) -> { Map> countsByConnector = v.tasks().stream().collect(Collectors.groupingBy(ConnectorTaskId::connector)); From 73bece1a4b8e0a3caecf1fc2b5a83a34817eee18 Mon Sep 17 00:00:00 2001 From: yazgoo Date: Wed, 10 Jul 2024 15:01:02 +0200 Subject: [PATCH 8/8] fix testWorkerJoiningDuringDelayedRebalance --- .../runtime/distributed/IncrementalCooperativeAssignor.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index c74c78f6f2d69..21c84a434b3ff 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -812,7 +812,10 @@ public BalancedIterator(Collection collection, Function allocation List::iterator ) )); - this.keys = new ArrayList<>(grouped.keySet()); + this.keys = collection.stream() + .map(allocationGrouper) + .distinct() + .collect(Collectors.toList()); } @Override @@ -842,6 +845,7 @@ public E next() { * @param tasks the tasks to be assigned */ protected void assignTasks(List workerAssignment, Collection tasks) { + workerAssignment.sort(WorkerLoad.taskComparator()); WorkerLoad first = workerAssignment.get(0); Iterator load = new BalancedIterator<>(tasks, ConnectorTaskId::connector);