Java执行kettle文件
添加依赖:
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>7.0.0.0-25</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>7.0.0.0-25</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.6</version>
</dependency>
工具类
package com.example.kettledemo.util;
import java.io.File;
import java.util.Map;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
/**
* @Package: com.example.kettledemo.util
* @Since: 2020/5/13 14:26
* @Version: V1.0
*/
public class kettleUtil {
/**
* 调用trans文件
*
* @param transFileName
* @throws Exception
*/
public static void callNativeTrans(String transFileName) throws Exception {
callNativeTransWithParams(null, transFileName);
}
/**
* 调用trans文件 带参数的
*
* @param params
* @param transFileName
* @throws Exception
*/
public static void callNativeTransWithParams(String[] params, String transFileName) throws Exception {
// 初始化
KettleEnvironment.init();
EnvUtil.environmentInit();
TransMeta transMeta = new TransMeta(transFileName);
//转换
Trans trans = new Trans(transMeta);
//执行
trans.execute(params);
//等待结束
trans.waitUntilFinished();
//抛出异常
if (trans.getErrors() > 0) {
throw new Exception("There are errors during transformation exception!(传输过程中发生异常)");
}
}
/**
* 调用job文件
*
* @param jobName
* @throws Exception
*/
public static void callNativeJob(String jobName, Map map) throws Exception {
// 初始化
String path = Thread.currentThread().getContextClassLoader().getResource("").getPath();
File file = new File(path+"/simple-jndi");// path是jdbc.prtoperties上层文件夹路径
String sysPath = file.getCanonicalPath();
Const.JNDI_DIRECTORY = sysPath;
KettleEnvironment.init();
JobMeta jobMeta = new JobMeta(jobName, null);
Job job = new Job(null, jobMeta);
//向Job 脚本传递变量,脚本中获取参数值:${变量名}
job.injectVariables(map);
job.start();
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new Exception("There are errors during job exception!(执行job发生异常)");
}
}
}
XXX.kjb中配置的变量:
测试:执行kettle任务
filePath为kettle任务文件路径,param为变量
@GetMapping("test")
public void test(String filePath, String param) throws Exception {
HashMap<Object, Object> map = Maps.newHashMap();
map.put("param", param);
kettleUtil.callNativeJob(filePath, map);
}
配置JNDI
1.将JNDI配置文件(data-integration/simple-jndi/jdbc.properties)放在一个指定的目录下,如项目根目录下simple-jndi/jdbc.properties:
项目根目录下simple-jndi/jdbc.properties:
2.添加simple-jndi的jar依赖。
<dependency>
<groupId>simple-jndi</groupId>
<artifactId>simple-jndi</artifactId>
<version>0.11.4.1</version>
</dependency>
3.调用kettle的环境初始化方法KettleEnvironment.init(true),加载JNDI
String path = Thread.currentThread().getContextClassLoader().getResource("").getPath();
File file = new File(path+"/simple-jndi");// path是jdbc.prtoperties上层文件夹路径
String sysPath = file.getCanonicalPath();
Const.JNDI_DIRECTORY = sysPath;
KettleEnvironment.init();