【发布时间】:2021-04-13 21:42:03
【问题描述】:
我正在创建一个 Kafka Streams 应用程序,我的主题数据来自 Protobuf。我们能够为此创建 Java 代码绑定。但是,我正在努力使用正确的 serde 来使用我的主题数据。有人可以分享我在这里做错了什么。
下面是属性定义,我正在使用:
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id-config");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-broker:my-port");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
我的 Serde 课程
public class AppSerdes extends Serdes {
public static KafkaProtobufSerde<ProtobufClass1> createConfiguredSerde1() {
KafkaProtobufSerde<ProtobufClass1> serde = new KafkaProtobufSerde<ProtobufClass1>();
Map<String, Object> serdeConfig = getSerdeConfig();
serde.configure(serdeConfig, false);
return serde;
}
public static KafkaProtobufSerde<ProtobufClass2> createConfiguredSerde2() {
KafkaProtobufSerde<ProtobufClass2> serde = new KafkaProtobufSerde<ProtobufClass2>();
Map<String, Object> serdeConfig = getSerdeConfig();
serde.configure(serdeConfig, false);
return serde;
}
private static Map<String, Object> getSerdeConfig() {
Map<String, Object> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
return serdeConfig;
}
}
这就是我创建 KStream 和 KTable 实例的方式:
StreamsBuilder streamBuilder = new StreamsBuilder();
KTable<String, ProtobufClass1> table = streamBuilder.table("topic1",
Consumed.with(AppSerdes.String(), AppSerdes.createConfiguredSerde1()));
KStream<String, ProtobufClass2> stream = streamBuilder.stream("topic2".
Consumed.with(AppSerdes.String(), AppSerdes.createConfiguredSerde2()));
但是,我收到以下错误:
org.apache.kafka.streams.errors.StreamsException: ClassCastException 调用处理器。处理器的输入类型是否与反序列化类型匹配?检查 Serde 设置并更改 StreamConfig 中的默认 Serdes 或通过方法参数提供正确的 Serdes。确保处理器可以接受类型键:java.lang.String 和值:com.google.protobuf.DynamicMessage 的反序列化输入。 请注意,尽管不正确的 Serdes 是错误的常见原因,但强制转换异常可能还有其他原因(例如,在用户代码中)。例如,如果处理器在存储中连接,但不正确地转换泛型,则在处理期间可能会引发类转换异常,但原因不会是错误的 Serdes。 在 org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) 在 org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) 在 org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) 在 org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) 在 org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703) 在 org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105) 在 org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647) 在 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) 在 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) 原因:java.lang.ClassCastException:com.google.protobuf.DynamicMessage 无法转换为 iit.datahub.party.system_crm.v1.CustomerAddressBase$CustomerAddressBaseEntity 在 org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:234) 在 org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) 在 org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) 在 org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) 在 org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) 在 org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ... 11 更多
【问题讨论】:
-
我从未使用过 protobuf。但上面写着
com.google.protobuf.DynamicMessage cannot be cast to iit.datahub.party.system_crm.v1.CustomerAddressBase$CustomerAddressBaseEntity我想我会试着找出com.google.protobuf.DynamicMessage是什么类。 -
我有同样的错误
DynamicMessage cannot be cast to
标签: java apache-kafka apache-kafka-streams confluent-schema-registry