Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import static io.vertx.core.http.HttpMethod.DELETE;
import static io.vertx.core.http.HttpMethod.GET;
import static io.vertx.core.http.HttpMethod.POST;
import static io.vertx.core.http.HttpMethod.PUT;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.here.xyz.XyzSerializable;
import com.here.xyz.jobs.Job;
import com.here.xyz.jobs.RuntimeInfo;
import com.here.xyz.jobs.RuntimeInfo.State;
import com.here.xyz.jobs.config.JobConfigClient;
import com.here.xyz.jobs.steps.Step;
import com.here.xyz.jobs.steps.execution.JobExecutor;
import com.here.xyz.jobs.steps.execution.SFNInspector;
Expand All @@ -58,6 +60,7 @@ public class JobAdminApi extends JobApiBase {
private static final String ADMIN_JOB_STEPS = ADMIN_JOB + "/steps";
private static final String ADMIN_JOB_STEP = ADMIN_JOB_STEPS + "/:stepId";
private static final String ADMIN_STATE_MACHINE_EVENTS = "/admin/state/events";
private static final String ADMIN_SERVICE_SCHEDULER_STATE = "/admin/service/scheduler/state";

public JobAdminApi(Router router) {
router.route(GET, ADMIN_JOBS).handler(handleErrors(this::getJobs));
Expand All @@ -66,6 +69,8 @@ public JobAdminApi(Router router) {
router.route(POST, ADMIN_JOB_STEPS).handler(handleErrors(this::postStep));
router.route(GET, ADMIN_JOB_STEP).handler(handleErrors(this::getStep));
router.route(POST, ADMIN_STATE_MACHINE_EVENTS).handler(handleErrors(this::postStateEvent));
router.route(GET, ADMIN_SERVICE_SCHEDULER_STATE).handler(handleErrors(this::getSchedulerState));
router.route(PUT, ADMIN_SERVICE_SCHEDULER_STATE).handler(handleErrors(this::putSchedulerState));
}

private void getJobs(RoutingContext context) {
Expand Down Expand Up @@ -423,6 +428,10 @@ private Job getJobFromBody(RoutingContext context) throws HttpException {
return deserializeFromBody(context, Job.class);
}

private SchedulerStateRequest getStateRequestFromBody(RoutingContext context) throws HttpException {
return deserializeFromBody(context, SchedulerStateRequest.class);
}

private <T extends XyzSerializable> T deserializeFromBody(RoutingContext context, Class<T> type) throws HttpException {
try {
return XyzSerializable.deserialize(context.body().asString(), type);
Expand All @@ -435,4 +444,94 @@ private <T extends XyzSerializable> T deserializeFromBody(RoutingContext context
private static String stepId(RoutingContext context) {
return context.pathParam("stepId");
}

private void putSchedulerState(RoutingContext context) throws HttpException {
String body = context.body().asString();
if (body == null)
throw new HttpException(BAD_REQUEST, "Request body must be a JSON object.");

SchedulerStateRequest request = getStateRequestFromBody(context);
boolean changeApplied = false;

if (request.state() != null) {
switch (request.state()) {
// As requested: START pauses scheduler pickup; STOP resumes pickup.
case START -> JobExecutor.resumeScheduling();
case STOP -> JobExecutor.pauseScheduling();
default -> throw new HttpException(BAD_REQUEST, "State must be either START or STOP.");
}
changeApplied = true;
}

if (request.singleJobAllowedPoliciesEnabled() != null) {
JobExecutor.setSingleJobAllowedPoliciesEnabled(request.singleJobAllowedPoliciesEnabled());
changeApplied = true;
}

if (request.singleJobPerResourceEnabled() != null) {
JobExecutor.setSingleJobPerResourceEnabled(request.singleJobPerResourceEnabled());
changeApplied = true;
}

if (!changeApplied)
throw new HttpException(BAD_REQUEST,
"Nothing to update. Provide 'state', 'singleJobAllowedPoliciesEnabled' and/or 'singleJobPerResourceEnabled'.");

loadSchedulerState(true)
.onSuccess(state -> sendResponse(context, OK, state))
.onFailure(t -> sendErrorResponse(context, t));
}

private void getSchedulerState(RoutingContext context) {
loadSchedulerState(true)
.onSuccess(state -> sendResponse(context, OK, state))
.onFailure(t -> sendErrorResponse(context, t));
}

private Future<SchedulerStateResponse> loadSchedulerState(boolean includeCounts) {
SchedulerStateResponse base = new SchedulerStateResponse(
JobExecutor.isSchedulingPaused() ? SchedulerRuntimeState.PAUSED : SchedulerRuntimeState.RUNNING,
JobExecutor.isSingleJobAllowedPoliciesEnabled(),
JobExecutor.isSingleJobPerResourceEnabled(),
null,
null
);

if (!includeCounts)
return Future.succeededFuture(base);

return JobConfigClient.getInstance().loadJobs(RUNNING)
.compose(runningJobs -> JobConfigClient.getInstance().loadJobs(PENDING)
.map(pendingJobs -> new SchedulerStateResponse(
base.state(),
base.singleJobAllowedPoliciesEnabled(),
base.singleJobPerResourceEnabled(),
runningJobs.size(),
pendingJobs.size()
)));
}

private record SchedulerStateRequest(
SchedulerControlState state,
Boolean singleJobAllowedPoliciesEnabled,
Boolean singleJobPerResourceEnabled
) implements XyzSerializable {}

private enum SchedulerControlState {
START,
STOP
}

private enum SchedulerRuntimeState {
RUNNING,
PAUSED
}

private record SchedulerStateResponse(
SchedulerRuntimeState state,
boolean singleJobAllowedPoliciesEnabled,
boolean singleJobPerResourceEnabled,
Integer runningJobs,
Integer queuedJobs
) implements XyzSerializable {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ public abstract class JobExecutor implements Initializable {
private static volatile boolean running;
private static volatile boolean stopRequested;
private static AtomicBoolean cancellationCheckRunning = new AtomicBoolean();
private static final AtomicBoolean schedulerPaused = new AtomicBoolean(false);
private static final AtomicBoolean singleJobAllowedPoliciesEnabled = new AtomicBoolean(true);
/**
* Global switch to enable/disable the "one active job per resource" policy.
* <p>When enabled, a job will not be started (it stays in PENDING) if any other RUNNING job
* shares at least one resource key (source or target) with it. This prevents concurrent
* imports writing to the same target or concurrent exports reading from the same source.
*/
private static final AtomicBoolean singleJobPerResourceEnabled = new AtomicBoolean(true);
private static final long CANCELLATION_TIMEOUT = 10 * 60 * 1_000; //10 min
private static final long CANCELLATION_CHECK_RERUN_PERIOD = 10_000; //10 sec
private static final long JOB_START_TIMEOUT = 60_000;
Expand All @@ -75,6 +84,11 @@ public static JobExecutor getInstance() {
}

public final Future<Void> startExecution(Job job, String formerExecutionId) {
if (isSchedulingPaused()) {
logger.info("[{}] Scheduler is paused. Job remains in PENDING and will not be started.", job.getId());
return Future.succeededFuture();
}

//TODO: Care about concurrency between nodes when it comes to resource-load calculation within this thread
return Future.succeededFuture()
.compose(v -> formerExecutionId == null ? reuseExistingJobIfPossible(job) : Future.succeededFuture())
Expand Down Expand Up @@ -127,6 +141,12 @@ private void checkPendingJobs() {
return;
}

if (isSchedulingPaused()) {
running = false;
logger.info("[checkPendingJobs] Scheduler is paused. Skipping check of pending jobs...");
return;
}

try {
JobConfigClient.getInstance().loadJobs(PENDING)
.compose(pendingJobs -> {
Expand Down Expand Up @@ -334,13 +354,79 @@ private static Step cancelNonRunningStep(Job job, Step step) {
private Future<Boolean> mayExecute(Job job) {
//Check execution policies and resources
return executionPoliciesSatisfied(job)
.compose(policiesSatisfied -> {
if (!policiesSatisfied)
.compose(policiesSatisfied -> {
if (!policiesSatisfied)
return Future.succeededFuture(false);
return singleJobPerResourceSatisfied(job);
})
.compose(singleJobCanGetStarted -> {
if (!singleJobCanGetStarted)
return Future.succeededFuture(false);
return enoughResourcesAvailable(job);
});
}

/**
* If {@link #singleJobPerResourceEnabled} is on, ensures no other RUNNING job shares any
* resource key (source/target) with the given job.
*
* @return A future that resolves to true if the job may execute, false if another RUNNING job
* already occupies one of its resources or an older PENDING job exists for overlapping
* resources (in which case the job remains in PENDING and is re-checked later).
*/
private static Future<Boolean> singleJobPerResourceSatisfied(Job job) {
if (!isSingleJobPerResourceEnabled())
return Future.succeededFuture(true);

Set<String> resourceKeys = job.getResourceKeys();
if (resourceKeys == null || resourceKeys.isEmpty())
return Future.succeededFuture(true);

return JobConfigClient.getInstance().loadJobs(resourceKeys, RUNNING)
.compose(runningJobs -> {
//Filter the job itself in case its state is already RUNNING when this check runs
List<Job> conflictingRunning = runningJobs.stream()
.filter(other -> !job.getId().equals(other.getId()))
.toList();

if (!conflictingRunning.isEmpty()) {
logger.info("[{}] Job can not be executed (yet) because the following RUNNING job(s) already occupy at least one of "
+ "its source/target resources: {}",
job.getId(),
conflictingRunning.stream().map(Job::getId).collect(Collectors.joining(", ")));
return Future.succeededFuture(false);
return enoughResourcesAvailable(job);
}

//No RUNNING conflict exists. Enforce FIFO for pending jobs on overlapping resources.
return JobConfigClient.getInstance().loadJobs(resourceKeys, PENDING)
.map(pendingJobs -> {
List<Job> olderPendingConflicts = pendingJobs.stream()
.filter(other -> !job.getId().equals(other.getId()))
.filter(other -> compareBySchedulingOrder(other, job) < 0)
.sorted(JobExecutor::compareBySchedulingOrder)
.toList();

if (olderPendingConflicts.isEmpty())
return true;

logger.info("[{}] Job can not be executed (yet) because older PENDING job(s) exist for overlapping resources: {}",
job.getId(),
olderPendingConflicts.stream().map(Job::getId).collect(Collectors.joining(", ")));
return false;
});
});
}

private static int compareBySchedulingOrder(Job left, Job right) {
int byCreatedAt = Long.compare(left.getCreatedAt(), right.getCreatedAt());
if (byCreatedAt != 0)
return byCreatedAt;

String leftId = left.getId() == null ? "" : left.getId();
String rightId = right.getId() == null ? "" : right.getId();
return leftId.compareTo(rightId);
}

private static Future<Boolean> enoughResourcesAvailable(Job job) {
//Check for all necessary resource loads whether they can be fulfilled
logger.info("[{}] Checking whether there are enough resources to execute the job ...", job.getId());
Expand All @@ -358,6 +444,9 @@ private static Future<Boolean> enoughResourcesAvailable(Job job) {
}

private Future<Boolean> executionPoliciesSatisfied(Job job) {
if (!isSingleJobAllowedPoliciesEnabled())
return Future.succeededFuture(true);

logger.info("[{}] Checking all execution policies prior to executing the job ...", job.getId());
Set<Predicate<Job>> policiesToApply = findMatchingJobPolicies(job);
if (policiesToApply.isEmpty())
Expand Down Expand Up @@ -434,6 +523,34 @@ public static void registerSingleJobAllowedPolicy(Predicate<Job> policy) {
singleJobAllowedPolicies.add(policy);
}

public static void pauseScheduling() {
schedulerPaused.set(true);
}

public static void resumeScheduling() {
schedulerPaused.set(false);
}

public static boolean isSchedulingPaused() {
return schedulerPaused.get();
}

public static void setSingleJobAllowedPoliciesEnabled(boolean enabled) {
singleJobAllowedPoliciesEnabled.set(enabled);
}

public static boolean isSingleJobAllowedPoliciesEnabled() {
return singleJobAllowedPoliciesEnabled.get();
}

public static void setSingleJobPerResourceEnabled(boolean enabled) {
singleJobPerResourceEnabled.set(enabled);
}

public static boolean isSingleJobPerResourceEnabled() {
return singleJobPerResourceEnabled.get();
}

private static Set<Predicate<Job>> findMatchingJobPolicies(Job job) {
return singleJobAllowedPolicies.stream().filter(policy -> policy.test(job)).collect(Collectors.toSet());
}
Expand Down
Loading