【问题标题】:In the context of using Kafka and Apache Beam, what is the difference between a Coder and Kafka Deserializer?在使用 Kafka 和 Apache Beam 的情况下,Coder 和 Kafka Deserializer 有什么区别?
【发布时间】:2019-10-16 06:05:10
【问题描述】:

我是 Apache Beam 的新手。我正在尝试按照文档使用 KafKaIO 从 Kafka 读取数据。在创建PCollection 期间,withValueDeserializerAndCoder 方法允许您设置编码器和反序列化器。我不明白为什么我们可能需要反序列化器和编码器。在我看来,两者都是关于将字节流表示为 java 对象。那么为什么我们两者都需要呢?是因为 Beam 更像是一个允许多个运行器在其下运行的框架吗?

【问题讨论】:

    标签: apache-kafka apache-beam


    【解决方案1】:

    是的,这有点棘手,乍一看并不明显。您需要有一个 Kafka Deserializer(或 Serializer,如果您写入 Kafka)来解释您从 Kafka 读取的 Java 对象的键和值字节。同时,Beam 要求我们提供Coders 来实现我们PCollections 在运行时的中间数据。

    编码器与(反)序列化器(负责解释 Kafka 消息)无关,因此我们需要明确提供编码器。虽然,KafkaIO 会尝试从反序列化器推断编码器,并且在许多情况下它会隐式工作,但如果它失败或您想要提供特定编码器,那么您可以单独指定它。

    例如,如果您的 Kafka 消息使用 Avro 格式序列化,则可以使用 KafkaAvroDeserializer 和内部 Beam AvroCoder

    public static void main(String[] args) {
    ...
      KafkaIO.Read read = KafkaIO.<Long, MyClass>read()
          .withKeyDeserializer(LongDeserializer.class)
          .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
              AvroCoder.of(MyClass.class));
    ...
    }
    
    @DefaultCoder(AvroCoder.class) 
    public class MyClass {
    
      String name;
      String age;
    
      MyClass() {}
    
      MyClass(String n, String a) {
        this.name = n;
        this.age = a;
      }
    }
    

    【讨论】:

      猜你喜欢
      • 2019-10-11
      • 2018-12-07
      • 2021-09-03
      • 2016-09-20
      • 2016-11-03
      • 2020-08-20
      • 2020-12-27
      • 2020-11-19
      • 2021-02-17
      相关资源
      最近更新 更多