Skip to content

Commit 38908b3

Browse files
authored
Merge pull request #556 from yanhom1314/master
optimize alarm related
2 parents 026fb45 + 5fdff7f commit 38908b3

File tree

14 files changed

+90
-32
lines changed

14 files changed

+90
-32
lines changed

benchmark/src/main/java/org/dromara/dynamictp/benchmark/ExecutorBenchmark.java

+2
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ public Thread newThread(Runnable r) {
9393
.maximumPoolSize(8)
9494
.keepAliveTime(60)
9595
.threadFactory("dtp-test-pool")
96+
.runTimeout(100)
97+
.queueTimeout(100)
9698
.queueCapacity(1024)
9799
.taskWrappers(TaskWrappers.getInstance().getByNames(Sets.newHashSet("ttl", "mdc")))
98100
.rejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())

common/src/main/java/org/dromara/dynamictp/common/entity/NotifyItem.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,22 @@ public class NotifyItem {
5151
private String type;
5252

5353
/**
54-
* Alarm threshold.
54+
* Indicator detection threshold.
5555
*/
5656
private int threshold;
5757

5858
/**
59-
* Within a cycle window, if the number of occurrences exceeding the threshold reaches count, an alarm will be triggered.
59+
* Within a cycle window, when the number of occurrences surpassing the threshold reaches the specified count, an alarm will be triggered.
6060
*/
6161
private int count;
6262

6363
/**
64-
* The size of cache in seconds for checking the alarm conditions.
64+
* The time span (in seconds) of the cache applied to check alarm conditions.
6565
*/
6666
private int period = 120;
6767

6868
/**
69-
* After the alarm is triggered at Time-N (TN), there will be silence during the TN -> TN + silencePeriod.
69+
* When the alarm is triggered at Time - N (TN), it will stay silent from TN to TN + silencePeriod.
7070
*/
7171
private int silencePeriod = 120;
7272

common/src/main/java/org/dromara/dynamictp/common/entity/TpExecutorProps.java

+10
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,21 @@ public class TpExecutorProps {
112112
*/
113113
private boolean notifyEnabled = true;
114114

115+
/**
116+
* Task execute timeout, unit (ms).
117+
*/
118+
private long runTimeout = 0;
119+
115120
/**
116121
* If try interrupt thread when task run timeout.
117122
*/
118123
private boolean tryInterrupt = false;
119124

125+
/**
126+
* Task queue wait timeout, unit (ms).
127+
*/
128+
private long queueTimeout = 0;
129+
120130
/**
121131
* Whether to wait for scheduled tasks to complete on shutdown,
122132
* not interrupting running tasks and executing all tasks in the queue.

core/src/main/java/org/dromara/dynamictp/core/DtpRegistry.java

+2
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,8 @@ private static void doRefreshDtp(ExecutorWrapper executorWrapper, DtpExecutorPro
294294
}
295295

296296
// update timeout related
297+
executor.setRunTimeout(props.getRunTimeout());
298+
executor.setQueueTimeout(props.getQueueTimeout());
297299
executor.setTryInterrupt(props.isTryInterrupt());
298300

299301
// update shutdown related

core/src/main/java/org/dromara/dynamictp/core/aware/TaskTimeoutAware.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import lombok.extern.slf4j.Slf4j;
2121
import org.dromara.dynamictp.common.entity.TpExecutorProps;
22-
import org.dromara.dynamictp.core.support.ExecutorWrapper;
2322
import org.dromara.dynamictp.core.support.ThreadPoolStatProvider;
2423

2524
import java.util.Objects;
@@ -51,10 +50,9 @@ public String getName() {
5150
@Override
5251
protected void refresh(TpExecutorProps props, ThreadPoolStatProvider statProvider) {
5352
super.refresh(props, statProvider);
54-
ExecutorWrapper executorWrapper = statProvider.getExecutorWrapper();
55-
if (Objects.nonNull(executorWrapper)) {
56-
statProvider.setRunTimeout(executorWrapper.getRunTimeout());
57-
statProvider.setQueueTimeout(executorWrapper.getQueueTimeout());
53+
if (Objects.nonNull(props)) {
54+
statProvider.setRunTimeout(props.getRunTimeout());
55+
statProvider.setQueueTimeout(props.getQueueTimeout());
5856
statProvider.setTryInterrupt(props.isTryInterrupt());
5957
}
6058
}

core/src/main/java/org/dromara/dynamictp/core/executor/DtpExecutor.java

+26
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,21 @@ public class DtpExecutor extends ThreadPoolExecutor implements TaskEnhanceAware,
104104
*/
105105
private boolean rejectEnhanced = true;
106106

107+
/**
108+
* for manual builder thread pools only
109+
*/
110+
private long runTimeout = 0;
111+
107112
/**
108113
* for manual builder thread pools only
109114
*/
110115
private boolean tryInterrupt = false;
111116

117+
/**
118+
* for manual builder thread pools only
119+
*/
120+
private long queueTimeout = 0;
121+
112122
/**
113123
* Whether to wait for scheduled tasks to complete on shutdown,
114124
* not interrupting running tasks and executing all tasks in the queue.
@@ -314,6 +324,14 @@ public void setRejectHandlerType(String rejectHandlerType) {
314324
this.rejectHandlerType = rejectHandlerType;
315325
}
316326

327+
public long getRunTimeout() {
328+
return runTimeout;
329+
}
330+
331+
public void setRunTimeout(long runTimeout) {
332+
this.runTimeout = runTimeout;
333+
}
334+
317335
public boolean isTryInterrupt() {
318336
return tryInterrupt;
319337
}
@@ -322,6 +340,14 @@ public void setTryInterrupt(boolean tryInterrupt) {
322340
this.tryInterrupt = tryInterrupt;
323341
}
324342

343+
public long getQueueTimeout() {
344+
return queueTimeout;
345+
}
346+
347+
public void setQueueTimeout(long queueTimeout) {
348+
this.queueTimeout = queueTimeout;
349+
}
350+
325351
public boolean isWaitForTasksToCompleteOnShutdown() {
326352
return waitForTasksToCompleteOnShutdown;
327353
}

core/src/main/java/org/dromara/dynamictp/core/support/ExecutorWrapper.java

-15
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import lombok.Data;
2222
import org.dromara.dynamictp.common.em.NotifyItemEnum;
2323
import org.dromara.dynamictp.common.entity.NotifyItem;
24-
import org.dromara.dynamictp.common.util.StreamUtil;
2524
import org.dromara.dynamictp.core.aware.AwareManager;
2625
import org.dromara.dynamictp.core.aware.RejectHandlerAware;
2726
import org.dromara.dynamictp.core.aware.TaskEnhanceAware;
@@ -227,18 +226,4 @@ public void setRejectHandler(RejectedExecutionHandler handler) {
227226
executor.setRejectedExecutionHandler(handler);
228227
}
229228
}
230-
231-
/**
232-
* Task execute timeout, unit (ms).
233-
*/
234-
public int getRunTimeout() {
235-
return StreamUtil.toMap(notifyItems, NotifyItem::getType).get(NotifyItemEnum.RUN_TIMEOUT.getValue()).getThreshold();
236-
}
237-
238-
/**
239-
* Task queue wait timeout, unit (ms), just for statistics.
240-
*/
241-
public int getQueueTimeout() {
242-
return StreamUtil.toMap(notifyItems, NotifyItem::getType).get(NotifyItemEnum.QUEUE_TIMEOUT.getValue()).getThreshold();
243-
}
244229
}

core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolBuilder.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,20 @@ public class ThreadPoolBuilder {
173173
private boolean notifyEnabled = true;
174174

175175
/**
176-
* If try interrupt thread when run timeout.
176+
* Task execute timeout, unit (ms).
177+
*/
178+
private long runTimeout = 0;
179+
180+
/**
181+
* If try interrupt thread when task run timeout.
177182
*/
178183
private boolean tryInterrupt = false;
179184

185+
/**
186+
* Task queue wait timeout, unit (ms), just for statistics.
187+
*/
188+
private long queueTimeout = 0;
189+
180190
/**
181191
* Task wrappers.
182192
*/
@@ -432,11 +442,21 @@ public ThreadPoolBuilder notifyEnabled(boolean notifyEnabled) {
432442
return this;
433443
}
434444

445+
public ThreadPoolBuilder runTimeout(long runTimeout) {
446+
this.runTimeout = runTimeout;
447+
return this;
448+
}
449+
435450
public ThreadPoolBuilder tryInterrupt(boolean tryInterrupt) {
436451
this.tryInterrupt = tryInterrupt;
437452
return this;
438453
}
439454

455+
public ThreadPoolBuilder queueTimeout(long queueTimeout) {
456+
this.queueTimeout = queueTimeout;
457+
return this;
458+
}
459+
440460
public ThreadPoolBuilder taskWrappers(List<TaskWrapper> taskWrappers) {
441461
this.taskWrappers.addAll(taskWrappers);
442462
return this;
@@ -566,7 +586,9 @@ private DtpExecutor buildDtpExecutor(ThreadPoolBuilder builder) {
566586
dtpExecutor.setAwaitTerminationSeconds(builder.awaitTerminationSeconds);
567587
dtpExecutor.setPreStartAllCoreThreads(builder.preStartAllCoreThreads);
568588
dtpExecutor.setRejectEnhanced(builder.rejectEnhanced);
589+
dtpExecutor.setRunTimeout(builder.runTimeout);
569590
dtpExecutor.setTryInterrupt(builder.tryInterrupt);
591+
dtpExecutor.setQueueTimeout(builder.queueTimeout);
570592
dtpExecutor.setTaskWrappers(builder.taskWrappers);
571593
dtpExecutor.setNotifyItems(builder.notifyItems);
572594
dtpExecutor.setPlatformIds(builder.platformIds);

core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolStatProvider.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
package org.dromara.dynamictp.core.support;
1919

2020
import lombok.val;
21+
import org.dromara.dynamictp.common.manager.ContextManagerHelper;
2122
import org.dromara.dynamictp.common.timer.HashedWheelTimer;
2223
import org.dromara.dynamictp.common.timer.Timeout;
2324
import org.dromara.dynamictp.core.executor.DtpExecutor;
2425
import org.dromara.dynamictp.core.monitor.PerformanceProvider;
25-
import org.dromara.dynamictp.common.manager.ContextManagerHelper;
2626
import org.dromara.dynamictp.core.timer.QueueTimeoutTimerTask;
2727
import org.dromara.dynamictp.core.timer.RunTimeoutTimerTask;
2828

@@ -100,9 +100,9 @@ private ThreadPoolStatProvider(ExecutorWrapper executorWrapper) {
100100
public static ThreadPoolStatProvider of(ExecutorWrapper executorWrapper) {
101101
val provider = new ThreadPoolStatProvider(executorWrapper);
102102
if (executorWrapper.isDtpExecutor()) {
103-
val dtpExecutor = (DtpExecutor) executorWrapper.getExecutor();
104-
provider.setRunTimeout(executorWrapper.getRunTimeout());
105-
provider.setQueueTimeout(executorWrapper.getQueueTimeout());
103+
DtpExecutor dtpExecutor = (DtpExecutor) executorWrapper.getExecutor();
104+
provider.setRunTimeout(dtpExecutor.getRunTimeout());
105+
provider.setQueueTimeout(dtpExecutor.getQueueTimeout());
106106
provider.setTryInterrupt(dtpExecutor.isTryInterrupt());
107107
}
108108
return provider;

spring/src/main/java/org/dromara/dynamictp/spring/annotation/DtpBeanDefinitionRegistrar.java

+4
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@
5050
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLATFORM_IDS;
5151
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLUGIN_NAMES;
5252
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PRE_START_ALL_CORE_THREADS;
53+
import static org.dromara.dynamictp.common.constant.DynamicTpConst.QUEUE_TIMEOUT;
5354
import static org.dromara.dynamictp.common.constant.DynamicTpConst.REJECT_ENHANCED;
5455
import static org.dromara.dynamictp.common.constant.DynamicTpConst.REJECT_HANDLER_TYPE;
56+
import static org.dromara.dynamictp.common.constant.DynamicTpConst.RUN_TIMEOUT;
5557
import static org.dromara.dynamictp.common.constant.DynamicTpConst.TASK_WRAPPERS;
5658
import static org.dromara.dynamictp.common.constant.DynamicTpConst.THREAD_POOL_ALIAS_NAME;
5759
import static org.dromara.dynamictp.common.constant.DynamicTpConst.THREAD_POOL_NAME;
@@ -107,7 +109,9 @@ private Map<String, Object> buildPropertyValues(DtpExecutorProps props) {
107109
propertyValues.put(PRE_START_ALL_CORE_THREADS, props.isPreStartAllCoreThreads());
108110
propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType());
109111
propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced());
112+
propertyValues.put(RUN_TIMEOUT, props.getRunTimeout());
110113
propertyValues.put(TRY_INTERRUPT_WHEN_TIMEOUT, props.isTryInterrupt());
114+
propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout());
111115
val notifyItems = mergeAllNotifyItems(props.getNotifyItems());
112116
propertyValues.put(NOTIFY_ITEMS, notifyItems);
113117
propertyValues.put(PLATFORM_IDS, props.getPlatformIds());

test/test-core/src/test/java/org/dromara/dynamictp/test/core/notify/capture/CapturedExecutorTest.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
package org.dromara.dynamictp.test.core.notify.capture;
1919

20-
import org.dromara.dynamictp.core.executor.DtpExecutor;
2120
import org.dromara.dynamictp.core.notifier.capture.CapturedExecutor;
21+
import org.dromara.dynamictp.core.support.adapter.ExecutorAdapter;
2222
import org.dromara.dynamictp.core.support.ThreadPoolBuilder;
2323
import org.dromara.dynamictp.core.support.ThreadPoolCreator;
24-
import org.dromara.dynamictp.core.support.adapter.ExecutorAdapter;
2524
import org.dromara.dynamictp.core.support.adapter.ThreadPoolExecutorAdapter;
25+
import org.dromara.dynamictp.core.executor.DtpExecutor;
2626
import org.junit.jupiter.api.Assertions;
2727
import org.junit.jupiter.api.BeforeAll;
2828
import org.junit.jupiter.api.RepeatedTest;
@@ -54,6 +54,8 @@ public static void setUp() {
5454
.workQueue(VARIABLE_LINKED_BLOCKING_QUEUE.getName(), 100, false, null)
5555
.waitForTasksToCompleteOnShutdown(true)
5656
.awaitTerminationSeconds(5)
57+
.runTimeout(200)
58+
.queueTimeout(200)
5759
.buildDynamic();
5860
}
5961

test/test-core/src/test/java/org/dromara/dynamictp/test/core/support/task/runnable/MdcRunnableTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.junit.Assert;
2525
import org.junit.Test;
2626
import org.slf4j.MDC;
27-
2827
import java.util.concurrent.CountDownLatch;
2928
import java.util.concurrent.TimeUnit;
3029

@@ -52,6 +51,8 @@ public void test() throws InterruptedException {
5251
.workQueue(VARIABLE_LINKED_BLOCKING_QUEUE.getName(), 20, false, null)
5352
.waitForTasksToCompleteOnShutdown(true)
5453
.awaitTerminationSeconds(5)
54+
.runTimeout(200)
55+
.queueTimeout(200)
5556
.buildDynamic();
5657

5758
CountDownLatch latch = new CountDownLatch(2);
@@ -83,6 +84,8 @@ public void testReject() throws InterruptedException {
8384
.workQueue(VARIABLE_LINKED_BLOCKING_QUEUE.getName(), 1, false, null)
8485
.waitForTasksToCompleteOnShutdown(true)
8586
.awaitTerminationSeconds(5)
87+
.runTimeout(200)
88+
.queueTimeout(200)
8689
.rejectedExecutionHandler("CallerRunsPolicy")
8790
.buildDynamic();
8891

test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorStaticTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public static void setUp() {
4949
.timeUnit(TimeUnit.MILLISECONDS)
5050
.waitForTasksToCompleteOnShutdown(true)
5151
.awaitTerminationSeconds(5)
52+
.runTimeout(10000)
53+
.queueTimeout(10000)
5254
.buildPriority();
5355
}
5456

test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/proxy/ThreadPoolExecutorProxyTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ public void rejectTest() throws InterruptedException {
138138

139139
private TpExecutorProps buildProps() {
140140
TpExecutorProps props = new TpExecutorProps();
141+
props.setRunTimeout(10);
142+
props.setQueueTimeout(10);
141143
return props;
142144
}
143145
}

0 commit comments

Comments
 (0)