【发布时间】:2019-01-28 01:40:05
【问题描述】:
我正在用 Java 编写一个 Kafka 流应用程序,该应用程序接受由连接器创建的输入主题,该连接器将模式注册表和 avro 用于键和值转换器。连接器产生以下模式:
key-schema: "int"
value-schema:{
"type": "record",
"name": "User",
"fields": [
{"name": "firstname", "type": "string"},
{"name": "lastname", "type": "string"}
]}
实际上,有几个主题,键模式总是“int”,值模式总是某种类型的记录(用户、产品等)。我的代码包含以下定义
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
Serde<User> userSerde = new SpecificAvroSerde<>();
userSerde.configure(serdeConfig, false);
起初我尝试用类似的东西来消费这个话题
Consumed.with(Serdes.Integer(), userSerde); 但这不起作用,因为 Serdes.Integer() 期望整数使用 4 个字节进行编码,但 avro 使用可变长度编码。使用 Consumed.with(Serdes.Bytes(), userSerde); 有效,但我真的想要 int 而不是字节,所以我将代码更改为这个
KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()
KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();
keyDeserializer.configure(serdeConfig, true);
keySerializer.configure(serdeConfig, true);
Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);
这使得编译器产生一个警告(它不喜欢(Serde<Integer>)(Serde) 转换)但它允许我使用
Consumed.with(keySerde, userSerde); 并获取一个整数作为键。这工作得很好,我的应用程序按预期运行(太棒了!!!)。但现在我想为键/值定义默认 serde,但我无法让它工作。
设置默认值serde很简单:
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
但是我不知道如何定义默认键 serde。
我试过了
-
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName());产生运行时错误:找不到 org.apache.kafka.common.serialization.Serdes$WrapperSerde 的公共无参数构造函数 -
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);产生运行时错误:java.lang.Integer cannot be cast to org.apache.avro.specific.SpecificRecord
我错过了什么? 谢谢。
【问题讨论】:
-
看起来像你的选角。忽略错误会使尝试 2 (SpecificAvroSerde.class) 不起作用,因为类型不匹配。所以从你所说的看来,关键实际上是avro数据。您的 avro 仅包含用户 id 的定义(现在)并不意味着它是整数类型,因为您可以稍后添加新字段。构建自定义 serde 以读取 Avro 数据并仅将用户 ID 作为整数返回是很好的(那么这就是正确的 Serde
)。如果您这样做,您还需要自定义 serde 用于写入,它采用整数 userId 值并写入 Avro 密钥数据(包含用户 ID)。
标签: java apache-kafka avro apache-kafka-streams confluent-platform