【发布时间】:2021-05-13 16:37:56
【问题描述】:
我正在尝试复制 this example。我的拓扑是:
@Bean("myTopo")
public KStream<Object, Object> getTopo(@Qualifier("myKConfig") StreamsBuilder builder) {
var stream = builder.stream("my-events");
stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
.count()
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach((k, v) -> {
System.out.println("k + v = " + k + " --- " + v);
});
我已经在配置中设置了 serde 和窗口化 serde 内部类:
...
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
...
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, JsonSerde.class);
var config = new KafkaStreamsConfiguration(props);
return new StreamsBuilderFactoryBean(config);
我得到的错误是
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor.
Do the Processor's input types match the deserialized types?
Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
Make sure the Processor can accept the deserialized input of type key:
org.apache.kafka.streams.kstream.Windowed,
and value:
org.apache.kafka.streams.kstream.internals.Change.
有根本原因
java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed
cannot be cast to class java.lang.String (org.apache.kafka.streams.kstream.Windowed is in unnamed module of loader 'app';
java.lang.String is in module java.base of loader 'bootstrap')
我看到count() 返回KTable<Windowed<Object>, Long>。所以看起来问题是它想要一个Windowed<String> serde 作为密钥。显然,DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS 是不够的。
如何创建和设置它?
【问题讨论】:
标签: java apache-kafka apache-kafka-streams