【问题标题】:Avro to json converter in javajava中的Avro到json转换器
【发布时间】:2021-10-02 12:35:43
【问题描述】:

我正在从 kafka 读取 Avro 作为字符串,我正在尝试使用 java 代码将字符串 avro 转换为 Json。

    @KafkaListener(topics = "#{'${kafka.consumer.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    void listener(String message, Acknowledgment acknowledgment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws CacheServiceException, JsonProcessingException {

//some code here
byte[] data = message.getBytes(); //This seems to be the issue

avroToJson(schema,data)


}

//Code to convert avro to json
public String avroToJson(Schema schema, byte[] avroBinary) throws IOException {
        DatumReader<Object> datumReader = new GenericDatumReader<>(schema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(avroBinary, null);
        Object avroDatum = datumReader.read(null, decoder);
        System.out.println("Initiating loop");
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
            DatumWriter<Object> writer = new GenericDatumWriter<>(schema);
            JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos, false);
            writer.write(avroDatum, encoder);
            encoder.flush();
            baos.flush();
            return new String(baos.toByteArray(), StandardCharsets.UTF_8);
        }
    }

我想避免从 kafka 以 AVRO 的形式读取数据,因为我正在从同一个 kafka 中具有不同架构的不同主题读取数据。

【问题讨论】:

  • 1) 为什么使用字符串而不使用 ByteArrayDeserializer? 2) DecoderFactory 不能与 Confluent 序列化的 Avro 消息一起使用
  • @OneCricketeer 我将其更改为 ByteArrayDeserializer,现在我将其作为 byte[] 获取。代码如下所示。但是正如您在第 2 点中提到的,DecoderFactory 不起作用,您能帮我解决如何更改我的 avroToJson 函数吗? @KafkaListener(topics = "#{'${kafka.consumer.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory") void listener(byte[] message, Acknowledgment acknowledgment, @Header(KafkaHeaders. RECEIVED_TOPIC) String topic) throws CacheServiceException, JsonProcessingException {
  • 另外,当使用 ByteArrayDeserializer 将数据读取为字节数组时,它的读取与我在早期代码中尝试执行 message.getBytes() 时的读取相同;
  • 我的意思是,如果您已经收到byte[],则不需要.getBytes()。您需要手动解析出存在原始 avro 数据的字节数组。 docs.confluent.io/platform/current/schema-registry/… 或者您可以使用 KafkaAvroDeserializer 将数据消费到 GenericRecord 对象中,toString 方法类似于 JSON

标签: java json apache-kafka avro


【解决方案1】:

Avro 用于根据架构验证您的数据。如果您不想要这个,只需删除 Avro 并仅使用 Kafka。从生产者端发送 json 数据而不是 avro 序列化数据。

【讨论】:

  • 是的,但就我而言,生产者是一个不同的团队,我无法控制生产者的格式,因为其他消费者也使用相同的 kafka 主题作为他们的来源。
  • 在这种情况下请不要使用 avro,否则请设计一个包装器 avro 并将数据作为字符串传递或使用通用的东西。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-11-05
  • 2020-09-05
相关资源
最近更新 更多