【发布时间】:2019-01-31 06:09:55
【问题描述】:
我有一个简单的问题。我想在同一个键上加入 2 个KStreams,并以 GenericRecord 作为值:
final KStream<String, GenericRecord> obs = builder.stream("Observations");
final KStream<String, GenericRecord> foI = builder.stream("FeaturesOfInterest");
final KStream<String, GenericRecord> transformfoIT = foIT
.map((key, value) -> KeyValue.pair(value.get("Observation").toString(), value));
final KStream<String, GenericRecord> merged = obsT.join(
transformfoIT,
(value, location) -> {
value.put("FeatureOfInterest", location);
System.out.println();
return value;
});
tranfsfromfoIT 就是这么简单的设置按键。这样obs.key 和tranformfoIT.key 对于消息是相同的。
但我的加入不起作用,因为我得到了:
KStream 类型中的方法 join(KTable, ValueJoiner) 不适用于参数 (KStream, (value, location) -> {})"
我不知道如何解决它。
我希望你能帮助我。
【问题讨论】:
标签: java join apache-kafka apache-kafka-streams