楔子
java调用kettle,jar包使用maven来管理。
通常需要的jar可以在kettle 的lib目录下找到,但是一个个找太麻烦,一些中央仓库还没有kettlejar
项目pom.xml配置kettle仓库
<repositories><!-- kettle中央仓库 -->
<repository>
<id>pentaho-public</id>
<name>Pentaho Public</name>
<url>http://nexus.pentaho.org/content/groups/omni</url>
<releases>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
</repositories>
java操作
public class DemoJob {
public static void main(String[] args) throws KettleException {
KettleEnvironment.init();
// 创建DB资源库
KettleDatabaseRepository repository = new KettleDatabaseRepository();
DatabaseMeta databaseMeta = setDatabaseMeta();
// 选择资源库 此处参我与本地不一致还是正确执行
KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta("Kettle", "Kettle", "Transformation description", databaseMeta);
repository.init(kettleDatabaseRepositoryMeta);
// 连接资源库
repository.connect("admin", "admin");
RepositoryDirectoryInterface directoryInterface = repository.loadRepositoryDirectoryTree();
System.out.println(directoryInterface);// 根节点是 “/”
List<RepositoryDirectoryInterface> children = directoryInterface.getChildren();
for (RepositoryDirectoryInterface re : children) {
System.out.println(re);
}
execJob(repository);
}
/**
* 执行 作业
*
* @param repository
* @throws KettleException
*/
private static void execJob(KettleDatabaseRepository repository) throws KettleException {
RepositoryDirectoryInterface dir = repository.findDirectory("/201811_JOB");
JobMeta jobMeta = repository.loadJob(repository.getJobId("A_邮件job", dir), null);
Job job = new Job(repository, jobMeta);
// 设置参数
// jobMeta.setParameterValue("method", "update");
job.setLogLevel(LogLevel.DETAILED);
// 启动执行指定的job
job.start();
job.waitUntilFinished();// 等待job执行完;
job.setFinished(true);
System.out.println(job.getResult());
}
/**
* 执行转换
*
* @param repository
* @throws KettleException
*/
@SuppressWarnings("unused")
private static void execTran(KettleDatabaseRepository repository) throws KettleException {
// 根据指定的字符串路径 找到目录
RepositoryDirectoryInterface dir = repository.findDirectory("/201811_KTR/A_KTR");
// 在 指定目录下找转换
TransMeta tmeta = repository.loadTransformation(repository.getTransformationID("A_TRAN", dir), null);
// 设置参数
// tmeta.setParameterValue("", "");
Trans trans = new Trans(tmeta);
trans.execute(null);// 执行trans
trans.waitUntilFinished();// 等待直到数据结束
if (trans.getErrors() > 0) {
System.out.println("transformation error");
} else {
System.out.println("transformation successfully");
}
}
/**
* 设置kettle资源库连接信息
*
* @return
*/
private static DatabaseMeta setDatabaseMeta() {
return new DatabaseMeta(PropUtils.getConfigValByKey("name"), PropUtils.getConfigValByKey("type"), PropUtils.getConfigValByKey("access"), PropUtils.getConfigValByKey("host"), PropUtils.getConfigValByKey("db"), PropUtils.getConfigValByKey("port"), PropUtils.getConfigValByKey("user"), PropUtils.getConfigValByKey("pass"));
}
}