Skip to content

Commit 11a4020

Browse files
horizonzyhangc0276
authored andcommitted
Fix auditor elector executor block problem. (#4165)
### Motivation Now, when we shut down the auditor elector. The shutdown behavior via submitting a shutdown task to do it. In some cases, the follower auditor elector executor is always blocked due to waiting leader election, so the shutdown task will lie in the task queue forever, get no chance to execute. In the pulsar, the case happen. See apache/pulsar#21797 (comment) So in the auditor elector shutdown, if the remain task can't done in time, we should invoke shutdownNow to interrupt the blocked thread. (cherry picked from commit c3748dd)
1 parent 7875f15 commit 11a4020

File tree

2 files changed

+141
-44
lines changed

2 files changed

+141
-44
lines changed

Diff for: bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java

+35-20
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@
2525
import com.google.common.annotations.VisibleForTesting;
2626
import java.io.IOException;
2727
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.ExecutionException;
2829
import java.util.concurrent.ExecutorService;
2930
import java.util.concurrent.Executors;
3031
import java.util.concurrent.Future;
3132
import java.util.concurrent.RejectedExecutionException;
3233
import java.util.concurrent.ThreadFactory;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.TimeoutException;
3336
import java.util.concurrent.atomic.AtomicBoolean;
3437
import org.apache.bookkeeper.client.BKException;
3538
import org.apache.bookkeeper.client.BookKeeper;
@@ -147,26 +150,28 @@ public Future<?> start() {
147150
/**
148151
* Run cleanup operations for the auditor elector.
149152
*/
150-
private void submitShutdownTask() {
151-
executor.submit(new Runnable() {
152-
@Override
153-
public void run() {
154-
if (!running.compareAndSet(true, false)) {
155-
return;
156-
}
157-
158-
try {
159-
ledgerAuditorManager.close();
160-
} catch (InterruptedException ie) {
161-
Thread.currentThread().interrupt();
162-
LOG.warn("InterruptedException while closing ledger auditor manager", ie);
163-
} catch (Exception ke) {
164-
LOG.error("Exception while closing ledger auditor manager", ke);
165-
}
166-
}
167-
});
153+
private Future<?> submitShutdownTask() {
154+
return executor.submit(shutdownTask);
168155
}
169156

157+
Runnable shutdownTask = new Runnable() {
158+
@Override
159+
public void run() {
160+
if (!running.compareAndSet(true, false)) {
161+
return;
162+
}
163+
164+
try {
165+
ledgerAuditorManager.close();
166+
} catch (InterruptedException ie) {
167+
Thread.currentThread().interrupt();
168+
LOG.warn("InterruptedException while closing ledger auditor manager", ie);
169+
} catch (Exception ke) {
170+
LOG.error("Exception while closing ledger auditor manager", ke);
171+
}
172+
}
173+
};
174+
170175
/**
171176
* Performing the auditor election using the ZooKeeper ephemeral sequential
172177
* znode. The bookie which has created the least sequential will be elect as
@@ -238,8 +243,18 @@ public void shutdown() throws InterruptedException {
238243
return;
239244
}
240245
// close auditor manager
241-
submitShutdownTask();
242-
executor.shutdown();
246+
try {
247+
submitShutdownTask().get(10, TimeUnit.SECONDS);
248+
executor.shutdown();
249+
} catch (ExecutionException e) {
250+
LOG.warn("Failed to close auditor manager", e);
251+
executor.shutdownNow();
252+
shutdownTask.run();
253+
} catch (TimeoutException e) {
254+
LOG.warn("Failed to close auditor manager in 10 seconds", e);
255+
executor.shutdownNow();
256+
shutdownTask.run();
257+
}
243258
}
244259

245260
if (auditor != null) {

Diff for: bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java

+106-24
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.concurrent.TimeUnit;
4646
import java.util.concurrent.TimeoutException;
4747
import java.util.concurrent.atomic.AtomicInteger;
48+
import java.util.concurrent.atomic.AtomicReference;
4849
import java.util.stream.Collectors;
4950
import lombok.Cleanup;
5051
import org.apache.bookkeeper.bookie.BookieImpl;
@@ -409,7 +410,16 @@ public void testInnerDelayedAuditOfLostBookies() throws Exception {
409410
urLedgerMgr.setLostBookieRecoveryDelay(5);
410411

411412
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
412-
String shutdownBookie = shutDownNonAuditorBookie();
413+
AtomicReference<String> shutdownBookieRef = new AtomicReference<>();
414+
CountDownLatch shutdownLatch = new CountDownLatch(1);
415+
new Thread(() -> {
416+
try {
417+
String shutdownBookie = shutDownNonAuditorBookie();
418+
shutdownBookieRef.set(shutdownBookie);
419+
shutdownLatch.countDown();
420+
} catch (Exception ignore) {
421+
}
422+
}).start();
413423

414424
if (LOG.isDebugEnabled()) {
415425
LOG.debug("Waiting for ledgers to be marked as under replicated");
@@ -425,9 +435,10 @@ public void testInnerDelayedAuditOfLostBookies() throws Exception {
425435
urLedgerList.contains(ledgerId));
426436
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
427437
String data = urLedgerData.get(ledgerId);
428-
assertTrue("Bookie " + shutdownBookie
438+
shutdownLatch.await();
439+
assertTrue("Bookie " + shutdownBookieRef.get()
429440
+ "is not listed in the ledger as missing replica :" + data,
430-
data.contains(shutdownBookie));
441+
data.contains(shutdownBookieRef.get()));
431442
}
432443

433444
/**
@@ -486,7 +497,16 @@ public void testRescheduleOfDelayedAuditOfLostBookiesToStartImmediately() throws
486497
urLedgerMgr.setLostBookieRecoveryDelay(50);
487498

488499
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
489-
String shutdownBookie = shutDownNonAuditorBookie();
500+
AtomicReference<String> shutdownBookieRef = new AtomicReference<>();
501+
CountDownLatch shutdownLatch = new CountDownLatch(1);
502+
new Thread(() -> {
503+
try {
504+
String shutdownBookie = shutDownNonAuditorBookie();
505+
shutdownBookieRef.set(shutdownBookie);
506+
shutdownLatch.countDown();
507+
} catch (Exception ignore) {
508+
}
509+
}).start();
490510

491511
if (LOG.isDebugEnabled()) {
492512
LOG.debug("Waiting for ledgers to be marked as under replicated");
@@ -505,9 +525,10 @@ public void testRescheduleOfDelayedAuditOfLostBookiesToStartImmediately() throws
505525
urLedgerList.contains(ledgerId));
506526
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
507527
String data = urLedgerData.get(ledgerId);
508-
assertTrue("Bookie " + shutdownBookie
528+
shutdownLatch.await();
529+
assertTrue("Bookie " + shutdownBookieRef.get()
509530
+ "is not listed in the ledger as missing replica :" + data,
510-
data.contains(shutdownBookie));
531+
data.contains(shutdownBookieRef.get()));
511532
}
512533

513534
@Test
@@ -530,7 +551,16 @@ public void testRescheduleOfDelayedAuditOfLostBookiesToStartLater() throws Excep
530551
urLedgerMgr.setLostBookieRecoveryDelay(3);
531552

532553
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
533-
String shutdownBookie = shutDownNonAuditorBookie();
554+
AtomicReference<String> shutdownBookieRef = new AtomicReference<>();
555+
CountDownLatch shutdownLatch = new CountDownLatch(1);
556+
new Thread(() -> {
557+
try {
558+
String shutdownBookie = shutDownNonAuditorBookie();
559+
shutdownBookieRef.set(shutdownBookie);
560+
shutdownLatch.countDown();
561+
} catch (Exception ignore) {
562+
}
563+
}).start();
534564

535565
if (LOG.isDebugEnabled()) {
536566
LOG.debug("Waiting for ledgers to be marked as under replicated");
@@ -556,9 +586,10 @@ public void testRescheduleOfDelayedAuditOfLostBookiesToStartLater() throws Excep
556586
urLedgerList.contains(ledgerId));
557587
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
558588
String data = urLedgerData.get(ledgerId);
559-
assertTrue("Bookie " + shutdownBookie
589+
shutdownLatch.await();
590+
assertTrue("Bookie " + shutdownBookieRef.get()
560591
+ "is not listed in the ledger as missing replica :" + data,
561-
data.contains(shutdownBookie));
592+
data.contains(shutdownBookieRef.get()));
562593
}
563594

564595
@Test
@@ -647,7 +678,12 @@ public void testTriggerAuditorWithPendingAuditTask() throws Exception {
647678
urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
648679

649680
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
650-
String shutdownBookie = shutDownNonAuditorBookie();
681+
new Thread(() -> {
682+
try {
683+
shutDownNonAuditorBookie();
684+
} catch (Exception ignore) {
685+
}
686+
}).start();
651687

652688
if (LOG.isDebugEnabled()) {
653689
LOG.debug("Waiting for ledgers to be marked as under replicated");
@@ -698,7 +734,12 @@ public void testTriggerAuditorBySettingDelayToZeroWithPendingAuditTask() throws
698734
urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
699735

700736
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
701-
String shutdownBookie = shutDownNonAuditorBookie();
737+
new Thread(() -> {
738+
try {
739+
shutDownNonAuditorBookie();
740+
} catch (Exception ignore) {
741+
}
742+
}).start();
702743

703744
if (LOG.isDebugEnabled()) {
704745
LOG.debug("Waiting for ledgers to be marked as under replicated");
@@ -750,8 +791,17 @@ public void testDelayedAuditWithMultipleBookieFailures() throws Exception {
750791
// wait for 10 seconds before starting the recovery work when a bookie fails
751792
urLedgerMgr.setLostBookieRecoveryDelay(10);
752793

753-
// shutdown a non auditor bookie to avoid an election
754-
String shutdownBookie1 = shutDownNonAuditorBookie();
794+
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
795+
AtomicReference<String> shutdownBookieRef1 = new AtomicReference<>();
796+
CountDownLatch shutdownLatch1 = new CountDownLatch(1);
797+
new Thread(() -> {
798+
try {
799+
String shutdownBookie1 = shutDownNonAuditorBookie();
800+
shutdownBookieRef1.set(shutdownBookie1);
801+
shutdownLatch1.countDown();
802+
} catch (Exception ignore) {
803+
}
804+
}).start();
755805

756806
// wait for 3 seconds and there shouldn't be any under replicated ledgers
757807
// because we have delayed the start of audit by 10 seconds
@@ -763,7 +813,16 @@ public void testDelayedAuditWithMultipleBookieFailures() throws Exception {
763813
// the history about having delayed recovery remains. Hence we make sure
764814
// we bring down a non auditor bookie. This should cause the audit to take
765815
// place immediately and not wait for the remaining 7 seconds to elapse
766-
String shutdownBookie2 = shutDownNonAuditorBookie();
816+
AtomicReference<String> shutdownBookieRef2 = new AtomicReference<>();
817+
CountDownLatch shutdownLatch2 = new CountDownLatch(1);
818+
new Thread(() -> {
819+
try {
820+
String shutdownBookie2 = shutDownNonAuditorBookie();
821+
shutdownBookieRef2.set(shutdownBookie2);
822+
shutdownLatch2.countDown();
823+
} catch (Exception ignore) {
824+
}
825+
}).start();
767826

768827
// 2 second grace period for the ledgers to get reported as under replicated
769828
Thread.sleep(2000);
@@ -776,9 +835,11 @@ public void testDelayedAuditWithMultipleBookieFailures() throws Exception {
776835
urLedgerList.contains(ledgerId));
777836
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
778837
String data = urLedgerData.get(ledgerId);
779-
assertTrue("Bookie " + shutdownBookie1 + shutdownBookie2
838+
shutdownLatch1.await();
839+
shutdownLatch2.await();
840+
assertTrue("Bookie " + shutdownBookieRef1.get() + shutdownBookieRef2.get()
780841
+ " are not listed in the ledger as missing replicas :" + data,
781-
data.contains(shutdownBookie1) && data.contains(shutdownBookie2));
842+
data.contains(shutdownBookieRef1.get()) && data.contains(shutdownBookieRef2.get()));
782843
}
783844

784845
/**
@@ -808,7 +869,17 @@ public void testDelayedAuditWithRollingUpgrade() throws Exception {
808869
// shutdown a non auditor bookie to avoid an election
809870
int idx1 = getShutDownNonAuditorBookieIdx("");
810871
ServerConfiguration conf1 = confByIndex(idx1);
811-
String shutdownBookie1 = shutdownBookie(idx1);
872+
873+
AtomicReference<String> shutdownBookieRef1 = new AtomicReference<>();
874+
CountDownLatch shutdownLatch1 = new CountDownLatch(1);
875+
new Thread(() -> {
876+
try {
877+
String shutdownBookie1 = shutdownBookie(idx1);
878+
shutdownBookieRef1.set(shutdownBookie1);
879+
shutdownLatch1.countDown();
880+
} catch (Exception ignore) {
881+
}
882+
}).start();
812883

813884
// wait for 2 seconds and there shouldn't be any under replicated ledgers
814885
// because we have delayed the start of audit by 5 seconds
@@ -821,8 +892,17 @@ public void testDelayedAuditWithRollingUpgrade() throws Exception {
821892

822893
// Now to simulate the rolling upgrade, bring down a bookie different from
823894
// the one we brought down/up above.
824-
String shutdownBookie2 = shutDownNonAuditorBookie(shutdownBookie1);
825-
895+
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
896+
AtomicReference<String> shutdownBookieRef2 = new AtomicReference<>();
897+
CountDownLatch shutdownLatch2 = new CountDownLatch(1);
898+
new Thread(() -> {
899+
try {
900+
String shutdownBookie2 = shutDownNonAuditorBookie();
901+
shutdownBookieRef2.set(shutdownBookie2);
902+
shutdownLatch2.countDown();
903+
} catch (Exception ignore) {
904+
}
905+
}).start();
826906
// since the first bookie that was brought down/up has come up, there is only
827907
// one bookie down at this time. Hence the lost bookie check shouldn't start
828908
// immediately; it will start 5 seconds after the second bookie went down
@@ -839,11 +919,13 @@ public void testDelayedAuditWithRollingUpgrade() throws Exception {
839919
urLedgerList.contains(ledgerId));
840920
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
841921
String data = urLedgerData.get(ledgerId);
842-
assertTrue("Bookie " + shutdownBookie1 + "wrongly listed as missing the ledger: " + data,
843-
!data.contains(shutdownBookie1));
844-
assertTrue("Bookie " + shutdownBookie2
922+
shutdownLatch1.await();
923+
shutdownLatch2.await();
924+
assertTrue("Bookie " + shutdownBookieRef1.get() + "wrongly listed as missing the ledger: " + data,
925+
!data.contains(shutdownBookieRef1.get()));
926+
assertTrue("Bookie " + shutdownBookieRef2.get()
845927
+ " is not listed in the ledger as missing replicas :" + data,
846-
data.contains(shutdownBookie2));
928+
data.contains(shutdownBookieRef2.get()));
847929
LOG.info("*****************Test Complete");
848930
}
849931

@@ -1008,7 +1090,7 @@ private Auditor getAuditorBookiesAuditor() throws Exception {
10081090
return auditorElectors.get(bookieAddr).auditor;
10091091
}
10101092

1011-
private String shutDownNonAuditorBookie() throws Exception {
1093+
private String shutDownNonAuditorBookie() throws Exception {
10121094
// shutdown bookie which is not an auditor
10131095
int indexOf = indexOfServer(getAuditorBookie());
10141096
int bkIndexDownBookie;

0 commit comments

Comments
 (0)