【问题标题】:Find count in WindowedStream - Flink在 WindowedStream 中查找计数 - Flink
【发布时间】:2019-09-20 08:43:36
【问题描述】:

我是 Streams 领域的新手,第一次尝试时遇到了一些问题。

更具体地说,我正在尝试使用 Flink 在滑动窗口中实现计数和分组功能。

我已经在普通的DateStream 中完成了它,但我无法让它在 WindowedStream 中工作。

您对我该怎么做有什么建议吗?

val parsedStream: DataStream[(String, Response)] = stream
      .mapWith(_.decodeOption[Response])
      .filter(_.isDefined)
      .map { record =>
        (
          s"${record.get.group.group_country}, ${record.get.group.group_state}, ${record.get.group.group_city}",
          record.get
        )
      }

val result: DataStream[((String, Response), Int)] = parsedStream
      .map((_, 1))
      .keyBy(_._1._1)
      .sum(1)

// The output of result is 
// ((us, GA, Atlanta,Response()), 14)
// ((us, SA, Atlanta,Response()), 4)

result
      .keyBy(_._1._1)
      .timeWindow(Time.seconds(5))

//the following part doesn't compile

      .apply(
        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
          def apply(
                   key: Tuple,
                   window: TimeWindow,
                   values: Iterable[(String, Response)],
                   out: Collector[(String, Int)]
                   ) {}
        }
      )

编译错误:

overloaded method value apply with alternatives:
  [R](function: (String, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[((String, com.flink.Response), Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$28: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
  [R](function: org.apache.flink.streaming.api.scala.function.WindowFunction[((String, com.flink.Response), Int),R,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$27: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
 cannot be applied to (org.apache.flink.streaming.api.functions.windowing.WindowFunction[((String, com.flink.Response), Int),(String, com.flink.Response),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]{def apply(key: String,window: org.apache.flink.streaming.api.windowing.windows.TimeWindow,input: Iterable[((String, com.flink.Response), Int)],out: org.apache.flink.util.Collector[(String, com.flink.Response)]): Unit})
      .apply(

【问题讨论】:

  • 嘿,你能粘贴你得到的编译错误吗?

标签: scala stream apache-flink flink-streaming


【解决方案1】:

我已经尝试了您的代码并发现了错误,您在为WindowFunction 声明类型时似乎有错误。

文档说WindowFunction 的预期类型是WindowFunction[IN, OUT, KEY, W &lt;: Window]。现在,如果您查看您的代码,您的IN 是您正在计算窗口的数据流的类型。流的类型是((String, Response), Int),而不是代码(String, Int) 中声明的类型。

如果您将未编译的部分更改为:

.apply(new WindowFunction[((String, Response), Int), (String, Response), String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[((String, Response), Int)], out: Collector[(String, Response)]): Unit = ???
})

编辑:至于第二个示例,通常由于相同的原因而发生错误。当您将keyByTuple 一起使用时,您有两个可能的函数可以使用keyBy(fields: Int*),它使用整数来使用提供的索引访问元组的字段(这是您所使用的)。还有keyBy(fun: T =&gt; K),您提供了一个函数来提取将要使用的密钥。

但是这些函数之间有一个重要的区别,其中一个函数将键返回为JavaTuple,而另一个函数返回具有确切类型的键。 所以基本上如果您在简化示例中将String 更改为Tuple,它应该可以清楚地编译。

【讨论】:

  • 我又遇到了编译错误。我更新了我的问题。
  • 看来这个错误与AccumulatingProcessingTimeWindowOperator有关,在最新的Flink中是不存在的。
  • 下面的代码已编译但没有做正确的事情.apply( (key: String, twindow: TimeWindow, it: Iterable[((String, Response), Int)], coll: Collector[(String, Int, Date)]) =&gt; { print(key) coll.collect((key, it.size, new Date(twindow.getEnd))) })
  • 你能详细说明给定的解决方案有什么问题吗??
【解决方案2】:

这是一个我们可以研究的更简单的例子

val source: DataStream[(JsonField, Int)] = env.fromElements(("hello", 1), ("hello", 2))

    val window2 = source
      .keyBy(0)
      .timeWindow(Time.minutes(1))
      .apply(new WindowFunction[(JsonField, Int), Int, String, TimeWindow] {})

【讨论】:

  • 嘿,我已经对我的答案进行了编辑,解释了为什么这个例子不起作用。
猜你喜欢
  • 2019-09-20
  • 2016-08-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-03-09
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多