【问题标题】:How to produce individual key and values to kafka topic from the list of keys and values如何从键和值列表中为kafka主题生成单独的键和值
【发布时间】:2023-01-25 07:36:06
【问题描述】:
KStream<List<keys>, List<values>> mapValues = stream.selectKey((key, value) -> util.fetchKeys(key, value)) .mapValues(value -> util.fetchValues(value)) ;
以上是我的代码 sn-p。在这里我想遍历每个键和值,使用
.to("out-topic", keySerdes, valueSerdes)。
这里keySerdes,valueSerdes是在avro转换的
如果有任何方法可以完成此操作,请告诉我。
提前致谢!
尝试展平以获得键和值对我不起作用
【问题讨论】:
标签:
java
apache-kafka-streams
【解决方案1】:
我认为您可以做的是将拓扑更改为如下所示:
stream.transform(() -> new MyTransformer()).to("out-topic", keySerde, valueSerde)
Transformer 实例看起来像这样:
MyTransformer<key, value> implements Transformer<key, value> {
ProcessorContext context;
void init(ProcessorContext context) {
this.context = context;
}
Void<value> transform(key, value) {
List<key> keys = util.fetchKeys(key);
List<value> values = util.fetchValues(value);
//assumes both lists have the same length
for(int i = 0; i < keys.size(); i++) {
//forwards each key-value to the sink node
context.forward(keys.get(i), values.get(i)
}
return null; //filtered out by Kafka Streams - not forwarded
void close() { }
}
如果您使用的是 Kafka Streams v. 3.0 或更高版本,transform 已被弃用,因此在这种情况下,将 transform 换成更新的 process 运算符,但总的来说,原理是相同的。
卫生部,
账单