Skip to content

Commit 2501088

Browse files
hangc0276nicoloboschi
authored andcommitted
Fix region/rack aware placement police replace bookie bug (#2642)
(cherry picked from commit 4443c60)
1 parent 79aae04 commit 2501088

File tree

3 files changed

+57
-0
lines changed

3 files changed

+57
-0
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java

+1
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public void handleBookiesThatJoined(Set<BookieId> joinedBookies) {
129129
BookieNode node = createBookieNode(addr);
130130
topology.add(node);
131131
knownBookies.put(addr, node);
132+
historyBookies.put(addr, node);
132133
String region = getLocalRegion(node);
133134
if (null == perRegionPlacement.get(region)) {
134135
perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy()

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java

+8
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
6262
static final Logger LOG = LoggerFactory.getLogger(TopologyAwareEnsemblePlacementPolicy.class);
6363
public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
6464
protected final Map<BookieId, BookieNode> knownBookies = new HashMap<BookieId, BookieNode>();
65+
protected final Map<BookieId, BookieNode> historyBookies = new HashMap<BookieId, BookieNode>();
6566
protected final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
6667
protected Map<BookieNode, WeightedObject> bookieInfoMap = new HashMap<BookieNode, WeightedObject>();
6768
// Initialize to empty set
@@ -717,6 +718,7 @@ public void handleBookiesThatJoined(Set<BookieId> joinedBookies) {
717718
BookieNode node = createBookieNode(addr);
718719
topology.add(node);
719720
knownBookies.put(addr, node);
721+
historyBookies.put(addr, node);
720722
if (this.isWeighted) {
721723
this.bookieInfoMap.putIfAbsent(node, new BookieInfo());
722724
}
@@ -750,6 +752,7 @@ public void onBookieRackChange(List<BookieId> bookieAddressList) {
750752
topology.remove(node);
751753
topology.add(newNode);
752754
knownBookies.put(bookieAddress, newNode);
755+
historyBookies.put(bookieAddress, newNode);
753756
}
754757
} catch (IllegalArgumentException | NetworkTopologyImpl.InvalidTopologyException e) {
755758
LOG.error("Failed to update bookie rack info: {} ", bookieAddress, e);
@@ -798,6 +801,11 @@ protected String resolveNetworkLocation(BookieId addr) {
798801
try {
799802
return NetUtils.resolveNetworkLocation(dnsResolver, bookieAddressResolver.resolve(addr));
800803
} catch (BookieAddressResolver.BookieIdNotResolvedException err) {
804+
BookieNode historyBookie = historyBookies.get(addr);
805+
if (null != historyBookie) {
806+
return historyBookie.getNetworkLocation();
807+
}
808+
801809
LOG.error("Cannot resolve bookieId {} to a network address, resolving as {}", addr,
802810
NetworkTopology.DEFAULT_REGION_AND_RACK, err);
803811
return NetworkTopology.DEFAULT_REGION_AND_RACK;

bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java

+48
Original file line numberDiff line numberDiff line change
@@ -1421,6 +1421,54 @@ private int getNumCoveredRegionsInWriteQuorum(List<BookieId> ensemble, int write
14211421
return numCoveredWriteQuorums;
14221422
}
14231423

1424+
@Test
1425+
public void testRecoveryOnNodeFailure() throws Exception {
1426+
repp.uninitalize();
1427+
repp = new RegionAwareEnsemblePlacementPolicy();
1428+
repp.initialize(conf, Optional.empty(), timer, DISABLE_ALL,
1429+
NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
1430+
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
1431+
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
1432+
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
1433+
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
1434+
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
1435+
BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.7", 3181);
1436+
1437+
// Update dns mapping
1438+
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1");
1439+
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/r1");
1440+
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/r2");
1441+
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/r2");
1442+
StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region3/r3");
1443+
StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/region3/r3");
1444+
1445+
// Update cluster
1446+
Set<BookieId> addrs = new HashSet<>();
1447+
addrs.add(addr1.toBookieId());
1448+
addrs.add(addr2.toBookieId());
1449+
addrs.add(addr3.toBookieId());
1450+
addrs.add(addr4.toBookieId());
1451+
addrs.add(addr5.toBookieId());
1452+
addrs.add(addr6.toBookieId());
1453+
1454+
repp.onClusterChanged(addrs, new HashSet<>());
1455+
1456+
Set<BookieId> bookiesLeftSet = new HashSet<>();
1457+
bookiesLeftSet.add(addr1.toBookieId());
1458+
repp.handleBookiesThatLeft(bookiesLeftSet);
1459+
1460+
List<BookieId> currentEnsemble = new ArrayList<>();
1461+
currentEnsemble.add(addr1.toBookieId());
1462+
currentEnsemble.add(addr3.toBookieId());
1463+
currentEnsemble.add(addr6.toBookieId());
1464+
1465+
EnsemblePlacementPolicy.PlacementResult<BookieId> placementResult = repp.replaceBookie(3,
1466+
3, 2, null,
1467+
currentEnsemble, addr1.toBookieId(), new HashSet<>());
1468+
1469+
assertEquals(placementResult.getResult(), addr2.toBookieId());
1470+
}
1471+
14241472
@Test
14251473
public void testNodeWithFailures() throws Exception {
14261474
repp.uninitalize();

0 commit comments

Comments
 (0)