【问题标题】:Flink, TaskManager not respondingFlink,TaskManager 没有响应
【发布时间】:2019-04-03 23:56:42
【问题描述】:

在这种情况下,我们有 3 个 kafka 主题(每个有 50 个分区),它们有不同的消息,而所有这些消息都有“用户名”字段,

topic_1 --> Message01 {String username; ...}, about 50,000 messages per minute
topic_2 --> Message02 {String username; ...}, about 3,000,000 messages per minute
topic_3 --> Message03 {String username; ...}, about 70,000 messages per minute

我们已经定义了一个包装类,

MessageWrapper{
 List<Message01> list01;
 List<Message02> list02;
 List<Message03> list03;
}

我们有一个 flatMap,它将原始消息“转换”为 tuple3,

String field --> username
Integer field --> type
MessageWrapper field --> the wrapper object

所有 3 个流都由类似的 flatMap() 函数处理,

public void flatMap(Message01 value, Collector<Tuple3<String, Integer, MessageWrapper>> out)
        throws Exception {
    String name = value.getUsername();
    if (!StringUtils.isBlank(name)) {
        MessageWrapper wrapper = new MessageWrapper();
        List<Message01> list = new ArrayList<>();
        list.add(value);
        wrapper.setList01(list);
        out.collect(new Tuple3<>(name, 1, wrapper));
    }
}

在 flatMap() 之后,我们将这 3 个流合并,

stream1.union(stream2, stream3).keyBy(0).timeWindow(Time.seconds(300))
        .process(
                new ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>() {

                    @Override
                    public void process(Tuple key,
                            ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>.Context ctx,
                            Iterable<Tuple3<String, Integer, MessageWrapper>> elements,
                            Collector<MessageWrapper> out) throws Exception {
                        // merge all entities which have same username, to get a big fat wrapper object
                        MessageWrapper w = new MessageWrapper();
                        for (Tuple3<String, Integer, MessageWrapper> t3 : elements) {
                            MessageWrapper ret = t3.f2;
                            Integer type = t3.f1;
                            if (type == 1) {
                                // add to list01
                            } else if (type == 2) {
                                // add to list02
                            } else if (type == 3) {
                                // add to list03
                            }
                        }

                        if (all 3 lists are not empty) {
                            out.collect(ret);
                        }
                    }
                });

目前我们使用20个taskmanager,每个4核+16G,总共80个slot,我们使用50个并行度。

我们总是遇到taskmanager因为full gc太多而没有响应的问题,

Connecting to remote task manager + 'xxxxxxxxxxxxx' has failed. This might indicate that the remote task manager has been lost".

如果我们将时间窗口从 5 分钟缩短到 1 分钟,一切都很好。这样看来,flink集群好像资源不够用,但是80核+320G,几百万条消息(每条消息大小5KB左右),应该够了吧?

任何人都可以在这里解释一下吗?还是代码可能有问题?

【问题讨论】:

  • 这实际上是什么时候发生的。刚开始的时候?或者当第一个窗口被处理时?根据您的规格,似乎每个任务管理器的平行度为 4,对吧(20x4=80)?这意味着每个插槽只有 4GB 和一个核心,不再那么多了。也许降低任务管理器的并行度会有所帮助。
  • @TobiSH 一般情况下会在开始后几分钟,可能是3分钟或10分钟,每次都不一样。
  • 如果数据不平衡,您有什么见解吗?一些用户可能有数百万个您试图保留在内存中的事件?如果不是:与其汇总它们,不如只计算它们,看看这是否是问题
  • 您使用 MessageWrapper 中 List 的什么实现?它应该是针对插入优化的。
  • @TobiSH 数组列表

标签: apache-flink flink-streaming


【解决方案1】:

我通过在所有机器的/etc/hosts 文件上用127.0.1.1 注释行解决了这个问题。我增加了conf/flink-conf.yaml 文件的taskmanager.numberOfTaskSlots: 属性上的插槽并行度。

【讨论】:

    猜你喜欢
    • 2016-12-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多