【问题标题】:Kafka Connect S3 sink throws IllegalArgumentException when loading AvroKafka Connect S3 sink 在加载 Avro 时抛出 IllegalArgumentException
【发布时间】:2017-06-08 07:38:20
【问题描述】:

我正在使用qubole's S3 sink 将 Avro 数据以 Parquet 格式加载到 S3 中。

在我的 Java 应用程序中,我创建了一个生产者

Properties props = new Properties();
props.put("bootstrap.servers", KafkaHelper.getServers());
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<byte[], byte[]>(props);

然后将GenericRecord转换成byte[]格式:

GenericRecord avroRecord = new GenericData.Record(avroSchema);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema);

for (Map.Entry<String, ?> entry : map.entrySet()) {
    String key = entry.getKey();
    Object value = entry.getValue();
    avroRecord.put(key, value);
}

ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, recordInjection.apply(avroRecord));
producer.send(record);

我在我的 Kafka Connect 属性中使用以下值:

key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

以及我的文件接收器属性中的以下配置选项:

connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

当我运行连接器时,我收到以下错误消息:'java.lang.IllegalArgumentException: Avro schema must be a record'。

我对 Kafka Connect 很陌生,我知道可以设置 Schema Registry 服务器——但我不明白接收器是否需要注册表来将 Avro 数据转换为 Parquet,或者这是否是我的某种格式或配置问题。在此错误的上下文中,“记录”指的是哪种数据格式?任何方向或帮助将不胜感激。

【问题讨论】:

    标签: java apache-kafka avro parquet apache-kafka-connect


    【解决方案1】:

    ByteArrayConverter 不会对数据进行任何转换:它不会实际进行任何序列化/反序列化,而是假定连接器知道如何处理原始 byte[] 数据。但是,ParquetFormat(实际上是大多数格式)不能只处理原始数据。相反,他们希望将数据反序列化并结构化为记录(您可以将其视为 C 结构、POJO 等)。

    请注意,qubole streamx README 指出ByteArrayConverter 在您可以安全地直接复制数据的情况下很有用。例如,如果您拥有 JSON 或 CSV 格式的数据。这些不需要反序列化,因为每个 Kafka 记录值的字节可以简单地复制到输出文件中。在这些情况下这是一个很好的优化,但通常不适用于所有输出文件格式。

    【讨论】:

    • 感谢您的解释——我实际上已经能够使用io.confluent.kafka.serializers.KafkaAvroSerializer 找到一个解决方案。为了这个问题,您能否详细说明您的答案并提供一个解决方案问题的解释?
    猜你喜欢
    • 1970-01-01
    • 2021-09-08
    • 2018-06-16
    • 2020-06-15
    • 2018-10-20
    • 2019-05-12
    • 2020-12-23
    • 1970-01-01
    • 2020-08-11
    相关资源
    最近更新 更多