【问题标题】:Error deserialising avro kafka message from JDBC Connector反序列化来自 JDBC 连接器的 avro kafka 消息时出错
【发布时间】:2019-08-05 22:16:48
【问题描述】:

我正在尝试收听我使用 confluent 的 kafka 连接功能发布的主题。但是,我无法反序列化它。我相信它的 avro 序列化但无法找到正确的反序列化器。

消息如下所示,在控制台主题中

null    {"c1":{"int":10},"c2":{"string":"foo"},"create_ts":1552598863000,"update_ts":1552598863000}

下面是反序列化器

public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {

    private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializer.class);

    protected final Class<T> targetType;

    public AvroDeserializer(Class<T> targetType) {
        this.targetType = targetType;
    }

    @Override
    public void close() {
        // No-op
    }

    @Override
    public void configure(Map<String, ?> arg0, boolean arg1) {
        // No-op
    }

    @SuppressWarnings("unchecked")
    @Override
    public T deserialize(String topic, byte[] data) {
        try {
            T result = null;

            if (data != null) {
                LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));

                DatumReader<GenericRecord> datumReader =
                        new SpecificDatumReader<>(targetType.newInstance().getSchema());
                Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

                result = (T) datumReader.read(null, decoder);
                LOGGER.debug("deserialized data='{}'", result);
            }
            return result;
        } catch (Exception ex) {
            throw new SerializationException(
                    "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
        }
    }
}

例外

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition mysql-foobar-0 at offset 10. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data '[0, 0, 0, 0, 21, 2, 20, 2, 6, 102, 111, 111, -80, -78, -44, -31, -81, 90, -80, -78, -44, -31, -81, 90]' from topic 'mysql-foobar'
Caused by: java.lang.InstantiationException: null
    at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48) ~[na:1.8.0_131]
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_131]
    at java.lang.Class.newInstance(Class.java:442) ~[na:1.8.0_131]
    at com.spring.kafkaexample.springbootkafkaconsumer.config.AvroDeserializer.deserialize(AvroDeserializer.java:48) ~[classes/:na]
    at com.spring.kafkaexample.springbootkafkaconsumer.config.AvroDeserializer.deserialize(AvroDeserializer.java:18) ~[classes/:na]

【问题讨论】:

    标签: java apache-kafka avro apache-kafka-connect confluent-platform


    【解决方案1】:

    如下图所示的控制台主题

    不清楚您使用的是kafka-avro-console-consumer 还是简单的kafka-console-consumer。了解您的数据是否为 ​​Avro 的方法是查看生产者/连接器配置。


    不过,您无需编写自己的反序列化器。另外,Confluent 不使用您的代码会使用的 Avro 模式 + 消息的约定(因此您会收到该错误)。您需要首先从模式注册表中查找模式。

    添加 Confluent Maven 存储库

    <repositories>
    
      <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
      </repository>
    
    </repositories>
    

    然后添加 Confluent 序列化器依赖

    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-avro-serializer</artifactId>
      <version>${confluent.version}</version>
    </dependency>
    

    然后import io.confluent.kafka.serializers.KafkaAvroDeserializer,或在您的消费者配置中使用该类

    https://docs.confluent.io/current/clients/install.html#java


    或者,您可以将 MySQL 连接器切换为不使用 Avro 转换器

    【讨论】:

    • 它的 avrò。我基本上是在尝试链接confluent.io/blog/…中提到的教程的例外
    • 如何使用普通的字符串序列化程序而不是 avro。我想这与'value.converter": "io.confluent.connect.avro.AvroConverter",@的属性有关
    • 你可以使用value.converter=org.apache.kafka.connect.json.JsonConverter
    猜你喜欢
    • 2020-05-03
    • 2020-06-26
    • 2022-07-28
    • 1970-01-01
    • 2019-08-03
    • 2019-06-29
    • 2020-08-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多