【发布时间】:2017-02-09 14:56:56
【问题描述】:
我想从代码中停止/取消 flink 作业。这是在我的集成测试中,我向我的 flink 作业提交任务并检查结果。当作业异步运行时,即使测试失败/通过,它也不会停止。我想在测试结束后停止工作。
我尝试了一些我在下面列出的东西:
- 获取jobmanager actor
- 获取正在运行的作业
- 对于每个正在运行的作业,向作业管理器发送取消请求
这当然没有运行,但我不确定 jobmanager actorref 是错误的还是缺少其他东西。
我得到的错误是:[flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] 消息 [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$]从 Actor[akka://flink/temp/$a] 到 Actor[akka://flink/user/jobmanager_1] 未传递。 [1] 遇到死信。可以使用配置设置“akka.log-dead-letters”和“akka.log-dead-letters-during-shutdown”关闭或调整此日志记录
这意味着作业管理器actor ref 错误或发送给它的消息不正确。
代码如下所示:
val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
try {
val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
if(result.isInstanceOf[RunningJobsStatus]){
val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
val itr = runningJobs.iterator()
while(itr.hasNext){
val jobId = itr.next().getJobId
val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
try {
Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
}
catch {
case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
}
}
}
}
catch{
case e : Exception => "Could not retrieve running jobs from the JobManager." + e
}
}
有人可以检查这是否是正确的方法吗?
编辑: 要完全停止作业,需要先停止任务管理器,再停止任务管理器,然后停止任务管理器。
【问题讨论】:
标签: akka apache-flink