【问题标题】:Kafka Stream aggregation by two fieldsKafka Stream 通过两个字段聚合
【发布时间】:2020-05-22 02:24:13
【问题描述】:

我正在使用 kafka 流为仓库中的项目创建聚合(总和)。
可以添加(例如从供应商处购买)或删除(例如出售的物品)该项目。
在应用中,一个仓库可以服务多个门店,公司有多个仓库。

在这种情况下,我需要使用两个字段对交易进行汇总和分组:商品名称和商店名称。

仅使用商品名称(一个字段)求和很简单,但如何使用其他分组(例如,每个商店的每件商品的总库存)或(每个仓库的总库存,每个商品名称)?

我的(过于简单的)代码

InventoryKafkaMessage.java

public class InventoryKafkaMessage {

    private String warehouseId;  // warehouse ID
    private String itemName;  // item name
    private long quantity;  // always positive
    private String type;  // ADD or REMOVE
    private String storeLocation; // store ID
    private long transactionTimestamp;
    // ... some others, but not relevant for this question
}

使用项目名称作为键发送到源主题的消息。

InventoryAggregatorStream.java
流是

        var inventorySerde = new JsonSerde<>(InventoryKafkaMessage.class);
        var sourceStream = builder.stream("supplychain-warehouse-inventory", Consumed.with(Serdes.String(), inventorySerde));

        // aggregating by key (item name)
        logisticStream.mapValues((k, v) -> v.getType().equalsIgnoreCase("ADD") ? v.getQuantity() : -1 * v.getQuantity())
                .groupByKey()
                .aggregate(() -> 0l, (aggKey, newValue, aggValue) -> aggValue + newValue,
                        Materialized.with(Serdes.String(), Serdes.Long()))
                .toStream().to("stream-supplychain-wharehouse-inventory-total", Produced.with(Serdes.String(), Serdes.Long()));

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    要对多个属性进行分组,您可以定义一个包含两个属性的组合类型并将其设置为键。例如,您可以定义一个类型:

    public class GroupingKey {
      private String warehouseId;
      private String itemName;
    
      public GroupingKey(String warehouseId, String itemName) {
        // set fields
      }
      // etc
    }
    
    // usage:
    
    sourceStream = builder.stream("supplychain-warehouse-inventory",
                                  Consumed.with(Serdes.String(), inventorySerde));
    newKeyStream = sourceStream.selectKey((k, v) -> new GroupingKey(v.warehouseId, v.itemName));
    
    newKeyStream.groupByKey()...
    

    【讨论】:

      猜你喜欢
      • 2023-03-26
      • 2016-11-23
      • 2020-12-10
      • 2022-10-02
      • 1970-01-01
      • 1970-01-01
      • 2021-04-01
      • 2023-03-14
      • 2017-09-13
      相关资源
      最近更新 更多