【问题标题】:Flink1.5.4 exception: Corrupt stream, found tag: 105Flink1.5.4 异常:损坏的流,发现标签:105
【发布时间】:2019-03-07 12:50:02
【问题描述】:

我的程序想在没有 Flink Window 的情况下加入两个流。

我连接两个流并定义一个类 A 扩展 RichCoFlatMapFunction 来处理它们。 在 A 类中,我使用 Guava 缓存来保存来自 flatmap1/2 方法的所有数据,并通过流中的标签将它们连接起来。 然后 Guava 缓存有一个删除监听器来收集加入和过期的数据到下一个 Flink 函数。

private synchronized void collect(ReqFeatures features) {
    feaCollector.collect(features);
}

每次一开始,它都运行良好,但几个小时后,它总是因为这个异常而死机。

java.io.IOException: Corrupt stream, found tag: 105
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:220)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)

而且有时还有另一个错误日志:

java.lang.IllegalStateException: When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:158)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:186)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:551)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)

如果我改用 Flink Window Function,则不会发生此异常。 为什么会发生此异常,我该如何解决?

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    我可以确认这也发生在 Flink 1.9.1 中(尽管对我们来说,它发生在我们运行 flink stop <job-id> 时)

    【讨论】:

      【解决方案2】:

      我修复了在收集输出时获取检查点锁定的相同问题。用户 flatMap 函数已经持有检查点锁,所以如果你在 flatMap 函数中收集输出也可以解决这个问题。

      在 flink 的代码中:

      synchronized (checkpointingLock) {
                      numRecordsIn.inc();
                      streamOperator.setKeyContextElement1(record);
                      streamOperator.processElement(record);
                  }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-01-10
        • 1970-01-01
        • 1970-01-01
        • 2017-11-20
        • 1970-01-01
        • 2020-12-09
        • 2019-02-12
        • 1970-01-01
        相关资源
        最近更新 更多