【问题标题】:How to extend the transformer in Kafka Scala?如何在 Kafka Scala 中扩展变压器?
【发布时间】:2020-02-21 02:43:01
【问题描述】:

我正在 Scala 中开发一个单词计数器的 Kafka 流式实现,我在其中扩展了转换器:

class WordCounter extends Transformer[String, String, (String, Long)]

然后在流中调用如下:

val counter: KStream[String, Long] = filtered_record.transform(new WordCounter, "count")

但是,通过 sbt 运行我的程序时出现以下错误:

[error]  required: org.apache.kafka.streams.kstream.TransformerSupplier[String,String,org.apache.kafka.streams.KeyValue[String,Long]]

我似乎无法弄清楚如何修复它,也找不到任何合适的 Kafka 类似实现示例。 有人知道我做错了什么吗?

【问题讨论】:

  • 试试filtered_record.transform(() => new WordCounter, "count")

标签: scala apache-kafka apache-kafka-streams


【解决方案1】:

transform()的签名是:

  def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]],
                        stateStoreNames: String*): KStream[K1, V1]

因此,transform()TransformerSupplier 作为第一个参数,而不是Transformer

另见javadocs

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-03-13
    • 1970-01-01
    • 1970-01-01
    • 2012-03-08
    • 2020-05-26
    • 2011-06-29
    • 2016-07-12
    • 1970-01-01
    相关资源
    最近更新 更多