【问题标题】:Flink expected HA behaviour when a TaskManager failsTaskManager 失败时 Flink 预期的 HA 行为
【发布时间】:2017-03-08 17:27:41
【问题描述】:

我创建了一个 HA Flink v1.2 集群,由 1 个 JobManager 和 2 个 TaskManager 组成,每个集群都在自己的 VM 中(不使用 YARN 或 hdfs)。 在 JobManager 节点上开始作业后,我会杀死一个 TaskManager 实例。立即在 Web 仪表板中,我可以看到作业被取消然后失败。如果我检查日志:

03/06/2017 16:23:50 Flat Map(1/2) switched to DEPLOYING 
03/06/2017 16:23:50 Flat Map(2/2) switched to SCHEDULED 
03/06/2017 16:23:50 Flat Map(2/2) switched to DEPLOYING 
03/06/2017 16:23:50 Flat Map(1/2) switched to RUNNING 
03/06/2017 16:23:50 Source: Custom Source -> Flat Map(1/2) switched to RUNNING 
03/06/2017 16:23:50 Flat Map(2/2) switched to RUNNING 
03/06/2017 16:23:50 Source: Custom Source -> Flat Map(2/2) switched to RUNNING 
03/06/2017 16:25:38 Flat Map(1/2) switched to FAILED 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-106-0-238/10.106.0.238:40578'. This might indicate that the remote task manager was lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)

03/06/2017 16:25:38 Job execution switched to status FAILING.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-106-0-238/10.106.0.238:40578'. This might indicate that the remote task manager was lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(1/2) switched to CANCELING 
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(2/2) switched to CANCELING 
03/06/2017 16:25:38 Flat Map(2/2) switched to CANCELING 
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(1/2) switched to CANCELED 
03/06/2017 16:26:18 Source: Custom Source -> Flat Map(2/2) switched to CANCELED 
03/06/2017 16:26:18 Flat Map(2/2) switched to CANCELED 

在我的工作实施中

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // number
                                                                // of
                                                                // restart
                                                                // attempts
        Time.of(10, TimeUnit.SECONDS) // delay
));

我的问题是 JobManager 不应该自动将所有请求重定向到剩余/正在运行的 TaskManager 吗? 同样,如果我启动 JobManager 和 1 TaskManager 实例并运行作业,当我启动第二个 TaskManager 实例时,它是否也有助于解决正在运行的作业?

谢谢!

【问题讨论】:

    标签: apache-flink flink-streaming flink-cep


    【解决方案1】:

    首先RestartStrategy与HA模式无关。高可用性涉及JobManager 的可用性。无论如何,HA 至少需要两个 JobManagers 实例(你说你只开始一个)。

    至于RestartStrategy,当您在失败后指定fixedDelayRestart 策略时(例如在您的情况下杀死TaskManager),将尝试再次运行作业(在您的情况下10 秒后)。如果在您的安装中不是这种情况,您可能缺少运行作业的可用资源(我想您每个 TaskManager 有 1 个任务槽,所以当只剩下一个时,您无法运行并行度为 2 的作业或更多)。

    对于最后一个问题,添加 TaskManager 不会有助于运行作业。以某种方式连接的行为称为动态缩放。您可以通过获取一个保存点然后使用更多资源重新运行它来做到这一点。看看here。自动重新缩放正在进行中。

    【讨论】:

    • 嗨,戴维,谢谢你的回答,它为我解决了一些问题。我做了一个新的测试,并行度设置为 1,每个 TaskManager 设置为 1 个插槽。你是对的,在剩余的 TaskManager 上重试了工作,但我得到一个错误原因:java.io.FileNotFoundException: /home/ubuntu/Prototype/flink/flink-checkpoints/6fc6168a1e5a6a27f58f6d57deeacb65/chk-37/31c325f7-2b57-4e6b -bc20-3f6e9390a724(没有这样的文件或目录)。似乎检查点在第二个 TaskManager 上不可用。这会导致作业失败。你知道任务管理器之间是否同步检查点吗?
    • 检查点的存储位置取决于使用的 StateBackend。欲了解更多信息,请参阅:ci.apache.org/projects/flink/flink-docs-release-1.2/ops/…
    • 当然,这很清楚,我使用文件系统作为后端状态。但是我在每个 TaskManager 上设置了本地路径(它们位于不同的 VM 中),并期望框架在 JobManager 中保持状态同步。显然不是,如果 TaskManager 出现故障并且它正在本地保存检查点,则作业将失败。你知道是否所有的 TaskManagers 后端状态 localtion 都应该指向同一个路径?它在那里生成名称为 UUID 的文件夹,以确保不会发生冲突。
    • 每个TaskManager可以有不同的路径。他们是独立的。
    • 大卫,谢谢你的回答。我的最后一个问题没有完全回答,但我相信它属于一个有点不同的话题。如果可以,请查看stackoverflow.com/questions/42672579/… 致以最诚挚的问候
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-05-21
    • 1970-01-01
    • 2019-02-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多