Skip to content

Commit

Permalink
YARN-11781. Implement Dynamic Requests Handling in CapacityScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
yangtao.yt committed Mar 1, 2025
1 parent 89f9c5a commit b204aa9
Show file tree
Hide file tree
Showing 8 changed files with 972 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ public Configuration getConf() {

private CSMaxRunningAppsEnforcer maxRunningEnforcer;

private RequestsHandler requestsHandler;

public CapacityScheduler() {
super(CapacityScheduler.class.getName());
this.maxRunningEnforcer = new CSMaxRunningAppsEnforcer(this);
Expand Down Expand Up @@ -328,6 +330,7 @@ void initScheduler(Configuration configuration) throws
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();

initMultiNodePlacement();
initRequestsHandler();
printSchedulerInitialized();
} finally {
writeLock.unlock();
Expand Down Expand Up @@ -455,8 +458,15 @@ public void reinitialize(Configuration newConf, RMContext rmContext,
refreshMaximumAllocation(
ResourceUtils.fetchMaximumAllocationFromConfig(this.conf));
reinitializeQueues(this.conf);
reinitRequestsHandler(this.conf);
} catch (Throwable t) {
this.conf = oldConf;
try {
reinitRequestsHandler(this.conf);
} catch (Throwable innerT) {
LOG.error("Failed to re-init requests handler : {}",
innerT.getMessage(), innerT);
}
reinitializeQueues(this.conf);
refreshMaximumAllocation(
ResourceUtils.fetchMaximumAllocationFromConfig(this.conf));
Expand Down Expand Up @@ -1337,6 +1347,26 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
return EMPTY_ALLOCATION;
}

// Handle requests
long requestsHandleStartTime = System.currentTimeMillis();
RequestsHandleResponse handleResponse =
handleRequests(application, ask, schedulingRequests);
long requestsHandleElapsedMs =
System.currentTimeMillis() - requestsHandleStartTime;
CapacitySchedulerMetrics.getMetrics().addRequestsHandle(
requestsHandleElapsedMs);
if (handleResponse != null && handleResponse.isUpdated()) {
LOG.info("Updated requests: appId={}, elapsedMs={}\n" +
"ResourceRequests: origin={}, updated={}\n" +
"SchedulingRequests: origin={}, updated={}",
applicationAttemptId.getApplicationId(),
requestsHandleElapsedMs,
ask, handleResponse.getResourceRequests(),
schedulingRequests, handleResponse.getSchedulingRequests());
ask = handleResponse.getResourceRequests();
schedulingRequests = handleResponse.getSchedulingRequests();
}

// Handle all container updates
handleContainerUpdates(application, updateRequests);

Expand Down Expand Up @@ -3642,4 +3672,23 @@ public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) {
}
}
}

/**
* initialize / reinitialize / handleRequests methods for RequestsHandler.
*/
private void initRequestsHandler() throws IOException, YarnException {
requestsHandler = new RequestsHandler(rmContext);
reinitRequestsHandler(this.conf);
}

private void reinitRequestsHandler(Configuration conf)
throws IOException, YarnException {
requestsHandler.initialize(conf);
}

protected RequestsHandleResponse handleRequests(FiCaSchedulerApp app,
List<ResourceRequest> resourceRequests,
List<SchedulingRequest> schedulingRequests) {
return requestsHandler.handle(app, resourceRequests, schedulingRequests);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String MAPPING_RULE_FORMAT_DEFAULT =
MAPPING_RULE_FORMAT_LEGACY;

public static final String REQUEST_HANDLER_PREFIX =
PREFIX + "request-handler.";

public static final String REQUEST_HANDLER_ENABLED =
REQUEST_HANDLER_PREFIX + "enabled";

public static final boolean DEFAULT_REQUEST_HANDLER_ENABLED = false;

public static final String REQUEST_HANDLER_UPDATES =
REQUEST_HANDLER_PREFIX + "updates";

private static final QueueCapacityConfigParser queueCapacityConfigParser
= new QueueCapacityConfigParser();
private static final String LEGACY_QUEUE_MODE_ENABLED = PREFIX + "legacy-queue-mode.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class CapacitySchedulerMetrics {
@Metric("Scheduler node update") MutableRate nodeUpdate;
@Metric("Scheduler node heartbeat interval") MutableQuantiles
schedulerNodeHBInterval;
@Metric("Requests handle") MutableRate requestsHandle;

private static volatile CapacitySchedulerMetrics INSTANCE = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -128,4 +129,13 @@ public void addSchedulerNodeHBInterval(long heartbeatInterval) {
public long getNumOfSchedulerNodeHBInterval() {
return this.schedulerNodeHBInterval.getEstimator().getCount();
}

public void addRequestsHandle(long latency) {
this.requestsHandle.add(latency);
}

@VisibleForTesting
public long getNumOfRequestsHandle() {
return this.requestsHandle.lastStat().numSamples();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

Check failure on line 1 in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandleResponse.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandleResponse.java#L1

asflicense: Missing Apache License

import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;

import java.util.List;

/**
* Response of the handle method in {@link RequestsHandler}.
*/
public class RequestsHandleResponse {

private final boolean isUpdated;
private final List<ResourceRequest> resourceRequests;
private final List<SchedulingRequest> schedulingRequests;

public RequestsHandleResponse(boolean isUpdated,
List<ResourceRequest> resourceRequests,
List<SchedulingRequest> schedulingRequests) {
this.isUpdated = isUpdated;
this.resourceRequests = resourceRequests;
this.schedulingRequests = schedulingRequests;
}

public List<ResourceRequest> getResourceRequests() {
return resourceRequests;
}

public List<SchedulingRequest> getSchedulingRequests() {
return schedulingRequests;
}

public boolean isUpdated() {
return isUpdated;
}
}
Loading

0 comments on commit b204aa9

Please sign in to comment.