diff --git a/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobConfiguration.java b/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobConfiguration.java index 0fe7ab486b..2111650922 100644 --- a/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobConfiguration.java +++ b/elasticjob-api/src/main/java/org/apache/shardingsphere/elasticjob/api/JobConfiguration.java @@ -80,6 +80,8 @@ public final class JobConfiguration { private final String label; private final boolean staticSharding; + + private final int maxRunTimeSeconds; /** * Create ElasticJob configuration builder. @@ -138,7 +140,9 @@ public static class Builder { private String label; private boolean staticSharding; - + + private int maxRuntimeSeconds = -1; + /** * Cron expression. * @@ -412,7 +416,23 @@ public Builder staticSharding(final boolean staticSharding) { this.staticSharding = staticSharding; return this; } - + + /** + * Set max runtime seconds. + * + *

+ * if job running for a long time more than it, will enabled interrupting. + * 0 means do not timeout. + *

+ * + * @param maxRuntimeSeconds max Runtime Seconds + * @return ElasticJob configuration builder + */ + public Builder maxRuntimeSeconds(final int maxRuntimeSeconds) { + this.maxRuntimeSeconds = maxRuntimeSeconds; + return this; + } + /** * Build ElasticJob configuration. * @@ -424,7 +444,7 @@ public final JobConfiguration build() { return new JobConfiguration(jobName, cron, timeZone, shardingTotalCount, shardingItemParameters, jobParameter, monitorExecution, failover, misfire, maxTimeDiffSeconds, reconcileIntervalMinutes, jobShardingStrategyType, jobExecutorServiceHandlerType, jobErrorHandlerType, jobListenerTypes, - extraConfigurations, description, props, disabled, overwrite, label, staticSharding); + extraConfigurations, description, props, disabled, overwrite, label, staticSharding, maxRuntimeSeconds); } } } diff --git a/elasticjob-api/src/test/java/org/apache/shardingsphere/elasticjob/api/JobConfigurationTest.java b/elasticjob-api/src/test/java/org/apache/shardingsphere/elasticjob/api/JobConfigurationTest.java index 5fdd70251d..44e45e9b41 100644 --- a/elasticjob-api/src/test/java/org/apache/shardingsphere/elasticjob/api/JobConfigurationTest.java +++ b/elasticjob-api/src/test/java/org/apache/shardingsphere/elasticjob/api/JobConfigurationTest.java @@ -37,6 +37,7 @@ public void assertBuildAllProperties() { .maxTimeDiffSeconds(1000).reconcileIntervalMinutes(60) .jobShardingStrategyType("AVG_ALLOCATION").jobExecutorServiceHandlerType("SINGLE_THREAD").jobErrorHandlerType("IGNORE") .description("desc").setProperty("key", "value") + .maxRuntimeSeconds(60) .disabled(true).overwrite(true).build(); assertThat(actual.getJobName(), is("test_job")); assertThat(actual.getCron(), is("0/1 * * * * ?")); @@ -54,6 +55,7 @@ public void assertBuildAllProperties() { assertThat(actual.getJobErrorHandlerType(), is("IGNORE")); assertThat(actual.getDescription(), is("desc")); assertThat(actual.getProps().getProperty("key"), is("value")); + assertThat(actual.getMaxRunTimeSeconds(), is(60)); assertTrue(actual.isDisabled()); assertTrue(actual.isOverwrite()); } @@ -79,6 +81,7 @@ public void assertBuildRequiredProperties() { assertTrue(actual.getProps().isEmpty()); assertFalse(actual.isDisabled()); assertFalse(actual.isOverwrite()); + assertThat(actual.getMaxRunTimeSeconds(), is(-1)); } @Test(expected = IllegalArgumentException.class) diff --git a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java index 6a879be4f1..097b6ec522 100644 --- a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java +++ b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java @@ -24,6 +24,7 @@ import org.apache.shardingsphere.elasticjob.executor.context.ExecutorContext; import org.apache.shardingsphere.elasticjob.executor.item.JobItemExecutor; import org.apache.shardingsphere.elasticjob.executor.item.JobItemExecutorFactory; +import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils; import org.apache.shardingsphere.elasticjob.infra.env.IpUtils; import org.apache.shardingsphere.elasticjob.infra.exception.ExceptionUtils; import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException; @@ -31,12 +32,16 @@ import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent; import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent.ExecutionSource; import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State; - import java.util.Collection; +import java.util.LinkedList; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; /** * ElasticJob executor. @@ -136,12 +141,7 @@ private void execute(final JobConfiguration jobConfig, final ShardingContexts sh private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) { Collection items = shardingContexts.getShardingItemParameters().keySet(); - if (1 == items.size()) { - int item = shardingContexts.getShardingItemParameters().keySet().iterator().next(); - JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item); - process(jobConfig, shardingContexts, item, jobExecutionEvent); - return; - } + Queue> futures = new LinkedList<>(); CountDownLatch latch = new CountDownLatch(items.size()); for (int each : items) { JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each); @@ -149,14 +149,44 @@ private void process(final JobConfiguration jobConfig, final ShardingContexts sh if (executorService.isShutdown()) { return; } - executorService.submit(() -> { + Future future = executorService.submit(() -> { try { process(jobConfig, shardingContexts, each, jobExecutionEvent); } finally { latch.countDown(); } + return true; }); + futures.offer(future); + } + + int sumWaitTime = 0; + int maxRunTime = jobConfig.getMaxRunTimeSeconds() > 0 ? jobConfig.getMaxRunTimeSeconds() * 1000 : 0; + while (!futures.isEmpty()) { + //waiting process Done + Future future = futures.peek(); + if (!future.isDone()) { + if (maxRunTime == 0 || sumWaitTime < maxRunTime) { + BlockUtils.sleep(100); + sumWaitTime += 100; + continue; + } else { + future.cancel(true); + } + } + + try { + future.get(); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } catch (final ExecutionException | CancellationException ignore) { + // ignore process Exception and Canceled for future + // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure + } finally { + futures.poll(); + } } + try { latch.await(); } catch (final InterruptedException ex) { diff --git a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java index 46082ab61d..913ea244c8 100644 --- a/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java +++ b/elasticjob-ecosystem/elasticjob-executor/elasticjob-executor-kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutorTest.java @@ -112,7 +112,7 @@ public void assertExecuteWhenShardingItemsIsEmpty() { verify(jobItemExecutor, times(0)).process(eq(fooJob), eq(jobConfig), eq(jobFacade), any()); } - @Test(expected = JobSystemException.class) + @Test public void assertExecuteFailureWhenThrowExceptionForSingleShardingItem() { assertExecuteFailureWhenThrowException(createSingleShardingContexts()); } diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java index 3cdfd35d49..bb20d1fe32 100644 --- a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java +++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java @@ -80,6 +80,8 @@ public final class JobConfigurationPOJO { private String label; private boolean staticSharding; + + private int maxRuntimeSeconds = -1; /** * Convert to job configuration. @@ -93,7 +95,7 @@ public JobConfiguration toJobConfiguration() { .maxTimeDiffSeconds(maxTimeDiffSeconds).reconcileIntervalMinutes(reconcileIntervalMinutes) .jobShardingStrategyType(jobShardingStrategyType).jobExecutorServiceHandlerType(jobExecutorServiceHandlerType) .jobErrorHandlerType(jobErrorHandlerType).jobListenerTypes(jobListenerTypes.toArray(new String[]{})).description(description) - .disabled(disabled).overwrite(overwrite).label(label).staticSharding(staticSharding).build(); + .disabled(disabled).overwrite(overwrite).label(label).staticSharding(staticSharding).maxRuntimeSeconds(maxRuntimeSeconds).build(); jobExtraConfigurations.stream().map(YamlConfiguration::toConfiguration).forEach(result.getExtraConfigurations()::add); for (Object each : props.keySet()) { result.getProps().setProperty(each.toString(), props.get(each.toString()).toString()); @@ -133,6 +135,7 @@ public static JobConfigurationPOJO fromJobConfiguration(final JobConfiguration j result.setOverwrite(jobConfiguration.isOverwrite()); result.setLabel(jobConfiguration.getLabel()); result.setStaticSharding(jobConfiguration.isStaticSharding()); + result.setMaxRuntimeSeconds(jobConfiguration.getMaxRunTimeSeconds()); return result; } } diff --git a/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJOTest.java b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJOTest.java index 8ae62a6819..e4c3eb5f3c 100644 --- a/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJOTest.java +++ b/elasticjob-infra/elasticjob-infra-common/src/test/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJOTest.java @@ -41,6 +41,7 @@ public final class JobConfigurationPOJOTest { + "jobName: test_job\n" + "jobParameter: param\n" + "jobShardingStrategyType: AVG_ALLOCATION\n" + + "maxRuntimeSeconds: -1\n" + "maxTimeDiffSeconds: -1\n" + "misfire: false\n" + "monitorExecution: false\n" @@ -56,6 +57,7 @@ public final class JobConfigurationPOJOTest { + "disabled: false\n" + "failover: false\n" + "jobName: test_job\n" + + "maxRuntimeSeconds: -1\n" + "maxTimeDiffSeconds: -1\n" + "misfire: false\n" + "monitorExecution: false\n" @@ -144,6 +146,7 @@ public void assertMarshal() { actual.setJobErrorHandlerType("IGNORE"); actual.setDescription("Job description"); actual.getProps().setProperty("key", "value"); + actual.setMaxRuntimeSeconds(-1); assertThat(YamlEngine.marshal(actual), is(YAML)); } diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrapTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrapTest.java index c76bf3af2d..84ac98b168 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrapTest.java +++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/bootstrap/impl/OneOffJobBootstrapTest.java @@ -89,6 +89,23 @@ public void assertShutdown() throws SchedulerException { assertTrue(getScheduler(oneOffJobBootstrap).isShutdown()); } + @Test + public void assertTimeout() { + AtomicInteger counter = new AtomicInteger(0); + final OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(zkRegCenter, (SimpleJob) shardingContext -> { + try { + Thread.sleep(50000); + } catch (InterruptedException e) { + counter.incrementAndGet(); + throw new RuntimeException("Timeout"); + } + }, JobConfiguration.newBuilder("test_timeout_job_execute", SHARDING_TOTAL_COUNT).maxRuntimeSeconds(1).build()); + oneOffJobBootstrap.execute(); + blockUtilFinish(oneOffJobBootstrap, counter); + assertThat(counter.get(), is(SHARDING_TOTAL_COUNT)); + getJobScheduler(oneOffJobBootstrap).shutdown(); + } + @SneakyThrows private JobScheduler getJobScheduler(final OneOffJobBootstrap oneOffJobBootstrap) { Field field = OneOffJobBootstrap.class.getDeclaredField("jobScheduler"); @@ -106,6 +123,7 @@ private Scheduler getScheduler(final OneOffJobBootstrap oneOffJobBootstrap) { @SneakyThrows private void blockUtilFinish(final OneOffJobBootstrap oneOffJobBootstrap, final AtomicInteger counter) { + JobScheduler jobScheduler = getJobScheduler(oneOffJobBootstrap); Scheduler scheduler = getScheduler(oneOffJobBootstrap); while (0 == counter.get() || !scheduler.getCurrentlyExecutingJobs().isEmpty()) { Thread.sleep(100); diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/fixture/LiteYamlConstants.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/fixture/LiteYamlConstants.java index fe592c5676..9d9433a3cd 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/fixture/LiteYamlConstants.java +++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/fixture/LiteYamlConstants.java @@ -34,14 +34,15 @@ public final class LiteYamlConstants { + "reconcileIntervalMinutes: 15\n" + "description: 'desc'\n" + "disabled: true\n" - + "overwrite: true"; - + + "overwrite: true\n" + + "maxRuntimeSeconds: 60\n"; + private static final boolean DEFAULT_FAILOVER = true; private static final boolean DEFAULT_MONITOR_EXECUTION = true; private static final int DEFAULT_MAX_TIME_DIFF_SECONDS = 1000; - + /** * Get the config of simple job in YAML format. * diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/config/ConfigurationServiceTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/config/ConfigurationServiceTest.java index 396704d350..f7dd766ed0 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/config/ConfigurationServiceTest.java +++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/config/ConfigurationServiceTest.java @@ -58,6 +58,7 @@ public void assertLoadDirectly() { assertThat(actual.getJobName(), is("test_job")); assertThat(actual.getCron(), is("0/1 * * * * ?")); assertThat(actual.getShardingTotalCount(), is(3)); + assertThat(actual.getMaxRunTimeSeconds(), is(60)); } @Test @@ -67,6 +68,7 @@ public void assertLoadFromCache() { assertThat(actual.getJobName(), is("test_job")); assertThat(actual.getCron(), is("0/1 * * * * ?")); assertThat(actual.getShardingTotalCount(), is(3)); + assertThat(actual.getMaxRunTimeSeconds(), is(60)); } @Test @@ -77,6 +79,7 @@ public void assertLoadFromCacheButNull() { assertThat(actual.getJobName(), is("test_job")); assertThat(actual.getCron(), is("0/1 * * * * ?")); assertThat(actual.getShardingTotalCount(), is(3)); + assertThat(actual.getMaxRunTimeSeconds(), is(60)); } @Test(expected = JobConfigurationException.class) diff --git a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/fixture/LifecycleYamlConstants.java b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/fixture/LifecycleYamlConstants.java index aaa5a0697d..281f2555c1 100644 --- a/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/fixture/LifecycleYamlConstants.java +++ b/elasticjob-lite/elasticjob-lite-lifecycle/src/test/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/fixture/LifecycleYamlConstants.java @@ -30,6 +30,7 @@ public final class LifecycleYamlConstants { + "monitorExecution: false\n" + "failover: true\n" + "misfire: false\n" + + "maxRuntimeSeconds: -1\n" + "maxTimeDiffSeconds: 100\n" + "jobShardingStrategyType: AVG_ALLOCATION\n" + "description: %s\n" @@ -42,6 +43,7 @@ public final class LifecycleYamlConstants { + "failover: false\n" + "jobName: test_job\n" + "jobParameter: param\n" + + "maxRuntimeSeconds: -1\n" + "maxTimeDiffSeconds: -1\n" + "misfire: true\n" + "monitorExecution: true\n" @@ -59,6 +61,7 @@ public final class LifecycleYamlConstants { + "monitorExecution: true\n" + "failover: false\n" + "misfire: true\n" + + "maxRuntimeSeconds: -1\n" + "maxTimeDiffSeconds: -1\n" + "reconcileIntervalMinutes: 10\n" + "description: ''\n" diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/parser/JobBeanDefinitionParser.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/parser/JobBeanDefinitionParser.java index 001543bef1..ed91a4691b 100644 --- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/parser/JobBeanDefinitionParser.java +++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/parser/JobBeanDefinitionParser.java @@ -91,6 +91,7 @@ private BeanDefinition createJobConfigurationBeanDefinition(final Element elemen result.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.OVERWRITE_ATTRIBUTE)); result.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.LABEL_ATTRIBUTE)); result.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.STATIC_SHARDING_ATTRIBUTE)); + result.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.MAX_RUNTIME_SECONDS_ATTRIBUTE)); return result.getBeanDefinition(); } diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/tag/JobBeanDefinitionTag.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/tag/JobBeanDefinitionTag.java index 3f0852b97e..0aaf3b3709 100644 --- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/tag/JobBeanDefinitionTag.java +++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/job/tag/JobBeanDefinitionTag.java @@ -73,4 +73,6 @@ public final class JobBeanDefinitionTag { public static final String LABEL_ATTRIBUTE = "label"; public static final String STATIC_SHARDING_ATTRIBUTE = "static-sharding"; + + public static final String MAX_RUNTIME_SECONDS_ATTRIBUTE = "max-runtime-seconds"; } diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/resources/META-INF/namespace/elasticjob.xsd b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/resources/META-INF/namespace/elasticjob.xsd index 308a8e4607..78e6fb161c 100644 --- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/resources/META-INF/namespace/elasticjob.xsd +++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/main/resources/META-INF/namespace/elasticjob.xsd @@ -64,6 +64,7 @@ +