【问题标题】:KafkaStreams serde exceptionKafkaStreams serde 异常
【发布时间】:2017-11-08 22:23:26
【问题描述】:

我正在使用 Kafka 和流技术;我为 KStream 创建了一个自定义序列化器和反序列化器,我将使用它来接收来自给定主题的消息。

现在,问题是我正在以这种方式创建一个 serde:

JsonSerializer<EventMessage> serializer = new JsonSerializer<>();
JsonDeserializer<EventMessage> deserializer = new JsonDeserializer<>(EventMessage.class);
Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);

序列化器实现:

public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson = new Gson();

    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        return gson.toJson(data).getBytes(Charset.forName("UTF-8"));
    }

    @Override
    public void close() {

    }
}  

反序列化器实现:

public class JsonDeserializer<T> implements Deserializer<T> {

    private Gson gson = new Gson();
    private Class<T> deserializedClass;

    public JsonDeserializer() {

    }

    public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void configure(Map<String, ?> map, boolean b) {
        if(deserializedClass == null) {
            deserializedClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        System.out.print(data);
        if(data == null){
            return null;
        }

        return gson.fromJson(new String(data),deserializedClass);
    }

    @Override
    public void close() {

    }
}

当我尝试执行代码时,我收到以下错误:

原因:org.apache.kafka.common.KafkaException: 无法实例化类 org.apache.kafka.common.serialization.Serdes$WrapperSerde 它是否有一个公共的无参数构造函数?

在这里完整转储:https://pastebin.com/WwpuXuxB

这是我尝试使用 serde 的方式:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(stringSerde, messageSerde, topic);

KStream<String, EventMessage> outStream = eventsStream
            .mapValues(value -> EventMessage.build(value.type, value.timestamp));

outStream.to("output");

另外,我不完全确定我是否正确设置了全局设置序列化器和反序列化器的属性:

streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, messageSerde.getClass());

【问题讨论】:

  • 您能检查一下向JsonSerializer 添加显式默认(非参数)构造函数是否有帮助?
  • 例如,StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass() 应该是 StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

为了完成 Matthias 的回答,我刚刚编写了一个简单示例,说明如何在 Kafka Stream App 中创建自定义 Serde(序列化器/反序列化器)。可以克隆和试用:https://github.com/Davidcorral94/Kafka-Streams-Custom-Seder

首先我创建了两个类,一个用于 Serializer,另一个用于 Deserializer。在这种情况下,我使用Gson library 来执行序列化/反序列化。

序列化器

public class PersonSerializer implements Closeable, AutoCloseable, Serializer<Person> {

    private static final Charset CHARSET = Charset.forName("UTF-8");
    static private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public byte[] serialize(String s, Person person) {
        // Transform the Person object to String
        String line = gson.toJson(person);
        // Return the bytes from the String 'line'
        return line.getBytes(CHARSET);
    }

    @Override
    public void close() {

    }
}

反序列化器

public class PersonDeserializer implements Closeable, AutoCloseable, Deserializer<Person> {

    private static final Charset CHARSET = Charset.forName("UTF-8");
    static private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public Person deserialize(String topic, byte[] bytes) {
        try {
            // Transform the bytes to String
            String person = new String(bytes, CHARSET);
            // Return the Person object created from the String 'person'
            return gson.fromJson(person, Person.class);
        } catch (Exception e) {
            throw new IllegalArgumentException("Error reading bytes", e);
        }
    }

    @Override
    public void close() {

    }
}

然后,我将它们都包装到 Serde 中,以便能够将其用于我的 Kafka Stream 应用程序中。

Serde

public class PersonSerde implements Serde<Person> {
    private PersonSerializer serializer = new PersonSerializer();
    private PersonDeserializer deserializer = new PersonDeserializer();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<Person> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<Person> deserializer() {
        return deserializer;
    }
}

最后,您可以通过下一行在您的 Kafka Stream App 中使用这个 Serde 类:

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSerde.class);

这实际上适用于目前可用的最新 Kafka 版本,即 1.0.0!

【讨论】:

    【解决方案2】:

    如果你调用Serdes.serdeFrom(...),你会得到一个供内部使用的WrappedSerde 类型(并且WrappedSerde 没有非参数构造函数)。目前没有可以调用的 API 来获取自定义 Serde。相反,您需要实现自己的 Serde 类并“手动”包装您的序列化器和反序列化器。

    public class EventMessageSerde implements Serde<EventMessage> {
        final private JsonSerializer<EventMessage> serializer;
        final private JsonDeserializer<EventMessage> deserializer;
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            serializer.configure(configs, isKey);
            deserializer.configure(configs, isKey);
        }
    
        @Override
        public void close() {
            serializer.close();
            deserializer.close();
        }
    
        @Override
        public Serializer<EventMessage> serializer() {
            return serializer;
        }
    
        @Override
        public Deserializer<EventMessage> deserializer() {
            return deserializer;
        }
    }
    

    在你的Properties你可以设置:

    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, EventMessageSerde.class);
    

    【讨论】:

    • 感谢您的更新,我会尽快尝试并给您反馈!
    【解决方案3】:

    另一种方法是使用StreamsBuilder 而不是KStreamBuilderKStreamBuilder 在 1.0.0 中已弃用。您可以在创建流时使用Consumed.with 直接传递 serde 对象。在这种情况下,您无需创建自定义 Serde 类。

    Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);
    
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, EventMessage> eventsStream = builder.stream(topic, Consumed.with(Serdes.String(), messageSerde));
    

    您可以在下面的代码中保留StringSerde,而不是使用失败的messageSerde.getClass(),因为messageSerde 只是一个没有非参数构造函数的WrappedSerde

    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName());
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-05-26
      • 2018-05-19
      • 1970-01-01
      • 2020-06-28
      • 1970-01-01
      相关资源
      最近更新 更多