【发布时间】:2021-10-02 12:35:43
【问题描述】:
我正在从 kafka 读取 Avro 作为字符串,我正在尝试使用 java 代码将字符串 avro 转换为 Json。
@KafkaListener(topics = "#{'${kafka.consumer.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
void listener(String message, Acknowledgment acknowledgment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws CacheServiceException, JsonProcessingException {
//some code here
byte[] data = message.getBytes(); //This seems to be the issue
avroToJson(schema,data)
}
//Code to convert avro to json
public String avroToJson(Schema schema, byte[] avroBinary) throws IOException {
DatumReader<Object> datumReader = new GenericDatumReader<>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(avroBinary, null);
Object avroDatum = datumReader.read(null, decoder);
System.out.println("Initiating loop");
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
DatumWriter<Object> writer = new GenericDatumWriter<>(schema);
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos, false);
writer.write(avroDatum, encoder);
encoder.flush();
baos.flush();
return new String(baos.toByteArray(), StandardCharsets.UTF_8);
}
}
我想避免从 kafka 以 AVRO 的形式读取数据,因为我正在从同一个 kafka 中具有不同架构的不同主题读取数据。
【问题讨论】:
-
1) 为什么使用字符串而不使用 ByteArrayDeserializer? 2)
DecoderFactory不能与 Confluent 序列化的 Avro 消息一起使用 -
@OneCricketeer 我将其更改为 ByteArrayDeserializer,现在我将其作为 byte[] 获取。代码如下所示。但是正如您在第 2 点中提到的,DecoderFactory 不起作用,您能帮我解决如何更改我的 avroToJson 函数吗? @KafkaListener(topics = "#{'${kafka.consumer.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory") void listener(byte[] message, Acknowledgment acknowledgment, @Header(KafkaHeaders. RECEIVED_TOPIC) String topic) throws CacheServiceException, JsonProcessingException {
-
另外,当使用 ByteArrayDeserializer 将数据读取为字节数组时,它的读取与我在早期代码中尝试执行 message.getBytes() 时的读取相同;
-
我的意思是,如果您已经收到
byte[],则不需要.getBytes()。您需要手动解析出存在原始 avro 数据的字节数组。 docs.confluent.io/platform/current/schema-registry/… 或者您可以使用 KafkaAvroDeserializer 将数据消费到 GenericRecord 对象中,toString方法类似于 JSON
标签: java json apache-kafka avro