【发布时间】:2018-09-25 16:06:30
【问题描述】:
UserRecord.java(由 Maven Avro 插件自动生成)
UserRecord extends SpecificRecordBase implement SpecificRecord
UserRecordSerde.java
UserRecordSerde extends SpecificAvroSerde
application.yml
spring.cloud.stream.bindings.input.destination: userTopic
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde: LongSerdespring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: LongSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: SpecificAvroSerde
类 - StreamListener - 原始流在 avro 中带有 null 键和 UserRecord 对象
@StreamListener
public KStream<Long, ArrayList<UserRecord>> handleUserRecords (@Input KStream<?, UserRecord> userRecordStream) { <br/>
Map<String, Object> serdeConfig = new HashMap();
serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
serdeConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); <br/>
Serde<ArrayList<UserRecord>> userRecordListSerde = new SpecificAvroSerde();
userRecordListSerde.configure(serdeConfig, false); <br/>
return userRecordStream
.map((key, value) -> new KeyValue(value.getUserID, value)
.groupByKey(Serialized.with(Serdes.Long(), userRecordSerde))
.aggregate(ArrayList::new, Long key, UserRecord value, ArrayList agg ->
{
agg.add(value);
return agg;
}, userRecordListSerde)
.toStream();
}
例外
java.lang.ClassCastException: com.example.UserRecord cannot be cast to com.example.UserRecord
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
【问题讨论】:
-
听起来你的
UserRecordSerde加载UserRecord的ClassLoader与使用handleUserRecords()方法的此类不同。
标签: apache-kafka avro apache-kafka-streams spring-cloud-stream