【问题标题】:kafka streams example throws a class cast exception, serde windowed -> string. How to set the proper Serde?kafka 流示例引发类转换异常,serde windowed -> string。如何设置正确的 Serde?
【发布时间】:2021-05-13 16:37:56
【问题描述】:

我正在尝试复制 this example。我的拓扑是:

@Bean("myTopo")
    public KStream<Object, Object> getTopo(@Qualifier("myKConfig") StreamsBuilder builder) {
        var stream = builder.stream("my-events");
        stream.groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
                .count()
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                .foreach((k, v) -> {
                    System.out.println("k + v = " + k + " --- " + v);
                });

我已经在配置中设置了 serde 和窗口化 serde 内部类:

        ...
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
        ...
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
        props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, JsonSerde.class);
        var config = new KafkaStreamsConfiguration(props);
        return new StreamsBuilderFactoryBean(config);

我得到的错误是

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. 
Do the Processor's input types match the deserialized types? 
Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. 
Make sure the Processor can accept the deserialized input of type key: 
   org.apache.kafka.streams.kstream.Windowed, 
and value: 
   org.apache.kafka.streams.kstream.internals.Change.

有根本原因

java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed 
cannot be cast to class java.lang.String (org.apache.kafka.streams.kstream.Windowed is in unnamed module of loader 'app'; 
java.lang.String is in module java.base of loader 'bootstrap')

我看到count() 返回KTable&lt;Windowed&lt;Object&gt;, Long&gt;。所以看起来问题是它想要一个Windowed&lt;String&gt; serde 作为密钥。显然,DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS 是不够的。

如何创建和设置它?

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    我想我遇到了这个错误:

    https://issues.apache.org/jira/browse/KAFKA-9259

    我在 count() 方法中添加了一个 Materialized

            var store = Stores.persistentTimestampedWindowStore(
                    "some-state-store",
                    Duration.ofMinutes(5),
                    Duration.ofMinutes(2),
                    false);
            var materialized = Materialized
                    .<String, Long>as(store)
                    .withKeySerde(Serdes.String());
    

    现在代码运行无异常。

    【讨论】:

      猜你喜欢
      • 2017-11-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-27
      相关资源
      最近更新 更多