【发布时间】:2017-11-08 22:23:26
【问题描述】:
我正在使用 Kafka 和流技术;我为 KStream 创建了一个自定义序列化器和反序列化器,我将使用它来接收来自给定主题的消息。
现在,问题是我正在以这种方式创建一个 serde:
JsonSerializer<EventMessage> serializer = new JsonSerializer<>();
JsonDeserializer<EventMessage> deserializer = new JsonDeserializer<>(EventMessage.class);
Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);
序列化器实现:
public class JsonSerializer<T> implements Serializer<T> {
private Gson gson = new Gson();
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String topic, T data) {
return gson.toJson(data).getBytes(Charset.forName("UTF-8"));
}
@Override
public void close() {
}
}
反序列化器实现:
public class JsonDeserializer<T> implements Deserializer<T> {
private Gson gson = new Gson();
private Class<T> deserializedClass;
public JsonDeserializer() {
}
public JsonDeserializer(Class<T> deserializedClass) {
this.deserializedClass = deserializedClass;
}
@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, ?> map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class<T>) map.get("serializedClass");
}
}
@Override
public T deserialize(String topic, byte[] data) {
System.out.print(data);
if(data == null){
return null;
}
return gson.fromJson(new String(data),deserializedClass);
}
@Override
public void close() {
}
}
当我尝试执行代码时,我收到以下错误:
原因:org.apache.kafka.common.KafkaException: 无法实例化类 org.apache.kafka.common.serialization.Serdes$WrapperSerde 它是否有一个公共的无参数构造函数?
在这里完整转储:https://pastebin.com/WwpuXuxB
这是我尝试使用 serde 的方式:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(stringSerde, messageSerde, topic);
KStream<String, EventMessage> outStream = eventsStream
.mapValues(value -> EventMessage.build(value.type, value.timestamp));
outStream.to("output");
另外,我不完全确定我是否正确设置了全局设置序列化器和反序列化器的属性:
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, messageSerde.getClass());
【问题讨论】:
-
您能检查一下向
JsonSerializer添加显式默认(非参数)构造函数是否有帮助? -
例如,
StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()应该是StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()。
标签: java apache-kafka apache-kafka-streams