【问题标题】:Newly build KTable returns nothing新建 KTable 不返回任何内容
【发布时间】:2018-04-25 14:10:00
【问题描述】:

我正在尝试使用 KTable 来使用来自 Kafka 主题的事件。但是,它什么也不返回。当我使用 KStream 时,它会返回并打印对象。这真的很奇怪。 Producer and Consumer can be found here

//Not working    
KTable<String, Customer> customerKTable = streamsBuilder.table("customer", Consumed.with(Serdes.String(), customerSerde),Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name()));
customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)));

//KStream working
KStream<String, Customer> customerKStream= streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde));
customerKStream.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)))

【问题讨论】:

    标签: apache-kafka-streams


    【解决方案1】:

    经过大量研究,我发现我的语法存在问题。我使用的语法是有效的,基于 Confluent/Kafka 文档,但它不起作用。将向 Kafka 团队提出一个错误。现在,正在运行的新语法是

    KTable<String, Customer> customerKTable = streamsBuilder.table("customer",Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name())
                                                                .withKeySerde(Serdes.String())
                                                                .withValueSerde(customerSerde));
    

    我应该包含 withKeySerde()withValueSerde() 以使 KTable 工作。但这并没有提到 Confluent/Kafka 文档

    【讨论】:

      猜你喜欢
      • 2020-04-27
      • 2019-04-06
      • 2012-02-15
      • 2020-09-07
      • 2021-11-22
      • 2015-11-06
      • 2013-11-14
      • 2014-03-23
      • 2012-08-25
      相关资源
      最近更新 更多