Skip to content

Commit dcb76b9

Browse files
authored
[Fix][Plugin] Optimize the plugin discovery mechanism (#8603)
1 parent 55ed90e commit dcb76b9

File tree

3 files changed

+133
-106
lines changed

3 files changed

+133
-106
lines changed

Diff for: seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java

+57-103
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.seatunnel.common.constants.PluginType;
3737
import org.apache.seatunnel.common.utils.FileUtils;
3838
import org.apache.seatunnel.common.utils.ReflectionUtils;
39+
import org.apache.seatunnel.common.utils.SeaTunnelException;
3940

4041
import org.apache.commons.lang3.ArrayUtils;
4142
import org.apache.commons.lang3.StringUtils;
@@ -44,7 +45,6 @@
4445
import lombok.extern.slf4j.Slf4j;
4546

4647
import java.io.File;
47-
import java.io.FileFilter;
4848
import java.io.IOException;
4949
import java.net.MalformedURLException;
5050
import java.net.URL;
@@ -55,13 +55,11 @@
5555
import java.util.Collection;
5656
import java.util.Collections;
5757
import java.util.HashMap;
58-
import java.util.HashSet;
5958
import java.util.LinkedHashMap;
6059
import java.util.List;
6160
import java.util.Map;
6261
import java.util.Optional;
6362
import java.util.ServiceLoader;
64-
import java.util.Set;
6563
import java.util.concurrent.ConcurrentHashMap;
6664
import java.util.function.BiConsumer;
6765
import java.util.stream.Collectors;
@@ -90,6 +88,9 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
9088
private final BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer;
9189
protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> pluginJarPath =
9290
new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
91+
protected final Map<PluginIdentifier, String> sourcePluginInstance;
92+
protected final Map<PluginIdentifier, String> sinkPluginInstance;
93+
protected final Map<PluginIdentifier, String> transformPluginInstance;
9394

9495
public AbstractPluginDiscovery(BiConsumer<ClassLoader, URL> addURLToClassloader) {
9596
this(Common.connectorDir(), loadConnectorPluginConfig(), addURLToClassloader);
@@ -114,6 +115,9 @@ public AbstractPluginDiscovery(
114115
this.pluginDir = pluginDir;
115116
this.pluginMappingConfig = pluginMappingConfig;
116117
this.addURLToClassLoaderConsumer = addURLToClassLoaderConsumer;
118+
this.sourcePluginInstance = getAllSupportedPlugins(PluginType.SOURCE);
119+
this.sinkPluginInstance = getAllSupportedPlugins(PluginType.SINK);
120+
this.transformPluginInstance = getAllSupportedPlugins(PluginType.TRANSFORM);
117121
log.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
118122
}
119123

@@ -423,14 +427,10 @@ private Optional<URL> findPluginJarPath(PluginIdentifier pluginIdentifier) {
423427
pluginDir
424428
.toFile()
425429
.listFiles(
426-
new FileFilter() {
427-
@Override
428-
public boolean accept(File pathname) {
429-
return pathname.getName().endsWith(".jar")
430+
pathname ->
431+
pathname.getName().endsWith(".jar")
430432
&& StringUtils.startsWithIgnoreCase(
431-
pathname.getName(), pluginJarPrefix);
432-
}
433-
});
433+
pathname.getName(), pluginJarPrefix));
434434
if (ArrayUtils.isEmpty(targetPluginFiles)) {
435435
return Optional.empty();
436436
}
@@ -439,10 +439,9 @@ public boolean accept(File pathname) {
439439
if (targetPluginFiles.length == 1) {
440440
pluginJarPath = targetPluginFiles[0].toURI().toURL();
441441
} else {
442+
PluginType type = PluginType.valueOf(pluginType.toUpperCase());
442443
pluginJarPath =
443-
findMostSimlarPluginJarFile(targetPluginFiles, pluginJarPrefix)
444-
.toURI()
445-
.toURL();
444+
selectPluginJar(targetPluginFiles, pluginJarPrefix, pluginName, type).get();
446445
}
447446
log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier, pluginJarPath);
448447
return Optional.of(pluginJarPath);
@@ -455,104 +454,59 @@ public boolean accept(File pathname) {
455454
}
456455
}
457456

