【问题标题】:Kafka stream not printing the value in the avro formatKafka 流未以 avro 格式打印值
【发布时间】:2018-02-01 22:09:02
【问题描述】:

我的 Kafka 生产者正在以 avro 格式发送数据,我正在使用 avro-schema(SpinsAvro) 读取该数据并尝试将流打印到控制台。

但是输出如下:Key 是 null,但是 value 是一些垃圾。

[KSTREAM-SOURCE-0000000000]: null , [B@200868f5
[KSTREAM-SOURCE-0000000000]: null , [B@841bc92
[KSTREAM-SOURCE-0000000000]: null , [B@302e9607
[KSTREAM-SOURCE-0000000000]: null , [B@6f9139fb
[KSTREAM-SOURCE-0000000000]: null , [B@cbdab3c

代码如下:

public class TestStream {

public static void main(final String[] args) throws Exception {

    final String bootstrapServers = "kafka-XXX:9092";
    final String schemaRegistryUrl = "http://XXX:8081";
    final KafkaStreams streams = buildStream(
            bootstrapServers,
            schemaRegistryUrl,
            "/tmp/kafka-streams");

    streams.cleanUp();

    streams.start();

    // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}

static KafkaStreams buildStream(final String bootstrapServers,
                                   final String schemaRegistryUrl,
                                   final String stateDir) {
    final Properties streamsConfiguration = new Properties();

    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "lambda-example");
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "lambda-example-client");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    streamsConfiguration.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    streamsConfiguration.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);


    final KStreamBuilder builder = new KStreamBuilder();

    // read the source stream
    final KStream<String, SpinsAvro> feeds = builder.stream("spins_topic");
    feeds.print();

    return new KafkaStreams(builder, streamsConfiguration);

}

}

【问题讨论】:

  • 您是否也在使用KafkaAvroSerializer 来生成 Avro 数据?
  • 是的..我正在使用 KafkaAvroSerializer 键 -> StringSerializer 值 -> KafkaAvroSerializer

标签: apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams


【解决方案1】:

您的配置不正确。

Kafka Streams 使用了Serdes,它结合了序列化器和反序列化器,用于一个类中的单一类型。设置

streamsConfiguration.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
streamsConfiguration.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");

没有任何作用。您需要使用DEFAULT_KEY_SERDE_CLASS_CONFIGDEFAULT_VALUE_SERDE_CLASS_CONFIG 并传入适当的Serdes

基本类型的 Serdes 已经通过 Serdes 类提供(例如,Serdes.Long())。还有 WrapperSerde 类,您可以使用它插入自定义序列化器和反序列化器以创建自定义 Serde(或者您只需从头开始实现 Serde 接口)。

查看文档了解更多详情:https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html

【讨论】:

猜你喜欢
  • 2018-03-19
  • 2016-10-12
  • 1970-01-01
  • 2018-06-05
  • 1970-01-01
  • 2016-11-16
  • 2022-01-14
  • 2019-07-25
  • 2020-03-17
相关资源
最近更新 更多