【问题标题】:Apache Kafka - KStream with KStream Join latest messagesApache Kafka - KStream 与 KStream 加入最新消息
【发布时间】:2020-05-02 08:09:48
【问题描述】:

我已经创建了想要将它们连接在一起的 KStreams。两个流的输出如下:

流 1:

2    {"CODE":"AAAA96","STATUS":"SUBMITTED","ID":2}

流 2:

26   {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100926","ID":26}

我想创建这两个Streams的joined stream(内连接),所以我创建了以下KStream:

KStream<String, String> s_joined = s_order
        .join(s_order_item, (left,right) -> left + right,
                JoinWindows.of(Duration.ofSeconds(30)))
        .mapValues(value -> {
            String[] arrOfstr = value.split("(?<=})");
            JSONObject jl = new JSONObject(arrOfstr[0]);
            JSONObject jr = new JSONObject(arrOfstr[1]);
            JSONObject json = new JSONObject();
            Iterator<String> keys = jl.keys();
            while(keys.hasNext()) {
                String key = keys.next();
                json.put(key, jl.get(key));
            }
            keys = jr.keys();
            while(keys.hasNext()) {
                String key = keys.next();
                json.put(key, jr.get(key));
            }
            return json.toString();
        });

在这个 KStream 中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。

通过一个例子,我将解释我想要做什么:

以下消息在窗口内发布:

流 1

9 {"CODE":"AAAA98","STATUS":"CANCELED","ID":"9"}

流 2

9 {"DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}

加入流

发布的内容

9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}

我想要发布的内容

9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}

最后,我只想发布窗口内的最新消息,而不是全部。这可能吗?

【问题讨论】:

    标签: join apache-kafka apache-kafka-streams


    【解决方案1】:

    您可以使用返回KGroupedStreamgroupByKey 函数,然后使用map/reduce 函数以所需的方式对其进行转换。请参阅Kafka Streams DSL 了解更多信息。

    【讨论】:

      【解决方案2】:

      我找到了答案。实现我想要做的事情的方法是使用函数suppress。更详细地说,你groupByKey() KStream 然后使用Window 函数。最后,聚合分组数据并使用suppress

      s_joined.toStream()
              .groupByKey()
              .WindowedBy(...)
              .aggregate(...)
              .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
      
      

      【讨论】:

        猜你喜欢
        • 2018-09-18
        • 2020-04-21
        • 2022-10-24
        • 2019-09-28
        • 2018-08-30
        • 2019-09-26
        • 1970-01-01
        • 1970-01-01
        • 2018-02-23
        相关资源
        最近更新 更多