【问题标题】:Issue with configuring Serdes for Kafka Streams为 Kafka 流配置 Serdes 的问题
【发布时间】:2022-01-21 23:38:30
【问题描述】:

我将一个 json 对象放入我的“提交”主题。我想使用 Kafka Streams 消费消息,但是出现错误

@Configuration
@EnableKafka
@EnableKafkaStreams
public class AnalyzerConfiguration {
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-streams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, commitSerde().getClass());

        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public Serde<Commit> commitSerde() {
        return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
    }

    @Bean
    public KStream<String, Commit> kStream(StreamsBuilder builder) {
        KStream<String, Commit> stream = builder.stream("commits", Consumed.with(Serdes.String(), commitSerde()));

        KTable<String, Long> commitsCount = stream
                .mapValues(Commit::getAuthorName)
                .selectKey((key, word) -> word)
                .groupByKey()
                .count(Materialized.as("Counts"));

        commitsCount.toStream().to("commits-count", Produced.with(Serdes.String(), Serdes.Long()));

        return stream;
    }
}

日志说:

线程“test-streams-469f5ee6-d0de-472e-a602-a7b6d11f2e1c-StreamThread-1”中的异常 org.apache.kafka.streams.errors.StreamsException:无法配置值 serde 类 org.apache.kafka.common .serialization.Serdes$WrapperSerde

原因:org.apache.kafka.common.KafkaException: 找不到 org.apache.kafka.common.serialization.Serdes$WrapperSerde 的公共无参数构造函数

原因:java.lang.NoSuchMethodException

【问题讨论】:

    标签: java spring-boot apache-kafka deserialization apache-kafka-streams


    【解决方案1】:

    您的问题是StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG 的注册。首先,在您的示例中不需要这样做,因为您在创建 KStream 时使用的 Consumed 中指定了值 serde。您可以省略默认的 serde。

    如果您将一个类注册为默认 serde,Kafka Streams 将在某个时候通过反射创建该类的实例。这会调用该类的默认(无参数)构造函数。在您的示例中,将使用来自Serdes.serdeFrom(new JsonSerializer&lt;&gt;(), new JsonDeserializer&lt;&gt;(Commit.class)) 的 org.apache.kafka.common.serialization.Serdes$WrapperSerde 类。该类没有这样的构造函数,导致异常。

    如果你想为你的Commit类型注册一个默认的serde,你需要把它包装成一个小类:

    public class CommitSerde extends WrapperSerde<Commit> {
    
        public CommitSerde() {
            super(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
        }
    }
    

    这个类应该适合在你的例子中使用props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CommitSerde.class.getName());注册为默认值serde。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-10-01
      • 2017-07-22
      • 1970-01-01
      • 1970-01-01
      • 2021-02-24
      • 1970-01-01
      • 2016-11-29
      • 1970-01-01
      相关资源
      最近更新 更多