【问题标题】:Convert Avro into Byte and Store Byte data into MySQL将 Avro 转换为 Byte 并将 Byte 数据存储到 MySQL
【发布时间】:2021-05-13 13:13:54
【问题描述】:

我有一个 Avro 架构文件 customer.avsc。我已经使用 builder 成功创建了 Avro 对象,并且可以读取 avro 对象。我想知道如何将客户 avro 对象转换为 Byte 并将其存储在数据库中。非常感谢!

    public static void main(String[] args) {

        // we can now build a customer in a "safe" way
        Customer.Builder customerBuilder = Customer.newBuilder();
        customerBuilder.setAge(30);
        customerBuilder.setFirstName("Mark");
        customerBuilder.setLastName("Simpson");
        customerBuilder.setAutomatedEmail(true);
        customerBuilder.setHeight(180f);
        customerBuilder.setWeight(90f);

        Customer customer = customerBuilder.build();
        System.out.println(customer);
        System.out.println(111111);

        // write it out to a file
        final DatumWriter<Customer> datumWriter = new SpecificDatumWriter<>(Customer.class);

        try (DataFileWriter<Customer> dataFileWriter = new DataFileWriter<>(datumWriter)) {
            dataFileWriter.create(customer.getSchema(), new File("customer-specific.avro"));
            dataFileWriter.append(customer);
            System.out.println("successfully wrote customer-specific.avro");
        } catch (IOException e){
            e.printStackTrace();
        }

【问题讨论】:

  • 你找到 Avro BinaryEncoder 类了吗?
  • 感谢您的回复,我已成功使用 BinaryEncoder 将 Avro 转换为 Byte。我现在正在使用特定记录,并且我认为需要为 Avro 模式生成 Java 类。如果我不想生成 Java 类,你知道如何为消费者使用特定的版本/模式 id 吗?我看到有人使用 Python 来做到这一点。 stackoverflow.com/questions/60467878/…不知道Java有没有类似的方法?如果我错了,请纠正我。非常感谢!
  • 您可以使用Schema Registry Maven plugin 下载最新版本,然后使用standard Avro Maven plugin 为其生成SpecificRecord 子类。不过,我认为 Maven 插件不支持下载特定版本,请查看源代码。您可能会发现 Jackson Avro 库很有用,但它并没有真正与注册表集成
  • 我们使用 Avro 的方式是,我们让生产者将版本化模式+类推送到 Maven 存储库注册表,它允许消费者像常规 Maven 依赖项一样提取这些。但是,如果您想将数据写入 MySQL 并拥有 Confluent Schema Registry,那么您最好使用 Kafka Connect,它不需要特定的类或自定义消费者应用程序
  • 感谢您的详细解释!我需要将其转换为字节的原因是因为我使用Debezium连接器来实​​现发件箱模式,并且发件箱表中的一列具有嵌套格式(需要json(转换为字符串)/avro(转换为字节)) , mysql db 是源。根据您提供的信息,我认为从注册表中提取模式作为 Maven 依赖项以在消费者项目中使用它是未来模式演变的最佳想法。

标签: java byte avro


【解决方案1】:

我正在使用 BinaryEncoder 来解决这个问题。在这种情况下,可以将 avro 转换为 Byte 并保存到 MySQL 数据库中。然后,当从 kafka 接收数据(字节 -> MySQL -> Debezium 连接器 -> Kafka -> Consumer API)时,我可以使用相同的模式再次将该字节列的有效负载解码为 avro / Java 对象。 这是代码。

        Customer.Builder customerBuilder = Customer.newBuilder();
        customerBuilder.setAge(20);
        customerBuilder.setFirstName("first");
        customerBuilder.setLastName("last");
        customerBuilder.setAutomatedEmail(true);
        customerBuilder.setHeight(180f);
        customerBuilder.setWeight(90f);

        Customer customer = customerBuilder.build();

        DatumWriter<SpecificRecord> writer = new SpecificDatumWriter<SpecificRecord>(
            customer.getSchema());
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(customer, encoder);
        encoder.flush();
        out.close();

        byte[] serializedBytes = out.toByteArray();
        System.out.println("Sending message in bytes : " + serializedBytes);
//        //String serializedHex = Hex.encodeHexString(serializedBytes);
//        //System.out.println("Serialized Hex String : " + serializedHex);
//        KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("page_views", serializedBytes);
//        producer.send(message);
//        producer.close();

        DatumReader<Customer> userDatumReader = new SpecificDatumReader<Customer>(Customer.class);

        Decoder decoder = DecoderFactory.get().binaryDecoder(serializedBytes, null);

        SpecificRecord datum = userDatumReader.read(null, decoder);
        System.out.println(datum);

【讨论】:

    猜你喜欢
    • 2011-06-08
    • 2014-07-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-03
    • 2011-10-15
    • 1970-01-01
    • 2023-03-10
    相关资源
    最近更新 更多