Skip to content

Commit 0930e73

Browse files
eolivellinicoloboschi
authored andcommitted
KeyShared stickyHashRange subscription: prevent message loss in case of consumer restart
(cherry picked from commit 202ad3d)
1 parent 879c7ed commit 0930e73

File tree

2 files changed

+130
-0
lines changed

2 files changed

+130
-0
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java

+1
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
181181
groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
182182
consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
183183
} else {
184+
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
184185
entry.release();
185186
}
186187
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java

+129
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import java.util.Set;
4141
import java.util.UUID;
4242
import java.util.concurrent.CompletableFuture;
43+
import java.util.concurrent.ConcurrentSkipListSet;
44+
import java.util.concurrent.CountDownLatch;
4345
import java.util.concurrent.ExecutionException;
4446
import java.util.concurrent.TimeUnit;
4547
import java.util.concurrent.atomic.AtomicInteger;
@@ -1303,4 +1305,131 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
13031305
return null;
13041306
}
13051307
}
1308+
1309+
@Test
1310+
public void testStickyKeyRangesRestartConsumers() throws PulsarClientException, InterruptedException {
1311+
final String topic = TopicName.get("persistent", "public", "default",
1312+
"testStickyKeyRangesRestartConsumers" + UUID.randomUUID()).toString();
1313+
1314+
final String subscriptionName = "my-sub";
1315+
1316+
final int numMessages = 100;
1317+
// start 2 consumers
1318+
Set<String> sentMessages = new ConcurrentSkipListSet<>();
1319+
1320+
CountDownLatch count1 = new CountDownLatch(2);
1321+
CountDownLatch count2 = new CountDownLatch(13); // consumer 2 usually receive the fix messages
1322+
CountDownLatch count3 = new CountDownLatch(numMessages);
1323+
Consumer<String> consumer1 = pulsarClient.newConsumer(
1324+
Schema.STRING)
1325+
.topic(topic)
1326+
.subscriptionName(subscriptionName)
1327+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
1328+
.subscriptionType(SubscriptionType.Key_Shared)
1329+
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65536 / 2)))
1330+
.messageListener((consumer, msg) -> {
1331+
consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
1332+
if (e != null) {
1333+
log.error("error", e);
1334+
} else {
1335+
sentMessages.remove(msg.getKey());
1336+
count1.countDown();
1337+
count3.countDown();
1338+
}
1339+
});
1340+
})
1341+
.subscribe();
1342+
1343+
Consumer<String> consumer2 = pulsarClient.newConsumer(
1344+
Schema.STRING)
1345+
.topic(topic)
1346+
.subscriptionName(subscriptionName)
1347+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
1348+
.subscriptionType(SubscriptionType.Key_Shared)
1349+
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(65536 / 2 + 1, 65535)))
1350+
.messageListener((consumer, msg) -> {
1351+
consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
1352+
if (e != null) {
1353+
log.error("error", e);
1354+
} else {
1355+
sentMessages.remove(msg.getKey());
1356+
count2.countDown();
1357+
count3.countDown();
1358+
}
1359+
});
1360+
})
1361+
.subscribe();
1362+
1363+
pulsar.getExecutor().submit(() -> {
1364+
try
1365+
{
1366+
try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
1367+
.topic(topic)
1368+
.enableBatching(false)
1369+
.create();) {
1370+
for (int i = 0; i < numMessages; i++)
1371+
{
1372+
String key = "test" + i;
1373+
sentMessages.add(key);
1374+
producer.newMessage()
1375+
.key(key)
1376+
.value("test" + i).
1377+
send();
1378+
Thread.sleep(100);
1379+
}
1380+
}
1381+
} catch (Throwable t) {
1382+
log.error("error", t);
1383+
}});
1384+
1385+
// wait for some messages to be received by both of the consumers
1386+
count1.await();
1387+
count2.await();
1388+
consumer1.close();
1389+
consumer2.close();
1390+
1391+
// this sleep is to trigger a race condition that happens
1392+
// when there are some messages that cannot be dispatched while consuming
1393+
Thread.sleep(3000);
1394+
1395+
// start consuming again...
1396+
1397+
pulsarClient.newConsumer(Schema.STRING)
1398+
.topic(topic)
1399+
.subscriptionName(subscriptionName)
1400+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
1401+
.subscriptionType(SubscriptionType.Key_Shared)
1402+
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65536 / 2)))
1403+
.messageListener((consumer, msg) -> {
1404+
consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
1405+
if (e != null) {
1406+
log.error("error", e);
1407+
} else {
1408+
sentMessages.remove(msg.getKey());
1409+
count3.countDown();
1410+
}
1411+
});
1412+
})
1413+
.subscribe();
1414+
pulsarClient.newConsumer(Schema.STRING)
1415+
.topic(topic)
1416+
.subscriptionName(subscriptionName)
1417+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
1418+
.subscriptionType(SubscriptionType.Key_Shared)
1419+
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(65536 / 2 + 1, 65535)))
1420+
.messageListener((consumer, msg) -> {
1421+
consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
1422+
if (e != null) {
1423+
log.error("error", e);
1424+
} else {
1425+
sentMessages.remove(msg.getKey());
1426+
count3.countDown();
1427+
}
1428+
});
1429+
})
1430+
.subscribe();
1431+
// wait for all the messages to be delivered
1432+
count3.await();
1433+
assertTrue(sentMessages.isEmpty(), "didn't receive " + sentMessages);
1434+
}
13061435
}

0 commit comments

Comments
 (0)