【问题标题】:Find Top K elements in WindowedStream - Flink在 WindowedStream 中查找 Top K 元素 - Flink
【发布时间】:2019-09-20 22:28:11
【问题描述】:

我是 Streams 领域的新手,第一次尝试时遇到了一些问题。

我想做的是在下面的window: WindowdStream 中找到Top K 元素。 我试图实现自己的功能,但不确定它实际上是如何工作的。

好像什么都没有打印出来

你有什么提示吗?

val parsedStream: DataStream[(String, Response)] = stream
      .mapWith(_.decodeOption[Response])
      .filter(_.isDefined)
      .map { record =>
        (
          s"${record.get.group.group_country}, ${record.get.group.group_city}",
          record.get
        )
      }

val topLocations = parsedStream
      .keyBy(_._1)
      .timeWindow(Time.days(7))
      .process(new SortByCountFunction)

SortByCountFunction

class SortByCountFunction
    extends ProcessWindowFunction[(String, Response), MeetUpLocationWindow, String, TimeWindow] {

    override def process(key: String,
                         context: Context,
                         elements: Iterable[(String, Response)],
                         out: Collector[MeetUpLocationWindow]): Unit = {

      val count: Map[String, Iterable[(String, Response)]] = elements.groupBy(_._1)

      val locAndCount: Seq[MeetUpLocation] = count.toList.map(tmp => {
        val location: String = tmp._1
        val meetUpList: Iterable[(String, Response)] = tmp._2
        MeetUpLocation(location, tmp._2.size, meetUpList.map(_._2).toList)
      })

      val output: List[MeetUpLocation] = locAndCount.sortBy(tup => tup.count).take(20).toList

      val windowEnd = context.window.getEnd

      out.collect(MeetUpLocationWindow(windowEnd, output))
    }
  }

case class MeetUpLocationWindow(endTs: Long, locations: List[MeetUpLocation])

case class MeetUpLocation(location: String, count: Int, meetUps: List[Response])

【问题讨论】:

    标签: scala stream streaming apache-flink flink-streaming


    【解决方案1】:

    当您的 Flink DataStream 作业无法产生任何输出时,通常的嫌疑人是:

    • 作业未在 StreamExecutionEnvironment 上调用 execute()(例如,env.execute()
    • 作业没有附加接收器(例如,TopLocations.print()
    • 该作业旨在使用事件时间,但水印设置不正确或空闲源阻止水印前进
    • 作业正在写入任务管理器日志,但没有人注意到
    • 输出类型的序列化器不产生输出

    如果没有更多信息,很难猜测在这种情况下哪些可能是问题所在。

    【讨论】:

    • 我认为问题出在SortByCountFunction 但无法检测到。
    • 在我看来,SortByCountFunction 总是会收集一些东西到输出,即使它是错误的结果。如果作业完全没有打印任何内容,那么肯定还有其他问题。
    猜你喜欢
    • 2019-09-20
    • 2019-10-09
    • 2022-11-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-24
    • 2021-01-13
    • 2021-08-29
    相关资源
    最近更新 更多