diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 02ffe83a6df7c..11c508414d047 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -240,6 +240,8 @@ public Configuration getConf() { private CSMaxRunningAppsEnforcer maxRunningEnforcer; + private RequestsHandler requestsHandler; + public CapacityScheduler() { super(CapacityScheduler.class.getName()); this.maxRunningEnforcer = new CSMaxRunningAppsEnforcer(this); @@ -328,6 +330,7 @@ void initScheduler(Configuration configuration) throws offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); initMultiNodePlacement(); + initRequestsHandler(); printSchedulerInitialized(); } finally { writeLock.unlock(); @@ -455,8 +458,15 @@ public void reinitialize(Configuration newConf, RMContext rmContext, refreshMaximumAllocation( ResourceUtils.fetchMaximumAllocationFromConfig(this.conf)); reinitializeQueues(this.conf); + reinitRequestsHandler(this.conf); } catch (Throwable t) { this.conf = oldConf; + try { + reinitRequestsHandler(this.conf); + } catch (Throwable innerT) { + LOG.error("Failed to re-init requests handler : {}", + innerT.getMessage(), innerT); + } reinitializeQueues(this.conf); refreshMaximumAllocation( ResourceUtils.fetchMaximumAllocationFromConfig(this.conf)); @@ -1337,6 +1347,26 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, return EMPTY_ALLOCATION; } + // Handle requests + long requestsHandleStartTime = System.currentTimeMillis(); + RequestsHandleResponse handleResponse = + handleRequests(application, ask, schedulingRequests); + long requestsHandleElapsedMs = + System.currentTimeMillis() - requestsHandleStartTime; + CapacitySchedulerMetrics.getMetrics().addRequestsHandle( + requestsHandleElapsedMs); + if (handleResponse != null && handleResponse.isUpdated()) { + LOG.info("Updated requests: appId={}, elapsedMs={}\n" + + "ResourceRequests: origin={}, updated={}\n" + + "SchedulingRequests: origin={}, updated={}", + applicationAttemptId.getApplicationId(), + requestsHandleElapsedMs, + ask, handleResponse.getResourceRequests(), + schedulingRequests, handleResponse.getSchedulingRequests()); + ask = handleResponse.getResourceRequests(); + schedulingRequests = handleResponse.getSchedulingRequests(); + } + // Handle all container updates handleContainerUpdates(application, updateRequests); @@ -3642,4 +3672,23 @@ public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) { } } } + + /** + * initialize / reinitialize / handleRequests methods for RequestsHandler. + */ + private void initRequestsHandler() throws IOException, YarnException { + requestsHandler = new RequestsHandler(rmContext); + reinitRequestsHandler(this.conf); + } + + private void reinitRequestsHandler(Configuration newConf) + throws IOException, YarnException { + requestsHandler.initialize(newConf); + } + + protected RequestsHandleResponse handleRequests(FiCaSchedulerApp app, + List resourceRequests, + List schedulingRequests) { + return requestsHandler.handle(app, resourceRequests, schedulingRequests); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index ea5c892ce3e5b..c9442f37afd49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -430,6 +430,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final String MAPPING_RULE_FORMAT_DEFAULT = MAPPING_RULE_FORMAT_LEGACY; + public static final String REQUEST_HANDLER_PREFIX = + PREFIX + "request-handler."; + + public static final String REQUEST_HANDLER_ENABLED = + REQUEST_HANDLER_PREFIX + "enabled"; + + public static final boolean DEFAULT_REQUEST_HANDLER_ENABLED = false; + + public static final String REQUEST_HANDLER_UPDATES = + REQUEST_HANDLER_PREFIX + "updates"; + private static final QueueCapacityConfigParser queueCapacityConfigParser = new QueueCapacityConfigParser(); private static final String LEGACY_QUEUE_MODE_ENABLED = PREFIX + "legacy-queue-mode.enabled"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java index 6277290246b38..d500e2739c1d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java @@ -52,6 +52,8 @@ public class CapacitySchedulerMetrics { @Metric("Scheduler node update") MutableRate nodeUpdate; @Metric("Scheduler node heartbeat interval") MutableQuantiles schedulerNodeHBInterval; + @Metric("Requests handle") + private MutableRate requestsHandle; private static volatile CapacitySchedulerMetrics INSTANCE = null; private static MetricsRegistry registry; @@ -128,4 +130,13 @@ public void addSchedulerNodeHBInterval(long heartbeatInterval) { public long getNumOfSchedulerNodeHBInterval() { return this.schedulerNodeHBInterval.getEstimator().getCount(); } + + public void addRequestsHandle(long latency) { + this.requestsHandle.add(latency); + } + + @VisibleForTesting + public MutableRate getRequestsHandle() { + return requestsHandle; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandleResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandleResponse.java new file mode 100644 index 0000000000000..38941521a3e46 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandleResponse.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; + +import java.util.List; + +/** + * Response of the handle method in {@link RequestsHandler}. + */ +public class RequestsHandleResponse { + + private final boolean isUpdated; + private final List resourceRequests; + private final List schedulingRequests; + + public RequestsHandleResponse(boolean isUpdated, + List resourceRequests, + List schedulingRequests) { + this.isUpdated = isUpdated; + this.resourceRequests = resourceRequests; + this.schedulingRequests = schedulingRequests; + } + + public List getResourceRequests() { + return resourceRequests; + } + + public List getSchedulingRequests() { + return schedulingRequests; + } + + public boolean isUpdated() { + return isUpdated; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandler.java new file mode 100644 index 0000000000000..1137ea5ee0b97 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandler.java @@ -0,0 +1,558 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; + +import javax.script.Bindings; +import javax.script.Compilable; +import javax.script.CompiledScript; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; + +/** + * RequestHandler is used to handle requests from applications, + * It handles requests at the beginning of CapacityScheduler#allocate, + * and manages multiple update items which define which requests + * should be chosen and how to update them. based on the capacity-scheduler + * configuration and can be updated dynamically without restarting the RM. + */ +public class RequestsHandler { + + protected static final Logger LOG = + LoggerFactory.getLogger(RequestsHandler.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final ScriptEngine SCRIPT_ENGINE = + new ScriptEngineManager().getEngineByName("JavaScript"); + + private final RMContext rmContext; + private final ReentrantReadWriteLock.WriteLock writeLock; + private final ReentrantReadWriteLock.ReadLock readLock; + + private boolean enabled = false; + + private List updateItems; + + // current updates conf value for comparing + private String updatesConfV; + + public RequestsHandler(RMContext rmContext) { + this.rmContext = rmContext; + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + writeLock = lock.writeLock(); + readLock = lock.readLock(); + } + + public void initialize(Configuration conf) + throws IOException, YarnException { + if (SCRIPT_ENGINE == null) { + // disabled if script engine is not found + LOG.warn("Disabled RequestsHandler since script engine not found"); + return; + } + boolean newEnabled = + conf.getBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + CapacitySchedulerConfiguration.DEFAULT_REQUEST_HANDLER_ENABLED); + List newUpdateItems = null; + String newUpdatesConfV = null; + if (newEnabled) { + newUpdatesConfV = + conf.get(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES); + UpdatesConf newUpdatesConf = null; + if (newUpdatesConfV != null && !newUpdatesConfV.isEmpty()) { + newUpdatesConf = + OBJECT_MAPPER.readValue(newUpdatesConfV, UpdatesConf.class); + } + if (newUpdatesConf != null && newUpdatesConf.getItems() != null && + !newUpdatesConf.getItems().isEmpty()) { + newUpdateItems = new ArrayList<>(); + for (UpdateItemConf updateItemConf : newUpdatesConf.getItems()) { + newUpdateItems.add(new UpdateItem(updateItemConf)); + } + } + } + // update + writeLock.lock(); + try{ + if (enabled == newEnabled && + StringUtils.equals(newUpdatesConfV, updatesConfV)) { + LOG.info("No changes detected in RequestsHandler configuration," + + " enabled={}, updatesConf={}", enabled, updatesConfV); + return; + } + enabled = newEnabled; + updateItems = newUpdateItems; + updatesConfV = newUpdatesConfV; + LOG.info("Initialized request updater, enabled={}, updatesConf={}", + enabled, updatesConfV); + } finally { + writeLock.unlock(); + } + } + + public RequestsHandleResponse handle(FiCaSchedulerApp app, + List resourceRequests, + List schedulingRequests) { + readLock.lock(); + try { + if (!enabled || updateItems == null || updateItems.isEmpty()) { + return null; + } + RMApp rmApp = rmContext.getRMApps().get(app.getApplicationId()); + if (rmApp == null) { + LOG.warn("Failed to handle request for not-found app {}", + app.getApplicationId()); + return null; + } + return updateRequests(app, rmApp, resourceRequests, schedulingRequests); + } finally { + readLock.unlock(); + } + } + + /** + * UpdatesConf is the root object of the configuration. + */ + public static class UpdatesConf { + + @JsonProperty("items") + private List items; + + public List getItems() { + return items; + } + + public void setItems(List items) { + this.items = items; + } + } + + public static class UpdateItemConf { + + @JsonProperty("appMatchExpr") + private String appMatchExpr; + + @JsonProperty("requestMatchExpr") + private String requestMatchExpr; + + // whether to convert ResourceRequest to SchedulingRequest + @JsonProperty("isRRToSR") + private boolean isRRToSR; + + @JsonProperty("partition") + private String partition; + + @JsonProperty("executionType") + private String executionType; + + @JsonProperty("allocationTags") + private Set allocationTags; + + @JsonProperty("placementConstraint") + private String placementConstraint; + + public String getAppMatchExpr() { + return appMatchExpr; + } + + public void setAppMatchExpr(String appMatchExpr) { + this.appMatchExpr = appMatchExpr; + } + + public String getRequestMatchExpr() { + return requestMatchExpr; + } + + public void setRequestMatchExpr(String requestMatchExpr) { + this.requestMatchExpr = requestMatchExpr; + } + + public boolean isRRToSR() { + return isRRToSR; + } + + public void setIsRRToSR(boolean isRRToSR) { + this.isRRToSR = isRRToSR; + } + + public String getPartition() { + return partition; + } + + public void setPartition(String partition) { + this.partition = partition; + } + + public Set getAllocationTags() { + return allocationTags; + } + + public void setAllocationTags(Set allocationTags) { + this.allocationTags = allocationTags; + } + + public String getPlacementConstraint() { + return placementConstraint; + } + + public void setPlacementConstraint(String placementConstraint) { + this.placementConstraint = placementConstraint; + } + + public String getExecutionType() { + return executionType; + } + + public void setExecutionType(String executionType) { + this.executionType = executionType; + } + + public String toString() { + return "{" + + "appMatchExpr='" + appMatchExpr + '\'' + + ", requestMatchExpr='" + requestMatchExpr + '\'' + + ", isRRToSR=" + isRRToSR + + ", partition='" + partition + '\'' + + ", executionType='" + executionType + '\'' + + ", allocationTags=" + allocationTags + + ", placementConstraint='" + placementConstraint + '\'' + + '}'; + } + } + + public RequestsHandleResponse updateRequests( + FiCaSchedulerApp app, RMApp rmApp, + List resourceRequests, + List schedulingRequests) { + boolean isUpdated = false; + for (UpdateItem updateItem : updateItems) { + if (!updateItem.isAppMatch(app.getApplicationId(), () -> convertToAppInfo(app, rmApp))) { + continue; + } + RequestsHandleResponse resp = updateItem.updateRequests( + app.getApplicationId(), resourceRequests, schedulingRequests); + if (resp.isUpdated()) { + isUpdated = true; + if (LOG.isDebugEnabled()) { + LOG.debug( + "Updated requests: appId={}, updateItemConf={}, RR={}, SR={}", + app.getApplicationId(), updateItem.updateItemConf.toString(), + resp.getResourceRequests(), resp.getSchedulingRequests()); + } + } + resourceRequests = resp.getResourceRequests(); + schedulingRequests = resp.getSchedulingRequests(); + } + return new RequestsHandleResponse(isUpdated, resourceRequests, + schedulingRequests); + } + + @VisibleForTesting + public boolean isEnabled() { + return enabled; + } + + @VisibleForTesting + public List getUpdateItems() { + return updateItems; + } + + public static class UpdateItem { + + private CompiledScript appMatchScript; + private CompiledScript requestMatchScript; + private PlacementConstraint placementConstraint; + private ExecutionType executionType; + private final UpdateItemConf updateItemConf; + + public UpdateItem(UpdateItemConf updateItemConf) throws YarnException { + // compile app/request match-scripts + if (updateItemConf.getAppMatchExpr() != null) { + try { + appMatchScript = ((Compilable) SCRIPT_ENGINE).compile( + updateItemConf.getAppMatchExpr()); + } catch (ScriptException e) { + throw new YarnException("Failed to compile app match expression: " + + updateItemConf.getAppMatchExpr(), e); + } + } + if (updateItemConf.getRequestMatchExpr() != null) { + try { + requestMatchScript = ((Compilable) SCRIPT_ENGINE).compile( + updateItemConf.getRequestMatchExpr()); + } catch (ScriptException e) { + throw new YarnException("Failed to compile request match expression: " + + updateItemConf.getRequestMatchExpr(), e); + } + } + // parse execution type + if (updateItemConf.getExecutionType() != null) { + try{ + executionType = ExecutionType.valueOf(updateItemConf.getExecutionType()); + } catch (IllegalArgumentException e) { + throw new YarnException("Failed to parse execution-type: " + + updateItemConf.getExecutionType(), e); + } + } + // parse placement constraint + if (updateItemConf.getPlacementConstraint() != null) { + try { + PlacementConstraint.AbstractConstraint absConstraint = + PlacementConstraintParser.parseExpression( + updateItemConf.getPlacementConstraint()); + placementConstraint = new PlacementConstraint(absConstraint); + } catch (PlacementConstraintParseException e) { + throw new YarnException("Failed to parse placement-constraint: " + + updateItemConf.getPlacementConstraint(), e); + } + } + // include updateItemConf + this.updateItemConf = updateItemConf; + } + + public boolean isAppMatch(ApplicationId appId, + Supplier> infoSupplier) { + if (appMatchScript == null) { + return true; + } + Map info = infoSupplier.get(); + try { + Bindings bindings = SCRIPT_ENGINE.createBindings(); + bindings.putAll(info); + Boolean isMatched = (Boolean) appMatchScript.eval(bindings); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Check app: appId={}, isMatched={}, appInfo={}, appMatchExpr={}", + appId, isMatched, info, updateItemConf.getAppMatchExpr()); + } + return isMatched; + } catch (Exception e) { + LOG.error( + "Failed to evaluate app-match-expr: appId={}, appMatchExpr={}", + appId, updateItemConf.getAppMatchExpr(), e); + return false; + } + } + + public boolean isRequestMatch(ApplicationId appId, + Supplier> infoSupplier) { + if (requestMatchScript == null) { + return true; + } + Map info = infoSupplier.get(); + try { + Bindings bindings = SCRIPT_ENGINE.createBindings(); + bindings.putAll(info); + Boolean isMatched = (Boolean) requestMatchScript.eval(bindings); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Check request: appId={}, isMatched={}, reqInfo={}, requestMatchExpr={}", + appId, isMatched, info, updateItemConf.getRequestMatchExpr()); + } + return isMatched; + } catch (Exception e) { + LOG.error("Failed to evaluate request-filter-expression: {}", + updateItemConf.getRequestMatchExpr(), e); + return false; + } + } + + private RequestsHandleResponse updateRequests( + ApplicationId appId, + List resourceRequests, + List schedulingRequests) { + boolean isUpdated = false; + // when isRRToSR is true, convert to SR at first + if (resourceRequests != null && !resourceRequests.isEmpty() && + updateItemConf.isRRToSR) { + schedulingRequests = resourceRequests.stream() + .map(UpdateItem::convertToSchedulingRequest) + .collect(Collectors.toList()); + resourceRequests = null; + if (LOG.isDebugEnabled()) { + LOG.debug("Converted to scheduling requests: appId={}, sr={}", + appId, schedulingRequests); + } + isUpdated = true; + } + // update resource requests + if (resourceRequests != null) { + for (ResourceRequest rr: resourceRequests) { + if (!isRequestMatch(appId, () -> convertToRequestInfo(rr))) { + continue; + } + updateResourceRequest(appId, rr); + isUpdated = true; + } + } + // update scheduling requests + if (schedulingRequests != null) { + for (SchedulingRequest sr: schedulingRequests) { + if (!isRequestMatch(appId, () -> convertToRequestInfo(sr))) { + continue; + } + updateSchedulingRequest(appId, sr); + isUpdated = true; + } + } + return new RequestsHandleResponse(isUpdated, resourceRequests, + schedulingRequests); + } + + /** + * Covert resource request to scheduling request. + * @param resourceRequest - resource request + * @return scheduling request + */ + public static SchedulingRequest convertToSchedulingRequest( + ResourceRequest resourceRequest) { + if (resourceRequest == null) { + return SchedulingRequest.newBuilder().build(); + } + // Compatible with Hadoop2.x + // whose default value of execution-type-request is null + ExecutionTypeRequest executionTypeRequest = + resourceRequest.getExecutionTypeRequest(); + if (executionTypeRequest == null) { + executionTypeRequest = ExecutionTypeRequest.newInstance(); + } + SchedulingRequest sr = SchedulingRequest.newBuilder() + .executionType(executionTypeRequest) + .allocationRequestId(resourceRequest.getAllocationRequestId()) + .priority(resourceRequest.getPriority()) + .resourceSizing(ResourceSizing.newInstance( + resourceRequest.getNumContainers(), + resourceRequest.getCapability())).build(); + if (resourceRequest.getNodeLabelExpression() != null) { + PlacementConstraint constraint = + PlacementConstraints.targetNodeAttribute(NODE, + NodeAttributeOpCode.EQ, + PlacementConstraints.PlacementTargets.nodePartition( + resourceRequest.getNodeLabelExpression())).build(); + sr.setPlacementConstraint(constraint); + } + return sr; + } + + private void updateResourceRequest(ApplicationId appId, + ResourceRequest rr) { + if (LOG.isDebugEnabled()) { + LOG.debug("Before updating resource request, appId={}, RR={}, conf={}", + appId, rr, updateItemConf.toString()); + } + if (updateItemConf.partition != null) { + rr.setNodeLabelExpression(updateItemConf.partition); + } + if (executionType != null) { + rr.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(executionType)); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Done updating resource request, appId={}, RR={}", appId, rr); + } + } + + private void updateSchedulingRequest(ApplicationId appId, + SchedulingRequest sr) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Before updating scheduling request, appId={}, SR={}, conf={}", + appId, sr, updateItemConf.toString()); + } + if (executionType != null) { + sr.setExecutionType(ExecutionTypeRequest.newInstance(executionType)); + } + if (placementConstraint != null) { + sr.setPlacementConstraint(placementConstraint); + } + if (updateItemConf.allocationTags != null) { + sr.setAllocationTags(updateItemConf.allocationTags); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Done updating scheduling request, appId={}, SR={}", appId, + sr); + } + } + } + + private Map convertToAppInfo(FiCaSchedulerApp app, RMApp rmApp) { + return ImmutableMap.of("queue", app.getQueueName(), + "user", app.getUser(), + "priority", app.getPriority() == null ? + 0 : app.getPriority().getPriority(), + "name", rmApp.getName(), + "type", rmApp.getApplicationType(), + "tags", rmApp.getApplicationTags(), + "isWaitingForAM", app.isWaitingForAMContainer()); + } + + private static Map convertToRequestInfo(ResourceRequest rr) { + return ImmutableMap.of("priority", + rr.getPriority() == null ? 0 : rr.getPriority().getPriority(), + "resourceName", rr.getResourceName(), + "relaxLocality", rr.getRelaxLocality()); + } + + private static Map convertToRequestInfo(SchedulingRequest sr) { + return ImmutableMap.of("priority", + sr.getPriority() == null ? 0 : sr.getPriority().getPriority(), + "executionType", sr.getExecutionType() == null ? + "" : + (sr.getExecutionType().getExecutionType() == null ? + "" : + sr.getExecutionType().getExecutionType().name()), + "allocationTags", sr.getAllocationTags()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java index 99b3983f863eb..057ae455aec73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java @@ -80,6 +80,7 @@ public RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals(0, csMetrics.getNumOfAllocates()); Assert.assertEquals(0, csMetrics.getNumOfCommitSuccess()); + Assert.assertEquals(0, csMetrics.getRequestsHandle().lastStat().numSamples()); RMApp rmApp = MockRMAppSubmitter.submit(rm, MockRMAppSubmissionData.Builder.createWithMemory(1024, rm) @@ -97,6 +98,9 @@ public RMNodeLabelsManager createNodeLabelManager() { am.registerAppAttempt(); am.allocate("*", 1024, 1, new ArrayList<>()); + Assert.assertTrue( + csMetrics.getRequestsHandle().lastStat().numSamples() > 0); + nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerRequestsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerRequestsHandler.java new file mode 100644 index 0000000000000..0e3667bd062d7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerRequestsHandler.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.junit.Test; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.toSet; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.getConfigurationWithQueueLabels; + +public class TestCapacitySchedulerRequestsHandler { + + /** + * Simple e2e verification for requests-handler. + * - requests-handler can be enabled dynamically via reinitializing scheduler + * - partition will be updated for the matched app and request + */ + @Test + public void testResourcesHandlerSimpleCase() throws Exception { + Configuration conf = new Configuration(false); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + conf = getConfigurationWithQueueLabels(conf); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of( + NodeId.newInstance("h1", 0), toSet("x"))); + + MockRM rm = new MockRM(conf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.start(); + + MockNM nm1 = // label = x + new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + MockNM nm2 = // label = "" + new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService()); + nm2.registerNode(); + + // Launch app1 in queue=a1 + MockRMAppSubmissionData data1 = + MockRMAppSubmissionData.Builder.createWithMemory(GB, rm) + .withAppName("app1").withUser("root").withAcls(null) + .withQueue("a1").withUnmanagedAM(false).build(); + RMApp app1 = MockRMAppSubmitter.submit(rm, data1); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + // am1 asks for a container with no label which will be allocated on nm2 + am1.allocate("*", GB, 1, 10, new ArrayList<>(), ""); + ContainerId containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm.waitForState(nm2, containerId, RMContainerState.ALLOCATED); + + // refresh conf with requests-handler enabled + // partition will be updated to 'x' for apps in a1 queue + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, "true"); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[{\"appMatchExpr\":\"queue=='a1'\"," + + "\"requestMatchExpr\":\"priority>5\"," + + " \"partition\":\"x\"}]}"); + rm.getResourceScheduler().reinitialize(conf, rm.getRMContext()); + + // am1 asks for another container with no label + // request matched, partition will be updated to 'x' for this request + // so that it will be allocated on nm1 + am1.allocate("*", GB, 1, 10, new ArrayList<>(), ""); + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + rm.waitForState(nm1, containerId, RMContainerState.ALLOCATED); + + // am1 asks for another container with no label + // request not matched, partition won't be updated to 'x' for this request + // so that it will be allocated on nm2 + am1.allocate("*", GB, 1, new ArrayList<>(), ""); + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 4); + rm.waitForState(nm2, containerId, RMContainerState.ALLOCATED); + + rm.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestRequestsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestRequestsHandler.java new file mode 100644 index 0000000000000..6341bd03a362b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestRequestsHandler.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; +import org.apache.hadoop.util.Lists; +import org.apache.hadoop.util.Sets; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class TestRequestsHandler { + + private Configuration conf; + private RMContext rmContext; + private RequestsHandler requestsHandler; + + @Before + public void setUp() { + rmContext = mock(RMContext.class); + requestsHandler = new RequestsHandler(rmContext); + conf = new Configuration(); + LogManager.getLogger(RequestsHandler.LOG.getName()).setLevel(Level.DEBUG); + } + + @Test + public void testInitialize() throws IOException, YarnException { + // invalid conf + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, true); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, "{"); + assertThrows(IOException.class, () -> requestsHandler.initialize(conf)); + + // disabled + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, false); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, "{\"items\":[]}"); + requestsHandler.initialize(conf); + assertFalse(requestsHandler.isEnabled()); + assertNull(requestsHandler.getUpdateItems()); + + // enabled without items + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, true); + conf.unset(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES); + requestsHandler.initialize(conf); + assertTrue(requestsHandler.isEnabled()); + assertNull(requestsHandler.getUpdateItems()); + + // enabled with 1 item + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, true); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[{\"appMatchExpr\":\"queue=='test'\"}]}"); + requestsHandler.initialize(conf); + assertTrue(requestsHandler.isEnabled()); + assertNotNull(requestsHandler.getUpdateItems()); + assertEquals(1, requestsHandler.getUpdateItems().size()); + + // turned to disabled + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, false); + requestsHandler.initialize(conf); + assertFalse(requestsHandler.isEnabled()); + assertNull(requestsHandler.getUpdateItems()); + + // turned to enabled + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, true); + requestsHandler.initialize(conf); + assertTrue(requestsHandler.isEnabled()); + assertNotNull(requestsHandler.getUpdateItems()); + assertEquals(1, requestsHandler.getUpdateItems().size()); + } + + @Test + public void testHandleRequests() throws IOException, YarnException { + // mock app1: id=1, queue=test1, user=user1, name=app1, + // type=MapReduce, tags=[tag1, tag2], waitingForAMContainer=true + ApplicationId app1Id = ApplicationId.newInstance(1, 1); + FiCaSchedulerApp app1 = mock(FiCaSchedulerApp.class); + when(app1.getApplicationId()).thenReturn(app1Id); + when(app1.isWaitingForAMContainer()).thenReturn(true); + when(app1.getQueueName()).thenReturn("test1"); + when(app1.getUser()).thenReturn("user1"); + when(app1.getPriority()).thenReturn(Priority.newInstance(1)); + RMApp rmApp1 = mock(RMApp.class); + when(rmApp1.getApplicationId()).thenReturn(app1Id); + when(rmApp1.getName()).thenReturn("app1"); + when(rmApp1.getApplicationType()).thenReturn("MapReduce"); + when(rmApp1.getApplicationTags()).thenReturn(ImmutableSet.of("tag1", "tag2")); + + ConcurrentHashMap rmApps = new ConcurrentHashMap<>(); + rmApps.put(app1Id, rmApp1); + when(rmContext.getRMApps()).thenReturn(rmApps); + + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, true); + + ResourceRequest rr1 = ResourceRequest.newInstance(Priority.newInstance(3), + "*", Resource.newInstance(4096, 2), 1, true); + rr1.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + rr1.setNodeLabelExpression("x"); + + /* + * check choosing by app, converting RR to SR + */ + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[{\"appMatchExpr\":\"queue=='test'\", \"isRRToSR\":true}]}"); + requestsHandler.initialize(conf); + + // not-matched, won't convert to scheduling request + when(app1.getQueueName()).thenReturn("test1"); + RequestsHandleResponse response = + requestsHandler.handle(app1, Lists.newArrayList(rr1), null); + assertFalse(response.isUpdated()); + assertEquals(1, response.getResourceRequests().size()); + assertNull(response.getSchedulingRequests()); + + // app matched, will be converted to scheduling request + when(app1.getQueueName()).thenReturn("test"); + response = requestsHandler.handle(app1, Lists.newArrayList(rr1), null); + assertTrue(response.isUpdated()); + assertEquals(1, response.getSchedulingRequests().size()); + assertNull(response.getResourceRequests()); + SchedulingRequest gotSR1 = response.getSchedulingRequests().get(0); + assertEquals(rr1.getPriority(), gotSR1.getPriority()); + assertEquals(rr1.getCapability(), gotSR1.getResourceSizing().getResources()); + assertEquals(rr1.getNumContainers(), gotSR1.getResourceSizing().getNumAllocations()); + assertEquals(rr1.getExecutionTypeRequest(), gotSR1.getExecutionType()); + assertEquals("node,EQ,yarn_node_partition/=[x]", gotSR1.getPlacementConstraint().toString()); + + /* + * check choosing by app and request, converting RR to SR, + * then updating priority, execution-type, and allocation-tags + */ + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[{\"appMatchExpr\":\"queue=='test'\", " + + "\"requestMatchExpr\":\"priority>10\", \"isRRToSR\":true," + + " \"placementConstraint\":\"and(in,rack,hbase:notin,node,zk)\"," + + " \"executionType\":\"OPPORTUNISTIC\"," + + " \"allocationTags\":[\"tag1\", \"tag2\"]}]}"); + requestsHandler.initialize(conf); + + ResourceRequest rr2 = + ResourceRequest.newInstance(Priority.newInstance(20), "*", + Resource.newInstance(1024, 1), 5, true); + response = requestsHandler.handle(app1, Lists.newArrayList(rr1, rr2), null); + assertTrue(response.isUpdated()); + assertEquals(2, response.getSchedulingRequests().size()); + + // both rr1 and rr2 should be converted to scheduling requests + // rr1 not matched + gotSR1 = response.getSchedulingRequests().get(0); + assertEquals(rr1.getPriority(), gotSR1.getPriority()); + assertEquals(rr1.getCapability(), + gotSR1.getResourceSizing().getResources()); + assertEquals(rr1.getNumContainers(), + gotSR1.getResourceSizing().getNumAllocations()); + assertEquals(rr1.getExecutionTypeRequest(), gotSR1.getExecutionType()); + assertEquals("node,EQ,yarn_node_partition/=[x]", + gotSR1.getPlacementConstraint().toString()); + + // rr2 matched, should be updated + SchedulingRequest gotSR2 = response.getSchedulingRequests().get(1); + assertEquals(rr2.getCapability(), + gotSR2.getResourceSizing().getResources()); + assertEquals(rr2.getNumContainers(), + gotSR2.getResourceSizing().getNumAllocations()); + assertEquals(ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), + gotSR2.getExecutionType()); + assertEquals(Sets.newHashSet("tag1", "tag2"), + gotSR2.getAllocationTags()); + assertEquals("and(in,rack,hbase:notin,node,zk)", + gotSR2.getPlacementConstraint().toString()); + } +}