【发布时间】:2019-12-25 22:07:40
【问题描述】:
我正在尝试使用 Kafka Streams 编写我的第一个练习应用程序来计算主题中的单词数。但是,我认为我指的是旧 API,因为在 lambda 函数的末尾,我想将 KTable 的输出放到一个主题中,但我没有看到任何这样的方法。
我引用的代码使用了to() 方法,但我认为现在没有这种方法。我看到toStream(),但不知道如何使用它将消息发送到特定的输出主题。
有人可以看看,因为这应该是非常基本的。
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-starter-project");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Short().getClass());
StreamsBuilder builder = new StreamsBuilder();
//1- Stream from Kafka
KStream<String, String> wordCountInput = builder.stream("word-count-input");
//2 - map values to lowercase
KTable<String,Long> wordCounts = wordCountInput
.mapValues(textlines -> textlines.toLowerCase())
//or mapValues(String::toLowercase())
//3- flatMapValues split by space
.flatMapValues(lowerCasedTextLine-> Arrays.asList(lowerCasedTextLine.split(" ")))
//4- Select key to apply a key and discard old key
.selectKey((ignoredKey,word)-> word)
//5 - groupBy key before aggregation
.groupByKey()
//6- count occurences finally
.count();
**wordCounts.to**
【问题讨论】:
-
您应该使用
table#toStream()#to("topic")-- 您可能还需要考虑禁用记录缓存:docs.confluent.io/current/streams/developer-guide/… -
@MatthiasJ.Sax 你能解释一下为什么我们应该使用 toStream().to() 而不是直接使用物化的“内部变更日志主题”吗?是不是我们以后可以在不影响消费者的情况下更改拓扑?还是有更微妙的原因?
-
是的,这是主要原因(也是唯一原因)。保持事物解耦。