【问题标题】:Can't restore savepoint from 1.2.1 to 1.4无法将保存点从 1.2.1 恢复到 1.4
【发布时间】:2018-01-05 23:12:58
【问题描述】:

我们已经部署了一个新的 Flink 实例,版本为 1.4。 在尝试从旧的 1.2.1 部署中恢复保存点时,我们尝试恢复的所有作业都出现相同的错误:

org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1360)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1336)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1336)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is no longer supported starting from Flink 1.4. Please rewrite your job to use 'CheckpointedFunction' instead!
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserializeSubtaskState(SavepointV1Serializer.java:171)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:96)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:54)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:278)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:70)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1141)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1350)
    ... 10 more

错误信息:

从 Flink 1.4 开始不再支持旧状态(来自 Flink

但似乎是错误的,因为我们的其他部署正在运行 1.2.1。

文档页面仍未更新 1.4:https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html,但似乎并行性在过去一直是个问题。我已经尝试使用与保存点即将到来的工作相同的工作,但仍然是同样的问题。

关于可能导致此问题的原因以及如何解决此问题的任何提示?

谢谢!

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    在 1.4.0 版本中,Flink 不再支持从使用Checkpointed 接口获取的状态恢复。为了进行有状态升级,您必须执行以下操作:

    1. 保存在 Flink 1.2.1 上运行的作业的保存点
    2. 在所有有状态函数中将 Checkpointed 替换为 CheckpointedFunction
    3. 实现CheckpointedRestoring接口从Checkpointed保存点恢复
    4. 在 Flink 1.2.1 上执行修改后的作业并获取第二个保存点
    5. 从所有有状态函数中移除 CheckpointedRestoring 接口
    6. 在 Flink 1.4.0 上使用第二个保存点运行修改后的作业

    在迁移您的工作时是否还有其他问题,请告诉我。

    【讨论】:

    • 感谢您的回答。我们自己并没有显式地实现这些功能,所以不可能做出这样的改变。我已经用我们解决问题的方式添加了一个答案,但请把你的答案留在这里,它可能对某些人有用。
    【解决方案2】:

    所以,终于解决了这个问题。

    我们开始在 Flink 1.1 中运行我们的任务,然后将它们的保存点迁移到 1.2.1。

    似乎 Flink 1.2.1 没有对保存点进行任何升级,因此它们仍将具有旧格式,即 Flink 1.4 不支持的格式。

    解决方案是在 Flink 1.3 中运行我们的任务 + 保存点,并在那里创建一个新的保存点,它将以新格式保存。这个终于兼容 Flink 1.4 了:)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-10-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-03-28
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多