From aea9c7f27f940eaeec0ca95f393f4b64bb0d2179 Mon Sep 17 00:00:00 2001 From: mytang0 Date: Thu, 16 Feb 2023 15:23:56 +0800 Subject: [PATCH] The lite-core module decouples ZooKeeper --- elasticjob-lite/elasticjob-lite-core/pom.xml | 22 +++--- .../api/registry/JobInstanceRegistry.java | 40 +++++----- .../listener/ListenerNotifierManager.java | 3 +- .../internal/snapshot/SnapshotService.java | 73 +++++++------------ .../lite/internal/util/ThreadUtils.java | 52 +++++++++++++ .../elasticjob-lite-spring-core/pom.xml | 5 ++ 6 files changed, 117 insertions(+), 78 deletions(-) create mode 100644 elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ThreadUtils.java diff --git a/elasticjob-lite/elasticjob-lite-core/pom.xml b/elasticjob-lite/elasticjob-lite-core/pom.xml index 32ff0c09a3..2acbd2e54f 100644 --- a/elasticjob-lite/elasticjob-lite-core/pom.xml +++ b/elasticjob-lite/elasticjob-lite-core/pom.xml @@ -15,7 +15,8 @@ ~ limitations under the License. --> - + 4.0.0 org.apache.shardingsphere.elasticjob @@ -24,7 +25,7 @@ elasticjob-lite-core ${project.artifactId} - + org.apache.shardingsphere.elasticjob @@ -41,11 +42,6 @@ elasticjob-registry-center-api ${project.parent.version} - - org.apache.shardingsphere.elasticjob - elasticjob-registry-center-zookeeper-curator - ${project.parent.version} - org.apache.shardingsphere.elasticjob elasticjob-simple-executor @@ -71,7 +67,7 @@ elasticjob-tracing-rdb ${project.parent.version} - + org.apache.commons commons-lang3 @@ -88,12 +84,12 @@ org.quartz-scheduler quartz - + org.projectlombok lombok - + org.apache.curator curator-test @@ -125,5 +121,11 @@ logback-classic test + + org.apache.shardingsphere.elasticjob + elasticjob-registry-center-zookeeper-curator + ${project.parent.version} + test + diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/registry/JobInstanceRegistry.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/registry/JobInstanceRegistry.java index 1af396349d..a17a052350 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/registry/JobInstanceRegistry.java +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/registry/JobInstanceRegistry.java @@ -17,9 +17,14 @@ package org.apache.shardingsphere.elasticjob.lite.api.registry; +import java.util.Arrays; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import lombok.RequiredArgsConstructor; - -import org.apache.curator.utils.ThreadUtils; import org.apache.shardingsphere.elasticjob.api.ElasticJob; import org.apache.shardingsphere.elasticjob.api.JobConfiguration; import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance; @@ -28,30 +33,23 @@ import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap; import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap; import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath; +import org.apache.shardingsphere.elasticjob.lite.internal.util.ThreadUtils; import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEvent; import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEventListener; -import java.util.Arrays; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - /** * Job instance registry. */ @RequiredArgsConstructor public final class JobInstanceRegistry { - + private static final Pattern JOB_CONFIG_COMPILE = Pattern.compile("/(\\w+)/config"); - + private final CoordinatorRegistryCenter regCenter; - + private final JobInstance jobInstance; - + /** * Register. */ @@ -60,9 +58,9 @@ public void register() { Executor executor = Executors.newSingleThreadExecutor(threadFactory); regCenter.watch("/", new JobInstanceRegistryListener(), executor); } - + public class JobInstanceRegistryListener implements DataChangedEventListener { - + @Override public void onChange(final DataChangedEvent event) { if (event.getType() != DataChangedEvent.Type.ADDED || !isJobConfigPath(event.getKey())) { @@ -78,13 +76,13 @@ public void onChange(final DataChangedEvent event) { new OneOffJobBootstrap(regCenter, newElasticJobInstance(jobConfig), jobConfig).execute(); } } - + private boolean isAllShardingItemsCompleted(final JobConfiguration jobConfig) { JobNodePath jobNodePath = new JobNodePath(jobConfig.getJobName()); return IntStream.range(0, jobConfig.getShardingTotalCount()) - .allMatch(each -> regCenter.isExisted(jobNodePath.getShardingNodePath(String.valueOf(each), "completed"))); + .allMatch(each -> regCenter.isExisted(jobNodePath.getShardingNodePath(String.valueOf(each), "completed"))); } - + private ElasticJob newElasticJobInstance(final JobConfiguration jobConfig) { String clazz = regCenter.get(String.format("/%s", jobConfig.getJobName())); try { @@ -95,7 +93,7 @@ private ElasticJob newElasticJobInstance(final JobConfiguration jobConfig) { throw new RuntimeException(String.format("new elastic job instance by class '%s' failure", clazz), ex); } } - + private boolean isLabelMatch(final JobConfiguration jobConfig) { if (jobConfig.getLabel() == null) { return false; @@ -105,7 +103,7 @@ private boolean isLabelMatch(final JobConfiguration jobConfig) { } return Arrays.stream(jobInstance.getLabels().split(",")).collect(Collectors.toSet()).contains(jobConfig.getLabel()); } - + private boolean isJobConfigPath(final String path) { return JOB_CONFIG_COMPILE.matcher(path).matches(); } diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java index 6f32a4251e..8a3a18cb35 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java @@ -17,8 +17,6 @@ package org.apache.shardingsphere.elasticjob.lite.internal.listener; -import org.apache.curator.utils.ThreadUtils; - import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -26,6 +24,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import org.apache.shardingsphere.elasticjob.lite.internal.util.ThreadUtils; /** * Manage listener's notify executor, diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/snapshot/SnapshotService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/snapshot/SnapshotService.java index fe109f5b0c..e755e4dd82 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/snapshot/SnapshotService.java +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/snapshot/SnapshotService.java @@ -18,13 +18,6 @@ package org.apache.shardingsphere.elasticjob.lite.internal.snapshot; import com.google.common.base.Preconditions; -import lombok.extern.slf4j.Slf4j; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.CuratorCache; -import org.apache.shardingsphere.elasticjob.lite.internal.util.SensitiveInfoUtils; -import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; -import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter; - import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; @@ -35,29 +28,32 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.elasticjob.lite.internal.util.SensitiveInfoUtils; +import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; /** * Snapshot service. */ @Slf4j public final class SnapshotService { - + public static final String DUMP_COMMAND = "dump@"; private final int port; - + private final CoordinatorRegistryCenter regCenter; - + private ServerSocket serverSocket; - + private volatile boolean closed; - + public SnapshotService(final CoordinatorRegistryCenter regCenter, final int port) { Preconditions.checkArgument(port >= 0 && port <= 0xFFFF, "Port value out of range: " + port); this.regCenter = regCenter; this.port = port; } - + /** * Start to listen. */ @@ -68,7 +64,7 @@ public void listen() { log.error("ElasticJob: Snapshot service listen failure, error is: ", ex); } } - + private int openSocket(final int port) throws IOException { closed = false; serverSocket = new ServerSocket(port); @@ -88,16 +84,16 @@ private int openSocket(final int port) throws IOException { }, threadName).start(); return localPort; } - + private boolean isIgnoredException() { return serverSocket.isClosed(); } - + private void process(final Socket socket) throws IOException { try ( - BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); - BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); - Socket ignored = socket) { + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); + Socket ignored = socket) { String cmdLine = reader.readLine(); if (null != cmdLine && cmdLine.startsWith(DUMP_COMMAND) && cmdLine.split("@").length == 2) { String jobName = cmdLine.split("@")[1]; @@ -106,45 +102,32 @@ private void process(final Socket socket) throws IOException { } } } - - private void dumpDirectly(final String path, final String jobName, final List result) { + + private void dumpDirectly(final String path, final List result) { for (String each : regCenter.getChildrenKeys(path)) { - String zkPath = path + "/" + each; - String zkValue = Optional.ofNullable(regCenter.get(zkPath)).orElse(""); - String cachePath = zkPath; - String cacheValue = zkValue; - // TODO Decoupling ZooKeeper - if (regCenter instanceof ZookeeperRegistryCenter) { - CuratorCache cache = (CuratorCache) regCenter.getRawCache("/" + jobName); - if (null != cache) { - Optional cacheData = cache.get(zkPath); - cachePath = cacheData.map(ChildData::getPath).orElse(""); - cacheValue = cacheData.map(ChildData::getData).map(String::new).orElse(""); - } - } - if (zkValue.equals(cacheValue) && zkPath.equals(cachePath)) { - result.add(String.join(" | ", zkPath, zkValue)); - } else { - result.add(String.join(" | ", zkPath, zkValue, cachePath, cacheValue)); - } - dumpDirectly(zkPath, jobName, result); + String childrenPath = path + "/" + each; + String childrenValue = Optional.ofNullable(regCenter.get(childrenPath)).orElse(""); + result.add(String.join(" | ", childrenPath, childrenValue)); + dumpDirectly(childrenPath, result); } } /** * Dump job. + * * @param jobName job's name * @return dump job's info */ public String dumpJobDirectly(final String jobName) { String path = "/" + jobName; final List result = new ArrayList<>(); - dumpDirectly(path, jobName, result); + dumpDirectly(path, result); return String.join("\n", SensitiveInfoUtils.filterSensitiveIps(result)) + "\n"; } /** * Dump job. + * * @param instanceIp job instance ip addr * @param dumpPort dump port * @param jobName job's name @@ -153,9 +136,9 @@ public String dumpJobDirectly(final String jobName) { */ public static String dumpJob(final String instanceIp, final int dumpPort, final String jobName) throws IOException { try ( - Socket socket = new Socket(instanceIp, dumpPort); - BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); - BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())) + Socket socket = new Socket(instanceIp, dumpPort); + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())) ) { writer.write(DUMP_COMMAND + jobName); writer.newLine(); @@ -173,7 +156,7 @@ private void outputMessage(final BufferedWriter outputWriter, final String msg) outputWriter.append(msg); outputWriter.flush(); } - + /** * Close listener. */ diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ThreadUtils.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ThreadUtils.java new file mode 100644 index 0000000000..28fbcef8e7 --- /dev/null +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ThreadUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.elasticjob.lite.internal.util; + +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.ThreadFactory; +import lombok.extern.slf4j.Slf4j; + +/** + * Thread utility. + */ +@Slf4j +public final class ThreadUtils { + + private ThreadUtils() { + + } + + /** + * Create a new generic thread factory instance. + * + * @param processName Process thread name prefix. + * @return Return generic thread factory instance. + */ + public static ThreadFactory newGenericThreadFactory(final String processName) { + Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (t, e) -> { + log.error("Unexpected exception in thread: " + t, e); + Throwables.throwIfUnchecked(e); + }; + return new ThreadFactoryBuilder() + .setNameFormat(processName + "-%d") + .setDaemon(true) + .setUncaughtExceptionHandler(uncaughtExceptionHandler) + .build(); + } +} diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-core/pom.xml b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-core/pom.xml index b48d41b71d..d297a70928 100644 --- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-core/pom.xml +++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-core/pom.xml @@ -38,6 +38,11 @@ + + org.apache.shardingsphere.elasticjob + elasticjob-registry-center-zookeeper-curator + ${project.parent.version} + org.springframework spring-context