Skip to content

Commit b7b9239

Browse files
authored
[fix][flaky-test]DispatcherBlockConsumerTest.testBrokerSubscriptionRecovery (#18228)
1 parent 069ac87 commit b7b9239

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -618,12 +618,13 @@ public void testBrokerSubscriptionRecovery(boolean unloadBundleGracefully) throw
618618
}
619619
latch.await();
620620
// (2) consume all messages except: unackMessages-set
621-
Set<Integer> unackMessages = Sets.newHashSet(5, 10, 20, 21, 22, 23, 25, 26, 30, 32, 40, 80, 160, 320);
621+
Set<Integer> unackMsgIndex = Sets.newHashSet(5, 10, 20, 21, 22, 23, 25, 26, 30, 32, 40, 80, 160, 320);
622+
Set<String> unackMsgs = unackMsgIndex.stream().map(i -> "my-message-" + i).collect(Collectors.toSet());
622623
int receivedMsgCount = 0;
623624
for (int i = 0; i < totalProducedMsgs; i++) {
624625
Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
625626
assertNotNull(msg);
626-
if (!unackMessages.contains(i)) {
627+
if (!unackMsgs.contains(new String(msg.getData()))) {
627628
consumer.acknowledge(msg);
628629
}
629630
receivedMsgCount++;
@@ -646,7 +647,6 @@ public void testBrokerSubscriptionRecovery(boolean unloadBundleGracefully) throw
646647
.subscriptionType(SubscriptionType.Shared).subscribe();
647648

648649
// consumer should only receive unakced messages
649-
Set<String> unackMsgs = unackMessages.stream().map(i -> "my-message-" + i).collect(Collectors.toSet());
650650
Set<String> receivedMsgs = new HashSet<>();
651651
for (int i = 0; i < totalProducedMsgs; i++) {
652652
Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);

0 commit comments

Comments
 (0)