Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11781. Implement Dynamic Requests Handling in CapacityScheduler #7448

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ public Configuration getConf() {

private CSMaxRunningAppsEnforcer maxRunningEnforcer;

private RequestsHandler requestsHandler;

public CapacityScheduler() {
super(CapacityScheduler.class.getName());
this.maxRunningEnforcer = new CSMaxRunningAppsEnforcer(this);
Expand Down Expand Up @@ -328,6 +330,7 @@ void initScheduler(Configuration configuration) throws
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();

initMultiNodePlacement();
initRequestsHandler();
printSchedulerInitialized();
} finally {
writeLock.unlock();
Expand Down Expand Up @@ -455,8 +458,15 @@ public void reinitialize(Configuration newConf, RMContext rmContext,
refreshMaximumAllocation(
ResourceUtils.fetchMaximumAllocationFromConfig(this.conf));
reinitializeQueues(this.conf);
reinitRequestsHandler(this.conf);
} catch (Throwable t) {
this.conf = oldConf;
try {
reinitRequestsHandler(this.conf);
} catch (Throwable innerT) {
LOG.error("Failed to re-init requests handler : {}",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the basic functions be affected if the initialization fails?

innerT.getMessage(), innerT);
}
reinitializeQueues(this.conf);
refreshMaximumAllocation(
ResourceUtils.fetchMaximumAllocationFromConfig(this.conf));
Expand Down Expand Up @@ -1337,6 +1347,26 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
return EMPTY_ALLOCATION;
}

// Handle requests
long requestsHandleStartTime = System.currentTimeMillis();
RequestsHandleResponse handleResponse =
handleRequests(application, ask, schedulingRequests);
long requestsHandleElapsedMs =
System.currentTimeMillis() - requestsHandleStartTime;
CapacitySchedulerMetrics.getMetrics().addRequestsHandle(
requestsHandleElapsedMs);
if (handleResponse != null && handleResponse.isUpdated()) {
LOG.info("Updated requests: appId={}, elapsedMs={}\n" +
"ResourceRequests: origin={}, updated={}\n" +
"SchedulingRequests: origin={}, updated={}",
applicationAttemptId.getApplicationId(),
requestsHandleElapsedMs,
ask, handleResponse.getResourceRequests(),
schedulingRequests, handleResponse.getSchedulingRequests());
ask = handleResponse.getResourceRequests();
schedulingRequests = handleResponse.getSchedulingRequests();
}

// Handle all container updates
handleContainerUpdates(application, updateRequests);

Expand Down Expand Up @@ -3642,4 +3672,23 @@ public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) {
}
}
}

/**
* initialize / reinitialize / handleRequests methods for RequestsHandler.
*/
private void initRequestsHandler() throws IOException, YarnException {
requestsHandler = new RequestsHandler(rmContext);
reinitRequestsHandler(this.conf);
}

private void reinitRequestsHandler(Configuration newConf)
throws IOException, YarnException {
requestsHandler.initialize(newConf);
}

protected RequestsHandleResponse handleRequests(FiCaSchedulerApp app,
List<ResourceRequest> resourceRequests,
List<SchedulingRequest> schedulingRequests) {
return requestsHandler.handle(app, resourceRequests, schedulingRequests);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String MAPPING_RULE_FORMAT_DEFAULT =
MAPPING_RULE_FORMAT_LEGACY;

public static final String REQUEST_HANDLER_PREFIX =
PREFIX + "request-handler.";

public static final String REQUEST_HANDLER_ENABLED =
REQUEST_HANDLER_PREFIX + "enabled";

public static final boolean DEFAULT_REQUEST_HANDLER_ENABLED = false;

public static final String REQUEST_HANDLER_UPDATES =
REQUEST_HANDLER_PREFIX + "updates";

private static final QueueCapacityConfigParser queueCapacityConfigParser
= new QueueCapacityConfigParser();
private static final String LEGACY_QUEUE_MODE_ENABLED = PREFIX + "legacy-queue-mode.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class CapacitySchedulerMetrics {
@Metric("Scheduler node update") MutableRate nodeUpdate;
@Metric("Scheduler node heartbeat interval") MutableQuantiles
schedulerNodeHBInterval;
@Metric("Requests handle")
private MutableRate requestsHandle;

private static volatile CapacitySchedulerMetrics INSTANCE = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -128,4 +130,13 @@ public void addSchedulerNodeHBInterval(long heartbeatInterval) {
public long getNumOfSchedulerNodeHBInterval() {
return this.schedulerNodeHBInterval.getEstimator().getCount();
}

public void addRequestsHandle(long latency) {
this.requestsHandle.add(latency);
}

@VisibleForTesting
public MutableRate getRequestsHandle() {
return requestsHandle;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;

import java.util.List;

/**
* Response of the handle method in {@link RequestsHandler}.
*/
public class RequestsHandleResponse {

private final boolean isUpdated;
private final List<ResourceRequest> resourceRequests;
private final List<SchedulingRequest> schedulingRequests;

public RequestsHandleResponse(boolean isUpdated,
List<ResourceRequest> resourceRequests,
List<SchedulingRequest> schedulingRequests) {
this.isUpdated = isUpdated;
this.resourceRequests = resourceRequests;
this.schedulingRequests = schedulingRequests;
}

public List<ResourceRequest> getResourceRequests() {
return resourceRequests;
}

public List<SchedulingRequest> getSchedulingRequests() {
return schedulingRequests;
}

public boolean isUpdated() {
return isUpdated;
}
}
Loading