Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加 datax-example 模块对第三方 Transformer 的扫描和注册 #2209

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.transformer.ComplexTransformerProxy;
import com.alibaba.datax.core.transport.transformer.TransformerInfo;
import com.alibaba.datax.core.transport.transformer.TransformerRegistry;
import com.alibaba.datax.core.util.ConfigParser;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.transformer.ComplexTransformer;
import com.alibaba.datax.transformer.Transformer;

import java.io.File;
import java.io.IOException;
Expand All @@ -20,6 +25,9 @@ public class ExampleConfigParser {

private static final String PLUGIN_DESC_FILE = "plugin.json";

private static final String TRANSFORMER_DESC_FILE = "transformer.json";


/**
* 指定Job配置路径,ConfigParser会解析Job、Plugin、Core全部信息,并以Configuration返回
* 不同于Core的ConfigParser,这里的core,plugin 不依赖于编译后的datax.home,而是扫描程序编译后的target目录
Expand All @@ -37,6 +45,10 @@ public static Configuration parse(final String jobPath) {
pluginTypeMap.put(writerName, "writer");
Configuration pluginsDescConfig = parsePluginsConfig(pluginTypeMap);
configuration.merge(pluginsDescConfig, false);
// 扫描并注册自定义Transformer
List<Configuration> listConfiguration = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
registerTransformerConfig(listConfiguration);

return configuration;
}

Expand Down Expand Up @@ -151,4 +163,107 @@ private static Configuration coreConfig() {
"Please check whether /example/conf/core.json exists!");
}
}

/**
* 注册第三方 transformer
* 判断是否使用TransFormer
* 如果使用则将涉及的Transformer项目全部注册
*
* @param transformers
*/
private static void registerTransformerConfig(List<Configuration> transformers) {

if (transformers == null || transformers.size() == 0) {
return;
}
Set<String> transformerSet = new HashSet<>();
for (Configuration transformer : transformers) {
String name = transformer.getString("name");
if (!name.startsWith("dx_")) { // 只检测自定义Transformer
transformerSet.add(name);
}
}
for (File basePackage : runtimeBasePackages()) {
scanTransFormerByPackage(basePackage, basePackage.listFiles(), transformerSet);
}
if (!transformerSet.isEmpty()) {
String failedTransformer = transformerSet.toString();
String message = "\ntransformer %s load failed :ry to analyze the reasons from the following aspects.。\n" +
"1: Check if the name of the transformer is spelled correctly, and verify whether DataX supports this transformer\n" +
"2:Verify if the <resource></resource> tag has been added under <build></build> section in the pom file of the relevant transformer.\n<resource>" +
" <directory>src/main/resources</directory>\n" +
" <includes>\n" +
" <include>**/*.*</include>\n" +
" </includes>\n" +
" <filtering>true</filtering>\n" +
" </resource>\n [Refer to the streamreader pom file] \n" +
"3: Check that the datax-yourtransformer-example module imported your test transformer";
message = String.format(message, failedTransformer);
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_INIT_ERROR, message);
}
}

/**
* @param packageFile 编译出来的target/classes根目录 便于找到TRANSFORMER时设置插件的TRANSFORMER 目录,设置根目录是最保险的方式
* @param files 待扫描文件
*/
private static void scanTransFormerByPackage(File packageFile, File[] files, Set<String> transformerSet) {
if (files == null) {
return;
}
for (File file : files) {
if (file.isFile() && TRANSFORMER_DESC_FILE.equals(file.getName())) {
Configuration transfomerDesc = Configuration.from(file);
String descTransformerName = transfomerDesc.getString("name");
String descTransformerClass = transfomerDesc.getString("class");
transformerSet.remove(descTransformerName);

if (verfiyExist(descTransformerName,descTransformerClass)) {
break;
}
try {
Class<?> transformerClass = Class.forName(descTransformerClass);
Object transformer = transformerClass.newInstance();
if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) {
((ComplexTransformer) transformer).setTransformerName(descTransformerName);
TransformerRegistry.registComplexTransformer((ComplexTransformer) transformer, null, false);
} else if (Transformer.class.isAssignableFrom(transformer.getClass())) {
((Transformer) transformer).setTransformerName(descTransformerName);
TransformerRegistry.registTransformer((Transformer) transformer, null, false);
} else {
throw DataXException.asDataXException(String.format("load Transformer class(%s) error, path = %s", descTransformerClass, file.getPath()));
}
} catch (Exception e) {
//错误funciton跳过
throw DataXException.asDataXException(String.format("skip transformer(%s),load Transformer class error, path = %s ", descTransformerName, file.getPath()));
}
} else {
scanTransFormerByPackage(packageFile, file.listFiles(), transformerSet);
}
}
}

/**
* 验证Transformer 是否已经加载过了
* @param transformerName
* @param transFormerClass
* @return
*/
private static Boolean verfiyExist(String transformerName,String transFormerClass){
TransformerInfo transformer = TransformerRegistry.getTransformer(transformerName);
if(transformer==null){
return false;
}
ComplexTransformerProxy proxy = (ComplexTransformerProxy)transformer.getTransformer();
if(proxy==null){
return false;
}
String className = proxy.getRealTransformer().getClass().getName();
if (transFormerClass.equals(className)){
return true;
}else{
throw DataXException.asDataXException(String.format("skip transformer(%s),load Transformer class error,There are two Transformer with the same name and different class path, class1 = %s;class2 = %s ", transformerName,transFormerClass,className));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.alibaba.datax.example.util;

import com.alibaba.datax.common.spi.ErrorCode;

/**
* TODO: 根据现有日志数据分析各类错误,进行细化。
*
* <p>请不要格式化本类代码</p>
*/
public enum TransformerErrorCode implements ErrorCode {

TRANSFORMER_INIT_ERROR("Transformer-00", "DataX Transformer 注册失败, 请联系您的运维解决 .");


private final String code;

private final String description;

private TransformerErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return this.code;
}

@Override
public String getDescription() {
return this.description;
}

@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code,
this.description);
}


}