Skip to content

Commit 84529a8

Browse files
hangc0276nicoloboschi
authored andcommitted
fix region aware placement policy use disk weight not work (#2981)
### Motivation When we meet the following conditions: 1. configured region aware placement policy 2. enable disk weight based placement 3. fallback random selection when selecting ensemble bookies, such as: - not enough regions - rack number less than 2 in one region It will throw the following exception, and create ledger failed. ``` 12:15:36.459 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/test_v2] Encountered unexpected error when creating ledger java.lang.NullPointerException: null at org.apache.bookkeeper.client.WeightedRandomSelectionImpl.getNextRandom(WeightedRandomSelectionImpl.java:150) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.selectRandomInternal(RackawareEnsemblePlacementPolicyImpl.java:748) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.selectRandom(RackawareEnsemblePlacementPolicyImpl.java:698) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.newEnsembleInternal(RackawareEnsemblePlacementPolicyImpl.java:409) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.newEnsemble(RackawareEnsemblePlacementPolicyImpl.java:372) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.newEnsemble(RackawareEnsemblePlacementPolicy.java:159) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.newEnsemble(RegionAwareEnsemblePlacementPolicy.java:303) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.BookieWatcherImpl.newEnsemble(BookieWatcherImpl.java:270) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.LedgerCreateOp.initiate(LedgerCreateOp.java:161) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.BookKeeper.asyncCreateLedger(BookKeeper.java:860) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncCreateLedger(ManagedLedgerImpl.java:3657) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.initializeBookKeeper(ManagedLedgerImpl.java:460) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.access$400(ManagedLedgerImpl.java:141) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$1.operationComplete(ManagedLedgerImpl.java:396) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$1.operationComplete(ManagedLedgerImpl.java:328) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.MetaStoreImpl.lambda$getManagedLedgerInfo$2(MetaStoreImpl.java:97) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) [?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?] at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [io.streamnative-bookkeeper-common-4.14.3.1.jar:4.14.3.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final] at java.lang.Thread.run(Thread.java:834) [?:?] ``` The root cause of this case it that in `selectRandomInternal`, the `wRselection` haven't ever update any bookie map and the filed `randomMax` and `cummulativeMap` doesn't initialized. ### Modification 1. update the `wRSelection`'s map on `selectRandomInternal` method whenever the `wRSelection` have ever set or not. (cherry picked from commit 002725e)
1 parent 2501088 commit 84529a8

File tree

3 files changed

+48
-14
lines changed

3 files changed

+48
-14
lines changed

Diff for: bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java

+13-12
Original file line numberDiff line numberDiff line change
@@ -715,20 +715,21 @@ protected List<BookieNode> selectRandomInternal(List<BookieNode> bookiesToSelect
715715
throw new BKNotEnoughBookiesException();
716716
}
717717
if (wRSelection == null) {
718-
Map<BookieNode, WeightedObject> rackMap = new HashMap<BookieNode, WeightedObject>();
719-
for (BookieNode n : bookiesToSelectFrom) {
720-
if (excludeBookies.contains(n)) {
721-
continue;
722-
}
723-
if (this.bookieInfoMap.containsKey(n)) {
724-
rackMap.put(n, this.bookieInfoMap.get(n));
725-
} else {
726-
rackMap.put(n, new BookieInfo());
727-
}
728-
}
729718
wRSelection = new WeightedRandomSelectionImpl<BookieNode>(this.maxWeightMultiple);
730-
wRSelection.updateMap(rackMap);
731719
}
720+
721+
Map<BookieNode, WeightedObject> rackMap = new HashMap<BookieNode, WeightedObject>();
722+
for (BookieNode n : bookiesToSelectFrom) {
723+
if (excludeBookies.contains(n)) {
724+
continue;
725+
}
726+
if (this.bookieInfoMap.containsKey(n)) {
727+
rackMap.put(n, this.bookieInfoMap.get(n));
728+
} else {
729+
rackMap.put(n, new BookieInfo());
730+
}
731+
}
732+
wRSelection.updateMap(rackMap);
732733
} else {
733734
Collections.shuffle(bookiesToSelectFrom);
734735
}

Diff for: bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java

-2
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,6 @@ public T getNextRandom() {
150150
Double randomNum = randomMax * Math.random();
151151
// find the nearest key in the map corresponding to the randomNum
152152
Double key = cummulativeMap.floorKey(randomNum);
153-
//LOG.info("Random max: {} CummulativeMap size: {} selected key: {}", randomMax, cummulativeMap.size(),
154-
// key);
155153
return cummulativeMap.get(key);
156154
} finally {
157155
rwLock.readLock().unlock();

Diff for: bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java

+35
Original file line numberDiff line numberDiff line change
@@ -1572,4 +1572,39 @@ public void testNewEnsembleSetWithFiveRegions() throws Exception {
15721572
fail("Should not get not enough bookies exception even there is only one rack.");
15731573
}
15741574
}
1575+
1576+
public void testRegionsWithDiskWeight() throws Exception {
1577+
repp.uninitalize();
1578+
repp = new RegionAwareEnsemblePlacementPolicy();
1579+
conf.setProperty(REPP_ENABLE_VALIDATION, false);
1580+
conf.setDiskWeightBasedPlacementEnabled(true);
1581+
repp.initialize(conf, Optional.empty(), timer, DISABLE_ALL,
1582+
NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
1583+
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
1584+
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
1585+
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
1586+
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
1587+
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
1588+
1589+
// update dns mapping
1590+
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1");
1591+
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region2/r3");
1592+
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region3/r11");
1593+
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region4/r13");
1594+
StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region5/r23");
1595+
// Update cluster
1596+
Set<BookieId> addrs = new HashSet<BookieId>();
1597+
addrs.add(addr1.toBookieId());
1598+
addrs.add(addr2.toBookieId());
1599+
addrs.add(addr3.toBookieId());
1600+
addrs.add(addr4.toBookieId());
1601+
addrs.add(addr5.toBookieId());
1602+
1603+
repp.onClusterChanged(addrs, new HashSet<BookieId>());
1604+
1605+
List<BookieId> ensemble = repp.newEnsemble(3, 3, 2, null,
1606+
new HashSet<>()).getResult();
1607+
1608+
assertEquals(3, ensemble.size());
1609+
}
15751610
}

0 commit comments

Comments
 (0)