-
Notifications
You must be signed in to change notification settings - Fork 58
[ASSIGNOR] Implement CostAwareAssignor #1524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
harryteng9527
wants to merge
67
commits into
opensource4you:main
Choose a base branch
from
harryteng9527:impl-assignor
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 5 commits
Commits
Show all changes
67 commits
Select commit
Hold shift + click to select a range
011ccb3
Implement NetworkIngressAssignor
harryteng9527 f77a8c7
Add a test for checking greedyAssign
harryteng9527 34f8492
Add round-robin assign when the cost equals zero
harryteng9527 17fa82c
spotless
harryteng9527 2f6c7e7
Merge branch 'main' into impl-assignor
harryteng9527 ed52d24
add throwing exception when there is no mbeanObjects
harryteng9527 a9938e8
Merge branch 'main' into impl-assignor
harryteng9527 9a1f57e
spotless
harryteng9527 152e5bb
add more condition to verify whether there are sufficient metrics or not
harryteng9527 1e5cb45
Merge branch 'main' into impl-assignor
harryteng9527 a3e99b8
Add a parameter to set the waiting time that wait for fetch beanObject
harryteng9527 18a98ce
mask ClusterInfo with subscribed topics
harryteng9527 748259e
Add calculating the traffic interval to the score of cost
harryteng9527 03afbb6
Merge branch 'main' into impl-assignor
harryteng9527 25f84c8
tweak and add comment
harryteng9527 29c569c
rename and add comment
harryteng9527 e92a050
rename and add more comment
harryteng9527 9d85e8e
Move Kafka configuration to ours
harryteng9527 c2a8293
Add test
harryteng9527 9c514d3
Merge branch 'main' into impl-assignor
harryteng9527 5f389f1
Add ClusterInfo masked
harryteng9527 25ea0ee
Merge branch 'main' into impl-assignor
harryteng9527 b1ae99a
remove masked
harryteng9527 1e0b6bd
add new assign methods
harryteng9527 f1ec882
add comment and modify greedyAssign
harryteng9527 3690aa9
Merge branch 'main' into impl-assignor
harryteng9527 84b6ed4
spotless
harryteng9527 3e4146d
Add throw exception and change field type
harryteng9527 e0ccc02
Move the fields to sub-class
harryteng9527 b1e0acc
Merge branch 'main' into impl-assignor
harryteng9527 9316267
modify retry machanism
harryteng9527 574abf3
Fix coding style
harryteng9527 349da0b
Merge branch 'main' into impl-assignor
harryteng9527 897d4e8
Merge branch 'main' into impl-assignor
harryteng9527 c2d73b5
Reference feedback to assign partitions
harryteng9527 4d4f0b4
Add test for greedyAssign
harryteng9527 1407b6d
Pass config into NetworkIngressCost
harryteng9527 52553a5
Replace flatMap to map
harryteng9527 5f2eb0d
Revise lambda to avoid creating unnecessary object
harryteng9527 e594ce8
Add comment for greedyAssign
harryteng9527 38a7d83
Merge branch 'main' into impl-assignor
harryteng9527 e5aaa8a
Modify object name
harryteng9527 279128e
Separate assign and check incompatibility
harryteng9527 489b071
Merge branch 'main' into impl-assignor
harryteng9527 03517c7
Fix style
harryteng9527 8406bf6
Add Assign interface to move greedy impl to it
harryteng9527 534d7cf
Add the interface to reassign based on incompatible
harryteng9527 cc6582f
Change name
harryteng9527 10a87de
Fix test
harryteng9527 8e925cc
Rename interfaces
harryteng9527 9db68d2
revise shuffle
harryteng9527 7462966
Reduce the complexity of shuffle
harryteng9527 2fade96
Add test
harryteng9527 a6b9b3b
Merge branch 'origin/main' into impl-assignor
harryteng9527 74f24cf
Merge branch 'origin/main' into impl-assignor
harryteng9527 af35031
Spotless
harryteng9527 3e4b57c
Add wait
harryteng9527 f842f00
Revise wait
harryteng9527 3b0258a
Remove retry and test
harryteng9527 353eca7
Merge branch 'origin/main' into impl-assignor
harryteng9527 80219e4
Merge branch 'main' into impl-assignor
harryteng9527 0d5820e
Use new shuffler
harryteng9527 7e5aac6
Add filter to avoid Null pointer and make skewCostLimiter more strict
harryteng9527 98be5ee
Modify randomShuffler signature, replace config to shuffleTime
harryteng9527 9fb2b11
Add GeneratorTest and modify Hint
harryteng9527 43a25f9
Fix Hint
harryteng9527 d4f55a8
Merge branch 'main' into impl-assignor
harryteng9527 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
154 changes: 154 additions & 0 deletions
154
common/src/main/java/org/astraea/common/assignor/NetworkIngressAssignor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,154 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.astraea.common.assignor; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.function.Supplier; | ||
| import java.util.stream.Collectors; | ||
| import org.astraea.common.Utils; | ||
| import org.astraea.common.admin.ClusterInfo; | ||
| import org.astraea.common.admin.Replica; | ||
| import org.astraea.common.admin.TopicPartition; | ||
|
|
||
| public class NetworkIngressAssignor extends Assignor { | ||
|
|
||
| @Override | ||
| protected Map<String, List<TopicPartition>> assign( | ||
| Map<String, org.astraea.common.assignor.Subscription> subscriptions, | ||
| ClusterInfo clusterInfo) { | ||
| var consumers = subscriptions.keySet(); | ||
| // 1. check unregister node. if there are unregister nodes, register them | ||
| registerUnregisterNode(clusterInfo); | ||
| // wait for clusterBean | ||
|
harryteng9527 marked this conversation as resolved.
Outdated
|
||
| Utils.sleep(Duration.ofSeconds(1)); | ||
| var clusterBean = metricCollector.clusterBean(); | ||
|
|
||
| // 2. parse subscription , get all topic consumer subscribe | ||
| var topics = topics(subscriptions); | ||
| var networkCost = costFunction.partitionCost(clusterInfo, clusterBean).value(); | ||
|
|
||
| // key = broker id, value = partition and its cost | ||
| var tpCostPerBroker = | ||
| clusterInfo | ||
| .replicaStream() | ||
| .filter(Replica::isLeader) | ||
| .filter(Replica::isOnline) | ||
| .filter(replica -> topics.contains(replica.topic())) | ||
| .collect(Collectors.groupingBy(replica -> replica.nodeInfo().id())) | ||
| .entrySet() | ||
| .stream() | ||
| .map( | ||
| e -> | ||
| Map.entry( | ||
| e.getKey(), | ||
| e.getValue().stream() | ||
| .map( | ||
| replica -> | ||
| Map.entry( | ||
| replica.topicPartition(), | ||
| networkCost.get(replica.topicPartition()))) | ||
| .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))) | ||
| .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); | ||
|
|
||
| return greedyAssign(tpCostPerBroker, consumers); | ||
| } | ||
|
|
||
| /** | ||
| * register unregistered nodes if present. if we didn't register unregistered nodes, we would miss | ||
| * the beanObjects from the nodes | ||
| * | ||
| * @param clusterInfo Currently cluster information. | ||
| */ | ||
| private void registerUnregisterNode(ClusterInfo clusterInfo) { | ||
| var unregister = checkUnregister(clusterInfo.nodes()); | ||
| if (!unregister.isEmpty()) registerJMX(unregister); | ||
| } | ||
|
|
||
| /** | ||
| * perform assign algorithm to get balanced assignment and ensure that 1. each consumer would | ||
| * receive the cost that are as close as possible to each other. 2. similar loads within a node | ||
| * would be assigned to the same consumer. | ||
| * | ||
| * @param costs the tp and their cost within a node | ||
| * @param consumers consumers' name | ||
| * @return the assignment | ||
| */ | ||
| Map<String, List<TopicPartition>> greedyAssign( | ||
| Map<Integer, Map<TopicPartition, Double>> costs, Set<String> consumers) { | ||
| // initial | ||
| var assignment = new HashMap<String, List<TopicPartition>>(); | ||
| for (var consumer : consumers) { | ||
| assignment.put(consumer, new ArrayList<>()); | ||
| } | ||
| var costPerConsumer = | ||
| assignment.keySet().stream() | ||
| .map(c -> Map.entry(c, (double) 0)) | ||
| .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); | ||
| costs | ||
| .values() | ||
| .forEach( | ||
| costPerBroker -> { | ||
| if (costPerBroker.values().stream().mapToDouble(x -> x).sum() == 0) { | ||
| // if there are no cost, round-robin assign per node | ||
| var iter = consumers.iterator(); | ||
| for (var tp : costPerBroker.keySet()) { | ||
| assignment.get(iter.next()).add(tp); | ||
| if (!iter.hasNext()) iter = consumers.iterator(); | ||
| } | ||
| } else { | ||
| var sortedCost = new LinkedHashMap<TopicPartition, Double>(); | ||
| costPerBroker.entrySet().stream() | ||
| .sorted(Map.Entry.comparingByValue()) | ||
| .forEach(entry -> sortedCost.put(entry.getKey(), entry.getValue())); | ||
| var tmpCostPerConsumer = new HashMap<>(costPerConsumer); | ||
| Supplier<String> largestCostConsumer = | ||
| () -> | ||
| Collections.max(tmpCostPerConsumer.entrySet(), Map.Entry.comparingByValue()) | ||
| .getKey(); | ||
| var consumer = largestCostConsumer.get(); | ||
| var lastValue = Collections.min(sortedCost.values()); | ||
|
|
||
| for (var e : sortedCost.entrySet()) { | ||
| var tp = e.getKey(); | ||
| var cost = e.getValue(); | ||
| // TODO: threshold need to be set an appropriate value | ||
| if (cost - lastValue > 0.05) { | ||
| tmpCostPerConsumer.remove(consumer); | ||
| consumer = largestCostConsumer.get(); | ||
| } | ||
|
|
||
| assignment.get(consumer).add(tp); | ||
| costPerConsumer.computeIfPresent(consumer, (ignore, c) -> c + cost); | ||
| lastValue = cost; | ||
| } | ||
| } | ||
| }); | ||
| return assignment; | ||
| } | ||
|
|
||
| @Override | ||
| public String name() { | ||
| return "networkIngress"; | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.