【问题标题】:How to produce individual key and values to kafka topic from the list of keys and values如何从键和值列表中为kafka主题生成单独的键和值
【发布时间】:2023-01-25 07:36:06
【问题描述】:

KStream<List<keys>, List<values>> mapValues = stream.selectKey((key, value) -> util.fetchKeys(key, value)) .mapValues(value -> util.fetchValues(value)) ;

以上是我的代码 sn-p。在这里我想遍历每个键和值,使用

.to("out-topic", keySerdes, valueSerdes)

这里keySerdes,valueSerdes是在avro转换的

如果有任何方法可以完成此操作,请告诉我。

提前致谢!

尝试展平以获得键和值对我不起作用

【问题讨论】:

    标签: java apache-kafka-streams


    【解决方案1】:

    我认为您可以做的是将拓扑更改为如下所示:

    stream.transform(() -> new MyTransformer()).to("out-topic", keySerde, valueSerde)

    Transformer 实例看起来像这样:

    MyTransformer<key, value> implements Transformer<key, value> {
      ProcessorContext context;
    
     void init(ProcessorContext context) {
        this.context = context;
     }
    
     Void<value> transform(key, value) {
        List<key> keys  = util.fetchKeys(key);
        List<value> values = util.fetchValues(value);
        //assumes both lists have the same length
        for(int i = 0; i < keys.size(); i++) {
          //forwards each key-value to the sink node
          context.forward(keys.get(i), values.get(i)
        }
      return null; //filtered out by Kafka Streams - not forwarded
    
      void close() { }
    }
    

    如果您使用的是 Kafka Streams v. 3.0 或更高版本,transform 已被弃用,因此在这种情况下,将 transform 换成更新的 process 运算符,但总的来说,原理是相同的。

    卫生部, 账单

    【讨论】:

      猜你喜欢
      • 2023-02-22
      • 2015-01-03
      • 2016-10-11
      • 2017-05-06
      • 2020-09-09
      • 1970-01-01
      • 1970-01-01
      • 2019-02-16
      • 1970-01-01
      相关资源
      最近更新 更多