【问题标题】:Apache Kafka - Implementing a KTableApache Kafka - 实现 KTable
【发布时间】:2020-04-24 22:45:40
【问题描述】:

我是 Kafka Streams API 的新手,我正在尝试创建一个 KTable。我有一个输入主题:s-order-topic,是一个json格式的消息,如下图。

{ "current_ts": "2019-12-24 13:16:40.316952",
  "primary_keys": ["ID"],
  "before": null,
  "tokens": {"txid":"3.17.2493", 
             "csn":"64913009"},
  "op_type":"I",
  "after":  { "CODE":"AAAA41",
              "STATUS":"COMPLETED",
              "ID":24},
  "op_ts":"2019-12-24 13:16:40.316941",
  "table":"S_ORDER"} 

我阅读了来自该主题的消息,我想创建一个 KTable,它具有 key、字段 "after":"ID"value "after" 字段内的所有字段("ID" 除外)。

仅当我使用默认聚合函数(即计数)时,我才成功创建了 KTable。但是我很难创建自己的聚合函数。下面我介绍我尝试创建 KTable 的部分代码。

KTable<Long, String> s_table = builder.stream("s-order-topic",  Consumed.with(Serdes.Long(),Serdes.String()))
                .mapValues(value -> {
                    String time;
                    JSONObject json = new JSONObject(value);
                    if (json.getString("op_type").equals("I")) {
                        time = "after";
                    }else {
                        time = "before";
                    }
                    JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                    return json2.toString();
                })
               .groupBy((key, value) -> {
                    JSONObject json = new JSONObject(value);
                    return json.getLong("ID");
                }, Grouped.with(Serdes.Long(), Serdes.String()))
                .aggregate( ... );

如何实现这个 KTable?

我是否正确地解决了问题?

(mapValues -> 只保留“before”/“after”字段。groupBy -> 使ID成为消息的键。Aggregate -> ?)

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams ktable


    【解决方案1】:

    我为我的情况找到了解决方案。我实现的KTable如下所示:

     KTable<String, String> s_table = builder.stream("s-order-topic",  Consumed.with(Serdes.String(),Serdes.String()))
                    .mapValues(value -> {
                        String time;
                        JSONObject json = new JSONObject(value);
                        if (json.getString("op_type").equals("I")) {
                            time = "after";
                        }else {
                            time = "before";
                        }
                        JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                        return json2.toString();
                    })
                    .groupBy((key, value) -> {
                        JSONObject json = new JSONObject(value);
                        return String.valueOf(json.getLong("ID"));
                    }, Grouped.with(Serdes.String(), Serdes.String()))
                    .reduce((prev,newval)->newval);
    

    aggregate 函数不适合这种情况,而是我使用了reduce 函数。

    控制台消费者的输出如下所示:

    15   {"CODE":"AAAA17","STATUS":"PENDING","ID":15}
    18   {"CODE":"AAAA50","STATUS":"SUBMITTED","ID":18}
    4    {"CODE":"AAAA80","STATUS":"SUBMITTED","ID":4}
    19   {"CODE":"AAAA83","STATUS":"SUBMITTED","ID":19}
    18   {"CODE":"AAAA33","STATUS":"COMPLETED","ID":18}
    5    {"CODE":"AAAA38","STATUS":"PENDING","ID":5}
    10   {"CODE":"AAAA1","STATUS":"COMPLETED","ID":10}
    3    {"CODE":"AAAA68","STATUS":"NOT COMPLETED","ID":3}
    9    {"CODE":"AAAA89","STATUS":"PENDING","ID":9}
    

    【讨论】:

    • 附注:聚合是reduce() 的泛化,允许您更改输出类型(对于reduce,输入值类型== 输出值类型)。因此,使用聚合实际上也应该起作用。
    • @MatthiasJ.Sax 感谢您的更正!所以我的问题应该是我没有为我的案例正确实施聚合。
    猜你喜欢
    • 2022-12-02
    • 2018-05-23
    • 1970-01-01
    • 2020-01-31
    • 1970-01-01
    • 2016-12-11
    • 1970-01-01
    • 2017-08-13
    • 1970-01-01
    相关资源
    最近更新 更多