diff --git a/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java index 6bbb4a23c9..854c155da7 100644 --- a/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java +++ b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/ExampleConfigParser.java @@ -2,9 +2,14 @@ import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.transport.transformer.ComplexTransformerProxy; +import com.alibaba.datax.core.transport.transformer.TransformerInfo; +import com.alibaba.datax.core.transport.transformer.TransformerRegistry; import com.alibaba.datax.core.util.ConfigParser; import com.alibaba.datax.core.util.FrameworkErrorCode; import com.alibaba.datax.core.util.container.CoreConstant; +import com.alibaba.datax.transformer.ComplexTransformer; +import com.alibaba.datax.transformer.Transformer; import java.io.File; import java.io.IOException; @@ -20,6 +25,9 @@ public class ExampleConfigParser { private static final String PLUGIN_DESC_FILE = "plugin.json"; + private static final String TRANSFORMER_DESC_FILE = "transformer.json"; + + /** * 指定Job配置路径,ConfigParser会解析Job、Plugin、Core全部信息,并以Configuration返回 * 不同于Core的ConfigParser,这里的core,plugin 不依赖于编译后的datax.home,而是扫描程序编译后的target目录 @@ -37,6 +45,10 @@ public static Configuration parse(final String jobPath) { pluginTypeMap.put(writerName, "writer"); Configuration pluginsDescConfig = parsePluginsConfig(pluginTypeMap); configuration.merge(pluginsDescConfig, false); + // 扫描并注册自定义Transformer + List listConfiguration = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER); + registerTransformerConfig(listConfiguration); + return configuration; } @@ -151,4 +163,107 @@ private static Configuration coreConfig() { "Please check whether /example/conf/core.json exists!"); } } + + /** + * 注册第三方 transformer + * 判断是否使用TransFormer + * 如果使用则将涉及的Transformer项目全部注册 + * + * @param transformers + */ + private static void registerTransformerConfig(List transformers) { + + if (transformers == null || transformers.size() == 0) { + return; + } + Set transformerSet = new HashSet<>(); + for (Configuration transformer : transformers) { + String name = transformer.getString("name"); + if (!name.startsWith("dx_")) { // 只检测自定义Transformer + transformerSet.add(name); + } + } + for (File basePackage : runtimeBasePackages()) { + scanTransFormerByPackage(basePackage, basePackage.listFiles(), transformerSet); + } + if (!transformerSet.isEmpty()) { + String failedTransformer = transformerSet.toString(); + String message = "\ntransformer %s load failed :ry to analyze the reasons from the following aspects.。\n" + + "1: Check if the name of the transformer is spelled correctly, and verify whether DataX supports this transformer\n" + + "2:Verify if the tag has been added under section in the pom file of the relevant transformer.\n" + + " src/main/resources\n" + + " \n" + + " **/*.*\n" + + " \n" + + " true\n" + + " \n [Refer to the streamreader pom file] \n" + + "3: Check that the datax-yourtransformer-example module imported your test transformer"; + message = String.format(message, failedTransformer); + throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_INIT_ERROR, message); + } + } + + /** + * @param packageFile 编译出来的target/classes根目录 便于找到TRANSFORMER时设置插件的TRANSFORMER 目录,设置根目录是最保险的方式 + * @param files 待扫描文件 + */ + private static void scanTransFormerByPackage(File packageFile, File[] files, Set transformerSet) { + if (files == null) { + return; + } + for (File file : files) { + if (file.isFile() && TRANSFORMER_DESC_FILE.equals(file.getName())) { + Configuration transfomerDesc = Configuration.from(file); + String descTransformerName = transfomerDesc.getString("name"); + String descTransformerClass = transfomerDesc.getString("class"); + transformerSet.remove(descTransformerName); + + if (verfiyExist(descTransformerName,descTransformerClass)) { + break; + } + try { + Class transformerClass = Class.forName(descTransformerClass); + Object transformer = transformerClass.newInstance(); + if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) { + ((ComplexTransformer) transformer).setTransformerName(descTransformerName); + TransformerRegistry.registComplexTransformer((ComplexTransformer) transformer, null, false); + } else if (Transformer.class.isAssignableFrom(transformer.getClass())) { + ((Transformer) transformer).setTransformerName(descTransformerName); + TransformerRegistry.registTransformer((Transformer) transformer, null, false); + } else { + throw DataXException.asDataXException(String.format("load Transformer class(%s) error, path = %s", descTransformerClass, file.getPath())); + } + } catch (Exception e) { + //错误funciton跳过 + throw DataXException.asDataXException(String.format("skip transformer(%s),load Transformer class error, path = %s ", descTransformerName, file.getPath())); + } + } else { + scanTransFormerByPackage(packageFile, file.listFiles(), transformerSet); + } + } + } + + /** + * 验证Transformer 是否已经加载过了 + * @param transformerName + * @param transFormerClass + * @return + */ + private static Boolean verfiyExist(String transformerName,String transFormerClass){ + TransformerInfo transformer = TransformerRegistry.getTransformer(transformerName); + if(transformer==null){ + return false; + } + ComplexTransformerProxy proxy = (ComplexTransformerProxy)transformer.getTransformer(); + if(proxy==null){ + return false; + } + String className = proxy.getRealTransformer().getClass().getName(); + if (transFormerClass.equals(className)){ + return true; + }else{ + throw DataXException.asDataXException(String.format("skip transformer(%s),load Transformer class error,There are two Transformer with the same name and different class path, class1 = %s;class2 = %s ", transformerName,transFormerClass,className)); + } + } + } diff --git a/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/TransformerErrorCode.java b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/TransformerErrorCode.java new file mode 100644 index 0000000000..bf368e6428 --- /dev/null +++ b/datax-example/datax-example-core/src/main/java/com/alibaba/datax/example/util/TransformerErrorCode.java @@ -0,0 +1,41 @@ +package com.alibaba.datax.example.util; + +import com.alibaba.datax.common.spi.ErrorCode; + +/** + * TODO: 根据现有日志数据分析各类错误,进行细化。 + * + *

请不要格式化本类代码

+ */ +public enum TransformerErrorCode implements ErrorCode { + + TRANSFORMER_INIT_ERROR("Transformer-00", "DataX Transformer 注册失败, 请联系您的运维解决 ."); + + + private final String code; + + private final String description; + + private TransformerErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s]. ", this.code, + this.description); + } + + +}