diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index d6a7f3b134d39..ec24c066f3605 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -618,12 +618,13 @@ public void testBrokerSubscriptionRecovery(boolean unloadBundleGracefully) throw } latch.await(); // (2) consume all messages except: unackMessages-set - Set unackMessages = Sets.newHashSet(5, 10, 20, 21, 22, 23, 25, 26, 30, 32, 40, 80, 160, 320); + Set unackMsgIndex = Sets.newHashSet(5, 10, 20, 21, 22, 23, 25, 26, 30, 32, 40, 80, 160, 320); + Set unackMsgs = unackMsgIndex.stream().map(i -> "my-message-" + i).collect(Collectors.toSet()); int receivedMsgCount = 0; for (int i = 0; i < totalProducedMsgs; i++) { Message msg = consumer.receive(500, TimeUnit.MILLISECONDS); assertNotNull(msg); - if (!unackMessages.contains(i)) { + if (!unackMsgs.contains(new String(msg.getData()))) { consumer.acknowledge(msg); } receivedMsgCount++; @@ -646,7 +647,6 @@ public void testBrokerSubscriptionRecovery(boolean unloadBundleGracefully) throw .subscriptionType(SubscriptionType.Shared).subscribe(); // consumer should only receive unakced messages - Set unackMsgs = unackMessages.stream().map(i -> "my-message-" + i).collect(Collectors.toSet()); Set receivedMsgs = new HashSet<>(); for (int i = 0; i < totalProducedMsgs; i++) { Message msg = consumer.receive(500, TimeUnit.MILLISECONDS);