【问题标题】:How to join two KTable and write the result ktable in state store如何加入两个KTable并将结果ktable写入状态存储
【发布时间】:2018-12-04 15:54:26
【问题描述】:

我有两个 KTable 对象:

KTable<Long, byte[]> firstTable = builder.table("firstTopic", Consumed.with(Serdes.Long(), Serdes.ByteArray()));

 KTable<Long, byte[]> secondTable = builder.table("secondTopic",
        Consumed.with(Serdes.Long(), Serdes.ByteArray()));

之后我想加入这两个表:

firstTable.leftJoin(secondTable,
            (leftValue, rightValue) -> {
            try {
                return utils.serializeNetwork(utils.deserializeNetwork(leftValue));
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
            }
          )

所以我有两张表,我将它们连接到一张表中,我希望通过每个键将结果表存储在 kafka 状态存储中,但我不知道该怎么做。

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    您可以通过在 leftJoin 上指定 Materialized 参数并指定状态存储的名称来强制实现到本地存储。

    firstTable.leftJoin(..., Materialized.as("my-store-name"));
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-12-28
      • 1970-01-01
      • 2020-11-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-12-06
      相关资源
      最近更新 更多