【问题标题】:The implementation of the FlinkKafkaConsumer010 is not serializable errorFlinkKafkaConsumer010 的实现 is not serializable 错误
【发布时间】:2018-03-14 09:59:11
【问题描述】:

我创建了一个基于 Apache Flink 的自定义类。以下是类定义的一些部分:

public class StreamData {
    private StreamExecutionEnvironment env;
    private DataStream<byte[]> data ;
    private Properties properties;
    public StreamData(){
        env = StreamExecutionEnvironment.getExecutionEnvironment();
    }


    public StreamData(StreamExecutionEnvironment e , DataStream<byte[]> d){
    env = e ;
    data = d ;
}
    public StreamData getDataFromESB(String id, int from) {

        final Pattern TOPIC = Pattern.compile(id);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", Long.toString(System.currentTimeMillis()));
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("metadata.max.age.ms", 30000);
        properties.put("enable.auto.commit", "false");

        if (from == 0)
            properties.setProperty("auto.offset.reset", "earliest");
        else
            properties.setProperty("auto.offset.reset", "latest");


        StreamExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<byte[]> stream = env
                .addSource(new FlinkKafkaConsumer011<>(TOPIC, new AbstractDeserializationSchema<byte[]>() {
                    @Override
                    public byte[] deserialize(byte[] bytes) {
                        return bytes;
                    }
                }, properties));
        return new StreamData(e, stream);
    }
    public void print(){
        data.print() ;
    }

    public void execute() throws Exception {
        env.execute() ;
    }

使用类StreamData,尝试从Apache Kafka获取一些数据并在main函数中打印出来:

StreamData stream = new StreamData();
        stream.getDataFromESB("original_data", 0);
        stream.print();
        stream.execute();

我得到了错误:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumer010 is not serializable. The object probably contains or references non serializable fields.
Caused by: java.io.NotSerializableException: StreamData

here 所述,我认为这是因为getDataFromESB 函数中的某些数据类型不可序列化。但我不知道如何解决这个问题!

【问题讨论】:

    标签: serialization apache-flink flink-streaming


    【解决方案1】:

    您的 AbstractDeserializationSchema 是一个匿名内部类,因此它包含对不可序列化的外部 StreamData 类的引用。要么让 StreamData 实现 Serializable,要么将您的模式定义为顶级类。

    【讨论】:

    • 我解决了这个问题:public class StreamData implements Serializable { private transient StreamExecutionEnvironment env; private DataStream&lt;byte[]&gt; data ; private Properties properties;
    • 在我的例子中,我将DeserializationSchema 类标记为static,这样我可以将其保留为内部类并且错误消失了。
    【解决方案2】:

    您似乎在代码中导入了 FlinkKafkaConsumer010,但使用的是 FlinkKafkaConsumer011。请在您的 sbt 文件中使用以下依赖项:

    "org.apache.flink" %% "flink-connector-kafka-0.11" % flinkVersion
    

    【讨论】:

    • 我在 maven 中添加了这个依赖。我认为是因为flink-connector-kafka-0.11 的设计,才谈到FlinkKafkaConsumer010 类。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-07-12
    • 2012-02-16
    • 2013-11-25
    • 2019-05-06
    • 2019-10-09
    • 2021-03-02
    相关资源
    最近更新 更多