【问题标题】:Why does Spark Streaming execute foreachRDD even when no new data is available?为什么即使没有新数据可用,Spark Streaming 也会执行 foreachRDD?
【发布时间】:2018-01-03 19:53:09
【问题描述】:

我有以下 Spark 流式传输示例:

val conf = new SparkConf().setAppName("Name").setMaster("local")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))

val directoryStream = ssc.textFileStream("""file:///C:/Users/something/something""")
directoryStream.foreachRDD(file => {
  println(file.count())
})

ssc.start()
ssc.awaitTermination()

即使文件夹是空的,它也会每 2 秒打印一次 0,就像文件夹中有一个空文件一样。我希望它仅在文件夹中存在新文件时进入foreachRDD。是不是我做错了什么?

我正在使用 Spark 1.6 和 Scala 2.10.7。

【问题讨论】:

    标签: scala apache-spark spark-streaming


    【解决方案1】:

    由于您的批处理持续时间为 2 秒,因此作业将每 2 秒触发一次,基本上触发点不是数据可用性,而是批处理持续时间,如果 DStream 时存在的数据包含数据,否则它将为空(使用下面的代码检查是否相同)

     dstream.foreachRDD{ rdd => if (!rdd.isEmpty) {// do something } }
    

    【讨论】:

    • 我明白了,感谢您提供的简单解决方案。现在我想知道,如果让我们说两个文本文件在 2 秒窗口内出现,它们是否结合在一起成为 foreachRDD 的一个迭代?或者 Spark 会在那个时候运行两次 foreachRDD 迭代?
    • 一个 RDD 一个批次,所以两个文本文件“联合”在一起。
    猜你喜欢
    • 2021-12-01
    • 2021-11-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多