【问题标题】:What is the difference between implementing Deserializer and Serde in Kafka Consumer API?在 Kafka Consumer API 中实现 Deserializer 和 Serde 有什么区别?
【发布时间】:2019-10-11 01:26:28
【问题描述】:

我尝试在 GitHub (https://github.com/onurtokat/kafka-clickstream-enrich) 上模拟 Gwen (Chen) Shapira 的 kafka-clickstream-enrich kafka-stream 项目。当我使用 Deserializers 使用消费者类使用主题时,我遇到了错误。定制的 Serde 类有序列化器和反序列化器。但是,我试图理解为什么自定义 serde 用于反序列化器,然后消费者 API 给出错误,因为它不是 org.apache.kafka.common.serialization.Deserializer 的实例

可以使用带有 Serdes.Integer() Serializer 和 new ProfileSerde() Deserializer 的 KTable 来使用主题,如下所示。

KTable<Integer, UserProfile> profiles = builder.table(Constants.USER_PROFILE_TOPIC,
                Consumed.with(Serdes.Integer(), new ProfileSerde()),
                Materialized.as("profile-store"));

自定义 Serde 定义为;

static public final class ProfileSerde extends WrapperSerde<UserProfile> {
        public ProfileSerde() {
            super(new JsonSerializer<UserProfile>(), new JsonDeserializer<UserProfile>(UserProfile.class));
        }
    }

通用 Serde 是自定义的,如下所示;

package com.onurtokat.serde;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;


public class WrapperSerde<T> implements Serde<T> {

    final private Serializer<T> serializer;
    final private Deserializer<T> deserializer;

    public WrapperSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
        this.serializer = serializer;
        this.deserializer = deserializer;
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<T> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<T> deserializer() {
        return deserializer;
    }
}

我的消费者就是这么简单,如下图所示;

package com.onurtokat.consumers;

import com.onurtokat.ClickstreamEnrichment;
import com.onurtokat.Constants;
import com.onurtokat.model.UserProfile;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumeProfileData {

    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClickstreamEnrichment.ProfileSerde.class);

        KafkaConsumer<Integer, UserProfile> consumerProfileTopic = new KafkaConsumer<>(config);
        consumerProfileTopic.subscribe(Arrays.asList(Constants.USER_PROFILE_TOPIC));
        while (true) {
            ConsumerRecords<Integer, UserProfile> records = consumerProfileTopic.poll(Duration.ofMillis(100));
            for (ConsumerRecord<Integer, UserProfile> record : records) {
                System.out.println(record.key() + " " + record.value());
            }
        }
    }
}

当我尝试与消费者一起消费主题时的错误是;

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
    at com.onurtokat.consumers.ConsumeProfileData.main(ConsumeProfileData.java:25)
Caused by: org.apache.kafka.common.KafkaException: com.onurtokat.ClickstreamEnrichment$ProfileSerde is not an instance of org.apache.kafka.common.serialization.Deserializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:712)
    ... 3 more

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api apache-kafka-streams


    【解决方案1】:

    区别在于:

    • Serdes 被 Kafka 的 Streams API(又名 Kafka Streams)使用。 Serde 是一对 (1) serializer 和 (2) deserializer 的包装器,用于相同的数据类型——请参阅接下来的两个要点。也就是说,Serde&lt;T&gt; 有一个Serializer&lt;T&gt; 和一个Deserializer&lt;T&gt;。您发布的第一个代码 sn-p(例如 KTable)是 Kafka Streams 代码 sn-p,这就是它需要 Serde 的原因。 Kafka Streams 需要Serde,因为它既可以生成消息(为此需要Serializer),也可以读取消息(需要Deserializer)。
    • 反序列化器被 Kafka 的消费者 API(又名消费者客户端)用于读取消息。您的最后一个代码 sn-p(例如 KafkaConsumer)正在使用消费者客户端,因此需要 Deserializer,而不是 Serde
    • 序列化器被 Kafka 的生产者 API(又名生产者客户端)用于编写消息。

    关于:

    Caused by: org.apache.kafka.common.KafkaException: com.onurtokat.ClickstreamEnrichment$ProfileSerde is not an instance of org.apache.kafka.common.serialization.Deserializer
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:712)
        ... 3 more
    

    您的 Kafka 消费者客户端代码获得了 Serde,而它原本期望的是 Deserializer

    【讨论】:

    • 谢谢 Micheal G. Noll。很好的解释。
    • 确实很好的解释。
    【解决方案2】:

    你好像误会了:

    可以使用带有 Serdes.Integer() Serializer 和 new ProfileSerde() Deserializer 的 KTable 来使用该主题,如下所示。

    您必须向Consumed.with() 提供 KeySerde 和 ValueSerde。

    关于例外情况:

    很清楚 - 你必须设置 Deserializer 的实现(不是 Serde)

    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, //here);
    

    【讨论】:

    • 我想你应该这样获得反序列化器 ClickstreamEnrichment.ProfileSerde.deserializer()
    • 你是对的。我应该实现反序列化器而不是使用 serde。感谢您的回复。
    猜你喜欢
    • 2016-11-03
    • 2017-06-20
    • 2023-03-02
    • 2018-07-15
    • 1970-01-01
    • 1970-01-01
    • 2019-09-04
    • 2013-06-24
    • 2018-12-07
    相关资源
    最近更新 更多