【问题标题】:Kafka Join 2 Streams (Kafka 1.0.1)Kafka Join 2 Streams (Kafka 1.0.1)
【发布时间】:2019-01-31 06:09:55
【问题描述】:

我有一个简单的问题。我想在同一个键上加入 2 个KStreams,并以 GenericRecord 作为值:

final KStream<String, GenericRecord> obs = builder.stream("Observations");
final KStream<String, GenericRecord> foI = builder.stream("FeaturesOfInterest");

final KStream<String, GenericRecord> transformfoIT = foIT
    .map((key, value) -> KeyValue.pair(value.get("Observation").toString(), value)); 

final KStream<String, GenericRecord> merged = obsT.join(
    transformfoIT,
    (value, location) -> {
        value.put("FeatureOfInterest", location);
        System.out.println();
        return value;

    });

tranfsfromfoIT 就是这么简单的设置按键。这样obs.keytranformfoIT.key 对于消息是相同的。

但我的加入不起作用,因为我得到了:

KStream 类型中的方法 join(KTable, ValueJoiner) 不适用于参数 (KStream, (value, location) -> {})"

我不知道如何解决它。

我希望你能帮助我。

【问题讨论】:

    标签: java join apache-kafka apache-kafka-streams


    【解决方案1】:

    要加入两个KStream,您还需要通过JoinWindows 参数提供一个加入窗口。比较:

    https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#kstream-kstream-join

    因为缺少第三个JoinWindows参数,编译器认为你想调用join(KTable,...)而不是join(KStream,...),类型明显不兼容。

    【讨论】:

      猜你喜欢
      • 2018-01-19
      • 2020-04-25
      • 2019-09-07
      • 2019-03-30
      • 1970-01-01
      • 2020-07-30
      • 1970-01-01
      • 2018-02-28
      相关资源
      最近更新 更多