1. Kettle的简单介绍
Kettle(现名Data Integration)是一款使用Java编写的功能强大的ETL(Extract Transform and Load)工具,支持关系型数据库(PostgreSQL、MySQL、Oracle等)、非关系型数据库(MongoDB、ElasticSearch等)以及文件之间的大规模数据迁移。
2. 常用组件
Kettle提供了极为丰富的组件库,下面列举的是它的一些常用组件,以及对组件的常用参数进行简单介绍,详细的参数说明可参考Kettle的帮助文档。
2.1 Table input
指定数据库表作为输入。
-
Step name: 步骤名称,Kettle的每一个组件即一个步骤,可为该步骤取一个别名 -
Connection: 指定数据库连接 -
SQL: 编写SQL,从该数据库表中筛选出符合条件的数据
2.2 Table output
指定数据库表作为输出
-
Step name: 步骤名称 -
Connection: 指定数据库连接 -
Target schema: 输出的数据库表模式 -
Target table: 指定输出的数据库表 -
Use batch update for inserts: 是否使用批处理进行插入 -
Database fields: 配置字段映射关系-
Table field: 输出的数据库表字段 -
Stream field: 流字段(流入该组件的数据字段)
-
2.3 Sort rows
按照某字段进行排序
-
Step name: 步骤名称 -
Fields:-
Fieldname: 排序的字段名 -
Ascending: 排序方式
-
2.4 Merge join
将不同来源数据进行融合,类似于
SQL中的join,注意: 该组件接收的数据必须按照join字段按照相同规则进行排序,否则join后的数据会有丢失。
-
Step name: 步骤名称 -
First Step: 需要融合的一组数据 -
Second Step: 需要融合的另一组数据 -
Join Type: 融合的类型 -
Keys for 1st step:First Step中进行融合的字段 -
Keys for 2nd step:Second Step中进行融合的字段
2.5 Add sequence
读取指定的序列值
-
Step name: 步骤名称 -
Name of value: 序列值别名 -
Use DB to get sequence: 是否使用数据库序列 -
Connnection: 数据库连接 -
Schema name: 数据库模式名称 -
Sequence name: 序列名
2.6 Modified Java Script Value
支持编写
JavaScript脚本,用于实现必要的业务逻辑
-
Step name: 步骤名称 -
Java script functions: 提供了一些JavaScript函数 -
Java script: 脚本编辑窗口 -
Fields: 可将脚本中的定义的变量映射出去
3. 在实际场景中的应用
在软件开发中,经常会遇到这样的场景: 新开发的系统即将替换老系统,而老系统庞大的数据需迁移到新系统中,但数据结构与新系统不完全兼容,下面通过一个简单的例子来介绍
Kettle是如何处理这些老数据,完成数据迁移任务的。
3.1 老数据结构
-
company公司表: -
district区域表:该表存储了省市区,通过parent_id进行关联
-
company_district公司区域表: -
employee员工表: -
employee_company员工公司表:
3.2 新数据结构
-
company公司表:对比老数据
company表,新的company表中新增了district、city、province字段,他们可以从老数据company_district表和district表中取得;contact字段对应tel字段;addr对应address。 -
employee员工表:对比老数据
employee表,新的employee表中新增company_id字段且有外键约束;sex字段由原来的1、2变更为男、女
3.3 数据迁移
由于
employee有外键关联company,因此先迁移company表数据,新的company表需新增old_id字段来保存老的company表的id,用于员工关联公司。
3.3.1 company表
数据迁移前的分析:
company表数据来源于三张表:company、company_district、district,因此需要三个Table input组件。company和company_district需进行join,join的结果还需和district进行join,因此需要两个Merge Join组件。- 使用
Merge join组件之前需进行排序,因此需要三个Sort rows组件- 新的
company表的id来源于自增长序列,因此需要一个Add sequence组件。- 最后将结果导入新的
company表,因此需要一个Table output组件。
- 打开
Kettle,点击File->new->Transformation,新建一个转换流程 - 点击左侧
DesignTab页,将Table input组件拖拽至右侧转换流程窗口,在组件上右键点击edit,弹出该组件的编辑窗口,设置步骤名称、数据库连接和SQL语句,如下图所示:
-
将
company和company_district数据进行left join,join之前需按照join字段排序,将Sort rows组件拖拽至右侧转换流程窗口,并进行编辑,如下图所示: -
将
Merge Join组件拖拽至右侧,并进行编辑,如下图所示: -
将
company和company_districtMerge Join的结果和district数据分别进行排序,同上面步骤 -
将两者进行
join,同上面步骤 -
添加
Add sequence组件,并进行编辑,如下图所示: -
添加
Table output组件,并进行编辑,如下图所示: -
整体流程如下图所示:
-
点击启动按钮执行整个流程,直至所有步骤右上角出现绿色的箭头,
company表便完成了迁移。
3.3.2 employee表
数据迁移前的分析:
employee表数据来源三张表: 老的employee、老的employee_company和新的company,因此需要三个Table input组件- 老的
employee和employee_company需进行join,join的结果还需和新的company进行join,因此需要两个Merge join组件和三个Sort rows组件。- 新的
employee表的id来源于自增长序列,因此需要一个Add sequence组件。- 新的
employee表的sex字段存储的是'男/女',而不是'1/2',因此需要一个Modified Java Script Value组件进行简单处理。- 最后将结果导入新的
employee表,因此需要一个Table output组件。
- 与
company的数据迁移类似,添加三个Table input组件,并进行编辑 - 分别将
employee和employee_company按照join字段进行统一排序 - 将排序的结果进行
join - 分别将新的
company和join之后的结果按照join字段进行统一排序 - 将排序的结果进行
join - 编写脚本,转换
sex字段 - 读取新的
employee序列值 - 输出到新的
employee表中 -
整体流程如下图所示:
-
点击启动按钮执行整个流程,直至所有步骤右上角出现绿色的箭头,
employee表便完成了迁移。
3.4 结果
-
company表 -
employee表
至此,便完成了老数据的迁移。
4. 遇到的问题
在
Kettle使用过程中会发现,当需要进行迁移的数据量较为庞大时(千万级),常常会出现内存溢出的问题,解决方法是将Kettle内存调高些: 打开spoon.sh文件,找到PENTAHO_DI_JAVA_OPTIONS="-Xms1024m -Xmx2048m -XX:MaxPermSize=256m",将其修改为PENTAHO_DI_JAVA_OPTIONS="-Xms16384m -Xmx32768m -XX:MaxPermSize=16384m",重启即可。