楔子

java调用kettle,jar包使用maven来管理。
通常需要的jar可以在kettle 的lib目录下找到,但是一个个找太麻烦,一些中央仓库还没有kettlejar

项目pom.xml配置kettle仓库

<repositories><!-- kettle中央仓库 -->
	<repository>
		<id>pentaho-public</id>
		<name>Pentaho Public</name>
		<url>http://nexus.pentaho.org/content/groups/omni</url>
		<releases>
			<enabled>true</enabled>
			<updatePolicy>always</updatePolicy>
		</releases>
		<snapshots>
			<enabled>true</enabled>
			<updatePolicy>always</updatePolicy>
		</snapshots>
	</repository>
</repositories>

次仓库包含kettle jar
java(maven管理jar)调用kettle

java操作

public class DemoJob {
	public static void main(String[] args) throws KettleException {

		KettleEnvironment.init();
		// 创建DB资源库
		KettleDatabaseRepository repository = new KettleDatabaseRepository();
		DatabaseMeta databaseMeta = setDatabaseMeta();
		// 选择资源库 此处参我与本地不一致还是正确执行
		KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta("Kettle", "Kettle", "Transformation description", databaseMeta);
		repository.init(kettleDatabaseRepositoryMeta);
		// 连接资源库
		repository.connect("admin", "admin");
		RepositoryDirectoryInterface directoryInterface = repository.loadRepositoryDirectoryTree();
		System.out.println(directoryInterface);// 根节点是 “/”
		List<RepositoryDirectoryInterface> children = directoryInterface.getChildren();
		for (RepositoryDirectoryInterface re : children) {
			System.out.println(re);
		}
		execJob(repository);

	}

	/**
	 * 执行 作业
	 * 
	 * @param repository
	 * @throws KettleException
	 */
	private static void execJob(KettleDatabaseRepository repository) throws KettleException {
		RepositoryDirectoryInterface dir = repository.findDirectory("/201811_JOB");
		JobMeta jobMeta = repository.loadJob(repository.getJobId("A_邮件job", dir), null);
		Job job = new Job(repository, jobMeta);
		// 设置参数
		// jobMeta.setParameterValue("method", "update");

		job.setLogLevel(LogLevel.DETAILED);
		// 启动执行指定的job
		job.start();

		job.waitUntilFinished();// 等待job执行完;
		job.setFinished(true);
		System.out.println(job.getResult());

	}

	/**
	 * 执行转换
	 * 
	 * @param repository
	 * @throws KettleException
	 */
	@SuppressWarnings("unused")
	private static void execTran(KettleDatabaseRepository repository) throws KettleException {
		// 根据指定的字符串路径 找到目录
		RepositoryDirectoryInterface dir = repository.findDirectory("/201811_KTR/A_KTR");
		// 在 指定目录下找转换
		TransMeta tmeta = repository.loadTransformation(repository.getTransformationID("A_TRAN", dir), null);
		// 设置参数
		// tmeta.setParameterValue("", "");
		Trans trans = new Trans(tmeta);
		trans.execute(null);// 执行trans
		trans.waitUntilFinished();// 等待直到数据结束
		if (trans.getErrors() > 0) {
			System.out.println("transformation error");
		} else {
			System.out.println("transformation successfully");
		}
	}

	/**
	 * 设置kettle资源库连接信息
	 * 
	 * @return
	 */
	private static DatabaseMeta setDatabaseMeta() {
		return new DatabaseMeta(PropUtils.getConfigValByKey("name"), PropUtils.getConfigValByKey("type"), PropUtils.getConfigValByKey("access"), PropUtils.getConfigValByKey("host"), PropUtils.getConfigValByKey("db"), PropUtils.getConfigValByKey("port"), PropUtils.getConfigValByKey("user"), PropUtils.getConfigValByKey("pass"));
	}
}

相关文章: