【问题标题】:In the context of using Kafka and Apache Beam, what is the difference between a Coder and Kafka Deserializer?在使用 Kafka 和 Apache Beam 的情况下,Coder 和 Kafka Deserializer 有什么区别?
【发布时间】:2019-10-16 06:05:10
【问题描述】:
我是 Apache Beam 的新手。我正在尝试按照文档使用 KafKaIO 从 Kafka 读取数据。在创建PCollection 期间,withValueDeserializerAndCoder 方法允许您设置编码器和反序列化器。我不明白为什么我们可能需要反序列化器和编码器。在我看来,两者都是关于将字节流表示为 java 对象。那么为什么我们两者都需要呢?是因为 Beam 更像是一个允许多个运行器在其下运行的框架吗?
【问题讨论】:
标签:
apache-kafka
apache-beam
【解决方案1】:
是的,这有点棘手,乍一看并不明显。您需要有一个 Kafka Deserializer(或 Serializer,如果您写入 Kafka)来解释您从 Kafka 读取的 Java 对象的键和值字节。同时,Beam 要求我们提供Coders 来实现我们PCollections 在运行时的中间数据。
编码器与(反)序列化器(负责解释 Kafka 消息)无关,因此我们需要明确提供编码器。虽然,KafkaIO 会尝试从反序列化器推断编码器,并且在许多情况下它会隐式工作,但如果它失败或您想要提供特定编码器,那么您可以单独指定它。
例如,如果您的 Kafka 消息使用 Avro 格式序列化,则可以使用 KafkaAvroDeserializer 和内部 Beam AvroCoder。
public static void main(String[] args) {
...
KafkaIO.Read read = KafkaIO.<Long, MyClass>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(MyClass.class));
...
}
@DefaultCoder(AvroCoder.class)
public class MyClass {
String name;
String age;
MyClass() {}
MyClass(String n, String a) {
this.name = n;
this.age = a;
}
}