【问题标题】:Scala: Cannot resolve overloaded methods (Flink WatermarkStrategy)Scala:无法解析重载方法(Flink WatermarkStrategy)
【发布时间】:2021-03-06 12:16:57
【问题描述】:

我正在关注 Flink 关于如何将 WatermarkStrategy 与 KafkaConsumer 一起使用的文档。代码如下所示

val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(
  WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))

val stream: DataStream[MyType] = env.addSource(kafkaSource)

每当我尝试编译上面的代码时,我都会收到一个错误提示

错误:使用替代方法重载了方法值 assignTimestampsAndWatermarks:

error: overloaded method value assignTimestampsAndWatermarks with alternatives:
[ERROR]   (x$1: org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR]   (x$1: org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR]   (x$1: org.apache.flink.api.common.eventtime.WatermarkStrategy[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String]
[ERROR]  cannot be applied to (org.apache.flink.api.common.eventtime.WatermarkStrategy[Nothing])
[ERROR]         consumer.assignTimestampsAndWatermarks(

【问题讨论】:

  • 你用的是哪个flink版本?能否请您粘贴完整的错误消息。
  • 我添加了错误。我正在使用 flink 1.11.2
  • 如果没有我自己的测试,我会说你在这里缺少一个类型。在 Watermarker 分配的一个测试用例中有一个很好的例子:github.com/apache/flink/blob/…。在那里您可以看到 WatermarkStrategy 采用一种类型(在您的示例中为 MyType )。希望有帮助

标签: apache-kafka apache-flink flink-streaming


【解决方案1】:

下面的代码返回 WatermarkStrategyy[Nothing] 而不是 WatermarkStrategy[String]

  WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))

我用这段代码解决了这个问题

val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
watermark: Watermark[String] = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
kafkaSource.assignTimestampsAndWatermarks(watermark)

【讨论】:

  • 这不适用于最新版本的 Flink。
【解决方案2】:

@Mayokun 是对的。但为了使代码更简单,您可以将类型信息放在静态方法之后:

val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props) 
kafkaSource.assignTimestampsAndWatermarks(
     WatermarkStrategy.forBoundedOutOfOrderness[MyType](Duration.ofSeconds(20))
)

【讨论】:

  • 是不是像获取20secs窗口的元素一样?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-09-17
  • 1970-01-01
  • 2021-03-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多