【问题标题】:Collect results from RDDs in a dstream driver program从 dstream 驱动程序中的 RDD 收集结果
【发布时间】:2015-04-28 03:10:16
【问题描述】:

我在驱动程序中有这个函数,它将 rdds 的结果收集到一个数组中并将其发送回。但是,即使 RDD(在 dstream 中)有数据,该函数也会返回一个空数组...我做错了什么?

def runTopFunction() : Array[(String, Int)] = {
        val topSearches = some function....
        val summary = new ArrayBuffer[(String,Int)]()
        topSearches.foreachRDD(rdd => {
            summary = summary.++(rdd.collect())
        })    

    return summary.toArray
}

【问题讨论】:

    标签: apache-spark spark-streaming rdd dstream


    【解决方案1】:

    所以虽然foreachRDD 会做你想做的事,但它也是非阻塞的,这意味着它不会等到所有流都被处理完。由于您在调用foreachRDD 后立即在缓冲区上调用toArray,因此尚未处理任何元素。

    【讨论】:

    • 而不是“非阻塞”,计算是惰性的并安排到稍后的时刻。因此,这个答案在术语上是不正确的。
    • foreachrdd 不像 DStream 或 RDD 上的转换那样惰性,它是一个动作而不是转换。
    • @user2888475 然而在streamingContext.start() 被调用之前什么都不会发生,并且在每个streaming interval 时间段都会发生一些事情。 Spark Streaming 中的操作导致调度的方式与 Spark 中它们导致执行的方式相同。即没有动作的 Dstream 不会做任何事情。
    【解决方案2】:

    DStream.forEachRDD 是对给定DStream 的操作,将安排在每个流式批处理间隔上执行。这是稍后执行的作业的声明式构造。

    不支持以这种方式累加值,因为虽然 Dstream.forEachRDD 只是说“在每次迭代中执行此操作”,但周围的累加代码会立即执行,从而导致空数组。

    根据summary 数据在计算后发生的情况,关于如何实现这一点的选项很少:

    • 如果需要由另一个进程检索数据,请使用共享线程安全结构。优先级队列非常适合 top-k 使用。
    • 如果要存储数据(fs, db),您可以在将topSearches 函数应用于 dstream 后写入存储。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-06-07
      • 2020-06-03
      • 2020-06-18
      • 1970-01-01
      • 2011-08-19
      相关资源
      最近更新 更多