Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public final class JobConfiguration {
private final String label;

private final boolean staticSharding;

private final int maxRunTimeSeconds;

/**
* Create ElasticJob configuration builder.
Expand Down Expand Up @@ -138,7 +140,9 @@ public static class Builder {
private String label;

private boolean staticSharding;


private int maxRuntimeSeconds = -1;

/**
* Cron expression.
*
Expand Down Expand Up @@ -412,7 +416,23 @@ public Builder staticSharding(final boolean staticSharding) {
this.staticSharding = staticSharding;
return this;
}


/**
* Set max runtime seconds.
*
* <p>
* if job running for a long time more than it, will enabled interrupting.
* 0 means do not timeout.
Comment on lines +424 to +425
Copy link

Copilot AI Sep 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment has grammatical errors. Should be 'If job runs for a long time more than it, interrupting will be enabled.' Also, the documentation is inconsistent - it says '0 means no timeout' but the default value is -1.

Suggested change
* if job running for a long time more than it, will enabled interrupting.
* 0 means do not timeout.
* If job runs for a long time more than this value, interrupting will be enabled.
* A value of -1 means no timeout.

Copilot uses AI. Check for mistakes.
* </p>
*
* @param maxRuntimeSeconds max Runtime Seconds
* @return ElasticJob configuration builder
*/
public Builder maxRuntimeSeconds(final int maxRuntimeSeconds) {
this.maxRuntimeSeconds = maxRuntimeSeconds;
return this;
}

/**
* Build ElasticJob configuration.
*
Expand All @@ -424,7 +444,7 @@ public final JobConfiguration build() {
return new JobConfiguration(jobName, cron, timeZone, shardingTotalCount, shardingItemParameters, jobParameter,
monitorExecution, failover, misfire, maxTimeDiffSeconds, reconcileIntervalMinutes,
jobShardingStrategyType, jobExecutorServiceHandlerType, jobErrorHandlerType, jobListenerTypes,
extraConfigurations, description, props, disabled, overwrite, label, staticSharding);
extraConfigurations, description, props, disabled, overwrite, label, staticSharding, maxRuntimeSeconds);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public void assertBuildAllProperties() {
.maxTimeDiffSeconds(1000).reconcileIntervalMinutes(60)
.jobShardingStrategyType("AVG_ALLOCATION").jobExecutorServiceHandlerType("SINGLE_THREAD").jobErrorHandlerType("IGNORE")
.description("desc").setProperty("key", "value")
.maxRuntimeSeconds(60)
.disabled(true).overwrite(true).build();
assertThat(actual.getJobName(), is("test_job"));
assertThat(actual.getCron(), is("0/1 * * * * ?"));
Expand All @@ -54,6 +55,7 @@ public void assertBuildAllProperties() {
assertThat(actual.getJobErrorHandlerType(), is("IGNORE"));
assertThat(actual.getDescription(), is("desc"));
assertThat(actual.getProps().getProperty("key"), is("value"));
assertThat(actual.getMaxRunTimeSeconds(), is(60));
assertTrue(actual.isDisabled());
assertTrue(actual.isOverwrite());
}
Expand All @@ -79,6 +81,7 @@ public void assertBuildRequiredProperties() {
assertTrue(actual.getProps().isEmpty());
assertFalse(actual.isDisabled());
assertFalse(actual.isOverwrite());
assertThat(actual.getMaxRunTimeSeconds(), is(-1));
}

@Test(expected = IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,24 @@
import org.apache.shardingsphere.elasticjob.executor.context.ExecutorContext;
import org.apache.shardingsphere.elasticjob.executor.item.JobItemExecutor;
import org.apache.shardingsphere.elasticjob.executor.item.JobItemExecutorFactory;
import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
import org.apache.shardingsphere.elasticjob.infra.exception.ExceptionUtils;
import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent.ExecutionSource;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;

import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
* ElasticJob executor.
Expand Down Expand Up @@ -136,27 +141,52 @@ private void execute(final JobConfiguration jobConfig, final ShardingContexts sh

private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
process(jobConfig, shardingContexts, item, jobExecutionEvent);
return;
}
Queue<Future<Boolean>> futures = new LinkedList<>();
CountDownLatch latch = new CountDownLatch(items.size());
for (int each : items) {
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
ExecutorService executorService = executorContext.get(ExecutorService.class);
if (executorService.isShutdown()) {
return;
}
executorService.submit(() -> {
Future<Boolean> future = executorService.submit(() -> {
try {
process(jobConfig, shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
return true;
});
futures.offer(future);
}

int sumWaitTime = 0;
int maxRunTime = jobConfig.getMaxRunTimeSeconds() > 0 ? jobConfig.getMaxRunTimeSeconds() * 1000 : 0;
while (!futures.isEmpty()) {
//waiting process Done
Copy link

Copilot AI Sep 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment has incorrect capitalization and grammar. Should be '// Waiting for process to complete' or '// Wait for process done'.

Suggested change
//waiting process Done
// Waiting for process to complete

Copilot uses AI. Check for mistakes.
Future<Boolean> future = futures.peek();
if (!future.isDone()) {
if (maxRunTime == 0 || sumWaitTime < maxRunTime) {
BlockUtils.sleep(100);
sumWaitTime += 100;
Comment on lines +170 to +171
Copy link

Copilot AI Sep 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 100 (milliseconds) is hardcoded and used in multiple places. Consider extracting it as a named constant like TIMEOUT_CHECK_INTERVAL_MILLIS = 100 to improve maintainability.

Copilot uses AI. Check for mistakes.
continue;
} else {
future.cancel(true);
}
}

try {
future.get();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (final ExecutionException | CancellationException ignore) {
// ignore process Exception and Canceled for future
// TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
Comment on lines +183 to +184
Copy link

Copilot AI Sep 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment has grammatical errors. Should be 'ignore process exceptions and cancellations for futures' and 'TODO: Consider tracking job failure status and how to handle overall job failure loops'.

Suggested change
// ignore process Exception and Canceled for future
// TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
// ignore process exceptions and cancellations for futures
// TODO: Consider tracking job failure status and how to handle overall job failure loops

Copilot uses AI. Check for mistakes.
} finally {
futures.poll();
}
}

try {
latch.await();
} catch (final InterruptedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void assertExecuteWhenShardingItemsIsEmpty() {
verify(jobItemExecutor, times(0)).process(eq(fooJob), eq(jobConfig), eq(jobFacade), any());
}

@Test(expected = JobSystemException.class)
@Test
public void assertExecuteFailureWhenThrowExceptionForSingleShardingItem() {
assertExecuteFailureWhenThrowException(createSingleShardingContexts());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public final class JobConfigurationPOJO {
private String label;

private boolean staticSharding;

private int maxRuntimeSeconds = -1;

/**
* Convert to job configuration.
Expand All @@ -93,7 +95,7 @@ public JobConfiguration toJobConfiguration() {
.maxTimeDiffSeconds(maxTimeDiffSeconds).reconcileIntervalMinutes(reconcileIntervalMinutes)
.jobShardingStrategyType(jobShardingStrategyType).jobExecutorServiceHandlerType(jobExecutorServiceHandlerType)
.jobErrorHandlerType(jobErrorHandlerType).jobListenerTypes(jobListenerTypes.toArray(new String[]{})).description(description)
.disabled(disabled).overwrite(overwrite).label(label).staticSharding(staticSharding).build();
.disabled(disabled).overwrite(overwrite).label(label).staticSharding(staticSharding).maxRuntimeSeconds(maxRuntimeSeconds).build();
jobExtraConfigurations.stream().map(YamlConfiguration::toConfiguration).forEach(result.getExtraConfigurations()::add);
for (Object each : props.keySet()) {
result.getProps().setProperty(each.toString(), props.get(each.toString()).toString());
Expand Down Expand Up @@ -133,6 +135,7 @@ public static JobConfigurationPOJO fromJobConfiguration(final JobConfiguration j
result.setOverwrite(jobConfiguration.isOverwrite());
result.setLabel(jobConfiguration.getLabel());
result.setStaticSharding(jobConfiguration.isStaticSharding());
result.setMaxRuntimeSeconds(jobConfiguration.getMaxRunTimeSeconds());
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class JobConfigurationPOJOTest {
+ "jobName: test_job\n"
+ "jobParameter: param\n"
+ "jobShardingStrategyType: AVG_ALLOCATION\n"
+ "maxRuntimeSeconds: -1\n"
+ "maxTimeDiffSeconds: -1\n"
+ "misfire: false\n"
+ "monitorExecution: false\n"
Expand All @@ -56,6 +57,7 @@ public final class JobConfigurationPOJOTest {
+ "disabled: false\n"
+ "failover: false\n"
+ "jobName: test_job\n"
+ "maxRuntimeSeconds: -1\n"
+ "maxTimeDiffSeconds: -1\n"
+ "misfire: false\n"
+ "monitorExecution: false\n"
Expand Down Expand Up @@ -144,6 +146,7 @@ public void assertMarshal() {
actual.setJobErrorHandlerType("IGNORE");
actual.setDescription("Job description");
actual.getProps().setProperty("key", "value");
actual.setMaxRuntimeSeconds(-1);
assertThat(YamlEngine.marshal(actual), is(YAML));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ public void assertShutdown() throws SchedulerException {
assertTrue(getScheduler(oneOffJobBootstrap).isShutdown());
}

@Test
public void assertTimeout() {
AtomicInteger counter = new AtomicInteger(0);
final OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(zkRegCenter, (SimpleJob) shardingContext -> {
try {
Thread.sleep(50000);
} catch (InterruptedException e) {
counter.incrementAndGet();
throw new RuntimeException("Timeout");
}
}, JobConfiguration.newBuilder("test_timeout_job_execute", SHARDING_TOTAL_COUNT).maxRuntimeSeconds(1).build());
oneOffJobBootstrap.execute();
blockUtilFinish(oneOffJobBootstrap, counter);
assertThat(counter.get(), is(SHARDING_TOTAL_COUNT));
getJobScheduler(oneOffJobBootstrap).shutdown();
}

@SneakyThrows
private JobScheduler getJobScheduler(final OneOffJobBootstrap oneOffJobBootstrap) {
Field field = OneOffJobBootstrap.class.getDeclaredField("jobScheduler");
Expand All @@ -106,6 +123,7 @@ private Scheduler getScheduler(final OneOffJobBootstrap oneOffJobBootstrap) {

@SneakyThrows
private void blockUtilFinish(final OneOffJobBootstrap oneOffJobBootstrap, final AtomicInteger counter) {
JobScheduler jobScheduler = getJobScheduler(oneOffJobBootstrap);
Scheduler scheduler = getScheduler(oneOffJobBootstrap);
while (0 == counter.get() || !scheduler.getCurrentlyExecutingJobs().isEmpty()) {
Thread.sleep(100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ public final class LiteYamlConstants {
+ "reconcileIntervalMinutes: 15\n"
+ "description: 'desc'\n"
+ "disabled: true\n"
+ "overwrite: true";

+ "overwrite: true\n"
+ "maxRuntimeSeconds: 60\n";

private static final boolean DEFAULT_FAILOVER = true;

private static final boolean DEFAULT_MONITOR_EXECUTION = true;

private static final int DEFAULT_MAX_TIME_DIFF_SECONDS = 1000;

/**
* Get the config of simple job in YAML format.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void assertLoadDirectly() {
assertThat(actual.getJobName(), is("test_job"));
assertThat(actual.getCron(), is("0/1 * * * * ?"));
assertThat(actual.getShardingTotalCount(), is(3));
assertThat(actual.getMaxRunTimeSeconds(), is(60));
}

@Test
Expand All @@ -67,6 +68,7 @@ public void assertLoadFromCache() {
assertThat(actual.getJobName(), is("test_job"));
assertThat(actual.getCron(), is("0/1 * * * * ?"));
assertThat(actual.getShardingTotalCount(), is(3));
assertThat(actual.getMaxRunTimeSeconds(), is(60));
}

@Test
Expand All @@ -77,6 +79,7 @@ public void assertLoadFromCacheButNull() {
assertThat(actual.getJobName(), is("test_job"));
assertThat(actual.getCron(), is("0/1 * * * * ?"));
assertThat(actual.getShardingTotalCount(), is(3));
assertThat(actual.getMaxRunTimeSeconds(), is(60));
}

@Test(expected = JobConfigurationException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class LifecycleYamlConstants {
+ "monitorExecution: false\n"
+ "failover: true\n"
+ "misfire: false\n"
+ "maxRuntimeSeconds: -1\n"
+ "maxTimeDiffSeconds: 100\n"
+ "jobShardingStrategyType: AVG_ALLOCATION\n"
+ "description: %s\n"
Expand All @@ -42,6 +43,7 @@ public final class LifecycleYamlConstants {
+ "failover: false\n"
+ "jobName: test_job\n"
+ "jobParameter: param\n"
+ "maxRuntimeSeconds: -1\n"
+ "maxTimeDiffSeconds: -1\n"
+ "misfire: true\n"
+ "monitorExecution: true\n"
Expand All @@ -59,6 +61,7 @@ public final class LifecycleYamlConstants {
+ "monitorExecution: true\n"
+ "failover: false\n"
+ "misfire: true\n"
+ "maxRuntimeSeconds: -1\n"
+ "maxTimeDiffSeconds: -1\n"
+ "reconcileIntervalMinutes: 10\n"
+ "description: ''\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private BeanDefinition createJobConfigurationBeanDefinition(final Element elemen
result.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.OVERWRITE_ATTRIBUTE));
result.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.LABEL_ATTRIBUTE));
result.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.STATIC_SHARDING_ATTRIBUTE));
result.addConstructorArgValue(element.getAttribute(JobBeanDefinitionTag.MAX_RUNTIME_SECONDS_ATTRIBUTE));
return result.getBeanDefinition();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,6 @@ public final class JobBeanDefinitionTag {
public static final String LABEL_ATTRIBUTE = "label";

public static final String STATIC_SHARDING_ATTRIBUTE = "static-sharding";

public static final String MAX_RUNTIME_SECONDS_ATTRIBUTE = "max-runtime-seconds";
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
<xsd:attribute name="overwrite" type="xsd:string" default="false" />
<xsd:attribute name="label" type="xsd:string" />
<xsd:attribute name="static-sharding" type="xsd:string" default="false" />
<xsd:attribute name="max-runtime-seconds" type="xsd:string" default="-1" />
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down