Skip to content

Commit 2318a18

Browse files
[Broker] Fix NPE when subscription is already removed (#14363)
* [Broker] Fix NPE when subscription is already removed * Cover same case for NonPersistentTopic Master Issue: #14362 ### Motivation There is current a race condition when we remove a subscription. The race and how to reproduce it is described in the #14362. One of the consequences of the race is that there is a chance we try to remove the subscription from the topic twice. This leads to an NPE, as described in the issue. ### Modifications * Verify that the `sub` is not null before getting its stats. ### Verifying this change This is a trivial change. (cherry picked from commit aee1e7d)
1 parent 766e5fe commit 2318a18

File tree

2 files changed

+12
-8
lines changed

2 files changed

+12
-8
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -1063,10 +1063,12 @@ public CompletableFuture<Void> unsubscribe(String subscriptionName) {
10631063
// That creates deadlock. so, execute remove it in different thread.
10641064
return CompletableFuture.runAsync(() -> {
10651065
NonPersistentSubscription sub = subscriptions.remove(subscriptionName);
1066-
// preserve accumulative stats form removed subscription
1067-
SubscriptionStatsImpl stats = sub.getStats();
1068-
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
1069-
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
1066+
if (sub != null) {
1067+
// preserve accumulative stats form removed subscription
1068+
SubscriptionStatsImpl stats = sub.getStats();
1069+
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
1070+
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
1071+
}
10701072
}, brokerService.executor());
10711073
}
10721074

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -1050,10 +1050,12 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
10501050

10511051
void removeSubscription(String subscriptionName) {
10521052
PersistentSubscription sub = subscriptions.remove(subscriptionName);
1053-
// preserve accumulative stats form removed subscription
1054-
SubscriptionStatsImpl stats = sub.getStats(false, false, false);
1055-
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
1056-
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
1053+
if (sub != null) {
1054+
// preserve accumulative stats form removed subscription
1055+
SubscriptionStatsImpl stats = sub.getStats(false, false, false);
1056+
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
1057+
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
1058+
}
10571059
}
10581060

10591061
/**

0 commit comments

Comments
 (0)