Skip to content

Issue 1898: Implement isEnsembleAdheringToPlacementPolicy in RegionAwareEnsemblePlacementPolicy #4133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -360,8 +361,12 @@ public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int writeQu
effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum);
TopologyAwareEnsemblePlacementPolicy nextPolicy = perRegionPlacement.get(
availableRegions.iterator().next());
return nextPolicy.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize,

PlacementResult<List<BookieId>> placementResult = nextPolicy.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize,
comprehensiveExclusionBookiesSet, ensemble, ensemble);
return PlacementResult.of(placementResult.getResult(),
isEnsembleAdheringToPlacementPolicy(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we can simplify the logic. We already know there is only one region.
Just need check the result of nextPolicy.newEnsemble.

  1. If the result is PlacementPolicyAdherence.FAIL, return directly.
  2. If the result is PlacementPolicyAdherence.MEETS_STRICT, then check whether minNumRacksPerWriteQuorum is more than 1. If it is, return PlacementPolicyAdherence.SOFT, otherwise return PlacementPolicyAdherence.MEETS_STRICT.

placementResult.getResult(), writeQuorumSize, ackQuorumSize));
}

int remainingEnsemble = ensembleSize;
Expand Down Expand Up @@ -644,12 +649,73 @@ public final DistributionSchedule.WriteSet reorderReadLACSequence(
@Override
public PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy(List<BookieId> ensembleList,
int writeQuorumSize, int ackQuorumSize) {
/**
* TODO: have to implement actual logic for this method for
* RegionAwareEnsemblePlacementPolicy. For now return true value.
*
* - https://github.com/apache/bookkeeper/issues/1898
*/
return PlacementPolicyAdherence.MEETS_STRICT;
if (CollectionUtils.isEmpty(ensembleList)) {
return PlacementPolicyAdherence.FAIL;
}

int effectiveMinRegionsForDurability = disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
PlacementPolicyAdherence placementPolicyAdherence = PlacementPolicyAdherence.MEETS_STRICT;

int ensembleSize = ensembleList.size();
// region -> bookie set
Map<String, Set<BookieId>> regionsInQuorum = new HashMap<>();
BookieId bookie;
for (int i = 0; i < ensembleList.size(); i++) {
regionsInQuorum.clear();
for (int j = 0; j < writeQuorumSize; j++) {
bookie = ensembleList.get((i + j) % ensembleSize);
BookieNode bookieNode = knownBookies.get(bookie);
if (bookieNode != null) {
String region = getLocalRegion(bookieNode);
if (regionsInQuorum.containsKey(region)) {
regionsInQuorum.get(region).add(bookie);
} else {
Set<BookieId> bookieSet = new HashSet<>();
bookieSet.add(bookie);
regionsInQuorum.put(region, bookieSet);
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("bookie {} is not in the list of knownBookies", bookie);
}
}

if (regionsInQuorum.size() < effectiveMinRegionsForDurability) {
if (LOG.isDebugEnabled()) {
LOG.debug("For ensemble {}, write set starting at {} are from {} regions, "
+ "less than effectiveMinRegionsForDurability: {}.",
ensembleList, i, regionsInQuorum.size(), effectiveMinRegionsForDurability);
}
return PlacementPolicyAdherence.FAIL;
}

if (regionsInQuorum.size() < writeQuorumSize) {
// not enough regions for each writeQuorum
if (LOG.isDebugEnabled()) {
LOG.debug("For ensemble: {}, write set starting at {} are from {} regions, "
+ "less than writeQuorumSize {}.",
ensembleList, i, regionsInQuorum.size(), writeQuorumSize);
}

// try to check whether each region matches RackawareEnsemblePlacementPolicy MEETS_STRICT
for (String region : regionsInQuorum.keySet()) {
Set<BookieId> bookieIds = regionsInQuorum.get(region);

TopologyAwareEnsemblePlacementPolicy policyWithinRegion = perRegionPlacement.get(region);
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = policyWithinRegion
.isEnsembleAdheringToPlacementPolicy(new ArrayList<>(bookieIds), bookieIds.size(), 1);
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
if (LOG.isDebugEnabled()) {
LOG.debug("For ensemble {}, write set starting at {} are all from one region, "
+ "fall back to RackawareEnsemblePlacementPolicy and fail.", ensembleList, i);
}
return PlacementPolicyAdherence.FAIL;
}
}

placementPolicyAdherence = PlacementPolicyAdherence.MEETS_SOFT;
}
}

return placementPolicyAdherence;
}
}
Loading