Skip to content

Commit 556024d

Browse files
authored
[fix][broker] Fix some problems in calculate totalAvailableBookies in method getExcludedBookiesWithIsolationGroups when some bookies belongs to multiple isolation groups. (#24091)
1 parent 16598da commit 556024d

File tree

2 files changed

+129
-5
lines changed

2 files changed

+129
-5
lines changed

Diff for: pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,9 @@ Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
206206
return excludedBookies;
207207
}
208208
Set<String> allGroups = allGroupsBookieMapping.keySet();
209+
if (allGroups.isEmpty()) {
210+
return excludedBookies;
211+
}
209212
int totalAvailableBookiesInPrimaryGroup = 0;
210213
Set<String> primaryIsolationGroup = Collections.emptySet();
211214
Set<String> secondaryIsolationGroup = Collections.emptySet();
@@ -222,9 +225,10 @@ Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
222225
}
223226
} else {
224227
for (String groupBookie : bookiesInGroup) {
225-
totalAvailableBookiesInPrimaryGroup += knownBookies
226-
.containsKey(BookieId.parse(groupBookie)) ? 1 : 0;
227-
primaryGroupBookies.add(BookieId.parse(groupBookie));
228+
BookieId bookieId = BookieId.parse(groupBookie);
229+
if (primaryGroupBookies.add(bookieId)) {
230+
totalAvailableBookiesInPrimaryGroup += knownBookies.containsKey(bookieId) ? 1 : 0;
231+
}
228232
}
229233
}
230234
}
@@ -256,8 +260,9 @@ Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
256260
Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
257261
if (bookieGroup != null && !bookieGroup.isEmpty()) {
258262
for (String bookieAddress : bookieGroup.keySet()) {
259-
excludedBookies.remove(BookieId.parse(bookieAddress));
260-
totalAvailableBookiesFromPrimaryAndSecondary += 1;
263+
if (excludedBookies.remove(BookieId.parse(bookieAddress))) {
264+
totalAvailableBookiesFromPrimaryAndSecondary += 1;
265+
}
261266
}
262267
}
263268
}

Diff for: pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java

+119
Original file line numberDiff line numberDiff line change
@@ -699,4 +699,123 @@ public void testDefaultIsolationPolicyNotCovered() throws Exception {
699699
.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult();
700700
assertEquals(BookieIdGroup1.containsAll(defaultBookieList),true);
701701
}
702+
703+
@Test
704+
public void testGetExcludedBookiesWithIsolationGroups() throws Exception {
705+
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
706+
final String isolationGroup1 = "Group1";
707+
final String isolationGroup2 = "Group2";
708+
final String isolationGroup3 = "Group3";
709+
710+
Map<String, BookieInfo> group1 = new HashMap<>();
711+
group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
712+
group1.put(BOOKIE2, BookieInfo.builder().rack("rack0").build());
713+
714+
Map<String, BookieInfo> group2 = new HashMap<>();
715+
group2.put(BOOKIE3, BookieInfo.builder().rack("rack1").build());
716+
group2.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
717+
718+
bookieMapping.put(isolationGroup1, group1);
719+
bookieMapping.put(isolationGroup2, group2);
720+
721+
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
722+
Optional.empty()).join();
723+
724+
IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy();
725+
ClientConfiguration bkClientConf = new ClientConfiguration();
726+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
727+
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroup1);
728+
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, isolationGroup2);
729+
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
730+
NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
731+
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
732+
733+
/* Test common cases */
734+
MutablePair<Set<String>, Set<String>> groups = new MutablePair<>();
735+
groups.setLeft(Sets.newHashSet(isolationGroup1));
736+
groups.setRight(Sets.newHashSet(""));
737+
Set<BookieId> blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
738+
assertEquals(blacklist.size(), 2);
739+
740+
groups.setLeft(Sets.newHashSet(isolationGroup1));
741+
groups.setRight(Sets.newHashSet(isolationGroup2));
742+
blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
743+
assertEquals(blacklist.size(), 2);
744+
745+
groups.setLeft(Sets.newHashSet(isolationGroup1));
746+
groups.setRight(Sets.newHashSet(isolationGroup2));
747+
blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(3, groups);
748+
assertTrue(blacklist.isEmpty());
749+
750+
/* Test a bookie belongs to multiple isolation groups */
751+
group1 = new HashMap<>();
752+
group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
753+
754+
group2 = new HashMap<>();
755+
group2.put(BOOKIE2, BookieInfo.builder().rack("rack0").build());
756+
group2.put(BOOKIE3, BookieInfo.builder().rack("rack1").build());
757+
group2.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
758+
759+
Map<String, BookieInfo> group3 = new HashMap<>();
760+
group3.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
761+
762+
bookieMapping.put(isolationGroup1, group1);
763+
bookieMapping.put(isolationGroup2, group2);
764+
bookieMapping.put(isolationGroup3, group3);
765+
766+
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
767+
Optional.empty()).join();
768+
769+
groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3));
770+
groups.setRight(Sets.newHashSet(""));
771+
blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
772+
assertEquals(blacklist.size(), 3);
773+
774+
/* Test a bookie belongs to multiple isolation groups and totalAvailableBookiesInPrimaryGroup < ensembleSize */
775+
groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3));
776+
groups.setRight(Sets.newHashSet(isolationGroup2));
777+
blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
778+
assertTrue(blacklist.isEmpty());
779+
780+
/* Test some bookies not set rack config */
781+
group1 = new HashMap<>();
782+
group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
783+
784+
group2 = new HashMap<>();
785+
group2.put(BOOKIE2, BookieInfo.builder().rack("rack0").build());
786+
787+
bookieMapping.put(isolationGroup1, group1);
788+
bookieMapping.put(isolationGroup2, group2);
789+
790+
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
791+
Optional.empty()).join();
792+
793+
groups.setLeft(Sets.newHashSet(isolationGroup1));
794+
groups.setRight(Sets.newHashSet(isolationGroup2));
795+
blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
796+
assertEquals(blacklist.size(), 2);
797+
798+
groups.setLeft(Sets.newHashSet(isolationGroup1));
799+
groups.setRight(Sets.newHashSet(isolationGroup2));
800+
blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(3, groups);
801+
assertTrue(blacklist.isEmpty());
802+
803+
/* Test some bookies not set rack config and totalAvailableBookiesFromPrimaryAndSecondary < ensembleSize */
804+
group1 = new HashMap<>();
805+
group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
806+
807+
group2 = new HashMap<>();
808+
group2.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
809+
810+
bookieMapping.put(isolationGroup1, group1);
811+
bookieMapping.put(isolationGroup2, group2);
812+
813+
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
814+
Optional.empty()).join();
815+
816+
groups.setLeft(Sets.newHashSet(isolationGroup1));
817+
groups.setRight(Sets.newHashSet(isolationGroup2));
818+
blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
819+
assertTrue(blacklist.isEmpty());
820+
}
702821
}

0 commit comments

Comments
 (0)