【问题标题】:How to publish a KTable output to a specific Kafka Topic?如何将 KTable 输出发布到特定的 Kafka 主题?
【发布时间】:2019-12-25 22:07:40
【问题描述】:

我正在尝试使用 Kafka Streams 编写我的第一个练习应用程序来计算主题中的单词数。但是,我认为我指的是旧 API,因为在 lambda 函数的末尾,我想将 KTable 的输出放到一个主题中,但我没有看到任何这样的方法。

我引用的代码使用了to() 方法,但我认为现在没有这种方法。我看到toStream(),但不知道如何使用它将消息发送到特定的输出主题。

有人可以看看,因为这应该是非常基本的。

public static void main(String[] args) {

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-starter-project");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Short().getClass());


        StreamsBuilder builder = new StreamsBuilder();
        //1- Stream from Kafka
        KStream<String, String> wordCountInput = builder.stream("word-count-input");
            //2 - map values to lowercase
            KTable<String,Long> wordCounts = wordCountInput
                    .mapValues(textlines -> textlines.toLowerCase())
                    //or mapValues(String::toLowercase())
                    //3- flatMapValues split by space
                    .flatMapValues(lowerCasedTextLine-> Arrays.asList(lowerCasedTextLine.split(" ")))
                    //4- Select key to apply a key and discard old key
                    .selectKey((ignoredKey,word)-> word)
                    //5 - groupBy key before aggregation
                    .groupByKey()
                    //6- count occurences finally
                    .count();

        **wordCounts.to**

【问题讨论】:

  • 您应该使用table#toStream()#to("topic") -- 您可能还需要考虑禁用记录缓存:docs.confluent.io/current/streams/developer-guide/…
  • @MatthiasJ.Sax 你能解释一下为什么我们应该使用 toStream().to() 而不是直接使用物化的“内部变更日志主题”吗?是不是我们以后可以在不影响消费者的情况下更改拓扑?还是有更微妙的原因?
  • 是的,这是主要原因(也是唯一原因)。保持事物解耦。

标签: apache-kafka-streams


【解决方案1】:

您可以使用以下计数方法:

KTable&lt;K, Long&gt; count(Materialized&lt;K, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt; var1);

所以在你的代码中:

KStream<String, String> wordCountInput = builder.stream("word-count-input");
        //2 - map values to lowercase
        KTable<String,Long> wordCounts = wordCountInput
                .mapValues(textlines -> textlines.toLowerCase())
                //or mapValues(String::toLowercase())
                //3- flatMapValues split by space
                .flatMapValues(lowerCasedTextLine-> Arrays.asList(lowerCasedTextLine.split(" ")))
                //4- Select key to apply a key and discard old key
                .selectKey((ignoredKey,word)-> word)
                //5 - groupBy key before aggregation
                .groupByKey()
                //6- count occurences finally
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("your-topic-name")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.Long()));

物化生成状态存储(默认为RocksDB)并同步到kafka主题“yournameapp-your-topic-name-changelog”。然后您可以创建 kafka 消费者来读取该主题,或者创建交互式查询来获取 ktable 的数据。

【讨论】:

  • 没有理由传入Materialized 参数。与此无关。
  • @MatthiasJ.Sax 感谢您的评论。我考虑过这个解决方案,但问题是“我想将 KTable 的输出放到一个主题中”,所以我认为我的回答适合这个问题。
  • 重点是,更改日志主题将独立于Materialized 参数创建。其次,它被认为是一个内部主题,创建一个适当的输出主题“更好”(比较我对问题本身的评论)。
【解决方案2】:

下面的代码非常适合我:

    // 1 stream from kafka
    StreamsBuilder streamBuilder = new StreamsBuilder();
    
    KStream<String, String> wordInputStream = streamBuilder.stream(INPUT_TOPIC);
    
    // 2 Map values to lowercase
    KTable<String, Long> wordCounts = wordInputStream.mapValues(value -> value.toLowerCase())
    // 3 flat map values split by space
    .flatMapValues(value -> Arrays.asList(value.split(SPACE)))
    //4 select key to apply as key
    .selectKey((key, value) -> value)
    //5 group by key before aggregation
    .groupByKey()
    // 6 count
    .count(Named.as("Counts"));
    
    wordCounts.toStream().to(OUTPUT_TOPIC); // here OUTPUT_TOPIC was used from string constant

使用 kafka-streams 3.0.0

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.0.0</version>
</dependency>

【讨论】:

    【解决方案3】:

    您可以使用以下代码。

    workCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
    KafkaStreams streams = new KafkaStreams(builder.build(), config);
    streams.start();
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-01-17
      • 2022-01-06
      • 1970-01-01
      • 2018-10-28
      • 1970-01-01
      • 2017-03-05
      • 2020-11-14
      相关资源
      最近更新 更多