【问题标题】:How to init KeyValueStore without conversion in KafkaStreams?如何在不转换 KafkaStreams 的情况下初始化 KeyValueStore?
【发布时间】:2021-05-11 15:04:49
【问题描述】:

第27行代码:github

我们可以看看Kafka Streams in Action这本书的作者是如何初始化KeyValueStore的。他使用脏转换(KeyValueStore)

public class PurchaseRewardTransformer implements ValueTransformer<Purchase, RewardAccumulator> {

    private KeyValueStore<String, Integer> stateStore;
    private final String storeName;
    private ProcessorContext context;

    public PurchaseRewardTransformer(String storeName) {
        Objects.requireNonNull(storeName,"Store Name can't be null");
        this.storeName = storeName;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.context = context;
        stateStore = (KeyValueStore) this.context.getStateStore(storeName);
    }

// ...

如何更有礼貌?

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    ProcessorContext.getStateStore 不返回定义的存储类型,它返回一个接口,因此如果您需要使用该实现的特定方法,则必须对其进行强制转换

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-25
      • 1970-01-01
      • 1970-01-01
      • 2019-04-09
      • 1970-01-01
      • 2022-12-07
      • 1970-01-01
      • 2020-05-21
      相关资源
      最近更新 更多