Skip to content

Commit 74395c3

Browse files
authored
[ISSUE apache#4796] Use ThreadPoolFactory to create single thread executor for SourceWorker (apache#4798)
1 parent 2aff401 commit 74395c3

File tree

2 files changed

+10
-3
lines changed

2 files changed

+10
-3
lines changed

eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadPoolFactory.java

+5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.eventmesh.common;
1919

2020
import java.util.concurrent.BlockingQueue;
21+
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.Executors;
2223
import java.util.concurrent.LinkedBlockingQueue;
2324
import java.util.concurrent.ScheduledExecutorService;
@@ -57,4 +58,8 @@ public static ScheduledExecutorService createSingleScheduledExecutor(final Strin
5758
public static ScheduledExecutorService createScheduledExecutor(int core, ThreadFactory threadFactory) {
5859
return Executors.newScheduledThreadPool(core, threadFactory);
5960
}
61+
62+
public static ExecutorService createSingleExecutor(final String threadName) {
63+
return Executors.newSingleThreadExecutor(new EventMeshThreadFactory(threadName));
64+
}
6065
}

eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
2424
import org.apache.eventmesh.client.tcp.common.MessageUtils;
2525
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
26+
import org.apache.eventmesh.common.ThreadPoolFactory;
2627
import org.apache.eventmesh.common.exception.EventMeshException;
2728
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
2829
import org.apache.eventmesh.common.protocol.tcp.Package;
@@ -56,7 +57,6 @@
5657
import java.util.concurrent.BlockingQueue;
5758
import java.util.concurrent.ExecutionException;
5859
import java.util.concurrent.ExecutorService;
59-
import java.util.concurrent.Executors;
6060
import java.util.concurrent.Future;
6161
import java.util.concurrent.LinkedBlockingQueue;
6262
import java.util.concurrent.TimeUnit;
@@ -87,9 +87,11 @@ public class SourceWorker implements ConnectorWorker {
8787

8888
private volatile RecordOffsetManagement.CommittableOffsets committableOffsets;
8989

90-
private final ExecutorService pollService = Executors.newSingleThreadExecutor();
90+
private final ExecutorService pollService =
91+
ThreadPoolFactory.createSingleExecutor("eventMesh-sourceWorker-pollService");
9192

92-
private final ExecutorService startService = Executors.newSingleThreadExecutor();
93+
private final ExecutorService startService =
94+
ThreadPoolFactory.createSingleExecutor("eventMesh-sourceWorker-startService");
9395

9496
private final BlockingQueue<ConnectRecord> queue;
9597
private final EventMeshTCPClient<CloudEvent> eventMeshTCPClient;

0 commit comments

Comments
 (0)