【问题标题】:Kafka connect custom transforms to convert schema-less Json to AvroKafka 连接自定义转换以将无模式 Json 转换为 Avro
【发布时间】:2021-04-07 13:42:19
【问题描述】:

我正在尝试构建一个从 Kafka 读取 json 数据(无模式)的系统,将其转换为 avro 并将其推送到 s3。

我已经能够使用 KStreams 和 KSQL 实现 json 到 avro 的转换。我想知道使用 Kafka Connect 的自定义转换是否可以实现同样的事情。

这是我迄今为止尝试过的:

public class JsontoAvroConverter<R extends ConnectRecord<R>> implements Transformation<R> {

    public static final String OVERVIEW_DOC = "Transform Payload to Custom Format";
    private static final String PURPOSE = "transforming payload";
    public static final ConfigDef CONFIG_DEF = new ConfigDef();
    @Override
    public void configure(Map<String, ?> props) {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    @Override
    public void close() {
    }

    @Override
    public R apply(R record) {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "1");
        properties.setProperty("retries", "10");

        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");

        avro_Schema updatedSchema = makeUpdatedSchema();

        return newRecord(record, updatedSchema);
    }

    private avro_Schema makeUpdatedSchema() {
        avro_Schema.Builder avro_record = avro_Schema.newBuilder()
                .setName("test")
                .setTry$(1);

        return avro_record.build();
    }

    protected Object operatingValue(R record) {
        return record.value();
    }

    protected R newRecord(R record, avro_Schema updatedSchema) {
        return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp());
    }
}

其中 avro_schema 是我在 avsc 文件中指定的模式的名称。

我不确定这是否是正确的方法,但我面临的问题是,当调用 newRecord() 函数时,它期望 updatedSchema 为 Schema 类型,但我提供了它自定义 avro_Schema 类型。

另外,我保存到 updatedSchema 中的 avro_record.build() 并不是真正的模式,而是转换后的记录本身。但我不能只将记录主题、key(=null) 和 updatedRecord 传递给 newRecord 函数。它需要单独的架构和值。

我的问题是:

  1. 是否可以使用 KafkaConnect 将 json 转换为 avro,而无需使用 KStreams 或 KSQL? - 因为这两种选择都需要设置独立的服务。
  2. 如何将自定义 avro 架构传递给 newRecord 函数,然后单独提供数据。

如果这个问题已经得到解答,我深表歉意,我确实遇到了一些其他问题,但似乎没有一个能够回答我的疑问。如果您需要任何其他详细信息,请告诉我。谢谢!

【问题讨论】:

  • 您的转换器代码需要知道 JSON 的确切格式才能将其转换为 Avro。话虽如此,您应该能够使用与您的 kstreams 方法完全相同的代码,并且您不需要任何设置生产者属性或模式注册表的属性对象
  • @OneCricketeer,谢谢!你有什么我可以参考的例子吗?我理解你的意思,我能够为 json 构建一个模式。问题是如何将其转换为 Avro。我确实尝试使用 KStreams 代码,但正如我所提到的,这里的问题是 Kafka Connect 提供的内置 newRecord 函数需要一个模式而不是自定义模式。还有一个问题,就是要指定什么serializer-deserializer和s3存储格式,因为KConnect是读json写avro的。
  • 如果我使用类型、命名空间、名称和字段键创建模式并使用此模式创建 json,只需添加 format.class=io.confluent.connect.s3.format.avro.AvroFormat做这个把戏?谢谢!
  • 没有例子。我的主要观点是,您将无法为任意 JSON 提供一些通用转换函数,并且如果输入数据发生更改,架构也会中断。我不认识您拥有的 avro_Schema 课程; Kafka Connect 附带的 Schema 类是您应该构建的(更改 record.valueSchema()),然后 AvroFormat 类获取该对象并转换/写入 Avro 对象
  • 你提到你已经在使用ksql了?你不能部署它来代替连接吗?或者您已经有一个正在运行的连接集群?

标签: java apache-kafka avro apache-kafka-connect


【解决方案1】:

KafkaConnect 自定义转换器只需要向传入的 JSON 添加一个模式。接收器属性 format.class=io.confluent.connect.s3.format.avro.AvroFormat 将处理其余部分。

没有架构,记录值是一个映射,而有了架构,它就变成了一个结构。我不得不修改我的代码如下:

    @Override
    public R apply(R record) {
        final Map<String,?> value = requireMap(record.value(),PURPOSE);
        Schema updatedSchema = makeUpdatedSchema();

        final Struct updatedValue = new Struct(updatedSchema);


        for (Field field : updatedSchema.fields()) {

            updatedValue.put(field.name(), value.get(field.name()));
        }


        return newRecord(record, updatedSchema, updatedValue);
    }

    private Schema makeUpdatedSchema() {
        final SchemaBuilder builder = SchemaBuilder.struct()
                .name("json_schema")
                .field("name",Schema.STRING_SCHEMA)
                .field("try",Schema.INT64_SCHEMA);

        return builder.build();
    }

感谢@OneCricketeer 澄清了我的疑问!

【讨论】:

    猜你喜欢
    • 2018-02-13
    • 2020-04-09
    • 2020-01-17
    • 2019-05-11
    • 1970-01-01
    • 2019-07-02
    • 2020-02-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多