DataX从入门到精通04—自定义transformer
文章目录前言一、pandas是什么?二、使用步骤1.引入库2.读入数据总结前言提示:这里可以添加本文要记录的大概内容:例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习的基础内容。提示:以下是本篇文章正文内容,下面案例可供参考一、pandas是什么?示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。二、
前言
一、Transformer是什么?
在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。更多详情参考官方介绍: 官方介绍
二、使用步骤
1. 开发流程
1.从 Github 上 clone DataX 项目源码到本地,在根目录下找到 transformer 文件夹;
2.在 com.alibaba.datax.transport.transformer 路径下找到transformer,
继承 Transformer 并参考已有的transformer类实现接口,按你的需求接收参数,用于从 job 配置文件接收命令;
3.在 core\src\main\java\com\alibaba\datax\core\transport\transformer 目录的 TransformerRegistry 类中注册你编写的 transformer 类
4. mvn打包编译,调试 mvn clean package -DskipTests assembly:assembly
2.示例
代码如下(示例):
- 以下以非对称加密做个演示,继承Transformer 编写自己的处理类
datax/transformer/src/main/java/com/alibaba/datax/transformer/AESTransformer.java
public class AESTransformer extends Transformer {
private static final Logger LOG = LoggerFactory.getLogger(AESTransformer.class);
public static final String ENCRYPT_KEY = "种子key";
int columnIndex;
public AESTransformer() {
setTransformerName("dx_aes");
LOG.info("Using AES preserve masker");
}
@Override
public Record evaluate(Record record, Object... paras) {
try {
if (paras.length < 1) {
throw new RuntimeException("dx_aes transformer缺少参数");
}
columnIndex = (Integer) paras[0];
} catch (Exception e) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER, "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());
}
Column column = record.getColumn(columnIndex);
try {
String oriValue = column.asString();
if (oriValue == null) {
return record;
}
if(column.getType() == Column.Type.STRING) {
EncryptUtil encryptUtil = EncryptUtil.getInstance();
String newValue = encryptUtil.AESencode(oriValue, ENCRYPT_KEY);
record.setColumn(columnIndex, new StringColumn(newValue));
}
} catch (Exception e) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
}
return record;
}
}
- 注册 (节选部分源码)
/**
* no comments.
* Created by liqiang on 16/3/3.
*/
public class TransformerRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class);
private static Map<String, TransformerInfo> registedTransformer = new HashMap<String, TransformerInfo>();
static {
/**
* add native transformer
* local storage and from server will be delay load.
*/
registTransformer(new SubstrTransformer());
registTransformer(new PadTransformer());
registTransformer(new ReplaceTransformer());
registTransformer(new FilterTransformer());
registTransformer(new GroovyTransformer());
registTransformer(new AESTransformer()); // 注册自己的类
}
- 重新打包部署
- 使用transformer示例JSON
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxxx",
"password": "xxxx",
"column" : [
"id"
,"les_id"
,"grade_id"
,"edition_id"
,"subject_id"
,"course_system_first_id"
,"course_system_second_id"
,"course_system_third_id"
,"course_system_four_id"
,"custom_points"
,"deleted"
,"created_at"
,"tea_id"
,"stu_id"
,"les_uid"
,"updated_at"
,"pt"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://xxxx:3306/test?useUnicode=true&characterEncoding=utf8"],
"table": ["xxx"]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":"id" , "type":"int"},
{"name":"les_id" , "type":"int"},
{"name":"grade_id" , "type":"int"},
{"name":"edition_id", "type":"int"},
{"name":"subject_id", "type":"int"},
{"name":"course_system_first_id" , "type":"int"},
{"name":"course_system_second_id", "type":"int"},
{"name":"course_system_third_id" , "type":"int"},
{"name":"course_system_four_id" , "type":"int"},
{"name":"custom_points", "type":"string"},
{"name":"deleted" ,"type":"TINYINT"},
{"name":"created_at" ,"type":"string"},
{"name":"tea_id" ,"type":"int"},
{"name":"stu_id", "type":"int"},
{"name":"les_uid" ,"type":"string"},
{"name":"updated_at" ,"type":"string"}
],
"defaultFS": "hdfs://nameservice1",
"hadoopConfig":{
"dfs.nameservices": "nameservice1",
"dfs.ha.namenodes.nameservice1": "namenode286,namenode36",
"dfs.namenode.rpc-address.nameservice1.namenode286": "xxxx:8020",
"dfs.namenode.rpc-address.nameservice1.namenode36": "xxxx:8020",
"dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"haveKerberos": "true",
"kerberosKeytabFilePath": "/home/xx/kerberos/xxx.keytab",
"kerberosPrincipal":"xxx@FAYSON.COM",
"encoding": "UTF-8",
"fileType": "orc",
"fileName": "xxx",
"path": "/user/hive/warehouse/ods.db/xxxxx/pt=2020-01-20",
"writeMode": "append", // append & overwrite
"fieldDelimiter" :"\u0001"
}
},
// 加密控制对应的字段索引号
"transformer": [
{
"name": "dx_aes",
"parameter":
{
"columnIndex":9,
"paras":[""]
}
},
{
"name": "dx_aes",
"parameter":
{
"columnIndex":11,
"paras":[""]
}
}
]
}
],
// 优化相关,暂时给默认值
"setting": {
"speed": {
"channel": "5"
},
"errorLimit": {
"record": 0
}
}
}
}
总结
本文作为示例主要讲解了DataX作为ETL工具中的T的转换部分,他能做的事情有很多比如脏数据清理,UDF转换、数据的加密解密处理等,在实际使用过程中还是有很多的用处的。后面的章节中将从源码的角度解开其“神秘”面纱,如果你有更好的想法欢迎和我一同分享。
更多推荐
所有评论(0)