【问题标题】:Apache Beam Kafka IO for Json messages - org.apache.kafka.common.errors.SerializationException用于 Json 消息的 Apache Beam Kafka IO - org.apache.kafka.common.errors.SerializationException
【发布时间】:2023-03-13 03:28:02
【问题描述】:

我正在尝试熟悉 Apache Beam Kafka IO 并遇到以下错误

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
    at com.andrewjones.KafkaConsumerExample.main(KafkaConsumerExample.java:58)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

以下是一段从 kafka 主题读取消息的代码。感谢您是否有人可以提供一些指示。

公共类 KafkaConsumerJsonExample { 静态最终字符串 TOKENIZER_PATTERN = "[^\p{L}]+";

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();

    // Create the Pipeline object with the options we defined above.
    Pipeline p = Pipeline.create(options);

    p.apply(KafkaIO.<Long, String>read()
            .withBootstrapServers("localhost:9092")
            .withTopic("myTopic2")
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)

            .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))

            // We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
            // the first 5 records.
            // In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
            .withMaxNumRecords(5)

            .withoutMetadata() // PCollection<KV<Long, String>>
    )
            .apply(Values.<String>create())

            .apply(TextIO.write().to("wordcounts"));
    System.out.println("running data pipeline");
    p.run().waitUntilFinish();
}

}

【问题讨论】:

    标签: google-cloud-platform apache-kafka google-cloud-dataflow kafka-consumer-api apache-beam


    【解决方案1】:

    这个问题是由使用LongDeserializer 来处理似乎由除 Long 之外的其他序列化程序序列化的键引起的,这取决于您如何生成记录。

    因此,您可以使用适当的反序列化器,或者,如果密钥无关紧要,作为一种解决方法,您也可以尝试使用 StringDeserializer 作为密钥。

    【讨论】:

    • 很高兴听到这个消息!
    猜你喜欢
    • 2019-07-12
    • 1970-01-01
    • 2023-03-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-04-18
    • 2017-10-15
    • 2021-07-13
    相关资源
    最近更新 更多