Skip to content

Commit b679478

Browse files
author
fanjianye
committed
fix concurrent error cause stuck in TransactionBufferHandlerImpl#endTxn
1 parent 570cb44 commit b679478

File tree

1 file changed

+19
-17
lines changed

1 file changed

+19
-17
lines changed

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java

+19-17
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ private boolean checkRequestCredits(OpRequestSend op) {
128128
}
129129
} else {
130130
pendingRequests.add(op);
131+
if (currentPermits != REQUEST_CREDITS_UPDATER.get(this)) {
132+
checkPendingRequests();
133+
}
131134
return false;
132135
}
133136
}
@@ -237,26 +240,25 @@ public void onResponse(OpRequestSend op) {
237240
}
238241

239242
private void checkPendingRequests() {
240-
while (true) {
241-
int permits = REQUEST_CREDITS_UPDATER.get(this);
242-
if (permits > 0 && pendingRequests.peek() != null) {
243-
if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, permits - 1)) {
244-
OpRequestSend polled = pendingRequests.poll();
245-
if (polled != null) {
246-
CompletableFuture<ClientCnx> clientCnx = getClientCnx(polled.topic);
247-
if (polled.cnx != clientCnx) {
248-
OpRequestSend invalid = polled;
249-
polled = OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
250-
clientCnx);
251-
invalid.recycle();
252-
}
253-
endTxn(polled);
254-
} else {
255-
REQUEST_CREDITS_UPDATER.incrementAndGet(this);
243+
int permits = REQUEST_CREDITS_UPDATER.get(this);
244+
if (permits > 0 && pendingRequests.peek() != null) {
245+
if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, permits - 1)) {
246+
OpRequestSend polled = pendingRequests.poll();
247+
if (polled != null) {
248+
CompletableFuture<ClientCnx> clientCnx = getClientCnx(polled.topic);
249+
if (polled.cnx != clientCnx) {
250+
OpRequestSend invalid = polled;
251+
polled = OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
252+
clientCnx);
253+
invalid.recycle();
256254
}
255+
endTxn(polled);
256+
} else {
257+
REQUEST_CREDITS_UPDATER.incrementAndGet(this);
258+
checkPendingRequests();
257259
}
258260
} else {
259-
break;
261+
checkPendingRequests();
260262
}
261263
}
262264
}

0 commit comments

Comments
 (0)