Skip to content

Commit 552c7a1

Browse files
authored
Modified Fate TransactionRunner to exit normally when shut down (#6021)
Modified the Fate TransactionRunner threads to allow them to exit their run loop normally when an InterruptedException is raised due to Fate being shut down. Closes #6014
1 parent f407e27 commit 552c7a1

2 files changed

Lines changed: 125 additions & 7 deletions

File tree

  • core/src/main/java/org/apache/accumulo/core/fate
  • test/src/main/java/org/apache/accumulo/test/fate/zookeeper

core/src/main/java/org/apache/accumulo/core/fate/Fate.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,24 @@ public enum TxInfo {
7474

7575
private class TransactionRunner implements Runnable {
7676

77+
private boolean isInterruptedException(Throwable e) {
78+
if (e == null) {
79+
return false;
80+
}
81+
82+
if (e instanceof InterruptedException) {
83+
return true;
84+
}
85+
86+
for (Throwable suppressed : e.getSuppressed()) {
87+
if (isInterruptedException(suppressed)) {
88+
return true;
89+
}
90+
}
91+
92+
return isInterruptedException(e.getCause());
93+
}
94+
7795
@Override
7896
public void run() {
7997
while (keepRunning.get()) {
@@ -99,14 +117,31 @@ public void run() {
99117
store.setStatus(tid, IN_PROGRESS);
100118
}
101119
op = executeCall(tid, op);
120+
// It's possible that a Fate operation impl
121+
// may not do the right thing with an
122+
// InterruptedException.
123+
if (Thread.currentThread().isInterrupted()) {
124+
throw new InterruptedException("Fate Transaction Runner thread interrupted");
125+
}
102126
} else {
103127
continue;
104128
}
105-
106129
} catch (Exception e) {
107-
blockIfHadoopShutdown(tid, e);
108-
transitionToFailed(tid, e);
109-
continue;
130+
if (!isInterruptedException(e)) {
131+
blockIfHadoopShutdown(tid, e);
132+
transitionToFailed(tid, e);
133+
continue;
134+
} else {
135+
if (keepRunning.get()) {
136+
throw e;
137+
} else {
138+
// If we are shutting down then Fate.shutdown was called
139+
// and ExecutorService.shutdownNow was called resulting
140+
// in this exception. We will exit at the top of the loop.
141+
Thread.interrupted();
142+
continue;
143+
}
144+
}
110145
}
111146

112147
if (op == null) {
@@ -130,7 +165,19 @@ public void run() {
130165
}
131166
}
132167
} catch (Exception e) {
133-
runnerLog.error("Uncaught exception in FATE runner thread.", e);
168+
if (isInterruptedException(e)) {
169+
if (keepRunning.get()) {
170+
runnerLog.error("Uncaught InterruptedException in FATE runner thread.", e);
171+
} else {
172+
// If we are shutting down then Fate.shutdown was called
173+
// and ExecutorService.shutdownNow was called resulting
174+
// in this exception. We will exit at the top of the loop,
175+
// so continue this loop iteration normally.
176+
Thread.interrupted();
177+
}
178+
} else {
179+
runnerLog.error("Uncaught exception in FATE runner thread.", e);
180+
}
134181
} finally {
135182
if (tid != null) {
136183
store.unreserve(tid, deferTime, TimeUnit.MILLISECONDS);
@@ -198,6 +245,7 @@ private void processFailed(long tid, Repo<T> op) {
198245
}
199246

200247
private void doCleanUp(long tid) {
248+
log.debug("Cleaning up {}", tid);
201249
Boolean autoClean = (Boolean) store.getTransactionInfo(tid, TxInfo.AUTO_CLEAN);
202250
if (autoClean != null && autoClean) {
203251
store.delete(tid);
@@ -261,6 +309,7 @@ public void startTransactionRunners(AccumuloConfiguration conf,
261309
ScheduledThreadPoolExecutor serverGeneralScheduledThreadPool) {
262310
final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf,
263311
Property.MANAGER_FATE_THREADPOOL_SIZE, true);
312+
log.debug("Starting Fate Transaction Runner pool with {} threads", pool.getCorePoolSize());
264313
ThreadPools
265314
.watchCriticalScheduledTask(serverGeneralScheduledThreadPool.scheduleWithFixedDelay(() -> {
266315
// resize the pool if the property changed
@@ -421,6 +470,9 @@ public Exception getException(long tid) {
421470
* Flags that FATE threadpool to clear out and end. Does not actively stop running FATE processes.
422471
*/
423472
public void shutdown(boolean wait) {
473+
log.info("Shutdown called on Fate, waiting: {}", wait);
474+
// important this is set before shutdownNow is called as the background
475+
// threads will check this to see if shutdown related errors should be ignored.
424476
keepRunning.set(false);
425477
if (executor == null) {
426478
return;

test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.easymock.EasyMock.replay;
3232
import static org.junit.jupiter.api.Assertions.assertEquals;
3333
import static org.junit.jupiter.api.Assertions.assertFalse;
34+
import static org.junit.jupiter.api.Assertions.assertNull;
3435
import static org.junit.jupiter.api.Assertions.assertThrows;
3536
import static org.junit.jupiter.api.Assertions.assertTrue;
3637
import static org.junit.jupiter.api.Assertions.fail;
@@ -42,6 +43,7 @@
4243
import java.util.concurrent.CountDownLatch;
4344
import java.util.concurrent.ScheduledThreadPoolExecutor;
4445
import java.util.concurrent.atomic.AtomicBoolean;
46+
import java.util.concurrent.atomic.AtomicReference;
4547

4648
import org.apache.accumulo.core.Constants;
4749
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
@@ -190,6 +192,7 @@ public Repo<Manager> call(long tid, Manager environment) throws Exception {
190192
private static CountDownLatch callStarted;
191193
private static CountDownLatch finishCall;
192194
private static CountDownLatch undoLatch;
195+
private static AtomicReference<Throwable> interruptedException = new AtomicReference<>();
193196

194197
Fate<Manager> fate;
195198

@@ -211,6 +214,7 @@ public static void setup() throws Exception {
211214

212215
@BeforeEach
213216
public void beforeEach() throws Exception {
217+
interruptedException.set(null);
214218
final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk, zc);
215219
final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000, System::currentTimeMillis);
216220

@@ -448,11 +452,73 @@ public void testRepoFails() throws Exception {
448452
assertTrue(fate.getException(txid).getMessage().contains("isReady() failed"));
449453
}
450454

455+
@Test
456+
public void testShutdownDoesNotFailTx() throws Exception {
457+
ConfigurationCopy config = new ConfigurationCopy();
458+
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
459+
config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
460+
461+
// Wait for the transaction runner to be scheduled.
462+
UtilWaitThread.sleep(3000);
463+
464+
callStarted = new CountDownLatch(1);
465+
finishCall = new CountDownLatch(1);
466+
467+
long txid = fate.startTransaction();
468+
assertEquals(TStatus.NEW, getTxStatus(zk, txid));
469+
fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op");
470+
assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid));
471+
472+
fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
473+
474+
// wait for call() to be called
475+
callStarted.await();
476+
assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
477+
478+
// shutdown fate
479+
fate.shutdown(true);
480+
481+
// tell the op to exit the method
482+
Wait.waitFor(() -> interruptedException.get() != null);
483+
interruptedException.set(null);
484+
485+
// restart fate
486+
beforeEach();
487+
assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
488+
489+
// Restarting the transaction runners will retry the in-progress
490+
// transaction. Reset the CountDownLatch's to confirm.
491+
callStarted = new CountDownLatch(1);
492+
finishCall = new CountDownLatch(1);
493+
fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
494+
callStarted.await();
495+
assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
496+
finishCall.countDown();
497+
498+
// This should complete normally, cleaning up the tx and deleting it from ZK
499+
while (true) {
500+
try {
501+
getTxStatus(zk, txid);
502+
Thread.sleep(100);
503+
continue;
504+
} catch (KeeperException.NoNodeException e) {
505+
break;
506+
}
507+
}
508+
assertNull(interruptedException.get());
509+
}
510+
451511
private static void inCall() throws InterruptedException {
452512
// signal that call started
453513
callStarted.countDown();
454-
// wait for the signal to exit the method
455-
finishCall.await();
514+
try {
515+
// wait for the signal to exit the method
516+
finishCall.await();
517+
} catch (InterruptedException e) {
518+
LOG.debug("InterruptedException occurred inCall.");
519+
interruptedException.set(e);
520+
throw e;
521+
}
456522
}
457523

458524
/*

0 commit comments

Comments
 (0)