Skip to content

Commit aa5fe6f

Browse files
authored
YARN-10058. Handle uncaught exception for async-scheduling threads to prevent scheduler hangs (#7129). Contributed by Tao Yang.
Reviewed-by: Syed Shameerur Rahman <[email protected]> Signed-off-by: Shilun Fan <[email protected]> Signed-off-by: He Xiaoqiao <[email protected]>
1 parent 58896f0 commit aa5fe6f

File tree

3 files changed

+90
-1
lines changed

3 files changed

+90
-1
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.atomic.AtomicBoolean;
3737

3838
import org.apache.commons.lang3.StringUtils;
39+
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
3940
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
4041
import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule;
4142
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
@@ -3543,7 +3544,10 @@ static class AsyncSchedulingConfiguration {
35433544

35443545
this.asyncSchedulerThreads = new ArrayList<>();
35453546
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
3546-
asyncSchedulerThreads.add(new AsyncScheduleThread(cs));
3547+
AsyncScheduleThread ast = new AsyncScheduleThread(cs);
3548+
ast.setUncaughtExceptionHandler(
3549+
new RMCriticalThreadUncaughtExceptionHandler(cs.rmContext));
3550+
asyncSchedulerThreads.add(ast);
35473551
}
35483552
this.resourceCommitterService = new ResourceCommitterService(cs);
35493553
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java

+52
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.hadoop.ha.HAServiceProtocol;
2222
import org.apache.hadoop.security.UserGroupInformation;
23+
import org.apache.hadoop.test.GenericTestUtils;
2324
import org.apache.hadoop.yarn.conf.YarnConfiguration;
2425
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
2526
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -135,6 +136,57 @@ public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
135136
rm2.stop();
136137
}
137138

139+
@Test(timeout = 30000)
140+
public void testAsyncScheduleThreadExit() throws Exception {
141+
// start two RMs, and transit rm1 to active, rm2 to standby
142+
startRMs();
143+
// register NM
144+
rm1.registerNode("192.1.1.1:1234", 8192, 8);
145+
rm1.drainEvents();
146+
147+
// make sure async-scheduling thread is correct at beginning
148+
checkAsyncSchedulerThreads(Thread.currentThread());
149+
150+
// test async-scheduling thread exit
151+
try{
152+
// set resource calculator to be null to simulate
153+
// NPE in async-scheduling thread
154+
CapacityScheduler cs =
155+
(CapacityScheduler) rm1.getRMContext().getScheduler();
156+
cs.setResourceCalculator(null);
157+
158+
// wait for rm1 to be transitioned to standby
159+
GenericTestUtils.waitFor(() -> rm1.getRMContext().getHAServiceState()
160+
== HAServiceProtocol.HAServiceState.STANDBY, 100, 5000);
161+
162+
// failover rm2 to rm1
163+
HAServiceProtocol.StateChangeRequestInfo requestInfo =
164+
new HAServiceProtocol.StateChangeRequestInfo(
165+
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
166+
rm2.adminService.transitionToStandby(requestInfo);
167+
GenericTestUtils.waitFor(() -> {
168+
try {
169+
// this call may fail when rm1 is still initializing
170+
// in StandByTransitionRunnable thread
171+
rm1.adminService.transitionToActive(requestInfo);
172+
return true;
173+
} catch (Exception e) {
174+
return false;
175+
}
176+
}, 100, 3000);
177+
178+
// wait for rm1 to be transitioned to active again
179+
GenericTestUtils.waitFor(() -> rm1.getRMContext().getHAServiceState()
180+
== HAServiceProtocol.HAServiceState.ACTIVE, 100, 5000);
181+
182+
// make sure async-scheduling thread is correct after failover
183+
checkAsyncSchedulerThreads(Thread.currentThread());
184+
} finally {
185+
rm1.stop();
186+
rm2.stop();
187+
}
188+
}
189+
138190
private RMApp submitAppAndCheckLaunched(MockRM rm) throws Exception {
139191
MockRMAppSubmissionData data =
140192
MockRMAppSubmissionData.Builder.createWithMemory(200, rm)

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java

+33
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
2020

21+
import org.apache.hadoop.test.GenericTestUtils;
2122
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
2223
import org.apache.hadoop.conf.Configuration;
2324
import org.apache.hadoop.yarn.api.records.Container;
@@ -67,6 +68,7 @@
6768
import org.junit.Assert;
6869
import org.junit.Before;
6970
import org.junit.Test;
71+
import org.junit.contrib.java.lang.system.internal.NoExitSecurityManager;
7072
import org.mockito.Mockito;
7173
import org.mockito.invocation.InvocationOnMock;
7274
import org.mockito.stubbing.Answer;
@@ -1072,6 +1074,37 @@ public Boolean answer(InvocationOnMock invocation) throws Exception {
10721074
rm.stop();
10731075
}
10741076

1077+
@Test(timeout = 30000)
1078+
public void testAsyncScheduleThreadExit() throws Exception {
1079+
// init RM & NM
1080+
final MockRM rm = new MockRM(conf);
1081+
rm.start();
1082+
rm.registerNode("192.168.0.1:1234", 8 * GB);
1083+
rm.drainEvents();
1084+
1085+
// Set no exit security manager to catch System.exit
1086+
SecurityManager originalSecurityManager = System.getSecurityManager();
1087+
NoExitSecurityManager noExitSecurityManager =
1088+
new NoExitSecurityManager(originalSecurityManager);
1089+
System.setSecurityManager(noExitSecurityManager);
1090+
1091+
// test async-scheduling thread exit
1092+
try{
1093+
// set resource calculator to be null to simulate
1094+
// NPE in async-scheduling thread
1095+
CapacityScheduler cs =
1096+
(CapacityScheduler) rm.getRMContext().getScheduler();
1097+
cs.setResourceCalculator(null);
1098+
1099+
// wait for RM to be shutdown until timeout
1100+
GenericTestUtils.waitFor(noExitSecurityManager::isCheckExitCalled,
1101+
100, 5000);
1102+
} finally {
1103+
System.setSecurityManager(originalSecurityManager);
1104+
rm.stop();
1105+
}
1106+
}
1107+
10751108
private ResourceCommitRequest createAllocateFromReservedProposal(
10761109
int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp,
10771110
SchedulerNode allocateNode, SchedulerNode reservedNode,

0 commit comments

Comments
 (0)