【发布时间】:2018-03-14 09:59:11
【问题描述】:
我创建了一个基于 Apache Flink 的自定义类。以下是类定义的一些部分:
public class StreamData {
private StreamExecutionEnvironment env;
private DataStream<byte[]> data ;
private Properties properties;
public StreamData(){
env = StreamExecutionEnvironment.getExecutionEnvironment();
}
public StreamData(StreamExecutionEnvironment e , DataStream<byte[]> d){
env = e ;
data = d ;
}
public StreamData getDataFromESB(String id, int from) {
final Pattern TOPIC = Pattern.compile(id);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", Long.toString(System.currentTimeMillis()));
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.put("metadata.max.age.ms", 30000);
properties.put("enable.auto.commit", "false");
if (from == 0)
properties.setProperty("auto.offset.reset", "earliest");
else
properties.setProperty("auto.offset.reset", "latest");
StreamExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<byte[]> stream = env
.addSource(new FlinkKafkaConsumer011<>(TOPIC, new AbstractDeserializationSchema<byte[]>() {
@Override
public byte[] deserialize(byte[] bytes) {
return bytes;
}
}, properties));
return new StreamData(e, stream);
}
public void print(){
data.print() ;
}
public void execute() throws Exception {
env.execute() ;
}
使用类StreamData,尝试从Apache Kafka获取一些数据并在main函数中打印出来:
StreamData stream = new StreamData();
stream.getDataFromESB("original_data", 0);
stream.print();
stream.execute();
我得到了错误:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumer010 is not serializable. The object probably contains or references non serializable fields.
Caused by: java.io.NotSerializableException: StreamData
如here 所述,我认为这是因为getDataFromESB 函数中的某些数据类型不可序列化。但我不知道如何解决这个问题!
【问题讨论】:
标签: serialization apache-flink flink-streaming