【问题标题】:Multiple Message Types in a Single Kafka Topic with Avro使用 Avro 的单个 Kafka 主题中的多种消息类型
【发布时间】:2018-12-28 00:16:19
【问题描述】:

我有一个基于 Kafka 构建的事件源应用程序。目前我有一个主题,其中包含多种消息类型。全部用 JSON 序列化/反序列化。

confluent 中的模式注册表看起来是一种很好的消息类型维护方法,并且在 Avro 完全兼容模式下,它还提供了一种在我的事件源应用程序中进行消息版本控制的机制。

与最近的patch -- blog post 到 4.1.1 融合。使用 Avro 序列化器/反序列化器,您可以在一个主题中包含多种不同类型的消息。

但是,我还没有看到任何可行的示例。一个也没有。

我的问题是:上面的补丁真的可以在不使用 Avro 联合​​类型(将所有不同类型的消息放在一个模式中并利用联合)的情况下工作吗?

这种方法如何与需要指定 Key 和 Value Serde 的 Kafka Streaming 应用程序一起使用?

我是否应该忘记 Avro 而只使用 protobuff?

【问题讨论】:

  • 经过更多研究和运行 POC。我选择了protobuff3。我设法创建了一个 protobuff 消息模式,并且该消息具有“Any”类型的一个属性。 Any 类型是我的有效负载,通过它我设法创建了一个通用主题、通用序列化器和反序列化器,我可以在我的代码中利用它们来重构在“Any”参数中序列化的不同类型的消息。

标签: apache-kafka avro apache-kafka-streams event-sourcing confluent-schema-registry


【解决方案1】:

这是一个消费者从发布不同类型事件的主题中获取数据的示例:

package com.kafka.schema;

import com.phonebook.Employee;
import com.phonebook.Milestone;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;

public class AvroConsumer {

    private static Consumer<Long, GenericRecord> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Const.BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        // Use Kafka Avro Deserializer.
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Use Specific Record or else you get Avro GenericRecord.
        // props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

        // Schema registry location.
        // Run Schema Registry on 8081
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Const.SCHEMA_REGISTRY);
        props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
        return new KafkaConsumer<>(props);
    }

    public static void main(String... args) {
        final Consumer<Long, GenericRecord> consumer = createConsumer();
        consumer.subscribe(Collections.singletonList(Const.TOPIC));
        IntStream.range(1, 100).forEach(index -> {
            final ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
            if (records.count() == 0) {
                System.out.println("None found");
            } else {
                records.forEach(record -> {
                    GenericRecord recValue = record.value();
                    System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), recValue);
                });
            }
        });
    }
}

这里的重要部分是:

props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());

【讨论】:

  • 我正在尝试类似的方法。我想对 2 个主题使用相同的 SerDe,我不想使用 Confluent kafka。我正在尝试向 SerDe 提供 2 个 avros 并一个接一个地处理异常
猜你喜欢
  • 1970-01-01
  • 2019-10-09
  • 2019-11-17
  • 2018-01-12
  • 1970-01-01
  • 1970-01-01
  • 2021-05-15
  • 2023-03-07
  • 2018-10-27
相关资源
最近更新 更多