参考前文:解决 Flink 1.11.0 sql 不能指定 jobName 的问题

从 FLink 1.11 改版 sql 的执行流程后,就不能和 Stream Api 一样使用 env.execute("JobName") 来指定任务名

看了源码后发现,在 sql 任务中,直接使用了 "insert-into" 拼接 catelog/database/sink table 做为 sql 任务的 job name

String jobName = "insert-into_" + String.join(",", sinkIdentifierNames);

使用体验当然是不好的,在 JIRA 上有个 改进的严重 issues: https://issues.apache.org/jira/browse/FLINK-18545 讨论这个问题,
最后决定在 PipelineOptions 中添加 "pipeline.name" 参数做为 job name

public class PipelineOptions {

  /**
   * The job name used for printing and logging.
   */
  public static final ConfigOption<String> NAME =
    key("pipeline.name")
      .stringType()
      .noDefaultValue()
      .withDescription("The job name used for printing and logging.");

这个 issues 在 Flink 1.12.0 终于 merge 进去了,所以升级到 Flink 1.12.0 就不再需要修改源码,直接在 TableConfig 中添加 "pipeline.name" 参数即可

由于之前为了指定 JobName 之前修改过源码,所以升级到 Flink 1.12.0 的第一件事情就是去掉之前修改的源码,使用 “pipeline.name” 配置参数指定 JobName

其他代码都和以前一样,只需要在 TableConfig 添加参数即可

val tabConf = tableEnv.getConfig
onf.setString("pipeline.name", Common.jobName)

Flink 1.12.0 sql 任务指定 job name
    




解决 Flink 1.11.0 sql 不能指定 jobName 的问题

 

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

Flink 1.12.0 sql 任务指定 job name
    




解决 Flink 1.11.0 sql 不能指定 jobName 的问题

 

相关文章: