【问题标题】:Canceling Apache Flink job from the code从代码中取消 Apache Flink 作业
【发布时间】:2017-02-09 14:56:56
【问题描述】:

我想从代码中停止/取消 flink 作业。这是在我的集成测试中,我向我的 flink 作业提交任务并检查结果。当作业异步运行时,即使测试失败/通过,它也不会停止。我想在测试结束后停止工作。

我尝试了一些我在下面列出的东西:

  1. 获取jobmanager actor
  2. 获取正在运行的作业
  3. 对于每个正在运行的作业,向作业管理器发送取消请求

这当然没有运行,但我不确定 jobmanager actorref 是错误的还是缺少其他东西。

我得到的错误是:[flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] 消息 [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$]从 Actor[akka://flink/temp/$a] 到 Actor[akka://flink/user/jobmanager_1] 未传递。 [1] 遇到死信。可以使用配置设置“akka.log-dead-letters”和“akka.log-dead-letters-during-shutdown”关闭或调整此日志记录

这意味着作业管理器actor ref 错误或发送给它的消息不正确。

代码如下所示:

val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
 val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
    try {
      val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
      if(result.isInstanceOf[RunningJobsStatus]){
        val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
        val itr = runningJobs.iterator()
        while(itr.hasNext){
          val jobId = itr.next().getJobId
          val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
          try {
            Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
          }
          catch {
            case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
          }

        }
      }
    }
    catch{
      case e : Exception => "Could not retrieve running jobs from the JobManager." + e
    }

  }

有人可以检查这是否是正确的方法吗?

编辑: 要完全停止作业,需要先停止任务管理器,再停止任务管理器,然后停止任务管理器。

【问题讨论】:

    标签: akka apache-flink


    【解决方案1】:

    您正在创建一个新的ActorSystem,然后尝试在同一演员系统中查找名称为/user/jobmanager_1 的演员。这不起作用,因为实际的作业管理器将在不同的ActorSystem 中运行。

    如果你想获得一个ActorRef 给真正的工作经理,你要么必须使用相同的ActorSystem 进行选择(然后你​​可以使用本地地址),或者你已经找到了远程地址工作经理演员。远程地址的格式为akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number]。如果您可以访问FlinkMiniCluster,那么您可以使用leaderGateway promise 获取当前领导者的ActorGateway

    【讨论】:

    • 您好,您所指出的是正确的。有没有办法获取jobmanager的远程地址?我无法访问 FlinkMiniCluster,因为它是由环境直接创建的。我想停止作为我的集成测试一部分的工作,现在我已经抽象出创建 FlinkMiniCluster 的代码,以便我可以控制它并且可以停止它。
    • 目前,Flink API 不提供这样的功能。但是我们正在努力通过引入JobClient 来改进这一点,您可以使用它更明确地控制正在运行的作业。到目前为止,我担心创建自己的 FlinkMiniCluster 并向其提交集成测试作业是实现您所描述的最佳方式。
    • 非常感谢!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多