458-
private static File findMostSimlarPluginJarFile(
459-
File[] targetPluginFiles, String pluginJarPrefix) {
460-
String splitRegex = "\\-|\\_|\\.";
461-
double maxSimlarity = -Integer.MAX_VALUE;
462-
int mostSimlarPluginJarFileIndex = -1;
463-
for (int i = 0; i < targetPluginFiles.length; i++) {
464-
File file = targetPluginFiles[i];
465-
String fileName = file.getName();
466-
double similarity =
467-
CosineSimilarityUtil.cosineSimilarity(pluginJarPrefix, fileName, splitRegex);
468-
if (similarity > maxSimlarity) {
469-
maxSimlarity = similarity;
470-
mostSimlarPluginJarFileIndex = i;
471-
}
457+
private Optional<URL> selectPluginJar(
458+
File[] targetPluginFiles, String pluginJarPrefix, String pluginName, PluginType type) {
459+
List<URL> resMatchedUrls = new ArrayList<>();
460+
for (File file : targetPluginFiles) {
461+
Optional<URL> matchedUrl = findMatchingUrl(file, type);
462+
matchedUrl.ifPresent(resMatchedUrls::add);
463+
}
464+
if (resMatchedUrls.size() != 1) {
465+
throw new SeaTunnelException(
466+
String.format(
467+
"Cannot find unique plugin jar for pluginIdentifier: %s -> %s. Possible impact jar: %s",
468+
pluginName, pluginJarPrefix, Arrays.asList(targetPluginFiles)));
469+
} else {
470+
return Optional.of(resMatchedUrls.get(0));
472471
}
473-
return targetPluginFiles[mostSimlarPluginJarFileIndex];
474472
}
475473

476-
static class CosineSimilarityUtil {
477-
public static double cosineSimilarity(String textA, String textB, String splitRegrex) {
478-
Set<String> words1 =
479-
new HashSet<>(Arrays.asList(textA.toLowerCase().split(splitRegrex)));
480-
Set<String> words2 =
481-
new HashSet<>(Arrays.asList(textB.toLowerCase().split(splitRegrex)));
482-
int[] termFrequency1 = calculateTermFrequencyVector(textA, words1, splitRegrex);
483-
int[] termFrequency2 = calculateTermFrequencyVector(textB, words2, splitRegrex);
484-
return calculateCosineSimilarity(termFrequency1, termFrequency2);
474+
private Optional<URL> findMatchingUrl(File file, PluginType type) {
475+
Map<PluginIdentifier, String> pluginInstanceMap = null;
476+
switch (type) {
477+
case SINK:
478+
pluginInstanceMap = sinkPluginInstance;
479+
break;
480+
case SOURCE:
481+
pluginInstanceMap = sourcePluginInstance;
482+
break;
483+
case TRANSFORM:
484+
pluginInstanceMap = transformPluginInstance;
485+
break;
485486
}
486-
487-
private static int[] calculateTermFrequencyVector(
488-
String text, Set<String> words, String splitRegrex) {
489-
int[] termFrequencyVector = new int[words.size()];
490-
String[] textArray = text.toLowerCase().split(splitRegrex);
491-
List<String> orderedWords = new ArrayList<String>();
492-
words.clear();
493-
for (String word : textArray) {
494-
if (!words.contains(word)) {
495-
orderedWords.add(word);
496-
words.add(word);
497-
}
498-
}
499-
for (String word : textArray) {
500-
if (words.contains(word)) {
501-
int index = 0;
502-
for (String w : orderedWords) {
503-
if (w.equals(word)) {
504-
termFrequencyVector[index]++;
505-
break;
506-
}
507-
index++;
508-
}
509-
}
510-
}
511-
return termFrequencyVector;
487+
if (pluginInstanceMap == null) {
488+
return Optional.empty();
512489
}
513-
514-
private static double calculateCosineSimilarity(int[] vectorA, int[] vectorB) {
515-
double dotProduct = 0.0;
516-
double magnitudeA = 0.0;
517-
double magnitudeB = 0.0;
518-
int vectorALength = vectorA.length;
519-
int vectorBLength = vectorB.length;
520-
if (vectorALength < vectorBLength) {
521-
int[] vectorTemp = new int[vectorBLength];
522-
for (int i = 0; i < vectorB.length; i++) {
523-
if (i <= vectorALength - 1) {
524-
vectorTemp[i] = vectorA[i];
525-
} else {
526-
vectorTemp[i] = 0;
527-
}
528-
}
529-
vectorA = vectorTemp;
530-
}
531-
if (vectorALength > vectorBLength) {
532-
int[] vectorTemp = new int[vectorALength];
533-
for (int i = 0; i < vectorA.length; i++) {
534-
if (i <= vectorBLength - 1) {
535-
vectorTemp[i] = vectorB[i];
536-
} else {
537-
vectorTemp[i] = 0;
538-
}
539-
}
540-
vectorB = vectorTemp;
541-
}
542-
for (int i = 0; i < vectorA.length; i++) {
543-
dotProduct += vectorA[i] * vectorB[i];
544-
magnitudeA += Math.pow(vectorA[i], 2);
545-
magnitudeB += Math.pow(vectorB[i], 2);
490+
List<PluginIdentifier> matchedIdentifier = new ArrayList<>();
491+
for (Map.Entry<PluginIdentifier, String> entry : pluginInstanceMap.entrySet()) {
492+
if (file.getName().startsWith(entry.getValue())) {
493+
matchedIdentifier.add(entry.getKey());
546494
}
495+
}
547496

548-
magnitudeA = Math.sqrt(magnitudeA);
549-
magnitudeB = Math.sqrt(magnitudeB);
550-
551-
if (magnitudeA == 0 || magnitudeB == 0) {
552-
return 0.0; // Avoid dividing by 0
553-
} else {
554-
return dotProduct / (magnitudeA * magnitudeB);
497+
if (matchedIdentifier.size() == 1) {
498+
try {
499+
return Optional.of(file.toURI().toURL());
500+
} catch (MalformedURLException e) {
501+
log.warn("Cannot get plugin URL for pluginIdentifier: {}", file, e);
555502
}
556503
}
504+
if (log.isDebugEnabled()) {
505+
log.debug(
506+
"File found: {}, matches more than one PluginIdentifier: {}",
507+
file.getName(),
508+
matchedIdentifier);
509+
}
510+
return Optional.empty();
557511
}
558512
}

Diff for: seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java

+61-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.seatunnel.common.config.Common;
2424
import org.apache.seatunnel.common.config.DeployMode;
2525
import org.apache.seatunnel.common.constants.PluginType;
26+
import org.apache.seatunnel.common.utils.SeaTunnelException;
2627

2728
import org.junit.jupiter.api.AfterEach;
2829
import org.junit.jupiter.api.Assertions;
@@ -53,7 +54,13 @@ class SeaTunnelSourcePluginDiscoveryTest {
5354
Paths.get(seatunnelHome, "connectors", "connector-http.jar"),
5455
Paths.get(seatunnelHome, "connectors", "connector-kafka.jar"),
5556
Paths.get(seatunnelHome, "connectors", "connector-kafka-alcs.jar"),
56-
Paths.get(seatunnelHome, "connectors", "connector-kafka-blcs.jar"));
57+
Paths.get(seatunnelHome, "connectors", "connector-kafka-blcs.jar"),
58+
Paths.get(seatunnelHome, "connectors", "connector-jdbc-release-1.1.jar"),
59+
Paths.get(seatunnelHome, "connectors", "connector-jdbc-hive1.jar"),
60+
Paths.get(seatunnelHome, "connectors", "connector-odbc-baidu-v1.jar"),
61+
Paths.get(seatunnelHome, "connectors", "connector-odbc-baidu-release-1.1.jar"),
62+
Paths.get(seatunnelHome, "connectors", "seatunnel-transforms-v2.jar"),
63+
Paths.get(seatunnelHome, "connectors", "seatunnel-transforms-v1.jar"));
5764

5865
@BeforeEach
5966
public void before() throws IOException {
@@ -75,7 +82,8 @@ void getPluginBaseClass() {
7582
PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), "HttpJira"),
7683
PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), "HttpBase"),
7784
PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), "Kafka"),
78-
PluginIdentifier.of("seatunnel", PluginType.SINK.getType(), "Kafka-Blcs"));
85+
PluginIdentifier.of("seatunnel", PluginType.SINK.getType(), "Kafka-Blcs"),
86+
PluginIdentifier.of("seatunnel", PluginType.SINK.getType(), "Jdbc"));
7987
SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
8088
new SeaTunnelSourcePluginDiscovery();
8189
Assertions.assertIterableEquals(
@@ -87,6 +95,57 @@ void getPluginBaseClass() {
8795
Paths.get(seatunnelHome, "connectors", "connector-kafka.jar")
8896
.toString(),
8997
Paths.get(seatunnelHome, "connectors", "connector-kafka-blcs.jar")
98+
.toString(),
99+
Paths.get(
100+
seatunnelHome,
101+
"connectors",
102+
"connector-jdbc-release-1.1.jar")
103+
.toString())
104+
.collect(Collectors.toList()),
105+
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers).stream()
106+
.map(URL::getPath)
107+
.collect(Collectors.toList()));
108+
}
109+
110+
@Test
111+
void getPluginBaseClassFailureScenario() {
112+
List<PluginIdentifier> pluginIdentifiers =
113+
Lists.newArrayList(
114+
PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), "Odbc"));
115+
SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
116+
new SeaTunnelSourcePluginDiscovery();
117+
Exception exception =
118+
Assertions.assertThrows(
119+
SeaTunnelException.class,
120+
() -> seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers));
121+
System.out.println(exception.getMessage());
122+
Assertions.assertTrue(
123+
exception
124+
.getMessage()
125+
.matches(
126+
"Cannot find unique plugin jar for pluginIdentifier: odbc -> connector-odbc. "
127+
+ "Possible impact jar: \\[.*.jar, .*.jar]"));
128+
}
129+
130+
@Test
131+
void getTransformClass() {
132+
List<PluginIdentifier> pluginIdentifiers =
133+
Lists.newArrayList(
134+
PluginIdentifier.of("seatunnel", PluginType.TRANSFORM.getType(), "Sql"),
135+
PluginIdentifier.of("seatunnel", PluginType.TRANSFORM.getType(), "Filter"));
136+
SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
137+
new SeaTunnelSourcePluginDiscovery();
138+
Assertions.assertIterableEquals(
139+
Stream.of(
140+
Paths.get(
141+
seatunnelHome,
142+
"connectors",
143+
"seatunnel-transforms-v2.jar")
144+
.toString(),
145+
Paths.get(
146+
seatunnelHome,
147+
"connectors",
148+
"seatunnel-transforms-v1.jar")
90149
.toString())
91150
.collect(Collectors.toList()),
92151
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers).stream()

Diff for: seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties

+15-1
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,18 @@ seatunnel.sink.Kafka = connector-kafka
2424
seatunnel.source.Kafka-Alcs = connector-kafka-alcs
2525
seatunnel.sink.Kafka-Alcs = connector-kafka-alcs
2626
seatunnel.source.Kafka-Blcs = connector-kafka-blcs
27-
seatunnel.sink.Kafka-Blcs = connector-kafka-blcs
27+
seatunnel.sink.Kafka-Blcs = connector-kafka-blcs
28+
seatunnel.source.Jdbc = connector-jdbc
29+
seatunnel.sink.Jdbc = connector-jdbc
30+
seatunnel.source.Hive1-Jdbc = connector-jdbc-hive1
31+
seatunnel.sink.Hive1-Jdbc = connector-jdbc-hive1
32+
seatunnel.source.Odbc = connector-odbc
33+
seatunnel.sink.Odbc = connector-odbc
34+
seatunnel.source.Baidu-Odbc = connector-odbc-baidu
35+
seatunnel.sink.Baidu-Odbc = connector-odbc-baidu
36+
37+
seatunnel.transform.Sql = seatunnel-transforms-v2
38+
seatunnel.transform.FieldMapper = seatunnel-transforms-v2
39+
seatunnel.transform.Filter = seatunnel-transforms-v1
40+
seatunnel.transform.FilterRowKind = seatunnel-transforms-v1
41+

0 commit comments

Comments
 (0)